Compare commits

..

No commits in common. "31d6a0552182af7d3bb48c0d9c96fe9920b6fe00" and "1c206323a22a00bc266b38091d047193273596bb" have entirely different histories.

6 changed files with 15 additions and 640 deletions

View File

@ -11,13 +11,11 @@ from app.core.database import get_session
from app.core.security import jwt_service, password_hasher from app.core.security import jwt_service, password_hasher
from app.models.user import User from app.models.user import User
from app.repositories.activity_repo import ActivityRepository from app.repositories.activity_repo import ActivityRepository
from app.repositories.analytics_repo import AnalyticsRepository
from app.repositories.contact_repo import ContactRepository from app.repositories.contact_repo import ContactRepository
from app.repositories.deal_repo import DealRepository from app.repositories.deal_repo import DealRepository
from app.repositories.org_repo import OrganizationRepository from app.repositories.org_repo import OrganizationRepository
from app.repositories.task_repo import TaskRepository from app.repositories.task_repo import TaskRepository
from app.repositories.user_repo import UserRepository from app.repositories.user_repo import UserRepository
from app.services.analytics_service import AnalyticsService
from app.services.auth_service import AuthService from app.services.auth_service import AuthService
from app.services.activity_service import ActivityService from app.services.activity_service import ActivityService
from app.services.contact_service import ContactService from app.services.contact_service import ContactService
@ -63,10 +61,6 @@ def get_activity_repository(session: AsyncSession = Depends(get_db_session)) ->
return ActivityRepository(session=session) return ActivityRepository(session=session)
def get_analytics_repository(session: AsyncSession = Depends(get_db_session)) -> AnalyticsRepository:
return AnalyticsRepository(session=session)
def get_deal_service(repo: DealRepository = Depends(get_deal_repository)) -> DealService: def get_deal_service(repo: DealRepository = Depends(get_deal_repository)) -> DealService:
return DealService(repository=repo) return DealService(repository=repo)
@ -93,12 +87,6 @@ def get_activity_service(
return ActivityService(repository=repo) return ActivityService(repository=repo)
def get_analytics_service(
repo: AnalyticsRepository = Depends(get_analytics_repository),
) -> AnalyticsService:
return AnalyticsService(repository=repo)
def get_contact_service( def get_contact_service(
repo: ContactRepository = Depends(get_contact_repository), repo: ContactRepository = Depends(get_contact_repository),
) -> ContactService: ) -> ContactService:

View File

@ -1,95 +1,32 @@
"""Analytics API endpoints for summaries and funnels.""" """Analytics API stubs (deal summary and funnel)."""
from __future__ import annotations from __future__ import annotations
from decimal import Decimal from fastapi import APIRouter, Depends, Query, status
from fastapi import APIRouter, Depends, Query from app.api.deps import get_organization_context
from pydantic import BaseModel, ConfigDict, field_serializer
from app.api.deps import get_analytics_service, get_organization_context
from app.models.deal import DealStage, DealStatus
from app.services.analytics_service import AnalyticsService, DealSummary, StageBreakdown
from app.services.organization_service import OrganizationContext from app.services.organization_service import OrganizationContext
def _decimal_to_str(value: Decimal) -> str:
normalized = value.normalize()
return format(normalized, "f")
router = APIRouter(prefix="/analytics", tags=["analytics"]) router = APIRouter(prefix="/analytics", tags=["analytics"])
class StatusSummaryModel(BaseModel): def _stub(endpoint: str) -> dict[str, str]:
model_config = ConfigDict(from_attributes=True) return {"detail": f"{endpoint} is not implemented yet"}
status: DealStatus
count: int
amount_sum: Decimal
@field_serializer("amount_sum")
def serialize_amount_sum(self, value: Decimal) -> str:
return _decimal_to_str(value)
class WonStatisticsModel(BaseModel): @router.get("/deals/summary", status_code=status.HTTP_501_NOT_IMPLEMENTED)
model_config = ConfigDict(from_attributes=True)
count: int
amount_sum: Decimal
average_amount: Decimal
@field_serializer("amount_sum", "average_amount")
def serialize_decimal_fields(self, value: Decimal) -> str:
return _decimal_to_str(value)
class NewDealsWindowModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
days: int
count: int
class DealSummaryResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
by_status: list[StatusSummaryModel]
won: WonStatisticsModel
new_deals: NewDealsWindowModel
total_deals: int
class StageBreakdownModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
stage: DealStage
total: int
by_status: dict[DealStatus, int]
conversion_to_next: float | None
class DealFunnelResponse(BaseModel):
stages: list[StageBreakdownModel]
@router.get("/deals/summary", response_model=DealSummaryResponse)
async def deals_summary( async def deals_summary(
days: int = Query(30, ge=1, le=180), days: int = Query(30, ge=1, le=180),
context: OrganizationContext = Depends(get_organization_context), context: OrganizationContext = Depends(get_organization_context),
service: AnalyticsService = Depends(get_analytics_service), ) -> dict[str, str]:
) -> DealSummaryResponse: """Placeholder for aggregated deal statistics."""
"""Return aggregated deal statistics for the current organization.""" _ = (days, context)
return _stub("GET /analytics/deals/summary")
summary: DealSummary = await service.get_deal_summary(context.organization_id, days=days)
return DealSummaryResponse.model_validate(summary)
@router.get("/deals/funnel", response_model=DealFunnelResponse) @router.get("/deals/funnel", status_code=status.HTTP_501_NOT_IMPLEMENTED)
async def deals_funnel( async def deals_funnel(
context: OrganizationContext = Depends(get_organization_context), context: OrganizationContext = Depends(get_organization_context),
service: AnalyticsService = Depends(get_analytics_service), ) -> dict[str, str]:
) -> DealFunnelResponse: """Placeholder for funnel analytics."""
"""Return funnel breakdown by stages and statuses.""" _ = context
return _stub("GET /analytics/deals/funnel")
breakdowns: list[StageBreakdown] = await service.get_deal_funnel(context.organization_id)
return DealFunnelResponse(stages=[StageBreakdownModel.model_validate(item) for item in breakdowns])

View File

@ -1,93 +0,0 @@
"""Analytics-specific data access helpers."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.deal import Deal, DealStage, DealStatus
@dataclass(slots=True, frozen=True)
class StatusRollup:
status: DealStatus
deal_count: int
amount_sum: Decimal
amount_count: int
@dataclass(slots=True, frozen=True)
class StageStatusRollup:
stage: DealStage
status: DealStatus
deal_count: int
class AnalyticsRepository:
"""Provides aggregate queries for analytics endpoints."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
@property
def session(self) -> AsyncSession:
return self._session
async def fetch_status_rollup(self, organization_id: int) -> list[StatusRollup]:
stmt: Select[tuple[Any, ...]] = (
select(
Deal.status,
func.count(Deal.id),
func.coalesce(func.sum(Deal.amount), 0),
func.count(Deal.amount),
)
.where(Deal.organization_id == organization_id)
.group_by(Deal.status)
)
result = await self._session.execute(stmt)
rows = result.all()
rollup: list[StatusRollup] = []
for status, count, amount_sum, amount_count in rows:
rollup.append(
StatusRollup(
status=status,
deal_count=int(count or 0),
amount_sum=_to_decimal(amount_sum),
amount_count=int(amount_count or 0),
)
)
return rollup
async def count_new_deals_since(self, organization_id: int, threshold: datetime) -> int:
stmt = select(func.count(Deal.id)).where(
Deal.organization_id == organization_id,
Deal.created_at >= threshold,
)
result = await self._session.execute(stmt)
value = result.scalar_one()
return int(value or 0)
async def fetch_stage_status_rollup(self, organization_id: int) -> list[StageStatusRollup]:
stmt: Select[tuple[Any, ...]] = (
select(Deal.stage, Deal.status, func.count(Deal.id))
.where(Deal.organization_id == organization_id)
.group_by(Deal.stage, Deal.status)
)
result = await self._session.execute(stmt)
rows = result.all()
return [
StageStatusRollup(stage=stage, status=status, deal_count=int(count or 0))
for stage, status, count in rows
]
def _to_decimal(value: Any) -> Decimal:
if isinstance(value, Decimal):
return value
if value is None:
return Decimal("0")
return Decimal(str(value))

View File

@ -1,139 +0,0 @@
"""Analytics-related business logic."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Iterable
from app.models.deal import DealStage, DealStatus
from app.repositories.analytics_repo import AnalyticsRepository, StageStatusRollup
_STAGE_ORDER: list[DealStage] = [
DealStage.QUALIFICATION,
DealStage.PROPOSAL,
DealStage.NEGOTIATION,
DealStage.CLOSED,
]
@dataclass(slots=True, frozen=True)
class StatusSummary:
status: DealStatus
count: int
amount_sum: Decimal
@dataclass(slots=True, frozen=True)
class WonStatistics:
count: int
amount_sum: Decimal
average_amount: Decimal
@dataclass(slots=True, frozen=True)
class NewDealsWindow:
days: int
count: int
@dataclass(slots=True, frozen=True)
class DealSummary:
by_status: list[StatusSummary]
won: WonStatistics
new_deals: NewDealsWindow
total_deals: int
@dataclass(slots=True, frozen=True)
class StageBreakdown:
stage: DealStage
total: int
by_status: dict[DealStatus, int]
conversion_to_next: float | None
class AnalyticsService:
"""Provides aggregated analytics for deals."""
def __init__(self, repository: AnalyticsRepository) -> None:
self._repository = repository
async def get_deal_summary(self, organization_id: int, *, days: int) -> DealSummary:
status_rollup = await self._repository.fetch_status_rollup(organization_id)
status_map = {item.status: item for item in status_rollup}
summaries: list[StatusSummary] = []
total_deals = 0
won_amount_sum = Decimal("0")
won_amount_count = 0
won_count = 0
for status in DealStatus:
row = status_map.get(status)
count = row.deal_count if row else 0
amount_sum = row.amount_sum if row else Decimal("0")
summaries.append(StatusSummary(status=status, count=count, amount_sum=amount_sum))
total_deals += count
if status is DealStatus.WON and row:
won_amount_sum = row.amount_sum
won_amount_count = row.amount_count
won_count = row.deal_count
won_average = (
(won_amount_sum / won_amount_count) if won_amount_count > 0 else Decimal("0")
)
window_threshold = _threshold_from_days(days)
new_deals = await self._repository.count_new_deals_since(organization_id, window_threshold)
return DealSummary(
by_status=summaries,
won=WonStatistics(
count=won_count,
amount_sum=won_amount_sum,
average_amount=won_average,
),
new_deals=NewDealsWindow(days=days, count=new_deals),
total_deals=total_deals,
)
async def get_deal_funnel(self, organization_id: int) -> list[StageBreakdown]:
rollup = await self._repository.fetch_stage_status_rollup(organization_id)
stage_map = _build_stage_map(rollup)
breakdowns: list[StageBreakdown] = []
totals = {stage: sum(by_status.values()) for stage, by_status in stage_map.items()}
for index, stage in enumerate(_STAGE_ORDER):
by_status = stage_map.get(stage, {status: 0 for status in DealStatus})
total = totals.get(stage, 0)
conversion = None
if index < len(_STAGE_ORDER) - 1:
next_stage = _STAGE_ORDER[index + 1]
next_total = totals.get(next_stage, 0)
if total > 0:
conversion = float(round((next_total / total) * 100, 2))
breakdowns.append(
StageBreakdown(
stage=stage,
total=total,
by_status=by_status,
conversion_to_next=conversion,
)
)
return breakdowns
def _threshold_from_days(days: int) -> datetime:
return datetime.now(timezone.utc) - timedelta(days=days)
def _build_stage_map(rollup: Iterable[StageStatusRollup]) -> dict[DealStage, dict[DealStatus, int]]:
stage_map: dict[DealStage, dict[DealStatus, int]] = {
stage: {status: 0 for status in DealStatus}
for stage in _STAGE_ORDER
}
for item in rollup:
stage_map.setdefault(item.stage, {status: 0 for status in DealStatus})
stage_map[item.stage][item.status] = item.deal_count
return stage_map

View File

@ -1,166 +0,0 @@
"""API tests for analytics endpoints."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from decimal import Decimal
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.core.security import jwt_service
from app.models.contact import Contact
from app.models.deal import Deal, DealStage, DealStatus
from app.models.organization import Organization
from app.models.organization_member import OrganizationMember, OrganizationRole
from app.models.user import User
@dataclass(slots=True)
class AnalyticsScenario:
organization_id: int
user_id: int
user_email: str
token: str
async def prepare_analytics_scenario(session_factory: async_sessionmaker[AsyncSession]) -> AnalyticsScenario:
async with session_factory() as session:
org = Organization(name="Analytics Org")
user = User(email="analytics@example.com", hashed_password="hashed", name="Analyst", is_active=True)
session.add_all([org, user])
await session.flush()
membership = OrganizationMember(
organization_id=org.id,
user_id=user.id,
role=OrganizationRole.OWNER,
)
contact = Contact(
organization_id=org.id,
owner_id=user.id,
name="Client",
email="client@example.com",
)
session.add_all([membership, contact])
await session.flush()
now = datetime.now(timezone.utc)
deals = [
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Qual 1",
amount=Decimal("100"),
status=DealStatus.NEW,
stage=DealStage.QUALIFICATION,
created_at=now - timedelta(days=5),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Proposal",
amount=Decimal("200"),
status=DealStatus.IN_PROGRESS,
stage=DealStage.PROPOSAL,
created_at=now - timedelta(days=15),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Negotiation Won",
amount=Decimal("500"),
status=DealStatus.WON,
stage=DealStage.NEGOTIATION,
created_at=now - timedelta(days=2),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Closed Lost",
amount=Decimal("300"),
status=DealStatus.LOST,
stage=DealStage.CLOSED,
created_at=now - timedelta(days=40),
),
]
session.add_all(deals)
await session.commit()
token = jwt_service.create_access_token(
subject=str(user.id),
expires_delta=timedelta(minutes=30),
claims={"email": user.email},
)
return AnalyticsScenario(
organization_id=org.id,
user_id=user.id,
user_email=user.email,
token=token,
)
def _headers(token: str, organization_id: int) -> dict[str, str]:
return {"Authorization": f"Bearer {token}", "X-Organization-Id": str(organization_id)}
@pytest.mark.asyncio
async def test_deals_summary_endpoint_returns_metrics(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_analytics_scenario(session_factory)
response = await client.get(
"/api/v1/analytics/deals/summary?days=30",
headers=_headers(scenario.token, scenario.organization_id),
)
assert response.status_code == 200
payload = response.json()
assert payload["total_deals"] == 4
by_status = {entry["status"]: entry for entry in payload["by_status"]}
assert by_status[DealStatus.NEW.value]["count"] == 1
assert by_status[DealStatus.WON.value]["amount_sum"] == "500"
assert payload["won"]["average_amount"] == "500"
assert payload["new_deals"]["count"] == 3
@pytest.mark.asyncio
async def test_deals_summary_respects_days_filter(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_analytics_scenario(session_factory)
response = await client.get(
"/api/v1/analytics/deals/summary?days=3",
headers=_headers(scenario.token, scenario.organization_id),
)
assert response.status_code == 200
payload = response.json()
assert payload["new_deals"]["count"] == 1 # только сделки моложе трёх дней
@pytest.mark.asyncio
async def test_deals_funnel_returns_breakdown(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_analytics_scenario(session_factory)
response = await client.get(
"/api/v1/analytics/deals/funnel",
headers=_headers(scenario.token, scenario.organization_id),
)
assert response.status_code == 200
payload = response.json()
assert len(payload["stages"]) == 4
qualification = next(item for item in payload["stages"] if item["stage"] == DealStage.QUALIFICATION.value)
assert qualification["total"] == 1
proposal = next(item for item in payload["stages"] if item["stage"] == DealStage.PROPOSAL.value)
assert proposal["conversion_to_next"] == 100.0

View File

@ -1,152 +0,0 @@
"""Unit tests for AnalyticsService."""
from __future__ import annotations
from collections.abc import AsyncGenerator
from datetime import datetime, timedelta, timezone
from decimal import Decimal
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import StaticPool
from app.models import Base
from app.models.contact import Contact
from app.models.deal import Deal, DealStage, DealStatus
from app.models.organization import Organization
from app.models.organization_member import OrganizationMember, OrganizationRole
from app.models.user import User
from app.repositories.analytics_repo import AnalyticsRepository
from app.services.analytics_service import AnalyticsService
@pytest_asyncio.fixture()
async def session() -> AsyncGenerator[AsyncSession, None]:
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:", future=True, poolclass=StaticPool
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Session = async_sessionmaker(engine, expire_on_commit=False)
async with Session() as session:
yield session
await engine.dispose()
async def _seed_data(session: AsyncSession) -> tuple[int, int, int]:
org = Organization(name="Analytics Org")
user = User(email="analytics@example.com", hashed_password="hashed", name="Analyst", is_active=True)
session.add_all([org, user])
await session.flush()
member = OrganizationMember(organization_id=org.id, user_id=user.id, role=OrganizationRole.OWNER)
contact = Contact(organization_id=org.id, owner_id=user.id, name="Client", email="client@example.com")
session.add_all([member, contact])
await session.flush()
now = datetime.now(timezone.utc)
deals = [
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Qual 1",
amount=Decimal("100"),
status=DealStatus.NEW,
stage=DealStage.QUALIFICATION,
created_at=now - timedelta(days=5),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Qual 2",
amount=Decimal("150"),
status=DealStatus.NEW,
stage=DealStage.QUALIFICATION,
created_at=now - timedelta(days=3),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Proposal",
amount=Decimal("200"),
status=DealStatus.IN_PROGRESS,
stage=DealStage.PROPOSAL,
created_at=now - timedelta(days=15),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Negotiation Won",
amount=Decimal("500"),
status=DealStatus.WON,
stage=DealStage.NEGOTIATION,
created_at=now - timedelta(days=2),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Negotiation Won No Amount",
amount=None,
status=DealStatus.WON,
stage=DealStage.NEGOTIATION,
created_at=now - timedelta(days=1),
),
Deal(
organization_id=org.id,
contact_id=contact.id,
owner_id=user.id,
title="Closed Lost",
amount=Decimal("300"),
status=DealStatus.LOST,
stage=DealStage.CLOSED,
created_at=now - timedelta(days=40),
),
]
session.add_all(deals)
await session.commit()
return org.id, user.id, contact.id
@pytest.mark.asyncio
async def test_deal_summary_returns_expected_metrics(session: AsyncSession) -> None:
org_id, _, _ = await _seed_data(session)
service = AnalyticsService(repository=AnalyticsRepository(session))
summary = await service.get_deal_summary(org_id, days=30)
assert summary.total_deals == 6
status_map = {item.status: item for item in summary.by_status}
assert status_map[DealStatus.NEW].count == 2
assert Decimal(status_map[DealStatus.NEW].amount_sum) == Decimal("250")
assert status_map[DealStatus.WON].count == 2
assert Decimal(summary.won.amount_sum) == Decimal("500")
assert Decimal(summary.won.average_amount) == Decimal("500")
assert summary.new_deals.count == 5 # все кроме старой закрытой сделки
assert summary.new_deals.days == 30
@pytest.mark.asyncio
async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession) -> None:
org_id, _, _ = await _seed_data(session)
service = AnalyticsService(repository=AnalyticsRepository(session))
funnel = await service.get_deal_funnel(org_id)
assert len(funnel) == 4
qual = next(item for item in funnel if item.stage == DealStage.QUALIFICATION)
assert qual.total == 2
assert qual.by_status[DealStatus.NEW] == 2
assert qual.conversion_to_next == 50.0
proposal = next(item for item in funnel if item.stage == DealStage.PROPOSAL)
assert proposal.total == 1
assert proposal.by_status[DealStatus.IN_PROGRESS] == 1
assert proposal.conversion_to_next == 200.0
last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED)
assert last_stage.conversion_to_next is None