"""Analytics-specific data access helpers.""" from __future__ import annotations from dataclasses import dataclass from datetime import datetime from decimal import Decimal from typing import Any from sqlalchemy import Select, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.deal import Deal, DealStage, DealStatus @dataclass(slots=True, frozen=True) class StatusRollup: status: DealStatus deal_count: int amount_sum: Decimal amount_count: int @dataclass(slots=True, frozen=True) class StageStatusRollup: stage: DealStage status: DealStatus deal_count: int class AnalyticsRepository: """Provides aggregate queries for analytics endpoints.""" def __init__(self, session: AsyncSession) -> None: self._session = session @property def session(self) -> AsyncSession: return self._session async def fetch_status_rollup(self, organization_id: int) -> list[StatusRollup]: stmt: Select[tuple[Any, ...]] = ( select( Deal.status, func.count(Deal.id), func.coalesce(func.sum(Deal.amount), 0), func.count(Deal.amount), ) .where(Deal.organization_id == organization_id) .group_by(Deal.status) ) result = await self._session.execute(stmt) rows = result.all() rollup: list[StatusRollup] = [] for status, count, amount_sum, amount_count in rows: rollup.append( StatusRollup( status=status, deal_count=int(count or 0), amount_sum=_to_decimal(amount_sum), amount_count=int(amount_count or 0), ), ) return rollup async def count_new_deals_since(self, organization_id: int, threshold: datetime) -> int: stmt = select(func.count(Deal.id)).where( Deal.organization_id == organization_id, Deal.created_at >= threshold, ) result = await self._session.execute(stmt) value = result.scalar_one() return int(value or 0) async def fetch_stage_status_rollup(self, organization_id: int) -> list[StageStatusRollup]: stmt: Select[tuple[Any, ...]] = ( select(Deal.stage, Deal.status, func.count(Deal.id)) .where(Deal.organization_id == organization_id) .group_by(Deal.stage, Deal.status) ) result = await self._session.execute(stmt) rows = result.all() return [ StageStatusRollup(stage=stage, status=status, deal_count=int(count or 0)) for stage, status, count in rows ] def _to_decimal(value: Any) -> Decimal: if isinstance(value, Decimal): return value if value is None: return Decimal("0") return Decimal(str(value))