test_task_crm/app/services/analytics_service.py

342 lines
11 KiB
Python

"""Analytics-related business logic."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from decimal import Decimal, InvalidOperation
from typing import Any, Iterable
from redis.asyncio.client import Redis
from redis.exceptions import RedisError
from app.core.cache import cache_manager, delete_keys, read_json, write_json
from app.models.deal import DealStage, DealStatus
from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup
_STAGE_ORDER: list[DealStage] = [
DealStage.QUALIFICATION,
DealStage.PROPOSAL,
DealStage.NEGOTIATION,
DealStage.CLOSED,
]
@dataclass(slots=True, frozen=True)
class StatusSummary:
status: DealStatus
count: int
amount_sum: Decimal
@dataclass(slots=True, frozen=True)
class WonStatistics:
count: int
amount_sum: Decimal
average_amount: Decimal
@dataclass(slots=True, frozen=True)
class NewDealsWindow:
days: int
count: int
@dataclass(slots=True, frozen=True)
class DealSummary:
by_status: list[StatusSummary]
won: WonStatistics
new_deals: NewDealsWindow
total_deals: int
@dataclass(slots=True, frozen=True)
class StageBreakdown:
stage: DealStage
total: int
by_status: dict[DealStatus, int]
conversion_to_next: float | None
logger = logging.getLogger(__name__)
_SUMMARY_CACHE_PREFIX = "analytics:summary"
_FUNNEL_CACHE_PREFIX = "analytics:funnel"
class AnalyticsService:
"""Provides aggregated analytics for deals."""
def __init__(
self,
repository: AnalyticsRepository,
cache: Redis | None = None,
*,
ttl_seconds: int = 0,
backoff_ms: int = 0,
) -> None:
self._repository = repository
self._cache = cache
self._ttl_seconds = ttl_seconds
self._backoff_ms = backoff_ms
async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary:
cached = await self._fetch_cached_summary(organization_id, days)
if cached is not None:
return cached
status_rollup = await self._repository.fetch_status_rollup(organization_id)
status_map = {item.status: item for item in status_rollup}
summaries: list[StatusSummary] = []
total_deals = 0
won_amount_sum = Decimal("0")
won_amount_count = 0
won_count = 0
for status in DealStatus:
row = status_map.get(status)
count = row.deal_count if row else 0
amount_sum = row.amount_sum if row else Decimal("0")
summaries.append(StatusSummary(status=status, count=count, amount_sum=amount_sum))
total_deals += count
if status is DealStatus.WON and row:
won_amount_sum = row.amount_sum
won_amount_count = row.amount_count
won_count = row.deal_count
won_average = (
(won_amount_sum / won_amount_count) if won_amount_count > 0 else Decimal("0")
)
window_threshold = _threshold_from_days(days)
new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold)
summary = DealSummary(
by_status=summaries,
won=WonStatistics(
count=won_count,
amount_sum=won_amount_sum,
average_amount=won_average,
),
new_deals=NewDealsWindow(days=days, count=new_deals),
total_deals=total_deals,
)
await self._store_summary_cache(organization_id, days, summary)
return summary
async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]:
cached = await self._fetch_cached_funnel(organization_id)
if cached is not None:
return cached
rollup = await self._repository.fetch_stage_status_rollup(organization_id)
stage_map = _build_stage_map(rollup)
breakdowns: list[StageBreakdown] = []
totals = {stage: sum(by_status.values()) for stage, by_status in stage_map.items()}
for index, stage in enumerate(_STAGE_ORDER):
by_status = stage_map.get(stage, {status: 0 for status in DealStatus})
total = totals.get(stage, 0)
conversion = None
if index < len(_STAGE_ORDER) - 1:
next_stage = _STAGE_ORDER[index + 1]
next_total = totals.get(next_stage, 0)
if total > 0:
conversion = float(round((next_total / total) * 100, 2))
breakdowns.append(
StageBreakdown(
stage=stage,
total=total,
by_status=by_status,
conversion_to_next=conversion,
)
)
await self._store_funnel_cache(organization_id, breakdowns)
return breakdowns
def _is_cache_enabled(self) -> bool:
return self._cache is not None and self._ttl_seconds > 0
async def _fetch_cached_summary(self, organization_id: int, days: int) -> DealSummary | None:
if not self._is_cache_enabled() or self._cache is None:
return None
key = _summary_cache_key(organization_id, days)
payload = await read_json(self._cache, key)
if payload is None:
return None
return _deserialize_summary(payload)
async def _store_summary_cache(self, organization_id: int, days: int, summary: DealSummary) -> None:
if not self._is_cache_enabled() or self._cache is None:
return
key = _summary_cache_key(organization_id, days)
payload = _serialize_summary(summary)
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
async def _fetch_cached_funnel(self, organization_id: int) -> list[StageBreakdown] | None:
if not self._is_cache_enabled() or self._cache is None:
return None
key = _funnel_cache_key(organization_id)
payload = await read_json(self._cache, key)
if payload is None:
return None
return _deserialize_funnel(payload)
async def _store_funnel_cache(self, organization_id: int, breakdowns: list[StageBreakdown]) -> None:
if not self._is_cache_enabled() or self._cache is None:
return
key = _funnel_cache_key(organization_id)
payload = _serialize_funnel(breakdowns)
await write_json(self._cache, key, payload, self._ttl_seconds, self._backoff_ms)
def _threshold_from_days(days: int) -> datetime:
return datetime.now(timezone.utc) - timedelta(days=days)
def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dict[DealStatus, int]]:
stage_map: dict[DealStage, dict[DealStatus, int]] = {
stage: {status: 0 for status in DealStatus}
for stage in _STAGE_ORDER
}
for item in rollup:
stage_map.setdefault(item.stage, {status: 0 for status in DealStatus})
stage_map[item.stage][item.status] = item.deal_count
return stage_map
def _summary_cache_key(organization_id: int, days: int) -> str:
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:{days}"
def summary_cache_pattern(organization_id: int) -> str:
return f"{_SUMMARY_CACHE_PREFIX}:{organization_id}:*"
def _funnel_cache_key(organization_id: int) -> str:
return f"{_FUNNEL_CACHE_PREFIX}:{organization_id}"
def funnel_cache_key(organization_id: int) -> str:
return _funnel_cache_key(organization_id)
def _serialize_summary(summary: DealSummary) -> dict[str, Any]:
return {
"by_status": [
{
"status": item.status.value,
"count": item.count,
"amount_sum": str(item.amount_sum),
}
for item in summary.by_status
],
"won": {
"count": summary.won.count,
"amount_sum": str(summary.won.amount_sum),
"average_amount": str(summary.won.average_amount),
},
"new_deals": {
"days": summary.new_deals.days,
"count": summary.new_deals.count,
},
"total_deals": summary.total_deals,
}
def _deserialize_summary(payload: Any) -> DealSummary | None:
try:
by_status_payload = payload["by_status"]
won_payload = payload["won"]
new_deals_payload = payload["new_deals"]
total_deals = int(payload["total_deals"])
except (KeyError, TypeError, ValueError):
return None
summaries: list[StatusSummary] = []
try:
for item in by_status_payload:
summaries.append(
StatusSummary(
status=DealStatus(item["status"]),
count=int(item["count"]),
amount_sum=Decimal(item["amount_sum"]),
)
)
won = WonStatistics(
count=int(won_payload["count"]),
amount_sum=Decimal(won_payload["amount_sum"]),
average_amount=Decimal(won_payload["average_amount"]),
)
new_deals = NewDealsWindow(
days=int(new_deals_payload["days"]),
count=int(new_deals_payload["count"]),
)
except (KeyError, TypeError, ValueError, InvalidOperation):
return None
return DealSummary(by_status=summaries, won=won, new_deals=new_deals, total_deals=total_deals)
def _serialize_funnel(breakdowns: list[StageBreakdown]) -> list[dict[str, Any]]:
serialized: list[dict[str, Any]] = []
for item in breakdowns:
serialized.append(
{
"stage": item.stage.value,
"total": item.total,
"by_status": {status.value: count for status, count in item.by_status.items()},
"conversion_to_next": item.conversion_to_next,
}
)
return serialized
def _deserialize_funnel(payload: Any) -> list[StageBreakdown] | None:
if not isinstance(payload, list):
return None
breakdowns: list[StageBreakdown] = []
try:
for item in payload:
by_status_payload = item["by_status"]
by_status = {DealStatus(key): int(value) for key, value in by_status_payload.items()}
breakdowns.append(
StageBreakdown(
stage=DealStage(item["stage"]),
total=int(item["total"]),
by_status=by_status,
conversion_to_next=float(item["conversion_to_next"]) if item["conversion_to_next"] is not None else None,
)
)
except (KeyError, TypeError, ValueError):
return None
return breakdowns
async def invalidate_analytics_cache(cache: Redis | None, organization_id: int, backoff_ms: int) -> None:
"""Remove cached analytics payloads for the organization."""
if cache is None:
return
summary_pattern = summary_cache_pattern(organization_id)
keys: list[str] = [funnel_cache_key(organization_id)]
try:
async for raw_key in cache.scan_iter(match=summary_pattern):
if isinstance(raw_key, bytes):
keys.append(raw_key.decode("utf-8"))
else:
keys.append(str(raw_key))
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.warning(
"Failed to enumerate summary cache keys for organization %s: %s",
organization_id,
exc,
)
return
await delete_keys(cache, keys, backoff_ms)