diff --git a/app/repositories/analytics_repo.py b/app/repositories/analytics_repo.py new file mode 100644 index 0000000..c7b51d2 --- /dev/null +++ b/app/repositories/analytics_repo.py @@ -0,0 +1,93 @@ +"""Analytics-specific data access helpers.""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from decimal import Decimal +from typing import Any + +from sqlalchemy import Select, func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.deal import Deal, DealStage, DealStatus + + +@dataclass(slots=True, frozen=True) +class StatusRollup: + status: DealStatus + deal_count: int + amount_sum: Decimal + amount_count: int + + +@dataclass(slots=True, frozen=True) +class StageStatusRollup: + stage: DealStage + status: DealStatus + deal_count: int + + +class AnalyticsRepository: + """Provides aggregate queries for analytics endpoints.""" + + def __init__(self, session: AsyncSession) -> None: + self._session = session + + @property + def session(self) -> AsyncSession: + return self._session + + async def fetch_status_rollup(self, organization_id: int) -> list[StatusRollup]: + stmt: Select[tuple[Any, ...]] = ( + select( + Deal.status, + func.count(Deal.id), + func.coalesce(func.sum(Deal.amount), 0), + func.count(Deal.amount), + ) + .where(Deal.organization_id == organization_id) + .group_by(Deal.status) + ) + result = await self._session.execute(stmt) + rows = result.all() + rollup: list[StatusRollup] = [] + for status, count, amount_sum, amount_count in rows: + rollup.append( + StatusRollup( + status=status, + deal_count=int(count or 0), + amount_sum=_to_decimal(amount_sum), + amount_count=int(amount_count or 0), + ) + ) + return rollup + + async def count_new_deals_since(self, organization_id: int, threshold: datetime) -> int: + stmt = select(func.count(Deal.id)).where( + Deal.organization_id == organization_id, + Deal.created_at >= threshold, + ) + result = await self._session.execute(stmt) + value = result.scalar_one() + return int(value or 0) + + async def fetch_stage_status_rollup(self, organization_id: int) -> list[StageStatusRollup]: + stmt: Select[tuple[Any, ...]] = ( + select(Deal.stage, Deal.status, func.count(Deal.id)) + .where(Deal.organization_id == organization_id) + .group_by(Deal.stage, Deal.status) + ) + result = await self._session.execute(stmt) + rows = result.all() + return [ + StageStatusRollup(stage=stage, status=status, deal_count=int(count or 0)) + for stage, status, count in rows + ] + + +def _to_decimal(value: Any) -> Decimal: + if isinstance(value, Decimal): + return value + if value is None: + return Decimal("0") + return Decimal(str(value))