"""Analytics-related business logic.""" from __future__ import annotations import logging from dataclasses import dataclass from datetime import datetime, timedelta, timezone from decimal import Decimal, InvalidOperation from typing import Any, Iterable from redis.asyncio.client import Redis from redis.exceptions import RedisError from app.core.cache import cache_manager, delete_keys, read_json, write_json from app.models.deal import DealStage, DealStatus from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup _STAGE_ORDER: list[DealStage] = [ DealStage.QUALIFICATION, DealStage.PROPOSAL, DealStage.NEGOTIATION, DealStage.CLOSED, ] @dataclass(slots=True, frozen=True) class StatusSummary: status: DealStatus count: int amount_sum: Decimal @dataclass(slots=True, frozen=True) class WonStatistics: count: int amount_sum: Decimal average_amount: Decimal @dataclass(slots=True, frozen=True) class NewDealsWindow: days: int count: int @dataclass(slots=True, frozen=True) class DealSummary: by_status: list[StatusSummary] won: WonStatistics new_deals: NewDealsWindow total_deals: int @dataclass(slots=True, frozen=True) class StageBreakdown: stage: DealStage total: int by_status: dict[DealStatus, int] conversion_to_next: float | None logger = logging.getLogger(__name__) _SUMMARY_CACHE_PREFIX = "analytics:summary" _FUNNEL_CACHE_PREFIX = "analytics:funnel" class AnalyticsService: """Provides aggregated analytics for deals.""" def __init__( self, repository: AnalyticsRepository, cache: Redis | None = None, *, ttl_seconds: int = 0, backoff_ms: int = 0, ) -> None: self._repository = repository self._cache = cache self._ttl_seconds = ttl_seconds self._backoff_ms = backoff_ms async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary: cached = await self._fetch_cached_summary(organization_id, days) if cached is not None: return cached status_rollup = await self._repository.fetch_status_rollup(organization_id) status_map = {item.status: item for item in status_rollup} summaries: list[StatusSummary] = [] total_deals = 0 won_amount_sum = Decimal("0") won_amount_count = 0 won_count = 0 for status in DealStatus: row = status_map.get(status) count = row.deal_count if row else 0 amount_sum = row.amount_sum if row else Decimal("0") summaries.append(StatusSummary(status=status, count=count, amount_sum=amount_sum)) total_deals += count if status is DealStatus.WON and row: won_amount_sum = row.amount_sum won_amount_count = row.amount_count won_count = row.deal_count won_average = ( (won_amount_sum / won_amount_count) if won_amount_count > 0 else Decimal("0") ) window_threshold = _threshold_from_days(days) new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold) summary = DealSummary( by_status=summaries, won=WonStatistics( count=won_count, amount_sum=won_amount_sum, average_amount=won_average, ), new_deals=NewDealsWindow(days=days, count=new_deals), total_deals=total_deals, ) await self._store_summary_cache(organization_id, days, summary) return summary async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]: cached = await self._fetch_cached_funnel(organization_id) if cached is not None: return cached rollup = await self._repository.fetch_stage_status_rollup(organization_id) stage_map = _build_stage_map(rollup) breakdowns: list[StageBreakdown] = [] totals = {stage: sum(by_status.values()) for stage, by_status in stage_map.items()} for index, stage in enumerate(_STAGE_ORDER): by_status = stage_map.get(stage, {status: 0 for status in DealStatus}) total = totals.get(stage, 0) conversion = None if index < len(_STAGE_ORDER) - 1: next_stage = _STAGE_ORDER[index + 1] next_total = totals.get(next_stage, 0) if total > 0: conversion = float(round((next_total / total) * 100, 2)) breakdowns.append( StageBreakdown( stage=stage, total=total, by_status=by_status, conversion_to_next=conversion, ) ) await self._store_funnel_cache(organization_id, breakdowns) return breakdowns def _is_cache_enabled(self) -> bool: return self._cache is not None and self._ttl_seconds > 0 async def _fetch_cached_summary(self, organization_id: int, days: int) -> DealSummary | None: if not self._is_cache_enabled() or self._cache is None: return None key = _summary_cache_key(organization_id, days) payload = await read_json(self._cache, key) if payload is None: return None return _deserialize_summary(payload) async def _store_summary_cache(self, organization_id: int, days: int, summary: DealSummary) -> None: if not self._is_cache_enabled() or self._cache is None: return key = _summary_cache_key(organization_id, days) payload = _serialize_summary(summary) await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms) async def _fetch_cached_funnel(self, organization_id: int) -> list[StageBreakdown] | None: if not self._is_cache_enabled() or self._cache is None: return None key = _funnel_cache_key(organization_id) payload = await read_json(self._cache, key) if payload is None: return None return _deserialize_funnel(payload) async def _store_funnel_cache(self, organization_id: int, breakdowns: list[StageBreakdown]) -> None: if not self._is_cache_enabled() or self._cache is None: return key = _funnel_cache_key(organization_id) payload = _serialize_funnel(breakdowns) await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms) def _threshold_from_days(days: int) -> datetime: return datetime.now(timezone.utc) - timedelta(days=days) def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dict[DealStatus, int]]: stage_map: dict[DealStage, dict[DealStatus, int]] = { stage: {status: 0 for status in DealStatus} for stage in _STAGE_ORDER } for item in rollup: stage_map.setdefault(item.stage, {status: 0 for status in DealStatus}) stage_map[item.stage][item.status] = item.deal_count return stage_map def _summary_cache_key(organization_id: int, days: int) -> str: return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:{days}" def summary_cache_pattern(organization_id: int) -> str: return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:*" def _funnel_cache_key(organization_id: int) -> str: return f"{_FUNNEL_CACHE_PREFIX}:{organization_id}" def funnel_cache_key(organization_id: int) -> str: return _funnel_cache_key(organization_id) def _serialize_summary(summary: DealSummary) -> dict[str, Any]: return { "by_status": [ { "status": item.status.value, "count": item.count, "amount_sum": str(item.amount_sum), } for item in summary.by_status ], "won": { "count": summary.won.count, "amount_sum": str(summary.won.amount_sum), "average_amount": str(summary.won.average_amount), }, "new_deals": { "days": summary.new_deals.days, "count": summary.new_deals.count, }, "total_deals": summary.total_deals, } def _deserialize_summary(payload: Any) -> DealSummary | None: try: by_status_payload = payload["by_status"] won_payload = payload["won"] new_deals_payload = payload["new_deals"] total_deals = int(payload["total_deals"]) except (KeyError, TypeError, ValueError): return None summaries: list[StatusSummary] = [] try: for item in by_status_payload: summaries.append( StatusSummary( status=DealStatus(item["status"]), count=int(item["count"]), amount_sum=Decimal(item["amount_sum"]), ) ) won = WonStatistics( count=int(won_payload["count"]), amount_sum=Decimal(won_payload["amount_sum"]), average_amount=Decimal(won_payload["average_amount"]), ) new_deals = NewDealsWindow( days=int(new_deals_payload["days"]), count=int(new_deals_payload["count"]), ) except (KeyError, TypeError, ValueError, InvalidOperation): return None return DealSummary(by_status=summaries, won=won, new_deals=new_deals, total_deals=total_deals) def _serialize_funnel(breakdowns: list[StageBreakdown]) -> list[dict[str, Any]]: serialized: list[dict[str, Any]] = [] for item in breakdowns: serialized.append( { "stage": item.stage.value, "total": item.total, "by_status": {status.value: count for status, count in item.by_status.items()}, "conversion_to_next": item.conversion_to_next, } ) return serialized def _deserialize_funnel(payload: Any) -> list[StageBreakdown] | None: if not isinstance(payload, list): return None breakdowns: list[StageBreakdown] = [] try: for item in payload: by_status_payload = item["by_status"] by_status = {DealStatus(key): int(value) for key, value in by_status_payload.items()} breakdowns.append( StageBreakdown( stage=DealStage(item["stage"]), total=int(item["total"]), by_status=by_status, conversion_to_next=float(item["conversion_to_next"]) if item["conversion_to_next"] is not None else None, ) ) except (KeyError, TypeError, ValueError): return None return breakdowns async def invalidate_analytics_cache(cache: Redis | None, organization_id: int, backoff_ms: int) -> None: """Remove cached analytics payloads for the organization.""" if cache is None: return summary_pattern = summary_cache_pattern(organization_id) keys: list[str] = [funnel_cache_key(organization_id)] try: async for raw_key in cache.scan_iter(match=summary_pattern): if isinstance(raw_key, bytes): keys.append(raw_key.decode("utf-8")) else: keys.append(str(raw_key)) except RedisError as exc: # pragma: no cover - network errors cache_manager.mark_unavailable() logger.warning( "Failed to enumerate summary cache keys for organization %s: %s", organization_id, exc, ) return await delete_keys(cache, keys, backoff_ms)