From 65a8307a2eb8792da36f5cf6b692724b968ae083 Mon Sep 17 00:00:00 2001 From: k1nq Date: Sat, 29 Nov 2025 09:14:23 +0500 Subject: [PATCH 1/4] feat: implement AnalyticsRepository with methods for fetching status rollups and counting new deals --- app/repositories/analytics_repo.py | 93 ++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 app/repositories/analytics_repo.py diff --git a/app/repositories/analytics_repo.py b/app/repositories/analytics_repo.py new file mode 100644 index 0000000..c7b51d2 --- /dev/null +++ b/app/repositories/analytics_repo.py @@ -0,0 +1,93 @@ +"""Analytics-specific data access helpers.""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from decimal import Decimal +from typing import Any + +from sqlalchemy import Select, func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.deal import Deal, DealStage, DealStatus + + +@dataclass(slots=True, frozen=True) +class StatusRollup: + status: DealStatus + deal_count: int + amount_sum: Decimal + amount_count: int + + +@dataclass(slots=True, frozen=True) +class StageStatusRollup: + stage: DealStage + status: DealStatus + deal_count: int + + +class AnalyticsRepository: + """Provides aggregate queries for analytics endpoints.""" + + def __init__(self, session: AsyncSession) -> None: + self._session = session + + @property + def session(self) -> AsyncSession: + return self._session + + async def fetch_status_rollup(self, organization_id: int) -> list[StatusRollup]: + stmt: Select[tuple[Any, ...]] = ( + select( + Deal.status, + func.count(Deal.id), + func.coalesce(func.sum(Deal.amount), 0), + func.count(Deal.amount), + ) + .where(Deal.organization_id == organization_id) + .group_by(Deal.status) + ) + result = await self._session.execute(stmt) + rows = result.all() + rollup: list[StatusRollup] = [] + for status, count, amount_sum, amount_count in rows: + rollup.append( + StatusRollup( + status=status, + deal_count=int(count or 0), + amount_sum=_to_decimal(amount_sum), + amount_count=int(amount_count or 0), + ) + ) + return rollup + + async def count_new_deals_since(self, organization_id: int, threshold: datetime) -> int: + stmt = select(func.count(Deal.id)).where( + Deal.organization_id == organization_id, + Deal.created_at >= threshold, + ) + result = await self._session.execute(stmt) + value = result.scalar_one() + return int(value or 0) + + async def fetch_stage_status_rollup(self, organization_id: int) -> list[StageStatusRollup]: + stmt: Select[tuple[Any, ...]] = ( + select(Deal.stage, Deal.status, func.count(Deal.id)) + .where(Deal.organization_id == organization_id) + .group_by(Deal.stage, Deal.status) + ) + result = await self._session.execute(stmt) + rows = result.all() + return [ + StageStatusRollup(stage=stage, status=status, deal_count=int(count or 0)) + for stage, status, count in rows + ] + + +def _to_decimal(value: Any) -> Decimal: + if isinstance(value, Decimal): + return value + if value is None: + return Decimal("0") + return Decimal(str(value)) -- 2.39.5 From 22442bfd2e9fa03c249f7eaa99c9602652de9ec4 Mon Sep 17 00:00:00 2001 From: k1nq Date: Sat, 29 Nov 2025 09:14:29 +0500 Subject: [PATCH 2/4] feat: add AnalyticsService and repository dependencies for deal analytics --- app/api/deps.py | 12 +++ app/services/analytics_service.py | 139 ++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 app/services/analytics_service.py diff --git a/app/api/deps.py b/app/api/deps.py index b0f573a..d55d4a0 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -11,11 +11,13 @@ from app.core.database import get_session from app.core.security import jwt_service, password_hasher from app.models.user import User from app.repositories.activity_repo import ActivityRepository +from app.repositories.analytics_repo import AnalyticsRepository from app.repositories.contact_repo import ContactRepository from app.repositories.deal_repo import DealRepository from app.repositories.org_repo import OrganizationRepository from app.repositories.task_repo import TaskRepository from app.repositories.user_repo import UserRepository +from app.services.analytics_service import AnalyticsService from app.services.auth_service import AuthService from app.services.activity_service import ActivityService from app.services.contact_service import ContactService @@ -61,6 +63,10 @@ def get_activity_repository(session: AsyncSession = Depends(get_db_session)) -> return ActivityRepository(session=session) +def get_analytics_repository(session: AsyncSession = Depends(get_db_session)) -> AnalyticsRepository: + return AnalyticsRepository(session=session) + + def get_deal_service(repo: DealRepository = Depends(get_deal_repository)) -> DealService: return DealService(repository=repo) @@ -87,6 +93,12 @@ def get_activity_service( return ActivityService(repository=repo) +def get_analytics_service( + repo: AnalyticsRepository = Depends(get_analytics_repository), +) -> AnalyticsService: + return AnalyticsService(repository=repo) + + def get_contact_service( repo: ContactRepository = Depends(get_contact_repository), ) -> ContactService: diff --git a/app/services/analytics_service.py b/app/services/analytics_service.py new file mode 100644 index 0000000..8fc9e46 --- /dev/null +++ b/app/services/analytics_service.py @@ -0,0 +1,139 @@ +"""Analytics-related business logic.""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from typing import Iterable + +from app.models.deal import DealStage, DealStatus +from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup + +_STAGE_ORDER: list[DealStage] = [ + DealStage.QUALIFICATION, + DealStage.PROPOSAL, + DealStage.NEGOTIATION, + DealStage.CLOSED, +] + + +@dataclass(slots=True, frozen=True) +class StatusSummary: + status: DealStatus + count: int + amount_sum: Decimal + + +@dataclass(slots=True, frozen=True) +class WonStatistics: + count: int + amount_sum: Decimal + average_amount: Decimal + + +@dataclass(slots=True, frozen=True) +class NewDealsWindow: + days: int + count: int + + +@dataclass(slots=True, frozen=True) +class DealSummary: + by_status: list[StatusSummary] + won: WonStatistics + new_deals: NewDealsWindow + total_deals: int + + +@dataclass(slots=True, frozen=True) +class StageBreakdown: + stage: DealStage + total: int + by_status: dict[DealStatus, int] + conversion_to_next: float | None + + +class AnalyticsService: + """Provides aggregated analytics for deals.""" + + def __init__(self, repository: AnalyticsRepository) -> None: + self._repository = repository + + async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary: + status_rollup = await self._repository.fetch_status_rollup(organization_id) + status_map = {item.status: item for item in status_rollup} + + summaries: list[StatusSummary] = [] + total_deals = 0 + won_amount_sum = Decimal("0") + won_amount_count = 0 + won_count = 0 + + for status in DealStatus: + row = status_map.get(status) + count = row.deal_count if row else 0 + amount_sum = row.amount_sum if row else Decimal("0") + summaries.append(StatusSummary(status=status, count=count, amount_sum=amount_sum)) + total_deals += count + if status is DealStatus.WON and row: + won_amount_sum = row.amount_sum + won_amount_count = row.amount_count + won_count = row.deal_count + + won_average = ( + (won_amount_sum / won_amount_count) if won_amount_count > 0 else Decimal("0") + ) + + window_threshold = _threshold_from_days(days) + new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold) + + return DealSummary( + by_status=summaries, + won=WonStatistics( + count=won_count, + amount_sum=won_amount_sum, + average_amount=won_average, + ), + new_deals=NewDealsWindow(days=days, count=new_deals), + total_deals=total_deals, + ) + + async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]: + rollup = await self._repository.fetch_stage_status_rollup(organization_id) + stage_map = _build_stage_map(rollup) + + breakdowns: list[StageBreakdown] = [] + totals = {stage: sum(by_status.values()) for stage, by_status in stage_map.items()} + for index, stage in enumerate(_STAGE_ORDER): + by_status = stage_map.get(stage, {status: 0 for status in DealStatus}) + total = totals.get(stage, 0) + conversion = None + if index < len(_STAGE_ORDER) - 1: + next_stage = _STAGE_ORDER[index + 1] + next_total = totals.get(next_stage, 0) + if total > 0: + conversion = float(round((next_total / total) * 100, 2)) + breakdowns.append( + StageBreakdown( + stage=stage, + total=total, + by_status=by_status, + conversion_to_next=conversion, + ) + ) + return breakdowns + + +def _threshold_from_days(days: int) -> datetime: + return datetime.now(timezone.utc) - timedelta(days=days) + + +def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dict[DealStatus, int]]: + stage_map: dict[DealStage, dict[DealStatus, int]] = { + stage: {status: 0 for status in DealStatus} + for stage in _STAGE_ORDER + } + for item in rollup: + stage_map.setdefault(item.stage, {status: 0 for status in DealStatus}) + stage_map[item.stage][item.status] = item.deal_count + return stage_map -- 2.39.5 From 92bd3b6c00661cbd700ca0009586329ffb071c3c Mon Sep 17 00:00:00 2001 From: k1nq Date: Sat, 29 Nov 2025 09:14:35 +0500 Subject: [PATCH 3/4] feat: implement deal summary and funnel endpoints with response models --- app/api/v1/analytics.py | 93 ++++++++++++++++++++++++++++++++++------- 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/app/api/v1/analytics.py b/app/api/v1/analytics.py index 08d5383..26c9cd2 100644 --- a/app/api/v1/analytics.py +++ b/app/api/v1/analytics.py @@ -1,32 +1,95 @@ -"""Analytics API stubs (deal summary and funnel).""" +"""Analytics API endpoints for summaries and funnels.""" from __future__ import annotations -from fastapi import APIRouter, Depends, Query, status +from decimal import Decimal -from app.api.deps import get_organization_context +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel, ConfigDict, field_serializer + +from app.api.deps import get_analytics_service, get_organization_context +from app.models.deal import DealStage, DealStatus +from app.services.analytics_service import AnalyticsService, DealSummary, StageBreakdown from app.services.organization_service import OrganizationContext + +def _decimal_to_str(value: Decimal) -> str: + normalized = value.normalize() + return format(normalized, "f") + router = APIRouter(prefix="/analytics", tags=["analytics"]) -def _stub(endpoint: str) -> dict[str, str]: - return {"detail": f"{endpoint} is not implemented yet"} +class StatusSummaryModel(BaseModel): + model_config = ConfigDict(from_attributes=True) + + status: DealStatus + count: int + amount_sum: Decimal + + @field_serializer("amount_sum") + def serialize_amount_sum(self, value: Decimal) -> str: + return _decimal_to_str(value) -@router.get("/deals/summary", status_code=status.HTTP_501_NOT_IMPLEMENTED) +class WonStatisticsModel(BaseModel): + model_config = ConfigDict(from_attributes=True) + + count: int + amount_sum: Decimal + average_amount: Decimal + + @field_serializer("amount_sum", "average_amount") + def serialize_decimal_fields(self, value: Decimal) -> str: + return _decimal_to_str(value) + + +class NewDealsWindowModel(BaseModel): + model_config = ConfigDict(from_attributes=True) + + days: int + count: int + + +class DealSummaryResponse(BaseModel): + model_config = ConfigDict(from_attributes=True) + + by_status: list[StatusSummaryModel] + won: WonStatisticsModel + new_deals: NewDealsWindowModel + total_deals: int + + +class StageBreakdownModel(BaseModel): + model_config = ConfigDict(from_attributes=True) + + stage: DealStage + total: int + by_status: dict[DealStatus, int] + conversion_to_next: float | None + + +class DealFunnelResponse(BaseModel): + stages: list[StageBreakdownModel] + + +@router.get("/deals/summary", response_model=DealSummaryResponse) async def deals_summary( days: int = Query(30, ge=1, le=180), context: OrganizationContext = Depends(get_organization_context), -) -> dict[str, str]: - """Placeholder for aggregated deal statistics.""" - _ = (days, context) - return _stub("GET /analytics/deals/summary") + service: AnalyticsService = Depends(get_analytics_service), +) -> DealSummaryResponse: + """Return aggregated deal statistics for the current organization.""" + + summary: DealSummary = await service.get_deal_summary(context.organization_id, days=days) + return DealSummaryResponse.model_validate(summary) -@router.get("/deals/funnel", status_code=status.HTTP_501_NOT_IMPLEMENTED) +@router.get("/deals/funnel", response_model=DealFunnelResponse) async def deals_funnel( context: OrganizationContext = Depends(get_organization_context), -) -> dict[str, str]: - """Placeholder for funnel analytics.""" - _ = context - return _stub("GET /analytics/deals/funnel") + service: AnalyticsService = Depends(get_analytics_service), +) -> DealFunnelResponse: + """Return funnel breakdown by stages and statuses.""" + + breakdowns: list[StageBreakdown] = await service.get_deal_funnel(context.organization_id) + return DealFunnelResponse(stages=[StageBreakdownModel.model_validate(item) for item in breakdowns]) -- 2.39.5 From d9ef4b3a2be6a7d1ca4e00d86269fb851f8f0896 Mon Sep 17 00:00:00 2001 From: k1nq Date: Sat, 29 Nov 2025 09:14:38 +0500 Subject: [PATCH 4/4] feat: add API and unit tests for analytics endpoints and services --- tests/api/v1/test_analytics.py | 166 +++++++++++++++++++++++ tests/services/test_analytics_service.py | 152 +++++++++++++++++++++ 2 files changed, 318 insertions(+) create mode 100644 tests/api/v1/test_analytics.py create mode 100644 tests/services/test_analytics_service.py diff --git a/tests/api/v1/test_analytics.py b/tests/api/v1/test_analytics.py new file mode 100644 index 0000000..6656a7c --- /dev/null +++ b/tests/api/v1/test_analytics.py @@ -0,0 +1,166 @@ +"""API tests for analytics endpoints.""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from decimal import Decimal + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from app.core.security import jwt_service +from app.models.contact import Contact +from app.models.deal import Deal, DealStage, DealStatus +from app.models.organization import Organization +from app.models.organization_member import OrganizationMember, OrganizationRole +from app.models.user import User + + +@dataclass(slots=True) +class AnalyticsScenario: + organization_id: int + user_id: int + user_email: str + token: str + + +async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSession]) -> AnalyticsScenario: + async with session_factory() as session: + org = Organization(name="Analytics Org") + user = User(email="analytics@example.com", hashed_password="hashed", name="Analyst", is_active=True) + session.add_all([org, user]) + await session.flush() + + membership = OrganizationMember( + organization_id=org.id, + user_id=user.id, + role=OrganizationRole.OWNER, + ) + contact = Contact( + organization_id=org.id, + owner_id=user.id, + name="Client", + email="client@example.com", + ) + session.add_all([membership, contact]) + await session.flush() + + now = datetime.now(timezone.utc) + deals = [ + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Qual 1", + amount=Decimal("100"), + status=DealStatus.NEW, + stage=DealStage.QUALIFICATION, + created_at=now - timedelta(days=5), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Proposal", + amount=Decimal("200"), + status=DealStatus.IN_PROGRESS, + stage=DealStage.PROPOSAL, + created_at=now - timedelta(days=15), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Negotiation Won", + amount=Decimal("500"), + status=DealStatus.WON, + stage=DealStage.NEGOTIATION, + created_at=now - timedelta(days=2), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Closed Lost", + amount=Decimal("300"), + status=DealStatus.LOST, + stage=DealStage.CLOSED, + created_at=now - timedelta(days=40), + ), + ] + session.add_all(deals) + await session.commit() + + token = jwt_service.create_access_token( + subject=str(user.id), + expires_delta=timedelta(minutes=30), + claims={"email": user.email}, + ) + return AnalyticsScenario( + organization_id=org.id, + user_id=user.id, + user_email=user.email, + token=token, + ) + + +def _headers(token: str, organization_id: int) -> dict[str, str]: + return {"Authorization": f"Bearer {token}", "X-Organization-Id": str(organization_id)} + + +@pytest.mark.asyncio +async def test_deals_summary_endpoint_returns_metrics( + session_factory: async_sessionmaker[AsyncSession], client: AsyncClient +) -> None: + scenario = await prepare_analytics_scenario(session_factory) + + response = await client.get( + "/api/v1/analytics/deals/summary?days=30", + headers=_headers(scenario.token, scenario.organization_id), + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["total_deals"] == 4 + by_status = {entry["status"]: entry for entry in payload["by_status"]} + assert by_status[DealStatus.NEW.value]["count"] == 1 + assert by_status[DealStatus.WON.value]["amount_sum"] == "500" + assert payload["won"]["average_amount"] == "500" + assert payload["new_deals"]["count"] == 3 + + +@pytest.mark.asyncio +async def test_deals_summary_respects_days_filter( + session_factory: async_sessionmaker[AsyncSession], client: AsyncClient +) -> None: + scenario = await prepare_analytics_scenario(session_factory) + + response = await client.get( + "/api/v1/analytics/deals/summary?days=3", + headers=_headers(scenario.token, scenario.organization_id), + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["new_deals"]["count"] == 1 # только сделки моложе трёх дней + + +@pytest.mark.asyncio +async def test_deals_funnel_returns_breakdown( + session_factory: async_sessionmaker[AsyncSession], client: AsyncClient +) -> None: + scenario = await prepare_analytics_scenario(session_factory) + + response = await client.get( + "/api/v1/analytics/deals/funnel", + headers=_headers(scenario.token, scenario.organization_id), + ) + + assert response.status_code == 200 + payload = response.json() + assert len(payload["stages"]) == 4 + qualification = next(item for item in payload["stages"] if item["stage"] == DealStage.QUALIFICATION.value) + assert qualification["total"] == 1 + proposal = next(item for item in payload["stages"] if item["stage"] == DealStage.PROPOSAL.value) + assert proposal["conversion_to_next"] == 100.0 \ No newline at end of file diff --git a/tests/services/test_analytics_service.py b/tests/services/test_analytics_service.py new file mode 100644 index 0000000..d8514b9 --- /dev/null +++ b/tests/services/test_analytics_service.py @@ -0,0 +1,152 @@ +"""Unit tests for AnalyticsService.""" +from __future__ import annotations + +from collections.abc import AsyncGenerator +from datetime import datetime, timedelta, timezone +from decimal import Decimal + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.pool import StaticPool + +from app.models import Base +from app.models.contact import Contact +from app.models.deal import Deal, DealStage, DealStatus +from app.models.organization import Organization +from app.models.organization_member import OrganizationMember, OrganizationRole +from app.models.user import User +from app.repositories.analytics_repo import AnalyticsRepository +from app.services.analytics_service import AnalyticsService + + +@pytest_asyncio.fixture() +async def session() -> AsyncGenerator[AsyncSession, None]: + engine = create_async_engine( + "sqlite+aiosqlite:///:memory:", future=True, poolclass=StaticPool + ) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + Session = async_sessionmaker(engine, expire_on_commit=False) + async with Session() as session: + yield session + await engine.dispose() + + +async def _seed_data(session: AsyncSession) -> tuple[int, int, int]: + org = Organization(name="Analytics Org") + user = User(email="analytics@example.com", hashed_password="hashed", name="Analyst", is_active=True) + session.add_all([org, user]) + await session.flush() + + member = OrganizationMember(organization_id=org.id, user_id=user.id, role=OrganizationRole.OWNER) + contact = Contact(organization_id=org.id, owner_id=user.id, name="Client", email="client@example.com") + session.add_all([member, contact]) + await session.flush() + + now = datetime.now(timezone.utc) + deals = [ + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Qual 1", + amount=Decimal("100"), + status=DealStatus.NEW, + stage=DealStage.QUALIFICATION, + created_at=now - timedelta(days=5), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Qual 2", + amount=Decimal("150"), + status=DealStatus.NEW, + stage=DealStage.QUALIFICATION, + created_at=now - timedelta(days=3), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Proposal", + amount=Decimal("200"), + status=DealStatus.IN_PROGRESS, + stage=DealStage.PROPOSAL, + created_at=now - timedelta(days=15), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Negotiation Won", + amount=Decimal("500"), + status=DealStatus.WON, + stage=DealStage.NEGOTIATION, + created_at=now - timedelta(days=2), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Negotiation Won No Amount", + amount=None, + status=DealStatus.WON, + stage=DealStage.NEGOTIATION, + created_at=now - timedelta(days=1), + ), + Deal( + organization_id=org.id, + contact_id=contact.id, + owner_id=user.id, + title="Closed Lost", + amount=Decimal("300"), + status=DealStatus.LOST, + stage=DealStage.CLOSED, + created_at=now - timedelta(days=40), + ), + ] + session.add_all(deals) + await session.commit() + return org.id, user.id, contact.id + + +@pytest.mark.asyncio +async def test_deal_summary_returns_expected_metrics(session: AsyncSession) -> None: + org_id, _, _ = await _seed_data(session) + service = AnalyticsService(repository=AnalyticsRepository(session)) + + summary = await service.get_deal_summary(org_id, days=30) + + assert summary.total_deals == 6 + status_map = {item.status: item for item in summary.by_status} + assert status_map[DealStatus.NEW].count == 2 + assert Decimal(status_map[DealStatus.NEW].amount_sum) == Decimal("250") + assert status_map[DealStatus.WON].count == 2 + assert Decimal(summary.won.amount_sum) == Decimal("500") + assert Decimal(summary.won.average_amount) == Decimal("500") + assert summary.new_deals.count == 5 # все кроме старой закрытой сделки + assert summary.new_deals.days == 30 + + +@pytest.mark.asyncio +async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession) -> None: + org_id, _, _ = await _seed_data(session) + service = AnalyticsService(repository=AnalyticsRepository(session)) + + funnel = await service.get_deal_funnel(org_id) + + assert len(funnel) == 4 + qual = next(item for item in funnel if item.stage == DealStage.QUALIFICATION) + assert qual.total == 2 + assert qual.by_status[DealStatus.NEW] == 2 + assert qual.conversion_to_next == 50.0 + + proposal = next(item for item in funnel if item.stage == DealStage.PROPOSAL) + assert proposal.total == 1 + assert proposal.by_status[DealStatus.IN_PROGRESS] == 1 + assert proposal.conversion_to_next == 200.0 + + last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED) + assert last_stage.conversion_to_next is None \ No newline at end of file -- 2.39.5