cache #9

Merged
k1nq merged 2 commits from cache into dev 2025-11-29 04:51:18 +00:00
15 changed files with 676 additions and 14 deletions

View File

@ -27,3 +27,14 @@ app/
Add new routers under `app/api/v1`, repositories under `app/repositories`, and keep business rules inside `app/services`. Add new routers under `app/api/v1`, repositories under `app/repositories`, and keep business rules inside `app/services`.
## Redis analytics cache
Analytics endpoints can use a Redis cache (TTL 120 seconds). The cache is disabled by default, so the service falls back to the database.
1. Start Redis and set the following variables:
- `REDIS_ENABLED=true`
- `REDIS_URL=redis://localhost:6379/0`
- `ANALYTICS_CACHE_TTL_SECONDS` (optional, defaults to 120)
- `ANALYTICS_CACHE_BACKOFF_MS` (max delay for write/delete retries, defaults to 200)
2. When Redis becomes unavailable, middleware logs the degradation and responses transparently fall back to database queries until connectivity is restored.

View File

@ -6,6 +6,7 @@ from fastapi import Depends, Header, HTTPException, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.core.cache import get_cache_client
from app.core.config import settings from app.core.config import settings
from app.core.database import get_session from app.core.database import get_session
from app.core.security import jwt_service, password_hasher from app.core.security import jwt_service, password_hasher
@ -29,6 +30,7 @@ from app.services.organization_service import (
OrganizationService, OrganizationService,
) )
from app.services.task_service import TaskService from app.services.task_service import TaskService
from redis.asyncio.client import Redis
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api_v1_prefix}/auth/token") oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api_v1_prefix}/auth/token")
@ -67,8 +69,19 @@ def get_analytics_repository(session: AsyncSession = Depends(get_db_session)) ->
return AnalyticsRepository(session=session) return AnalyticsRepository(session=session)
def get_deal_service(repo: DealRepository = Depends(get_deal_repository)) -> DealService: def get_cache_backend() -> Redis | None:
return DealService(repository=repo) return get_cache_client()
def get_deal_service(
repo: DealRepository = Depends(get_deal_repository),
cache: Redis | None = Depends(get_cache_backend),
) -> DealService:
return DealService(
repository=repo,
cache=cache,
cache_backoff_ms=settings.analytics_cache_backoff_ms,
)
def get_auth_service( def get_auth_service(
@ -95,8 +108,14 @@ def get_activity_service(
def get_analytics_service( def get_analytics_service(
repo: AnalyticsRepository = Depends(get_analytics_repository), repo: AnalyticsRepository = Depends(get_analytics_repository),
cache: Redis | None = Depends(get_cache_backend),
) -> AnalyticsService: ) -> AnalyticsService:
return AnalyticsService(repository=repo) return AnalyticsService(
repository=repo,
cache=cache,
ttl_seconds=settings.analytics_cache_ttl_seconds,
backoff_ms=settings.analytics_cache_backoff_ms,
)
def get_contact_service( def get_contact_service(

160
app/core/cache.py Normal file
View File

@ -0,0 +1,160 @@
"""Redis cache utilities and availability tracking."""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, Awaitable, Callable, Optional
import redis.asyncio as redis
from redis.asyncio.client import Redis
from redis.exceptions import RedisError
from app.core.config import settings
logger = logging.getLogger(__name__)
class RedisCacheManager:
"""Manages lifecycle and availability of the Redis cache client."""
def __init__(self) -> None:
self._client: Redis | None = None
self._available: bool = False
self._lock = asyncio.Lock()
@property
def is_enabled(self) -> bool:
return settings.redis_enabled
@property
def is_available(self) -> bool:
return self._available and self._client is not None
def get_client(self) -> Redis | None:
if not self.is_enabled:
return None
if self.is_available:
return self._client
return None
async def startup(self) -> None:
if not self.is_enabled:
return
async with self._lock:
if self._client is not None:
return
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
await self._refresh_availability()
async def shutdown(self) -> None:
async with self._lock:
if self._client is not None:
await self._client.close()
self._client = None
self._available = False
async def reconnect(self) -> None:
if not self.is_enabled:
return
async with self._lock:
if self._client is None:
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
await self._refresh_availability()
async def _refresh_availability(self) -> None:
if self._client is None:
self._available = False
return
try:
await self._client.ping()
except RedisError as exc: # pragma: no cover - logging only
self._available = False
logger.warning("Redis ping failed: %s", exc)
else:
self._available = True
def mark_unavailable(self) -> None:
self._available = False
def mark_available(self) -> None:
if self._client is not None:
self._available = True
cache_manager = RedisCacheManager()
async def init_cache() -> None:
"""Initialize Redis cache connection if enabled."""
await cache_manager.startup()
async def shutdown_cache() -> None:
"""Close Redis cache connection."""
await cache_manager.shutdown()
def get_cache_client() -> Optional[Redis]:
"""Expose the active Redis client for dependency injection."""
return cache_manager.get_client()
async def read_json(client: Redis, key: str) -> Any | None:
"""Read and decode JSON payload from Redis."""
try:
raw = await client.get(key)
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.debug("Redis GET failed for %s: %s", key, exc)
return None
if raw is None:
return None
cache_manager.mark_available()
try:
return json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc: # pragma: no cover - malformed payloads
logger.warning("Discarding malformed cache entry %s: %s", key, exc)
return None
async def write_json(client: Redis, key: str, value: Any, ttl_seconds: int, backoff_ms: int) -> None:
"""Serialize data to JSON and store it with TTL using retry/backoff."""
payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
async def _operation() -> Any:
return await client.set(name=key, value=payload, ex=ttl_seconds)
await _run_with_retry(_operation, backoff_ms)
async def delete_keys(client: Redis, keys: list[str], backoff_ms: int) -> None:
"""Delete cache keys with retry/backoff semantics."""
if not keys:
return
async def _operation() -> Any:
return await client.delete(*keys)
await _run_with_retry(_operation, backoff_ms)
async def _run_with_retry(operation: Callable[[], Awaitable[Any]], max_sleep_ms: int) -> None:
try:
await operation()
cache_manager.mark_available()
return
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.debug("Redis cache operation failed: %s", exc)
if max_sleep_ms <= 0:
return
sleep_seconds = min(max_sleep_ms / 1000, 0.1)
await asyncio.sleep(sleep_seconds)
await cache_manager.reconnect()
try:
await operation()
cache_manager.mark_available()
except RedisError as exc: # pragma: no cover - repeated network errors
cache_manager.mark_unavailable()
logger.warning("Redis cache operation failed after retry: %s", exc)

View File

@ -20,6 +20,14 @@ class Settings(BaseSettings):
jwt_algorithm: str = "HS256" jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 30 access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7 refresh_token_expire_days: int = 7
redis_enabled: bool = Field(default=False, description="Toggle Redis-backed cache usage")
redis_url: str = Field(default="redis://localhost:6379/0", description="Redis connection URL")
analytics_cache_ttl_seconds: int = Field(default=120, ge=1, description="TTL for cached analytics responses")
analytics_cache_backoff_ms: int = Field(
default=200,
ge=0,
description="Maximum backoff (ms) for retrying cache writes/invalidation",
)
settings = Settings() settings = Settings()

View File

@ -0,0 +1 @@
"""Application middleware components."""

View File

@ -0,0 +1,38 @@
"""Middleware that logs cache availability transitions."""
from __future__ import annotations
import logging
from starlette.types import ASGIApp, Receive, Scope, Send
from app.core.cache import cache_manager
from app.core.config import settings
logger = logging.getLogger(__name__)
class CacheAvailabilityMiddleware:
"""Logs when Redis cache becomes unavailable or recovers."""
def __init__(self, app: ASGIApp) -> None:
self.app = app
self._last_state: bool | None = None
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "http" and settings.redis_enabled:
self._log_transition()
await self.app(scope, receive, send)
def _log_transition(self) -> None:
available = cache_manager.is_available
if self._last_state is None:
self._last_state = available
if not available:
logger.warning("Redis cache unavailable, serving responses without cache")
return
if available == self._last_state:
return
if available:
logger.info("Redis cache connectivity restored; caching re-enabled")
else:
logger.warning("Redis cache unavailable, serving responses without cache")
self._last_state = available

View File

@ -1,14 +1,30 @@
"""FastAPI application factory.""" """FastAPI application factory."""
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
from app.api.routes import api_router from app.api.routes import api_router
from app.core.cache import init_cache, shutdown_cache
from app.core.config import settings from app.core.config import settings
from app.core.middleware.cache_monitor import CacheAvailabilityMiddleware
def create_app() -> FastAPI: def create_app() -> FastAPI:
"""Build FastAPI application instance.""" """Build FastAPI application instance."""
application = FastAPI(title=settings.project_name, version=settings.version) @asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
await init_cache()
try:
yield
finally:
await shutdown_cache()
application = FastAPI(title=settings.project_name, version=settings.version, lifespan=lifespan)
application.include_router(api_router) application.include_router(api_router)
application.add_middleware(CacheAvailabilityMiddleware)
return application return application

View File

@ -1,11 +1,16 @@
"""Analytics-related business logic.""" """Analytics-related business logic."""
from __future__ import annotations from __future__ import annotations
import logging
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from decimal import Decimal from decimal import Decimal, InvalidOperation
from typing import Iterable from typing import Any, Iterable
from redis.asyncio.client import Redis
from redis.exceptions import RedisError
from app.core.cache import cache_manager, delete_keys, read_json, write_json
from app.models.deal import DealStage, DealStatus from app.models.deal import DealStage, DealStatus
from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup
@ -53,13 +58,33 @@ class StageBreakdown:
conversion_to_next: float | None conversion_to_next: float | None
logger = logging.getLogger(__name__)
_SUMMARY_CACHE_PREFIX = "analytics:summary"
_FUNNEL_CACHE_PREFIX = "analytics:funnel"
class AnalyticsService: class AnalyticsService:
"""Provides aggregated analytics for deals.""" """Provides aggregated analytics for deals."""
def __init__(self, repository: AnalyticsRepository) -> None: def __init__(
self,
repository: AnalyticsRepository,
cache: Redis | None = None,
*,
ttl_seconds: int = 0,
backoff_ms: int = 0,
) -> None:
self._repository = repository self._repository = repository
self._cache = cache
self._ttl_seconds = ttl_seconds
self._backoff_ms = backoff_ms
async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary: async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary:
cached = await self._fetch_cached_summary(organization_id, days)
if cached is not None:
return cached
status_rollup = await self._repository.fetch_status_rollup(organization_id) status_rollup = await self._repository.fetch_status_rollup(organization_id)
status_map = {item.status: item for item in status_rollup} status_map = {item.status: item for item in status_rollup}
@ -87,7 +112,7 @@ class AnalyticsService:
window_threshold = _threshold_from_days(days) window_threshold = _threshold_from_days(days)
new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold) new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold)
return DealSummary( summary = DealSummary(
by_status=summaries, by_status=summaries,
won=WonStatistics( won=WonStatistics(
count=won_count, count=won_count,
@ -98,7 +123,14 @@ class AnalyticsService:
total_deals=total_deals, total_deals=total_deals,
) )
await self._store_summary_cache(organization_id, days, summary)
return summary
async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]: async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]:
cached = await self._fetch_cached_funnel(organization_id)
if cached is not None:
return cached
rollup = await self._repository.fetch_stage_status_rollup(organization_id) rollup = await self._repository.fetch_stage_status_rollup(organization_id)
stage_map = _build_stage_map(rollup) stage_map = _build_stage_map(rollup)
@ -121,8 +153,44 @@ class AnalyticsService:
conversion_to_next=conversion, conversion_to_next=conversion,
) )
) )
await self._store_funnel_cache(organization_id, breakdowns)
return breakdowns return breakdowns
def _is_cache_enabled(self) -> bool:
return self._cache is not None and self._ttl_seconds > 0
async def _fetch_cached_summary(self, organization_id: int, days: int) -> DealSummary | None:
if not self._is_cache_enabled() or self._cache is None:
return None
key = _summary_cache_key(organization_id, days)
payload = await read_json(self._cache, key)
if payload is None:
return None
return _deserialize_summary(payload)
async def _store_summary_cache(self, organization_id: int, days: int, summary: DealSummary) -> None:
if not self._is_cache_enabled() or self._cache is None:
return
key = _summary_cache_key(organization_id, days)
payload = _serialize_summary(summary)
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
async def _fetch_cached_funnel(self, organization_id: int) -> list[StageBreakdown] | None:
if not self._is_cache_enabled() or self._cache is None:
return None
key = _funnel_cache_key(organization_id)
payload = await read_json(self._cache, key)
if payload is None:
return None
return _deserialize_funnel(payload)
async def _store_funnel_cache(self, organization_id: int, breakdowns: list[StageBreakdown]) -> None:
if not self._is_cache_enabled() or self._cache is None:
return
key = _funnel_cache_key(organization_id)
payload = _serialize_funnel(breakdowns)
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
def _threshold_from_days(days: int) -> datetime: def _threshold_from_days(days: int) -> datetime:
return datetime.now(timezone.utc) - timedelta(days=days) return datetime.now(timezone.utc) - timedelta(days=days)
@ -137,3 +205,137 @@ def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dic
stage_map.setdefault(item.stage, {status: 0 for status in DealStatus}) stage_map.setdefault(item.stage, {status: 0 for status in DealStatus})
stage_map[item.stage][item.status] = item.deal_count stage_map[item.stage][item.status] = item.deal_count
return stage_map return stage_map
def _summary_cache_key(organization_id: int, days: int) -> str:
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:{days}"
def summary_cache_pattern(organization_id: int) -> str:
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:*"
def _funnel_cache_key(organization_id: int) -> str:
return f"{_FUNNEL_CACHE_PREFIX}:{organization_id}"
def funnel_cache_key(organization_id: int) -> str:
return _funnel_cache_key(organization_id)
def _serialize_summary(summary: DealSummary) -> dict[str, Any]:
return {
"by_status": [
{
"status": item.status.value,
"count": item.count,
"amount_sum": str(item.amount_sum),
}
for item in summary.by_status
],
"won": {
"count": summary.won.count,
"amount_sum": str(summary.won.amount_sum),
"average_amount": str(summary.won.average_amount),
},
"new_deals": {
"days": summary.new_deals.days,
"count": summary.new_deals.count,
},
"total_deals": summary.total_deals,
}
def _deserialize_summary(payload: Any) -> DealSummary | None:
try:
by_status_payload = payload["by_status"]
won_payload = payload["won"]
new_deals_payload = payload["new_deals"]
total_deals = int(payload["total_deals"])
except (KeyError, TypeError, ValueError):
return None
summaries: list[StatusSummary] = []
try:
for item in by_status_payload:
summaries.append(
StatusSummary(
status=DealStatus(item["status"]),
count=int(item["count"]),
amount_sum=Decimal(item["amount_sum"]),
)
)
won = WonStatistics(
count=int(won_payload["count"]),
amount_sum=Decimal(won_payload["amount_sum"]),
average_amount=Decimal(won_payload["average_amount"]),
)
new_deals = NewDealsWindow(
days=int(new_deals_payload["days"]),
count=int(new_deals_payload["count"]),
)
except (KeyError, TypeError, ValueError, InvalidOperation):
return None
return DealSummary(by_status=summaries, won=won, new_deals=new_deals, total_deals=total_deals)
def _serialize_funnel(breakdowns: list[StageBreakdown]) -> list[dict[str, Any]]:
serialized: list[dict[str, Any]] = []
for item in breakdowns:
serialized.append(
{
"stage": item.stage.value,
"total": item.total,
"by_status": {status.value: count for status, count in item.by_status.items()},
"conversion_to_next": item.conversion_to_next,
}
)
return serialized
def _deserialize_funnel(payload: Any) -> list[StageBreakdown] | None:
if not isinstance(payload, list):
return None
breakdowns: list[StageBreakdown] = []
try:
for item in payload:
by_status_payload = item["by_status"]
by_status = {DealStatus(key): int(value) for key, value in by_status_payload.items()}
breakdowns.append(
StageBreakdown(
stage=DealStage(item["stage"]),
total=int(item["total"]),
by_status=by_status,
conversion_to_next=float(item["conversion_to_next"]) if item["conversion_to_next"] is not None else None,
)
)
except (KeyError, TypeError, ValueError):
return None
return breakdowns
async def invalidate_analytics_cache(cache: Redis | None, organization_id: int, backoff_ms: int) -> None:
"""Remove cached analytics payloads for the organization."""
if cache is None:
return
summary_pattern = summary_cache_pattern(organization_id)
keys: list[str] = [funnel_cache_key(organization_id)]
try:
async for raw_key in cache.scan_iter(match=summary_pattern):
if isinstance(raw_key, bytes):
keys.append(raw_key.decode("utf-8"))
else:
keys.append(str(raw_key))
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.warning(
"Failed to enumerate summary cache keys for organization %s: %s",
organization_id,
exc,
)
return
await delete_keys(cache, keys, backoff_ms)

