cache #9
11
README.md
11
README.md
|
|
@ -27,3 +27,14 @@ app/
|
|||
|
||||
Add new routers under `app/api/v1`, repositories under `app/repositories`, and keep business rules inside `app/services`.
|
||||
|
||||
## Redis analytics cache
|
||||
|
||||
Analytics endpoints can use a Redis cache (TTL 120 seconds). The cache is disabled by default, so the service falls back to the database.
|
||||
|
||||
1. Start Redis and set the following variables:
|
||||
- `REDIS_ENABLED=true`
|
||||
- `REDIS_URL=redis://localhost:6379/0`
|
||||
- `ANALYTICS_CACHE_TTL_SECONDS` (optional, defaults to 120)
|
||||
- `ANALYTICS_CACHE_BACKOFF_MS` (max delay for write/delete retries, defaults to 200)
|
||||
2. When Redis becomes unavailable, middleware logs the degradation and responses transparently fall back to database queries until connectivity is restored.
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ from fastapi import Depends, Header, HTTPException, status
|
|||
from fastapi.security import OAuth2PasswordBearer
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.cache import get_cache_client
|
||||
from app.core.config import settings
|
||||
from app.core.database import get_session
|
||||
from app.core.security import jwt_service, password_hasher
|
||||
|
|
@ -29,6 +30,7 @@ from app.services.organization_service import (
|
|||
OrganizationService,
|
||||
)
|
||||
from app.services.task_service import TaskService
|
||||
from redis.asyncio.client import Redis
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api_v1_prefix}/auth/token")
|
||||
|
||||
|
|
@ -67,8 +69,19 @@ def get_analytics_repository(session: AsyncSession = Depends(get_db_session)) ->
|
|||
return AnalyticsRepository(session=session)
|
||||
|
||||
|
||||
def get_deal_service(repo: DealRepository = Depends(get_deal_repository)) -> DealService:
|
||||
return DealService(repository=repo)
|
||||
def get_cache_backend() -> Redis | None:
|
||||
return get_cache_client()
|
||||
|
||||
|
||||
def get_deal_service(
|
||||
repo: DealRepository = Depends(get_deal_repository),
|
||||
cache: Redis | None = Depends(get_cache_backend),
|
||||
) -> DealService:
|
||||
return DealService(
|
||||
repository=repo,
|
||||
cache=cache,
|
||||
cache_backoff_ms=settings.analytics_cache_backoff_ms,
|
||||
)
|
||||
|
||||
|
||||
def get_auth_service(
|
||||
|
|
@ -95,8 +108,14 @@ def get_activity_service(
|
|||
|
||||
def get_analytics_service(
|
||||
repo: AnalyticsRepository = Depends(get_analytics_repository),
|
||||
cache: Redis | None = Depends(get_cache_backend),
|
||||
) -> AnalyticsService:
|
||||
return AnalyticsService(repository=repo)
|
||||
return AnalyticsService(
|
||||
repository=repo,
|
||||
cache=cache,
|
||||
ttl_seconds=settings.analytics_cache_ttl_seconds,
|
||||
backoff_ms=settings.analytics_cache_backoff_ms,
|
||||
)
|
||||
|
||||
|
||||
def get_contact_service(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,160 @@
|
|||
"""Redis cache utilities and availability tracking."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Awaitable, Callable, Optional
|
||||
|
||||
import redis.asyncio as redis
|
||||
from redis.asyncio.client import Redis
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RedisCacheManager:
|
||||
"""Manages lifecycle and availability of the Redis cache client."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._client: Redis | None = None
|
||||
self._available: bool = False
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
@property
|
||||
def is_enabled(self) -> bool:
|
||||
return settings.redis_enabled
|
||||
|
||||
@property
|
||||
def is_available(self) -> bool:
|
||||
return self._available and self._client is not None
|
||||
|
||||
def get_client(self) -> Redis | None:
|
||||
if not self.is_enabled:
|
||||
return None
|
||||
if self.is_available:
|
||||
return self._client
|
||||
return None
|
||||
|
||||
async def startup(self) -> None:
|
||||
if not self.is_enabled:
|
||||
return
|
||||
async with self._lock:
|
||||
if self._client is not None:
|
||||
return
|
||||
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
|
||||
await self._refresh_availability()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
async with self._lock:
|
||||
if self._client is not None:
|
||||
await self._client.close()
|
||||
self._client = None
|
||||
self._available = False
|
||||
|
||||
async def reconnect(self) -> None:
|
||||
if not self.is_enabled:
|
||||
return
|
||||
async with self._lock:
|
||||
if self._client is None:
|
||||
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
|
||||
await self._refresh_availability()
|
||||
|
||||
async def _refresh_availability(self) -> None:
|
||||
if self._client is None:
|
||||
self._available = False
|
||||
return
|
||||
try:
|
||||
await self._client.ping()
|
||||
except RedisError as exc: # pragma: no cover - logging only
|
||||
self._available = False
|
||||
logger.warning("Redis ping failed: %s", exc)
|
||||
else:
|
||||
self._available = True
|
||||
|
||||
def mark_unavailable(self) -> None:
|
||||
self._available = False
|
||||
|
||||
def mark_available(self) -> None:
|
||||
if self._client is not None:
|
||||
self._available = True
|
||||
|
||||
|
||||
cache_manager = RedisCacheManager()
|
||||
|
||||
|
||||
async def init_cache() -> None:
|
||||
"""Initialize Redis cache connection if enabled."""
|
||||
await cache_manager.startup()
|
||||
|
||||
|
||||
async def shutdown_cache() -> None:
|
||||
"""Close Redis cache connection."""
|
||||
await cache_manager.shutdown()
|
||||
|
||||
|
||||
def get_cache_client() -> Optional[Redis]:
|
||||
"""Expose the active Redis client for dependency injection."""
|
||||
return cache_manager.get_client()
|
||||
|
||||
|
||||
async def read_json(client: Redis, key: str) -> Any | None:
|
||||
"""Read and decode JSON payload from Redis."""
|
||||
try:
|
||||
raw = await client.get(key)
|
||||
except RedisError as exc: # pragma: no cover - network errors
|
||||
cache_manager.mark_unavailable()
|
||||
logger.debug("Redis GET failed for %s: %s", key, exc)
|
||||
return None
|
||||
if raw is None:
|
||||
return None
|
||||
cache_manager.mark_available()
|
||||
try:
|
||||
return json.loads(raw.decode("utf-8"))
|
||||
except (UnicodeDecodeError, json.JSONDecodeError) as exc: # pragma: no cover - malformed payloads
|
||||
logger.warning("Discarding malformed cache entry %s: %s", key, exc)
|
||||
return None
|
||||
|
||||
|
||||
async def write_json(client: Redis, key: str, value: Any, ttl_seconds: int, backoff_ms: int) -> None:
|
||||
"""Serialize data to JSON and store it with TTL using retry/backoff."""
|
||||
payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
|
||||
|
||||
async def _operation() -> Any:
|
||||
return await client.set(name=key, value=payload, ex=ttl_seconds)
|
||||
|
||||
await _run_with_retry(_operation, backoff_ms)
|
||||
|
||||
|
||||
async def delete_keys(client: Redis, keys: list[str], backoff_ms: int) -> None:
|
||||
"""Delete cache keys with retry/backoff semantics."""
|
||||
if not keys:
|
||||
return
|
||||
|
||||
async def _operation() -> Any:
|
||||
return await client.delete(*keys)
|
||||
|
||||
await _run_with_retry(_operation, backoff_ms)
|
||||
|
||||
|
||||
async def _run_with_retry(operation: Callable[[], Awaitable[Any]], max_sleep_ms: int) -> None:
|
||||
try:
|
||||
await operation()
|
||||
cache_manager.mark_available()
|
||||
return
|
||||
except RedisError as exc: # pragma: no cover - network errors
|
||||
cache_manager.mark_unavailable()
|
||||
logger.debug("Redis cache operation failed: %s", exc)
|
||||
if max_sleep_ms <= 0:
|
||||
return
|
||||
sleep_seconds = min(max_sleep_ms / 1000, 0.1)
|
||||
await asyncio.sleep(sleep_seconds)
|
||||
await cache_manager.reconnect()
|
||||
try:
|
||||
await operation()
|
||||
cache_manager.mark_available()
|
||||
except RedisError as exc: # pragma: no cover - repeated network errors
|
||||
cache_manager.mark_unavailable()
|
||||
logger.warning("Redis cache operation failed after retry: %s", exc)
|
||||
|
|
@ -20,6 +20,14 @@ class Settings(BaseSettings):
|
|||
jwt_algorithm: str = "HS256"
|
||||
access_token_expire_minutes: int = 30
|
||||
refresh_token_expire_days: int = 7
|
||||
redis_enabled: bool = Field(default=False, description="Toggle Redis-backed cache usage")
|
||||
redis_url: str = Field(default="redis://localhost:6379/0", description="Redis connection URL")
|
||||
analytics_cache_ttl_seconds: int = Field(default=120, ge=1, description="TTL for cached analytics responses")
|
||||
analytics_cache_backoff_ms: int = Field(
|
||||
default=200,
|
||||
ge=0,
|
||||
description="Maximum backoff (ms) for retrying cache writes/invalidation",
|
||||
)
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
"""Application middleware components."""
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
"""Middleware that logs cache availability transitions."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from starlette.types import ASGIApp, Receive, Scope, Send
|
||||
|
||||
from app.core.cache import cache_manager
|
||||
from app.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CacheAvailabilityMiddleware:
|
||||
"""Logs when Redis cache becomes unavailable or recovers."""
|
||||
|
||||
def __init__(self, app: ASGIApp) -> None:
|
||||
self.app = app
|
||||
self._last_state: bool | None = None
|
||||
|
||||
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
|
||||
if scope["type"] == "http" and settings.redis_enabled:
|
||||
self._log_transition()
|
||||
await self.app(scope, receive, send)
|
||||
|
||||
def _log_transition(self) -> None:
|
||||
available = cache_manager.is_available
|
||||
if self._last_state is None:
|
||||
self._last_state = available
|
||||
if not available:
|
||||
logger.warning("Redis cache unavailable, serving responses without cache")
|
||||
return
|
||||
if available == self._last_state:
|
||||
return
|
||||
if available:
|
||||
logger.info("Redis cache connectivity restored; caching re-enabled")
|
||||
else:
|
||||
logger.warning("Redis cache unavailable, serving responses without cache")
|
||||
self._last_state = available
|
||||
18
app/main.py
18
app/main.py
|
|
@ -1,14 +1,30 @@
|
|||
"""FastAPI application factory."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
||||
from app.api.routes import api_router
|
||||
from app.core.cache import init_cache, shutdown_cache
|
||||
from app.core.config import settings
|
||||
from app.core.middleware.cache_monitor import CacheAvailabilityMiddleware
|
||||
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
"""Build FastAPI application instance."""
|
||||
application = FastAPI(title=settings.project_name, version=settings.version)
|
||||
@asynccontextmanager
|
||||
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
|
||||
await init_cache()
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
await shutdown_cache()
|
||||
|
||||
application = FastAPI(title=settings.project_name, version=settings.version, lifespan=lifespan)
|
||||
application.include_router(api_router)
|
||||
application.add_middleware(CacheAvailabilityMiddleware)
|
||||
return application
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,11 +1,16 @@
|
|||
"""Analytics-related business logic."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from decimal import Decimal
|
||||
from typing import Iterable
|
||||
from decimal import Decimal, InvalidOperation
|
||||
from typing import Any, Iterable
|
||||
|
||||
from redis.asyncio.client import Redis
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
from app.core.cache import cache_manager, delete_keys, read_json, write_json
|
||||
from app.models.deal import DealStage, DealStatus
|
||||
from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup
|
||||
|
||||
|
|
@ -53,13 +58,33 @@ class StageBreakdown:
|
|||
conversion_to_next: float | None
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SUMMARY_CACHE_PREFIX = "analytics:summary"
|
||||
_FUNNEL_CACHE_PREFIX = "analytics:funnel"
|
||||
|
||||
|
||||
class AnalyticsService:
|
||||
"""Provides aggregated analytics for deals."""
|
||||
|
||||
def __init__(self, repository: AnalyticsRepository) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
repository: AnalyticsRepository,
|
||||
cache: Redis | None = None,
|
||||
*,
|
||||
ttl_seconds: int = 0,
|
||||
backoff_ms: int = 0,
|
||||
) -> None:
|
||||
self._repository = repository
|
||||
self._cache = cache
|
||||
self._ttl_seconds = ttl_seconds
|
||||
self._backoff_ms = backoff_ms
|
||||
|
||||
async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary:
|
||||
cached = await self._fetch_cached_summary(organization_id, days)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
status_rollup = await self._repository.fetch_status_rollup(organization_id)
|
||||
status_map = {item.status: item for item in status_rollup}
|
||||
|
||||
|
|
@ -87,7 +112,7 @@ class AnalyticsService:
|
|||
window_threshold = _threshold_from_days(days)
|
||||
new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold)
|
||||
|
||||
return DealSummary(
|
||||
summary = DealSummary(
|
||||
by_status=summaries,
|
||||
won=WonStatistics(
|
||||
count=won_count,
|
||||
|
|
@ -98,7 +123,14 @@ class AnalyticsService:
|
|||
total_deals=total_deals,
|
||||
)
|
||||
|
||||
await self._store_summary_cache(organization_id, days, summary)
|
||||
return summary
|
||||
|
||||
async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]:
|
||||
cached = await self._fetch_cached_funnel(organization_id)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
rollup = await self._repository.fetch_stage_status_rollup(organization_id)
|
||||
stage_map = _build_stage_map(rollup)
|
||||
|
||||
|
|
@ -121,8 +153,44 @@ class AnalyticsService:
|
|||
conversion_to_next=conversion,
|
||||
)
|
||||
)
|
||||
await self._store_funnel_cache(organization_id, breakdowns)
|
||||
return breakdowns
|
||||
|
||||
def _is_cache_enabled(self) -> bool:
|
||||
return self._cache is not None and self._ttl_seconds > 0
|
||||
|
||||
async def _fetch_cached_summary(self, organization_id: int, days: int) -> DealSummary | None:
|
||||
if not self._is_cache_enabled() or self._cache is None:
|
||||
return None
|
||||
key = _summary_cache_key(organization_id, days)
|
||||
payload = await read_json(self._cache, key)
|
||||
if payload is None:
|
||||
return None
|
||||
return _deserialize_summary(payload)
|
||||
|
||||
async def _store_summary_cache(self, organization_id: int, days: int, summary: DealSummary) -> None:
|
||||
if not self._is_cache_enabled() or self._cache is None:
|
||||
return
|
||||
key = _summary_cache_key(organization_id, days)
|
||||
payload = _serialize_summary(summary)
|
||||
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
|
||||
|
||||
async def _fetch_cached_funnel(self, organization_id: int) -> list[StageBreakdown] | None:
|
||||
if not self._is_cache_enabled() or self._cache is None:
|
||||
return None
|
||||
key = _funnel_cache_key(organization_id)
|
||||
payload = await read_json(self._cache, key)
|
||||
if payload is None:
|
||||
return None
|
||||
return _deserialize_funnel(payload)
|
||||
|
||||
async def _store_funnel_cache(self, organization_id: int, breakdowns: list[StageBreakdown]) -> None:
|
||||
if not self._is_cache_enabled() or self._cache is None:
|
||||
return
|
||||
key = _funnel_cache_key(organization_id)
|
||||
payload = _serialize_funnel(breakdowns)
|
||||
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
|
||||
|
||||
|
||||
def _threshold_from_days(days: int) -> datetime:
|
||||
return datetime.now(timezone.utc) - timedelta(days=days)
|
||||
|
|
@ -137,3 +205,137 @@ def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dic
|
|||
stage_map.setdefault(item.stage, {status: 0 for status in DealStatus})
|
||||
stage_map[item.stage][item.status] = item.deal_count
|
||||
return stage_map
|
||||
|
||||
|
||||
def _summary_cache_key(organization_id: int, days: int) -> str:
|
||||
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:{days}"
|
||||
|
||||
|
||||
def summary_cache_pattern(organization_id: int) -> str:
|
||||
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:*"
|
||||
|
||||
|
||||
def _funnel_cache_key(organization_id: int) -> str:
|
||||
return f"{_FUNNEL_CACHE_PREFIX}:{organization_id}"
|
||||
|
||||
|
||||
def funnel_cache_key(organization_id: int) -> str:
|
||||
return _funnel_cache_key(organization_id)
|
||||
|
||||
|
||||
def _serialize_summary(summary: DealSummary) -> dict[str, Any]:
|
||||
return {
|
||||
"by_status": [
|
||||
{
|
||||
"status": item.status.value,
|
||||
"count": item.count,
|
||||
"amount_sum": str(item.amount_sum),
|
||||
}
|
||||
for item in summary.by_status
|
||||
],
|
||||
"won": {
|
||||
"count": summary.won.count,
|
||||
"amount_sum": str(summary.won.amount_sum),
|
||||
"average_amount": str(summary.won.average_amount),
|
||||
},
|
||||
"new_deals": {
|
||||
"days": summary.new_deals.days,
|
||||
"count": summary.new_deals.count,
|
||||
},
|
||||
"total_deals": summary.total_deals,
|
||||
}
|
||||
|
||||
|
||||
def _deserialize_summary(payload: Any) -> DealSummary | None:
|
||||
try:
|
||||
by_status_payload = payload["by_status"]
|
||||
won_payload = payload["won"]
|
||||
new_deals_payload = payload["new_deals"]
|
||||
total_deals = int(payload["total_deals"])
|
||||
except (KeyError, TypeError, ValueError):
|
||||
return None
|
||||
|
||||
summaries: list[StatusSummary] = []
|
||||
try:
|
||||
for item in by_status_payload:
|
||||
summaries.append(
|
||||
StatusSummary(
|
||||
status=DealStatus(item["status"]),
|
||||
count=int(item["count"]),
|
||||
amount_sum=Decimal(item["amount_sum"]),
|
||||
)
|
||||
)
|
||||
won = WonStatistics(
|
||||
count=int(won_payload["count"]),
|
||||
amount_sum=Decimal(won_payload["amount_sum"]),
|
||||
average_amount=Decimal(won_payload["average_amount"]),
|
||||
)
|
||||
new_deals = NewDealsWindow(
|
||||
days=int(new_deals_payload["days"]),
|
||||
count=int(new_deals_payload["count"]),
|
||||
)
|
||||
except (KeyError, TypeError, ValueError, InvalidOperation):
|
||||
return None
|
||||
|
||||
return DealSummary(by_status=summaries, won=won, new_deals=new_deals, total_deals=total_deals)
|
||||
|
||||
|
||||
def _serialize_funnel(breakdowns: list[StageBreakdown]) -> list[dict[str, Any]]:
|
||||
serialized: list[dict[str, Any]] = []
|
||||
for item in breakdowns:
|
||||
serialized.append(
|
||||
{
|
||||
"stage": item.stage.value,
|
||||
"total": item.total,
|
||||
"by_status": {status.value: count for status, count in item.by_status.items()},
|
||||
"conversion_to_next": item.conversion_to_next,
|
||||
}
|
||||
)
|
||||
return serialized
|
||||
|
||||
|
||||
def _deserialize_funnel(payload: Any) -> list[StageBreakdown] | None:
|
||||
if not isinstance(payload, list):
|
||||
return None
|
||||
breakdowns: list[StageBreakdown] = []
|
||||
try:
|
||||
for item in payload:
|
||||
by_status_payload = item["by_status"]
|
||||
by_status = {DealStatus(key): int(value) for key, value in by_status_payload.items()}
|
||||
breakdowns.append(
|
||||
StageBreakdown(
|
||||
stage=DealStage(item["stage"]),
|
||||
total=int(item["total"]),
|
||||
by_status=by_status,
|
||||
conversion_to_next=float(item["conversion_to_next"]) if item["conversion_to_next"] is not None else None,
|
||||
)
|
||||
)
|
||||
except (KeyError, TypeError, ValueError):
|
||||
return None
|
||||
return breakdowns
|
||||
|
||||
|
||||
async def invalidate_analytics_cache(cache: Redis | None, organization_id: int, backoff_ms: int) -> None:
|
||||
"""Remove cached analytics payloads for the organization."""
|
||||
|
||||
if cache is None:
|
||||
return
|
||||
|
||||
summary_pattern = summary_cache_pattern(organization_id)
|
||||
keys: list[str] = [funnel_cache_key(organization_id)]
|
||||
try:
|
||||
async for raw_key in cache.scan_iter(match=summary_pattern):
|
||||
if isinstance(raw_key, bytes):
|
||||
keys.append(raw_key.decode("utf-8"))
|
||||
else:
|
||||
keys.append(str(raw_key))
|
||||
except RedisError as exc: # pragma: no cover - network errors
|
||||
cache_manager.mark_unavailable()
|
||||
logger.warning(
|
||||
"Failed to enumerate summary cache keys for organization %s: %s",
|
||||
organization_id,
|
||||
exc,
|
||||
)
|
||||
return
|
||||
|
||||
await delete_keys(cache, keys, backoff_ms)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ from collections.abc import Iterable
|
|||
from dataclasses import dataclass
|
||||
from decimal import Decimal
|
||||
|
||||
from redis.asyncio.client import Redis
|
||||
from sqlalchemy import func, select
|
||||
|
||||
from app.models.activity import Activity, ActivityType
|
||||
|
|
@ -12,6 +13,7 @@ from app.models.contact import Contact
|
|||
from app.models.deal import Deal, DealCreate, DealStage, DealStatus
|
||||
from app.models.organization_member import OrganizationRole
|
||||
from app.repositories.deal_repo import DealRepository
|
||||
from app.services.analytics_service import invalidate_analytics_cache
|
||||
from app.services.organization_service import OrganizationContext
|
||||
|
||||
|
||||
|
|
@ -61,13 +63,23 @@ class DealUpdateData:
|
|||
class DealService:
|
||||
"""Encapsulates deal workflows and validations."""
|
||||
|
||||
def __init__(self, repository: DealRepository) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
repository: DealRepository,
|
||||
cache: Redis | None = None,
|
||||
*,
|
||||
cache_backoff_ms: int = 0,
|
||||
) -> None:
|
||||
self._repository = repository
|
||||
self._cache = cache
|
||||
self._cache_backoff_ms = cache_backoff_ms
|
||||
|
||||
async def create_deal(self, data: DealCreate, *, context: OrganizationContext) -> Deal:
|
||||
self._ensure_same_organization(data.organization_id, context)
|
||||
await self._ensure_contact_in_organization(data.contact_id, context.organization_id)
|
||||
return await self._repository.create(data=data, role=context.role, user_id=context.user_id)
|
||||
deal = await self._repository.create(data=data, role=context.role, user_id=context.user_id)
|
||||
await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms)
|
||||
return deal
|
||||
|
||||
async def update_deal(
|
||||
self,
|
||||
|
|
@ -111,6 +123,7 @@ class DealService:
|
|||
author_id=context.user_id,
|
||||
activities=[activity for activity in [stage_activity, status_activity] if activity],
|
||||
)
|
||||
await invalidate_analytics_cache(self._cache, context.organization_id, self._cache_backoff_ms)
|
||||
return updated
|
||||
|
||||
async def ensure_contact_can_be_deleted(self, contact_id: int) -> None:
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ dependencies = [
|
|||
"pyjwt>=2.9.0",
|
||||
"pydantic-settings>=2.12.0",
|
||||
"sqlalchemy>=2.0.44",
|
||||
"redis>=5.2.0",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
|
|
|
|||
|
|
@ -8,10 +8,11 @@ import pytest_asyncio
|
|||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from app.api.deps import get_db_session
|
||||
from app.api.deps import get_cache_backend, get_db_session
|
||||
from app.core.security import password_hasher
|
||||
from app.main import create_app
|
||||
from app.models import Base
|
||||
from tests.utils.fake_redis import InMemoryRedis
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
|
@ -41,6 +42,7 @@ async def session_factory() -> AsyncGenerator[async_sessionmaker[AsyncSession],
|
|||
@pytest_asyncio.fixture()
|
||||
async def client(
|
||||
session_factory: async_sessionmaker[AsyncSession],
|
||||
cache_stub: InMemoryRedis,
|
||||
) -> AsyncGenerator[AsyncClient, None]:
|
||||
app = create_app()
|
||||
|
||||
|
|
@ -54,6 +56,12 @@ async def client(
|
|||
raise
|
||||
|
||||
app.dependency_overrides[get_db_session] = _get_session_override
|
||||
app.dependency_overrides[get_cache_backend] = lambda: cache_stub
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://testserver") as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def cache_stub() -> InMemoryRedis:
|
||||
return InMemoryRedis()
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ class AnalyticsScenario:
|
|||
user_id: int
|
||||
user_email: str
|
||||
token: str
|
||||
in_progress_deal_id: int
|
||||
|
||||
|
||||
async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSession]) -> AnalyticsScenario:
|
||||
|
|
@ -102,6 +103,7 @@ async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSe
|
|||
user_id=user.id,
|
||||
user_email=user.email,
|
||||
token=token,
|
||||
in_progress_deal_id=next(deal.id for deal in deals if deal.status is DealStatus.IN_PROGRESS),
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -163,4 +165,37 @@ async def test_deals_funnel_returns_breakdown(
|
|||
qualification = next(item for item in payload["stages"] if item["stage"] == DealStage.QUALIFICATION.value)
|
||||
assert qualification["total"] == 1
|
||||
proposal = next(item for item in payload["stages"] if item["stage"] == DealStage.PROPOSAL.value)
|
||||
assert proposal["conversion_to_next"] == 100.0
|
||||
assert proposal["conversion_to_next"] == 100.0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deal_update_invalidates_cached_summary(
|
||||
session_factory: async_sessionmaker[AsyncSession],
|
||||
client: AsyncClient,
|
||||
cache_stub,
|
||||
) -> None:
|
||||
scenario = await prepare_analytics_scenario(session_factory)
|
||||
headers = _headers(scenario.token, scenario.organization_id)
|
||||
|
||||
first = await client.get(
|
||||
"/api/v1/analytics/deals/summary?days=30",
|
||||
headers=headers,
|
||||
)
|
||||
assert first.status_code == 200
|
||||
keys = [key async for key in cache_stub.scan_iter("analytics:summary:*")]
|
||||
assert keys, "cache should contain warmed summary"
|
||||
|
||||
patch_response = await client.patch(
|
||||
f"/api/v1/deals/{scenario.in_progress_deal_id}",
|
||||
json={"status": DealStatus.WON.value, "stage": DealStage.CLOSED.value},
|
||||
headers=headers,
|
||||
)
|
||||
assert patch_response.status_code == 200
|
||||
|
||||
refreshed = await client.get(
|
||||
"/api/v1/analytics/deals/summary?days=30",
|
||||
headers=headers,
|
||||
)
|
||||
assert refreshed.status_code == 200
|
||||
payload = refreshed.json()
|
||||
assert payload["won"]["count"] == 2
|
||||
|
|
@ -17,7 +17,8 @@ from app.models.organization import Organization
|
|||
from app.models.organization_member import OrganizationMember, OrganizationRole
|
||||
from app.models.user import User
|
||||
from app.repositories.analytics_repo import AnalyticsRepository
|
||||
from app.services.analytics_service import AnalyticsService
|
||||
from app.services.analytics_service import AnalyticsService, invalidate_analytics_cache
|
||||
from tests.utils.fake_redis import InMemoryRedis
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
|
|
@ -149,4 +150,85 @@ async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession
|
|||
assert proposal.conversion_to_next == 200.0
|
||||
|
||||
last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED)
|
||||
assert last_stage.conversion_to_next is None
|
||||
assert last_stage.conversion_to_next is None
|
||||
|
||||
|
||||
class _ExplodingRepository(AnalyticsRepository):
|
||||
async def fetch_status_rollup(self, organization_id: int): # type: ignore[override]
|
||||
raise AssertionError("cache not used for status rollup")
|
||||
|
||||
async def count_new_deals_since(self, organization_id: int, threshold): # type: ignore[override]
|
||||
raise AssertionError("cache not used for new deal count")
|
||||
|
||||
async def fetch_stage_status_rollup(self, organization_id: int): # type: ignore[override]
|
||||
raise AssertionError("cache not used for funnel rollup")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_summary_reads_from_cache_when_available(session: AsyncSession) -> None:
|
||||
org_id, _, _ = await _seed_data(session)
|
||||
cache = InMemoryRedis()
|
||||
service = AnalyticsService(
|
||||
repository=AnalyticsRepository(session),
|
||||
cache=cache,
|
||||
ttl_seconds=60,
|
||||
backoff_ms=0,
|
||||
)
|
||||
|
||||
await service.get_deal_summary(org_id, days=30)
|
||||
service._repository = _ExplodingRepository(session)
|
||||
|
||||
cached = await service.get_deal_summary(org_id, days=30)
|
||||
assert cached.total_deals == 6
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalidation_refreshes_cached_summary(session: AsyncSession) -> None:
|
||||
org_id, _, contact_id = await _seed_data(session)
|
||||
cache = InMemoryRedis()
|
||||
service = AnalyticsService(
|
||||
repository=AnalyticsRepository(session),
|
||||
cache=cache,
|
||||
ttl_seconds=60,
|
||||
backoff_ms=0,
|
||||
)
|
||||
|
||||
await service.get_deal_summary(org_id, days=30)
|
||||
|
||||
deal = Deal(
|
||||
organization_id=org_id,
|
||||
contact_id=contact_id,
|
||||
owner_id=1,
|
||||
title="New",
|
||||
amount=Decimal("50"),
|
||||
status=DealStatus.NEW,
|
||||
stage=DealStage.QUALIFICATION,
|
||||
created_at=datetime.now(timezone.utc),
|
||||
)
|
||||
session.add(deal)
|
||||
await session.commit()
|
||||
|
||||
cached = await service.get_deal_summary(org_id, days=30)
|
||||
assert cached.total_deals == 6
|
||||
|
||||
await invalidate_analytics_cache(cache, org_id, backoff_ms=0)
|
||||
refreshed = await service.get_deal_summary(org_id, days=30)
|
||||
assert refreshed.total_deals == 7
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_funnel_reads_from_cache_when_available(session: AsyncSession) -> None:
|
||||
org_id, _, _ = await _seed_data(session)
|
||||
cache = InMemoryRedis()
|
||||
service = AnalyticsService(
|
||||
repository=AnalyticsRepository(session),
|
||||
cache=cache,
|
||||
ttl_seconds=60,
|
||||
backoff_ms=0,
|
||||
)
|
||||
|
||||
await service.get_deal_funnel(org_id)
|
||||
service._repository = _ExplodingRepository(session)
|
||||
|
||||
cached = await service.get_deal_funnel(org_id)
|
||||
assert len(cached) == 4
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
"""Simple in-memory Redis replacement for tests."""
|
||||
from __future__ import annotations
|
||||
|
||||
import fnmatch
|
||||
import time
|
||||
from collections.abc import AsyncIterator
|
||||
|
||||
|
||||
class InMemoryRedis:
|
||||
"""Subset of redis.asyncio.Redis API backed by an in-memory dict."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._store: dict[str, bytes] = {}
|
||||
self._expirations: dict[str, float] = {}
|
||||
|
||||
async def ping(self) -> bool: # pragma: no cover - compatibility shim
|
||||
return True
|
||||
|
||||
async def get(self, name: str) -> bytes | None:
|
||||
self._purge_if_expired(name)
|
||||
return self._store.get(name)
|
||||
|
||||
async def set(self, name: str, value: bytes, ex: int | None = None) -> None:
|
||||
self._store[name] = value
|
||||
if ex is not None:
|
||||
self._expirations[name] = time.monotonic() + ex
|
||||
elif name in self._expirations:
|
||||
self._expirations.pop(name, None)
|
||||
|
||||
async def delete(self, *names: str) -> int:
|
||||
removed = 0
|
||||
for name in names:
|
||||
if name in self._store:
|
||||
del self._store[name]
|
||||
removed += 1
|
||||
self._expirations.pop(name, None)
|
||||
return removed
|
||||
|
||||
async def close(self) -> None: # pragma: no cover - interface completeness
|
||||
self._store.clear()
|
||||
self._expirations.clear()
|
||||
|
||||
async def scan_iter(self, match: str) -> AsyncIterator[str]:
|
||||
pattern = match or "*"
|
||||
for key in list(self._store.keys()):
|
||||
self._purge_if_expired(key)
|
||||
for key in self._store.keys():
|
||||
if fnmatch.fnmatch(key, pattern):
|
||||
yield key
|
||||
|
||||
def _purge_if_expired(self, name: str) -> None:
|
||||
expires_at = self._expirations.get(name)
|
||||
if expires_at is None:
|
||||
return
|
||||
if expires_at <= time.monotonic():
|
||||
self._store.pop(name, None)
|
||||
self._expirations.pop(name, None)
|
||||
11
uv.lock
11
uv.lock
|
|
@ -692,6 +692,15 @@ wheels = [
|
|||
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis"
|
||||
version = "7.1.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/43/c8/983d5c6579a411d8a99bc5823cc5712768859b5ce2c8afe1a65b37832c81/redis-7.1.0.tar.gz", hash = "sha256:b1cc3cfa5a2cb9c2ab3ba700864fb0ad75617b41f01352ce5779dabf6d5f9c3c", size = 4796669, upload-time = "2025-11-19T15:54:39.961Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/89/f0/8956f8a86b20d7bb9d6ac0187cf4cd54d8065bc9a1a09eb8011d4d326596/redis-7.1.0-py3-none-any.whl", hash = "sha256:23c52b208f92b56103e17c5d06bdc1a6c2c0b3106583985a76a18f83b265de2b", size = 354159, upload-time = "2025-11-19T15:54:38.064Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rich"
|
||||
version = "14.2.0"
|
||||
|
|
@ -850,6 +859,7 @@ dependencies = [
|
|||
{ name = "passlib", extra = ["bcrypt"] },
|
||||
{ name = "pydantic-settings" },
|
||||
{ name = "pyjwt" },
|
||||
{ name = "redis" },
|
||||
{ name = "sqlalchemy" },
|
||||
]
|
||||
|
||||
|
|
@ -871,6 +881,7 @@ requires-dist = [
|
|||
{ name = "passlib", extras = ["bcrypt"], specifier = ">=1.7.4" },
|
||||
{ name = "pydantic-settings", specifier = ">=2.12.0" },
|
||||
{ name = "pyjwt", specifier = ">=2.9.0" },
|
||||
{ name = "redis", specifier = ">=5.2.0" },
|
||||
{ name = "sqlalchemy", specifier = ">=2.0.44" },
|
||||
]
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue