feat: implement AnalyticsRepository with methods for fetching status rollups and counting new deals

This commit is contained in:
k1nq 2025-11-29 09:14:23 +05:00
parent 1c206323a2
commit 65a8307a2e
1 changed files with 93 additions and 0 deletions

View File

@ -0,0 +1,93 @@
"""Analytics-specific data access helpers."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.deal import Deal, DealStage, DealStatus
@dataclass(slots=True, frozen=True)
class StatusRollup:
status: DealStatus
deal_count: int
amount_sum: Decimal
amount_count: int
@dataclass(slots=True, frozen=True)
class StageStatusRollup:
stage: DealStage
status: DealStatus
deal_count: int
class AnalyticsRepository:
"""Provides aggregate queries for analytics endpoints."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
@property
def session(self) -> AsyncSession:
return self._session
async def fetch_status_rollup(self, organization_id: int) -> list[StatusRollup]:
stmt: Select[tuple[Any, ...]] = (
select(
Deal.status,
func.count(Deal.id),
func.coalesce(func.sum(Deal.amount), 0),
func.count(Deal.amount),
)
.where(Deal.organization_id == organization_id)
.group_by(Deal.status)
)
result = await self._session.execute(stmt)
rows = result.all()
rollup: list[StatusRollup] = []
for status, count, amount_sum, amount_count in rows:
rollup.append(
StatusRollup(
status=status,
deal_count=int(count or 0),
amount_sum=_to_decimal(amount_sum),
amount_count=int(amount_count or 0),
)
)
return rollup
async def count_new_deals_since(self, organization_id: int, threshold: datetime) -> int:
stmt = select(func.count(Deal.id)).where(
Deal.organization_id == organization_id,
Deal.created_at >= threshold,
)
result = await self._session.execute(stmt)
value = result.scalar_one()
return int(value or 0)
async def fetch_stage_status_rollup(self, organization_id: int) -> list[StageStatusRollup]:
stmt: Select[tuple[Any, ...]] = (
select(Deal.stage, Deal.status, func.count(Deal.id))
.where(Deal.organization_id == organization_id)
.group_by(Deal.stage, Deal.status)
)
result = await self._session.execute(stmt)
rows = result.all()
return [
StageStatusRollup(stage=stage, status=status, deal_count=int(count or 0))
for stage, status, count in rows
]
def _to_decimal(value: Any) -> Decimal:
if isinstance(value, Decimal):
return value
if value is None:
return Decimal("0")
return Decimal(str(value))