View File

@ -5,6 +5,7 @@ from collections.abc import Iterable
from dataclasses import dataclass from dataclasses import dataclass
from decimal import Decimal from decimal import Decimal
from redis.asyncio.client import Redis
from sqlalchemy import func, select from sqlalchemy import func, select
from app.models.activity import Activity, ActivityType from app.models.activity import Activity, ActivityType
@ -12,6 +13,7 @@ from app.models.contact import Contact
from app.models.deal import Deal, DealCreate, DealStage, DealStatus from app.models.deal import Deal, DealCreate, DealStage, DealStatus
from app.models.organization_member import OrganizationRole from app.models.organization_member import OrganizationRole
from app.repositories.deal_repo import DealRepository from app.repositories.deal_repo import DealRepository
from app.services.analytics_service import invalidate_analytics_cache
from app.services.organization_service import OrganizationContext from app.services.organization_service import OrganizationContext
@ -61,13 +63,23 @@ class DealUpdateData:
class DealService: class DealService:
"""Encapsulates deal workflows and validations.""" """Encapsulates deal workflows and validations."""
def __init__(self, repository: DealRepository) -> None: def __init__(
self,
repository: DealRepository,
cache: Redis | None = None,
*,
cache_backoff_ms: int = 0,
) -> None:
self._repository = repository self._repository = repository
self._cache = cache
self._cache_backoff_ms = cache_backoff_ms
async def create_deal(self, data: DealCreate, *, context: OrganizationContext) -> Deal: async def create_deal(self, data: DealCreate, *, context: OrganizationContext) -> Deal:
self._ensure_same_organization(data.organization_id, context) self._ensure_same_organization(data.organization_id, context)
await self._ensure_contact_in_organization(data.contact_id, context.organization_id) await self._ensure_contact_in_organization(data.contact_id, context.organization_id)
return await self._repository.create(data=data, role=context.role, user_id=context.user_id) deal = await self._repository.create(data=data, role=context.role, user_id=context.user_id)
await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms)
return deal
async def update_deal( async def update_deal(
self, self,
@ -111,6 +123,7 @@ class DealService:
author_id=context.user_id, author_id=context.user_id,
activities=[activity for activity in [stage_activity, status_activity] if activity], activities=[activity for activity in [stage_activity, status_activity] if activity],
) )
await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms)
return updated return updated
async def ensure_contact_can_be_deleted(self, contact_id: int) -> None: async def ensure_contact_can_be_deleted(self, contact_id: int) -> None:

