95 lines
2.8 KiB
Python
95 lines
2.8 KiB
Python
"""Analytics-specific data access helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
|
|
from sqlalchemy import Select, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.deal import Deal, DealStage, DealStatus
|
|
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class StatusRollup:
|
|
status: DealStatus
|
|
deal_count: int
|
|
amount_sum: Decimal
|
|
amount_count: int
|
|
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class StageStatusRollup:
|
|
stage: DealStage
|
|
status: DealStatus
|
|
deal_count: int
|
|
|
|
|
|
class AnalyticsRepository:
|
|
"""Provides aggregate queries for analytics endpoints."""
|
|
|
|
def __init__(self, session: AsyncSession) -> None:
|
|
self._session = session
|
|
|
|
@property
|
|
def session(self) -> AsyncSession:
|
|
return self._session
|
|
|
|
async def fetch_status_rollup(self, organization_id: int) -> list[StatusRollup]:
|
|
stmt: Select[tuple[Any, ...]] = (
|
|
select(
|
|
Deal.status,
|
|
func.count(Deal.id),
|
|
func.coalesce(func.sum(Deal.amount), 0),
|
|
func.count(Deal.amount),
|
|
)
|
|
.where(Deal.organization_id == organization_id)
|
|
.group_by(Deal.status)
|
|
)
|
|
result = await self._session.execute(stmt)
|
|
rows = result.all()
|
|
rollup: list[StatusRollup] = []
|
|
for status, count, amount_sum, amount_count in rows:
|
|
rollup.append(
|
|
StatusRollup(
|
|
status=status,
|
|
deal_count=int(count or 0),
|
|
amount_sum=_to_decimal(amount_sum),
|
|
amount_count=int(amount_count or 0),
|
|
),
|
|
)
|
|
return rollup
|
|
|
|
async def count_new_deals_since(self, organization_id: int, threshold: datetime) -> int:
|
|
stmt = select(func.count(Deal.id)).where(
|
|
Deal.organization_id == organization_id,
|
|
Deal.created_at >= threshold,
|
|
)
|
|
result = await self._session.execute(stmt)
|
|
value = result.scalar_one()
|
|
return int(value or 0)
|
|
|
|
async def fetch_stage_status_rollup(self, organization_id: int) -> list[StageStatusRollup]:
|
|
stmt: Select[tuple[Any, ...]] = (
|
|
select(Deal.stage, Deal.status, func.count(Deal.id))
|
|
.where(Deal.organization_id == organization_id)
|
|
.group_by(Deal.stage, Deal.status)
|
|
)
|
|
result = await self._session.execute(stmt)
|
|
rows = result.all()
|
|
return [
|
|
StageStatusRollup(stage=stage, status=status, deal_count=int(count or 0))
|
|
for stage, status, count in rows
|
|
]
|
|
|
|
|
|
def _to_decimal(value: Any) -> Decimal:
|
|
if isinstance(value, Decimal):
|
|
return value
|
|
if value is None:
|
|
return Decimal("0")
|
|
return Decimal(str(value))
|