test_task_crm/app/repositories/analytics_repo.py

94 lines
2.8 KiB
Python

"""Analytics-specific data access helpers."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.deal import Deal, DealStage, DealStatus
@dataclass(slots=True, frozen=True)
class StatusRollup:
status: DealStatus
deal_count: int
amount_sum: Decimal
amount_count: int
@dataclass(slots=True, frozen=True)
class StageStatusRollup:
stage: DealStage
status: DealStatus
deal_count: int
class AnalyticsRepository:
"""Provides aggregate queries for analytics endpoints."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
@property
def session(self) -> AsyncSession:
return self._session
async def fetch_status_rollup(self, organization_id: int) -> list[StatusRollup]:
stmt: Select[tuple[Any, ...]] = (
select(
Deal.status,
func.count(Deal.id),
func.coalesce(func.sum(Deal.amount), 0),
func.count(Deal.amount),
)
.where(Deal.organization_id == organization_id)
.group_by(Deal.status)
)
result = await self._session.execute(stmt)
rows = result.all()
rollup: list[StatusRollup] = []
for status, count, amount_sum, amount_count in rows:
rollup.append(
StatusRollup(
status=status,
deal_count=int(count or 0),
amount_sum=_to_decimal(amount_sum),
amount_count=int(amount_count or 0),
)
)
return rollup
async def count_new_deals_since(self, organization_id: int, threshold: datetime) -> int:
stmt = select(func.count(Deal.id)).where(
Deal.organization_id == organization_id,
Deal.created_at >= threshold,
)
result = await self._session.execute(stmt)
value = result.scalar_one()
return int(value or 0)
async def fetch_stage_status_rollup(self, organization_id: int) -> list[StageStatusRollup]:
stmt: Select[tuple[Any, ...]] = (
select(Deal.stage, Deal.status, func.count(Deal.id))
.where(Deal.organization_id == organization_id)
.group_by(Deal.stage, Deal.status)
)
result = await self._session.execute(stmt)
rows = result.all()
return [
StageStatusRollup(stage=stage, status=status, deal_count=int(count or 0))
for stage, status, count in rows
]
def _to_decimal(value: Any) -> Decimal:
if isinstance(value, Decimal):
return value
if value is None:
return Decimal("0")
return Decimal(str(value))