View File

@ -12,6 +12,7 @@ dependencies = [
"pyjwt>=2.9.0", "pyjwt>=2.9.0",
"pydantic-settings>=2.12.0", "pydantic-settings>=2.12.0",
"sqlalchemy>=2.0.44", "sqlalchemy>=2.0.44",
"redis>=5.2.0",
] ]
[dependency-groups] [dependency-groups]

View File

@ -8,10 +8,11 @@ import pytest_asyncio
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.api.deps import get_db_session from app.api.deps import get_cache_backend, get_db_session
from app.core.security import password_hasher from app.core.security import password_hasher
from app.main import create_app from app.main import create_app
from app.models import Base from app.models import Base
from tests.utils.fake_redis import InMemoryRedis
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@ -41,6 +42,7 @@ async def session_factory() -> AsyncGenerator[async_sessionmaker[AsyncSession],
@pytest_asyncio.fixture() @pytest_asyncio.fixture()
async def client( async def client(
session_factory: async_sessionmaker[AsyncSession], session_factory: async_sessionmaker[AsyncSession],
cache_stub: InMemoryRedis,
) -> AsyncGenerator[AsyncClient, None]: ) -> AsyncGenerator[AsyncClient, None]:
app = create_app() app = create_app()
@ -54,6 +56,12 @@ async def client(
raise raise
app.dependency_overrides[get_db_session] = _get_session_override app.dependency_overrides[get_db_session] = _get_session_override
app.dependency_overrides[get_cache_backend] = lambda: cache_stub
transport = ASGITransport(app=app) transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as test_client: async with AsyncClient(transport=transport, base_url="http://testserver") as test_client:
yield test_client yield test_client
@pytest.fixture()
def cache_stub() -> InMemoryRedis:
return InMemoryRedis()

View File

@ -23,6 +23,7 @@ class AnalyticsScenario:
user_id: int user_id: int
user_email: str user_email: str
token: str token: str
in_progress_deal_id: int
async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSession]) -> AnalyticsScenario: async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSession]) -> AnalyticsScenario:
@ -102,6 +103,7 @@ async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSe
user_id=user.id, user_id=user.id,
user_email=user.email, user_email=user.email,
token=token, token=token,
in_progress_deal_id=next(deal.id for deal in deals if deal.status is DealStatus.IN_PROGRESS),
) )
@ -163,4 +165,37 @@ async def test_deals_funnel_returns_breakdown(
qualification = next(item for item in payload["stages"] if item["stage"] == DealStage.QUALIFICATION.value) qualification = next(item for item in payload["stages"] if item["stage"] == DealStage.QUALIFICATION.value)
assert qualification["total"] == 1 assert qualification["total"] == 1
proposal = next(item for item in payload["stages"] if item["stage"] == DealStage.PROPOSAL.value) proposal = next(item for item in payload["stages"] if item["stage"] == DealStage.PROPOSAL.value)
assert proposal["conversion_to_next"] == 100.0 assert proposal["conversion_to_next"] == 100.0
@pytest.mark.asyncio
async def test_deal_update_invalidates_cached_summary(
session_factory: async_sessionmaker[AsyncSession],
client: AsyncClient,
cache_stub,
) -> None:
scenario = await prepare_analytics_scenario(session_factory)
headers = _headers(scenario.token, scenario.organization_id)
first = await client.get(
"/api/v1/analytics/deals/summary?days=30",
headers=headers,
)
assert first.status_code == 200
keys = [key async for key in cache_stub.scan_iter("analytics:summary:*")]
assert keys, "cache should contain warmed summary"
patch_response = await client.patch(
f"/api/v1/deals/{scenario.in_progress_deal_id}",
json={"status": DealStatus.WON.value, "stage": DealStage.CLOSED.value},
headers=headers,
)
assert patch_response.status_code == 200
refreshed = await client.get(
"/api/v1/analytics/deals/summary?days=30",
headers=headers,
)
assert refreshed.status_code == 200
payload = refreshed.json()
assert payload["won"]["count"] == 2

View File

@ -17,7 +17,8 @@ from app.models.organization import Organization
from app.models.organization_member import OrganizationMember, OrganizationRole from app.models.organization_member import OrganizationMember, OrganizationRole
from app.models.user import User from app.models.user import User
from app.repositories.analytics_repo import AnalyticsRepository from app.repositories.analytics_repo import AnalyticsRepository
from app.services.analytics_service import AnalyticsService from app.services.analytics_service import AnalyticsService, invalidate_analytics_cache
from tests.utils.fake_redis import InMemoryRedis
@pytest_asyncio.fixture() @pytest_asyncio.fixture()
@ -149,4 +150,85 @@ async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession
assert proposal.conversion_to_next == 200.0 assert proposal.conversion_to_next == 200.0
last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED) last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED)
assert last_stage.conversion_to_next is None assert last_stage.conversion_to_next is None
class _ExplodingRepository(AnalyticsRepository):
async def fetch_status_rollup(self, organization_id: int): # type: ignore[override]
raise AssertionError("cache not used for status rollup")
async def count_new_deals_since(self, organization_id: int, threshold): # type: ignore[override]
raise AssertionError("cache not used for new deal count")
async def fetch_stage_status_rollup(self, organization_id: int): # type: ignore[override]
raise AssertionError("cache not used for funnel rollup")
@pytest.mark.asyncio
async def test_summary_reads_from_cache_when_available(session: AsyncSession) -> None:
org_id, _, _ = await _seed_data(session)
cache = InMemoryRedis()
service = AnalyticsService(
repository=AnalyticsRepository(session),
cache=cache,
ttl_seconds=60,
backoff_ms=0,
)
await service.get_deal_summary(org_id, days=30)
service._repository = _ExplodingRepository(session)
cached = await service.get_deal_summary(org_id, days=30)
assert cached.total_deals == 6
@pytest.mark.asyncio
async def test_invalidation_refreshes_cached_summary(session: AsyncSession) -> None:
org_id, _, contact_id = await _seed_data(session)
cache = InMemoryRedis()
service = AnalyticsService(
repository=AnalyticsRepository(session),
cache=cache,
ttl_seconds=60,
backoff_ms=0,
)
await service.get_deal_summary(org_id, days=30)
deal = Deal(
organization_id=org_id,
contact_id=contact_id,
owner_id=1,
title="New",
amount=Decimal("50"),
status=DealStatus.NEW,
stage=DealStage.QUALIFICATION,
created_at=datetime.now(timezone.utc),
)
session.add(deal)
await session.commit()
cached = await service.get_deal_summary(org_id, days=30)
assert cached.total_deals == 6
await invalidate_analytics_cache(cache, org_id, backoff_ms=0)
refreshed = await service.get_deal_summary(org_id, days=30)
assert refreshed.total_deals == 7
@pytest.mark.asyncio
async def test_funnel_reads_from_cache_when_available(session: AsyncSession) -> None:
org_id, _, _ = await _seed_data(session)
cache = InMemoryRedis()
service = AnalyticsService(
repository=AnalyticsRepository(session),
cache=cache,
ttl_seconds=60,
backoff_ms=0,
)
await service.get_deal_funnel(org_id)
service._repository = _ExplodingRepository(session)
cached = await service.get_deal_funnel(org_id)
assert len(cached) == 4

