diff --git a/README.md b/README.md index b2f6d64..82f1e66 100644 --- a/README.md +++ b/README.md @@ -27,3 +27,14 @@ app/ Add new routers under `app/api/v1`, repositories under `app/repositories`, and keep business rules inside `app/services`. +## Redis analytics cache + +Analytics endpoints can use a Redis cache (TTL 120 seconds). The cache is disabled by default, so the service falls back to the database. + +1. Start Redis and set the following variables: + - `REDIS_ENABLED=true` + - `REDIS_URL=redis://localhost:6379/0` + - `ANALYTICS_CACHE_TTL_SECONDS` (optional, defaults to 120) + - `ANALYTICS_CACHE_BACKOFF_MS` (max delay for write/delete retries, defaults to 200) +2. When Redis becomes unavailable, middleware logs the degradation and responses transparently fall back to database queries until connectivity is restored. + diff --git a/app/api/deps.py b/app/api/deps.py index d55d4a0..6dba5ef 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -6,6 +6,7 @@ from fastapi import Depends, Header, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.ext.asyncio import AsyncSession +from app.core.cache import get_cache_client from app.core.config import settings from app.core.database import get_session from app.core.security import jwt_service, password_hasher @@ -29,6 +30,7 @@ from app.services.organization_service import ( OrganizationService, ) from app.services.task_service import TaskService +from redis.asyncio.client import Redis oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api_v1_prefix}/auth/token") @@ -67,8 +69,19 @@ def get_analytics_repository(session: AsyncSession = Depends(get_db_session)) -> return AnalyticsRepository(session=session) -def get_deal_service(repo: DealRepository = Depends(get_deal_repository)) -> DealService: - return DealService(repository=repo) +def get_cache_backend() -> Redis | None: + return get_cache_client() + + +def get_deal_service( + repo: DealRepository = Depends(get_deal_repository), + cache: Redis | None = Depends(get_cache_backend), +) -> DealService: + return DealService( + repository=repo, + cache=cache, + cache_backoff_ms=settings.analytics_cache_backoff_ms, + ) def get_auth_service( @@ -95,8 +108,14 @@ def get_activity_service( def get_analytics_service( repo: AnalyticsRepository = Depends(get_analytics_repository), + cache: Redis | None = Depends(get_cache_backend), ) -> AnalyticsService: - return AnalyticsService(repository=repo) + return AnalyticsService( + repository=repo, + cache=cache, + ttl_seconds=settings.analytics_cache_ttl_seconds, + backoff_ms=settings.analytics_cache_backoff_ms, + ) def get_contact_service( diff --git a/app/core/cache.py b/app/core/cache.py new file mode 100644 index 0000000..d879f40 --- /dev/null +++ b/app/core/cache.py @@ -0,0 +1,160 @@ +"""Redis cache utilities and availability tracking.""" +from __future__ import annotations + +import asyncio +import json +import logging +from typing import Any, Awaitable, Callable, Optional + +import redis.asyncio as redis +from redis.asyncio.client import Redis +from redis.exceptions import RedisError + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class RedisCacheManager: + """Manages lifecycle and availability of the Redis cache client.""" + + def __init__(self) -> None: + self._client: Redis | None = None + self._available: bool = False + self._lock = asyncio.Lock() + + @property + def is_enabled(self) -> bool: + return settings.redis_enabled + + @property + def is_available(self) -> bool: + return self._available and self._client is not None + + def get_client(self) -> Redis | None: + if not self.is_enabled: + return None + if self.is_available: + return self._client + return None + + async def startup(self) -> None: + if not self.is_enabled: + return + async with self._lock: + if self._client is not None: + return + self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False) + await self._refresh_availability() + + async def shutdown(self) -> None: + async with self._lock: + if self._client is not None: + await self._client.close() + self._client = None + self._available = False + + async def reconnect(self) -> None: + if not self.is_enabled: + return + async with self._lock: + if self._client is None: + self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False) + await self._refresh_availability() + + async def _refresh_availability(self) -> None: + if self._client is None: + self._available = False + return + try: + await self._client.ping() + except RedisError as exc: # pragma: no cover - logging only + self._available = False + logger.warning("Redis ping failed: %s", exc) + else: + self._available = True + + def mark_unavailable(self) -> None: + self._available = False + + def mark_available(self) -> None: + if self._client is not None: + self._available = True + + +cache_manager = RedisCacheManager() + + +async def init_cache() -> None: + """Initialize Redis cache connection if enabled.""" + await cache_manager.startup() + + +async def shutdown_cache() -> None: + """Close Redis cache connection.""" + await cache_manager.shutdown() + + +def get_cache_client() -> Optional[Redis]: + """Expose the active Redis client for dependency injection.""" + return cache_manager.get_client() + + +async def read_json(client: Redis, key: str) -> Any | None: + """Read and decode JSON payload from Redis.""" + try: + raw = await client.get(key) + except RedisError as exc: # pragma: no cover - network errors + cache_manager.mark_unavailable() + logger.debug("Redis GET failed for %s: %s", key, exc) + return None + if raw is None: + return None + cache_manager.mark_available() + try: + return json.loads(raw.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: # pragma: no cover - malformed payloads + logger.warning("Discarding malformed cache entry %s: %s", key, exc) + return None + + +async def write_json(client: Redis, key: str, value: Any, ttl_seconds: int, backoff_ms: int) -> None: + """Serialize data to JSON and store it with TTL using retry/backoff.""" + payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True).encode("utf-8") + + async def _operation() -> Any: + return await client.set(name=key, value=payload, ex=ttl_seconds) + + await _run_with_retry(_operation, backoff_ms) + + +async def delete_keys(client: Redis, keys: list[str], backoff_ms: int) -> None: + """Delete cache keys with retry/backoff semantics.""" + if not keys: + return + + async def _operation() -> Any: + return await client.delete(*keys) + + await _run_with_retry(_operation, backoff_ms) + + +async def _run_with_retry(operation: Callable[[], Awaitable[Any]], max_sleep_ms: int) -> None: + try: + await operation() + cache_manager.mark_available() + return + except RedisError as exc: # pragma: no cover - network errors + cache_manager.mark_unavailable() + logger.debug("Redis cache operation failed: %s", exc) + if max_sleep_ms <= 0: + return + sleep_seconds = min(max_sleep_ms / 1000, 0.1) + await asyncio.sleep(sleep_seconds) + await cache_manager.reconnect() + try: + await operation() + cache_manager.mark_available() + except RedisError as exc: # pragma: no cover - repeated network errors + cache_manager.mark_unavailable() + logger.warning("Redis cache operation failed after retry: %s", exc) diff --git a/app/core/config.py b/app/core/config.py index e838665..08cd870 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -20,6 +20,14 @@ class Settings(BaseSettings): jwt_algorithm: str = "HS256" access_token_expire_minutes: int = 30 refresh_token_expire_days: int = 7 + redis_enabled: bool = Field(default=False, description="Toggle Redis-backed cache usage") + redis_url: str = Field(default="redis://localhost:6379/0", description="Redis connection URL") + analytics_cache_ttl_seconds: int = Field(default=120, ge=1, description="TTL for cached analytics responses") + analytics_cache_backoff_ms: int = Field( + default=200, + ge=0, + description="Maximum backoff (ms) for retrying cache writes/invalidation", + ) settings = Settings() diff --git a/app/core/middleware/__init__.py b/app/core/middleware/__init__.py new file mode 100644 index 0000000..0178ac9 --- /dev/null +++ b/app/core/middleware/__init__.py @@ -0,0 +1 @@ +"""Application middleware components.""" diff --git a/app/core/middleware/cache_monitor.py b/app/core/middleware/cache_monitor.py new file mode 100644 index 0000000..4c17b9a --- /dev/null +++ b/app/core/middleware/cache_monitor.py @@ -0,0 +1,38 @@ +"""Middleware that logs cache availability transitions.""" +from __future__ import annotations + +import logging +from starlette.types import ASGIApp, Receive, Scope, Send + +from app.core.cache import cache_manager +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class CacheAvailabilityMiddleware: + """Logs when Redis cache becomes unavailable or recovers.""" + + def __init__(self, app: ASGIApp) -> None: + self.app = app + self._last_state: bool | None = None + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] == "http" and settings.redis_enabled: + self._log_transition() + await self.app(scope, receive, send) + + def _log_transition(self) -> None: + available = cache_manager.is_available + if self._last_state is None: + self._last_state = available + if not available: + logger.warning("Redis cache unavailable, serving responses without cache") + return + if available == self._last_state: + return + if available: + logger.info("Redis cache connectivity restored; caching re-enabled") + else: + logger.warning("Redis cache unavailable, serving responses without cache") + self._last_state = available diff --git a/app/main.py b/app/main.py index 8416969..89780dc 100644 --- a/app/main.py +++ b/app/main.py @@ -2,13 +2,25 @@ from fastapi import FastAPI from app.api.routes import api_router +from app.core.cache import init_cache, shutdown_cache from app.core.config import settings +from app.core.middleware.cache_monitor import CacheAvailabilityMiddleware def create_app() -> FastAPI: """Build FastAPI application instance.""" application = FastAPI(title=settings.project_name, version=settings.version) application.include_router(api_router) + application.add_middleware(CacheAvailabilityMiddleware) + + @application.on_event("startup") + async def _startup() -> None: + await init_cache() + + @application.on_event("shutdown") + async def _shutdown() -> None: + await shutdown_cache() + return application diff --git a/app/services/analytics_service.py b/app/services/analytics_service.py index 8fc9e46..7bf1c52 100644 --- a/app/services/analytics_service.py +++ b/app/services/analytics_service.py @@ -1,11 +1,16 @@ """Analytics-related business logic.""" from __future__ import annotations +import logging from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from decimal import Decimal -from typing import Iterable +from decimal import Decimal, InvalidOperation +from typing import Any, Iterable +from redis.asyncio.client import Redis +from redis.exceptions import RedisError + +from app.core.cache import cache_manager, delete_keys, read_json, write_json from app.models.deal import DealStage, DealStatus from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup @@ -53,13 +58,33 @@ class StageBreakdown: conversion_to_next: float | None +logger = logging.getLogger(__name__) + +_SUMMARY_CACHE_PREFIX = "analytics:summary" +_FUNNEL_CACHE_PREFIX = "analytics:funnel" + + class AnalyticsService: """Provides aggregated analytics for deals.""" - def __init__(self, repository: AnalyticsRepository) -> None: + def __init__( + self, + repository: AnalyticsRepository, + cache: Redis | None = None, + *, + ttl_seconds: int = 0, + backoff_ms: int = 0, + ) -> None: self._repository = repository + self._cache = cache + self._ttl_seconds = ttl_seconds + self._backoff_ms = backoff_ms async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary: + cached = await self._fetch_cached_summary(organization_id, days) + if cached is not None: + return cached + status_rollup = await self._repository.fetch_status_rollup(organization_id) status_map = {item.status: item for item in status_rollup} @@ -87,7 +112,7 @@ class AnalyticsService: window_threshold = _threshold_from_days(days) new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold) - return DealSummary( + summary = DealSummary( by_status=summaries, won=WonStatistics( count=won_count, @@ -98,7 +123,14 @@ class AnalyticsService: total_deals=total_deals, ) + await self._store_summary_cache(organization_id, days, summary) + return summary + async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]: + cached = await self._fetch_cached_funnel(organization_id) + if cached is not None: + return cached + rollup = await self._repository.fetch_stage_status_rollup(organization_id) stage_map = _build_stage_map(rollup) @@ -121,8 +153,44 @@ class AnalyticsService: conversion_to_next=conversion, ) ) + await self._store_funnel_cache(organization_id, breakdowns) return breakdowns + def _is_cache_enabled(self) -> bool: + return self._cache is not None and self._ttl_seconds > 0 + + async def _fetch_cached_summary(self, organization_id: int, days: int) -> DealSummary | None: + if not self._is_cache_enabled() or self._cache is None: + return None + key = _summary_cache_key(organization_id, days) + payload = await read_json(self._cache, key) + if payload is None: + return None + return _deserialize_summary(payload) + + async def _store_summary_cache(self, organization_id: int, days: int, summary: DealSummary) -> None: + if not self._is_cache_enabled() or self._cache is None: + return + key = _summary_cache_key(organization_id, days) + payload = _serialize_summary(summary) + await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms) + + async def _fetch_cached_funnel(self, organization_id: int) -> list[StageBreakdown] | None: + if not self._is_cache_enabled() or self._cache is None: + return None + key = _funnel_cache_key(organization_id) + payload = await read_json(self._cache, key) + if payload is None: + return None + return _deserialize_funnel(payload) + + async def _store_funnel_cache(self, organization_id: int, breakdowns: list[StageBreakdown]) -> None: + if not self._is_cache_enabled() or self._cache is None: + return + key = _funnel_cache_key(organization_id) + payload = _serialize_funnel(breakdowns) + await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms) + def _threshold_from_days(days: int) -> datetime: return datetime.now(timezone.utc) - timedelta(days=days) @@ -137,3 +205,137 @@ def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dic stage_map.setdefault(item.stage, {status: 0 for status in DealStatus}) stage_map[item.stage][item.status] = item.deal_count return stage_map + + +def _summary_cache_key(organization_id: int, days: int) -> str: + return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:{days}" + + +def summary_cache_pattern(organization_id: int) -> str: + return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:*" + + +def _funnel_cache_key(organization_id: int) -> str: + return f"{_FUNNEL_CACHE_PREFIX}:{organization_id}" + + +def funnel_cache_key(organization_id: int) -> str: + return _funnel_cache_key(organization_id) + + +def _serialize_summary(summary: DealSummary) -> dict[str, Any]: + return { + "by_status": [ + { + "status": item.status.value, + "count": item.count, + "amount_sum": str(item.amount_sum), + } + for item in summary.by_status + ], + "won": { + "count": summary.won.count, + "amount_sum": str(summary.won.amount_sum), + "average_amount": str(summary.won.average_amount), + }, + "new_deals": { + "days": summary.new_deals.days, + "count": summary.new_deals.count, + }, + "total_deals": summary.total_deals, + } + + +def _deserialize_summary(payload: Any) -> DealSummary | None: + try: + by_status_payload = payload["by_status"] + won_payload = payload["won"] + new_deals_payload = payload["new_deals"] + total_deals = int(payload["total_deals"]) + except (KeyError, TypeError, ValueError): + return None + + summaries: list[StatusSummary] = [] + try: + for item in by_status_payload: + summaries.append( + StatusSummary( + status=DealStatus(item["status"]), + count=int(item["count"]), + amount_sum=Decimal(item["amount_sum"]), + ) + ) + won = WonStatistics( + count=int(won_payload["count"]), + amount_sum=Decimal(won_payload["amount_sum"]), + average_amount=Decimal(won_payload["average_amount"]), + ) + new_deals = NewDealsWindow( + days=int(new_deals_payload["days"]), + count=int(new_deals_payload["count"]), + ) + except (KeyError, TypeError, ValueError, InvalidOperation): + return None + + return DealSummary(by_status=summaries, won=won, new_deals=new_deals, total_deals=total_deals) + + +def _serialize_funnel(breakdowns: list[StageBreakdown]) -> list[dict[str, Any]]: + serialized: list[dict[str, Any]] = [] + for item in breakdowns: + serialized.append( + { + "stage": item.stage.value, + "total": item.total, + "by_status": {status.value: count for status, count in item.by_status.items()}, + "conversion_to_next": item.conversion_to_next, + } + ) + return serialized + + +def _deserialize_funnel(payload: Any) -> list[StageBreakdown] | None: + if not isinstance(payload, list): + return None + breakdowns: list[StageBreakdown] = [] + try: + for item in payload: + by_status_payload = item["by_status"] + by_status = {DealStatus(key): int(value) for key, value in by_status_payload.items()} + breakdowns.append( + StageBreakdown( + stage=DealStage(item["stage"]), + total=int(item["total"]), + by_status=by_status, + conversion_to_next=float(item["conversion_to_next"]) if item["conversion_to_next"] is not None else None, + ) + ) + except (KeyError, TypeError, ValueError): + return None + return breakdowns + + +async def invalidate_analytics_cache(cache: Redis | None, organization_id: int, backoff_ms: int) -> None: + """Remove cached analytics payloads for the organization.""" + + if cache is None: + return + + summary_pattern = summary_cache_pattern(organization_id) + keys: list[str] = [funnel_cache_key(organization_id)] + try: + async for raw_key in cache.scan_iter(match=summary_pattern): + if isinstance(raw_key, bytes): + keys.append(raw_key.decode("utf-8")) + else: + keys.append(str(raw_key)) + except RedisError as exc: # pragma: no cover - network errors + cache_manager.mark_unavailable() + logger.warning( + "Failed to enumerate summary cache keys for organization %s: %s", + organization_id, + exc, + ) + return + + await delete_keys(cache, keys, backoff_ms) diff --git a/app/services/deal_service.py b/app/services/deal_service.py index 281811a..6b0b8b1 100644 --- a/app/services/deal_service.py +++ b/app/services/deal_service.py @@ -5,6 +5,7 @@ from collections.abc import Iterable from dataclasses import dataclass from decimal import Decimal +from redis.asyncio.client import Redis from sqlalchemy import func, select from app.models.activity import Activity, ActivityType @@ -12,6 +13,7 @@ from app.models.contact import Contact from app.models.deal import Deal, DealCreate, DealStage, DealStatus from app.models.organization_member import OrganizationRole from app.repositories.deal_repo import DealRepository +from app.services.analytics_service import invalidate_analytics_cache from app.services.organization_service import OrganizationContext @@ -61,13 +63,23 @@ class DealUpdateData: class DealService: """Encapsulates deal workflows and validations.""" - def __init__(self, repository: DealRepository) -> None: + def __init__( + self, + repository: DealRepository, + cache: Redis | None = None, + *, + cache_backoff_ms: int = 0, + ) -> None: self._repository = repository + self._cache = cache + self._cache_backoff_ms = cache_backoff_ms async def create_deal(self, data: DealCreate, *, context: OrganizationContext) -> Deal: self._ensure_same_organization(data.organization_id, context) await self._ensure_contact_in_organization(data.contact_id, context.organization_id) - return await self._repository.create(data=data, role=context.role, user_id=context.user_id) + deal = await self._repository.create(data=data, role=context.role, user_id=context.user_id) + await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms) + return deal async def update_deal( self, @@ -111,6 +123,7 @@ class DealService: author_id=context.user_id, activities=[activity for activity in [stage_activity, status_activity] if activity], ) + await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms) return updated async def ensure_contact_can_be_deleted(self, contact_id: int) -> None: diff --git a/pyproject.toml b/pyproject.toml index 139936b..7eb324f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "pyjwt>=2.9.0", "pydantic-settings>=2.12.0", "sqlalchemy>=2.0.44", + "redis>=5.2.0", ] [dependency-groups] diff --git a/tests/api/v1/conftest.py b/tests/api/v1/conftest.py index 8c8fcb6..89cafa5 100644 --- a/tests/api/v1/conftest.py +++ b/tests/api/v1/conftest.py @@ -8,10 +8,11 @@ import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine -from app.api.deps import get_db_session +from app.api.deps import get_cache_backend, get_db_session from app.core.security import password_hasher from app.main import create_app from app.models import Base +from tests.utils.fake_redis import InMemoryRedis @pytest.fixture(autouse=True) @@ -41,6 +42,7 @@ async def session_factory() -> AsyncGenerator[async_sessionmaker[AsyncSession], @pytest_asyncio.fixture() async def client( session_factory: async_sessionmaker[AsyncSession], + cache_stub: InMemoryRedis, ) -> AsyncGenerator[AsyncClient, None]: app = create_app() @@ -54,6 +56,12 @@ async def client( raise app.dependency_overrides[get_db_session] = _get_session_override + app.dependency_overrides[get_cache_backend] = lambda: cache_stub transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as test_client: yield test_client + + +@pytest.fixture() +def cache_stub() -> InMemoryRedis: + return InMemoryRedis() diff --git a/tests/api/v1/test_analytics.py b/tests/api/v1/test_analytics.py index 6656a7c..d45d89b 100644 --- a/tests/api/v1/test_analytics.py +++ b/tests/api/v1/test_analytics.py @@ -23,6 +23,7 @@ class AnalyticsScenario: user_id: int user_email: str token: str + in_progress_deal_id: int async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSession]) -> AnalyticsScenario: @@ -102,6 +103,7 @@ async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSe user_id=user.id, user_email=user.email, token=token, + in_progress_deal_id=next(deal.id for deal in deals if deal.status is DealStatus.IN_PROGRESS), ) @@ -163,4 +165,37 @@ async def test_deals_funnel_returns_breakdown( qualification = next(item for item in payload["stages"] if item["stage"] == DealStage.QUALIFICATION.value) assert qualification["total"] == 1 proposal = next(item for item in payload["stages"] if item["stage"] == DealStage.PROPOSAL.value) - assert proposal["conversion_to_next"] == 100.0 \ No newline at end of file + assert proposal["conversion_to_next"] == 100.0 + + +@pytest.mark.asyncio +async def test_deal_update_invalidates_cached_summary( + session_factory: async_sessionmaker[AsyncSession], + client: AsyncClient, + cache_stub, +) -> None: + scenario = await prepare_analytics_scenario(session_factory) + headers = _headers(scenario.token, scenario.organization_id) + + first = await client.get( + "/api/v1/analytics/deals/summary?days=30", + headers=headers, + ) + assert first.status_code == 200 + keys = [key async for key in cache_stub.scan_iter("analytics:summary:*")] + assert keys, "cache should contain warmed summary" + + patch_response = await client.patch( + f"/api/v1/deals/{scenario.in_progress_deal_id}", + json={"status": DealStatus.WON.value, "stage": DealStage.CLOSED.value}, + headers=headers, + ) + assert patch_response.status_code == 200 + + refreshed = await client.get( + "/api/v1/analytics/deals/summary?days=30", + headers=headers, + ) + assert refreshed.status_code == 200 + payload = refreshed.json() + assert payload["won"]["count"] == 2 \ No newline at end of file diff --git a/tests/services/test_analytics_service.py b/tests/services/test_analytics_service.py index d8514b9..a672d79 100644 --- a/tests/services/test_analytics_service.py +++ b/tests/services/test_analytics_service.py @@ -17,7 +17,8 @@ from app.models.organization import Organization from app.models.organization_member import OrganizationMember, OrganizationRole from app.models.user import User from app.repositories.analytics_repo import AnalyticsRepository -from app.services.analytics_service import AnalyticsService +from app.services.analytics_service import AnalyticsService, invalidate_analytics_cache +from tests.utils.fake_redis import InMemoryRedis @pytest_asyncio.fixture() @@ -149,4 +150,85 @@ async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession assert proposal.conversion_to_next == 200.0 last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED) - assert last_stage.conversion_to_next is None \ No newline at end of file + assert last_stage.conversion_to_next is None + + +class _ExplodingRepository(AnalyticsRepository): + async def fetch_status_rollup(self, organization_id: int): # type: ignore[override] + raise AssertionError("cache not used for status rollup") + + async def count_new_deals_since(self, organization_id: int, threshold): # type: ignore[override] + raise AssertionError("cache not used for new deal count") + + async def fetch_stage_status_rollup(self, organization_id: int): # type: ignore[override] + raise AssertionError("cache not used for funnel rollup") + + +@pytest.mark.asyncio +async def test_summary_reads_from_cache_when_available(session: AsyncSession) -> None: + org_id, _, _ = await _seed_data(session) + cache = InMemoryRedis() + service = AnalyticsService( + repository=AnalyticsRepository(session), + cache=cache, + ttl_seconds=60, + backoff_ms=0, + ) + + await service.get_deal_summary(org_id, days=30) + service._repository = _ExplodingRepository(session) + + cached = await service.get_deal_summary(org_id, days=30) + assert cached.total_deals == 6 + + +@pytest.mark.asyncio +async def test_invalidation_refreshes_cached_summary(session: AsyncSession) -> None: + org_id, _, contact_id = await _seed_data(session) + cache = InMemoryRedis() + service = AnalyticsService( + repository=AnalyticsRepository(session), + cache=cache, + ttl_seconds=60, + backoff_ms=0, + ) + + await service.get_deal_summary(org_id, days=30) + + deal = Deal( + organization_id=org_id, + contact_id=contact_id, + owner_id=1, + title="New", + amount=Decimal("50"), + status=DealStatus.NEW, + stage=DealStage.QUALIFICATION, + created_at=datetime.now(timezone.utc), + ) + session.add(deal) + await session.commit() + + cached = await service.get_deal_summary(org_id, days=30) + assert cached.total_deals == 6 + + await invalidate_analytics_cache(cache, org_id, backoff_ms=0) + refreshed = await service.get_deal_summary(org_id, days=30) + assert refreshed.total_deals == 7 + + +@pytest.mark.asyncio +async def test_funnel_reads_from_cache_when_available(session: AsyncSession) -> None: + org_id, _, _ = await _seed_data(session) + cache = InMemoryRedis() + service = AnalyticsService( + repository=AnalyticsRepository(session), + cache=cache, + ttl_seconds=60, + backoff_ms=0, + ) + + await service.get_deal_funnel(org_id) + service._repository = _ExplodingRepository(session) + + cached = await service.get_deal_funnel(org_id) + assert len(cached) == 4 \ No newline at end of file diff --git a/tests/utils/fake_redis.py b/tests/utils/fake_redis.py new file mode 100644 index 0000000..8d6e605 --- /dev/null +++ b/tests/utils/fake_redis.py @@ -0,0 +1,57 @@ +"""Simple in-memory Redis replacement for tests.""" +from __future__ import annotations + +import fnmatch +import time +from collections.abc import AsyncIterator + + +class InMemoryRedis: + """Subset of redis.asyncio.Redis API backed by an in-memory dict.""" + + def __init__(self) -> None: + self._store: dict[str, bytes] = {} + self._expirations: dict[str, float] = {} + + async def ping(self) -> bool: # pragma: no cover - compatibility shim + return True + + async def get(self, name: str) -> bytes | None: + self._purge_if_expired(name) + return self._store.get(name) + + async def set(self, name: str, value: bytes, ex: int | None = None) -> None: + self._store[name] = value + if ex is not None: + self._expirations[name] = time.monotonic() + ex + elif name in self._expirations: + self._expirations.pop(name, None) + + async def delete(self, *names: str) -> int: + removed = 0 + for name in names: + if name in self._store: + del self._store[name] + removed += 1 + self._expirations.pop(name, None) + return removed + + async def close(self) -> None: # pragma: no cover - interface completeness + self._store.clear() + self._expirations.clear() + + async def scan_iter(self, match: str) -> AsyncIterator[str]: + pattern = match or "*" + for key in list(self._store.keys()): + self._purge_if_expired(key) + for key in self._store.keys(): + if fnmatch.fnmatch(key, pattern): + yield key + + def _purge_if_expired(self, name: str) -> None: + expires_at = self._expirations.get(name) + if expires_at is None: + return + if expires_at <= time.monotonic(): + self._store.pop(name, None) + self._expirations.pop(name, None) diff --git a/uv.lock b/uv.lock index 1c2d9fb..26c0f59 100644 --- a/uv.lock +++ b/uv.lock @@ -692,6 +692,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" }, ] +[[package]] +name = "redis" +version = "7.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/43/c8/983d5c6579a411d8a99bc5823cc5712768859b5ce2c8afe1a65b37832c81/redis-7.1.0.tar.gz", hash = "sha256:b1cc3cfa5a2cb9c2ab3ba700864fb0ad75617b41f01352ce5779dabf6d5f9c3c", size = 4796669, upload-time = "2025-11-19T15:54:39.961Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/89/f0/8956f8a86b20d7bb9d6ac0187cf4cd54d8065bc9a1a09eb8011d4d326596/redis-7.1.0-py3-none-any.whl", hash = "sha256:23c52b208f92b56103e17c5d06bdc1a6c2c0b3106583985a76a18f83b265de2b", size = 354159, upload-time = "2025-11-19T15:54:38.064Z" }, +] + [[package]] name = "rich" version = "14.2.0" @@ -850,6 +859,7 @@ dependencies = [ { name = "passlib", extra = ["bcrypt"] }, { name = "pydantic-settings" }, { name = "pyjwt" }, + { name = "redis" }, { name = "sqlalchemy" }, ] @@ -871,6 +881,7 @@ requires-dist = [ { name = "passlib", extras = ["bcrypt"], specifier = ">=1.7.4" }, { name = "pydantic-settings", specifier = ">=2.12.0" }, { name = "pyjwt", specifier = ">=2.9.0" }, + { name = "redis", specifier = ">=5.2.0" }, { name = "sqlalchemy", specifier = ">=2.0.44" }, ]