57
tests/utils/fake_redis.py Normal file
View File

@ -0,0 +1,57 @@
"""Simple in-memory Redis replacement for tests."""
from __future__ import annotations
import fnmatch
import time
from collections.abc import AsyncIterator
class InMemoryRedis:
"""Subset of redis.asyncio.Redis API backed by an in-memory dict."""
def __init__(self) -> None:
self._store: dict[str, bytes] = {}
self._expirations: dict[str, float] = {}
async def ping(self) -> bool: # pragma: no cover - compatibility shim
return True
async def get(self, name: str) -> bytes | None:
self._purge_if_expired(name)
return self._store.get(name)
async def set(self, name: str, value: bytes, ex: int | None = None) -> None:
self._store[name] = value
if ex is not None:
self._expirations[name] = time.monotonic() + ex
elif name in self._expirations:
self._expirations.pop(name, None)
async def delete(self, *names: str) -> int:
removed = 0
for name in names:
if name in self._store:
del self._store[name]
removed += 1
self._expirations.pop(name, None)
return removed
async def close(self) -> None: # pragma: no cover - interface completeness
self._store.clear()
self._expirations.clear()
async def scan_iter(self, match: str) -> AsyncIterator[str]:
pattern = match or "*"
for key in list(self._store.keys()):
self._purge_if_expired(key)
for key in self._store.keys():
if fnmatch.fnmatch(key, pattern):
yield key
def _purge_if_expired(self, name: str) -> None:
expires_at = self._expirations.get(name)
if expires_at is None:
return
if expires_at <= time.monotonic():
self._store.pop(name, None)
self._expirations.pop(name, None)

11
uv.lock
View File

@ -692,6 +692,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" }, { url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
] ]
[[package]]
name = "redis"
version = "7.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/43/c8/983d5c6579a411d8a99bc5823cc5712768859b5ce2c8afe1a65b37832c81/redis-7.1.0.tar.gz", hash = "sha256:b1cc3cfa5a2cb9c2ab3ba700864fb0ad75617b41f01352ce5779dabf6d5f9c3c", size = 4796669, upload-time = "2025-11-19T15:54:39.961Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/89/f0/8956f8a86b20d7bb9d6ac0187cf4cd54d8065bc9a1a09eb8011d4d326596/redis-7.1.0-py3-none-any.whl", hash = "sha256:23c52b208f92b56103e17c5d06bdc1a6c2c0b3106583985a76a18f83b265de2b", size = 354159, upload-time = "2025-11-19T15:54:38.064Z" },
]
[[package]] [[package]]
name = "rich" name = "rich"
version = "14.2.0" version = "14.2.0"
@ -850,6 +859,7 @@ dependencies = [
{ name = "passlib", extra = ["bcrypt"] }, { name = "passlib", extra = ["bcrypt"] },
{ name = "pydantic-settings" }, { name = "pydantic-settings" },
{ name = "pyjwt" }, { name = "pyjwt" },
{ name = "redis" },
{ name = "sqlalchemy" }, { name = "sqlalchemy" },
] ]
@ -871,6 +881,7 @@ requires-dist = [
{ name = "passlib", extras = ["bcrypt"], specifier = ">=1.7.4" }, { name = "passlib", extras = ["bcrypt"], specifier = ">=1.7.4" },
{ name = "pydantic-settings", specifier = ">=2.12.0" }, { name = "pydantic-settings", specifier = ">=2.12.0" },
{ name = "pyjwt", specifier = ">=2.9.0" }, { name = "pyjwt", specifier = ">=2.9.0" },
{ name = "redis", specifier = ">=5.2.0" },
{ name = "sqlalchemy", specifier = ">=2.0.44" }, { name = "sqlalchemy", specifier = ">=2.0.44" },
] ]