152 lines
5.4 KiB
Python
152 lines
5.4 KiB
Python
"""Unit tests for AnalyticsService."""
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncGenerator
|
|
from datetime import datetime, timedelta, timezone
|
|
from decimal import Decimal
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
from app.models import Base
|
|
from app.models.contact import Contact
|
|
from app.models.deal import Deal, DealStage, DealStatus
|
|
from app.models.organization import Organization
|
|
from app.models.organization_member import OrganizationMember, OrganizationRole
|
|
from app.models.user import User
|
|
from app.repositories.analytics_repo import AnalyticsRepository
|
|
from app.services.analytics_service import AnalyticsService
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def session() -> AsyncGenerator[AsyncSession, None]:
|
|
engine = create_async_engine(
|
|
"sqlite+aiosqlite:///:memory:", future=True, poolclass=StaticPool
|
|
)
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
Session = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with Session() as session:
|
|
yield session
|
|
await engine.dispose()
|
|
|
|
|
|
async def _seed_data(session: AsyncSession) -> tuple[int, int, int]:
|
|
org = Organization(name="Analytics Org")
|
|
user = User(email="analytics@example.com", hashed_password="hashed", name="Analyst", is_active=True)
|
|
session.add_all([org, user])
|
|
await session.flush()
|
|
|
|
member = OrganizationMember(organization_id=org.id, user_id=user.id, role=OrganizationRole.OWNER)
|
|
contact = Contact(organization_id=org.id, owner_id=user.id, name="Client", email="client@example.com")
|
|
session.add_all([member, contact])
|
|
await session.flush()
|
|
|
|
now = datetime.now(timezone.utc)
|
|
deals = [
|
|
Deal(
|
|
organization_id=org.id,
|
|
contact_id=contact.id,
|
|
owner_id=user.id,
|
|
title="Qual 1",
|
|
amount=Decimal("100"),
|
|
status=DealStatus.NEW,
|
|
stage=DealStage.QUALIFICATION,
|
|
created_at=now - timedelta(days=5),
|
|
),
|
|
Deal(
|
|
organization_id=org.id,
|
|
contact_id=contact.id,
|
|
owner_id=user.id,
|
|
title="Qual 2",
|
|
amount=Decimal("150"),
|
|
status=DealStatus.NEW,
|
|
stage=DealStage.QUALIFICATION,
|
|
created_at=now - timedelta(days=3),
|
|
),
|
|
Deal(
|
|
organization_id=org.id,
|
|
contact_id=contact.id,
|
|
owner_id=user.id,
|
|
title="Proposal",
|
|
amount=Decimal("200"),
|
|
status=DealStatus.IN_PROGRESS,
|
|
stage=DealStage.PROPOSAL,
|
|
created_at=now - timedelta(days=15),
|
|
),
|
|
Deal(
|
|
organization_id=org.id,
|
|
contact_id=contact.id,
|
|
owner_id=user.id,
|
|
title="Negotiation Won",
|
|
amount=Decimal("500"),
|
|
status=DealStatus.WON,
|
|
stage=DealStage.NEGOTIATION,
|
|
created_at=now - timedelta(days=2),
|
|
),
|
|
Deal(
|
|
organization_id=org.id,
|
|
contact_id=contact.id,
|
|
owner_id=user.id,
|
|
title="Negotiation Won No Amount",
|
|
amount=None,
|
|
status=DealStatus.WON,
|
|
stage=DealStage.NEGOTIATION,
|
|
created_at=now - timedelta(days=1),
|
|
),
|
|
Deal(
|
|
organization_id=org.id,
|
|
contact_id=contact.id,
|
|
owner_id=user.id,
|
|
title="Closed Lost",
|
|
amount=Decimal("300"),
|
|
status=DealStatus.LOST,
|
|
stage=DealStage.CLOSED,
|
|
created_at=now - timedelta(days=40),
|
|
),
|
|
]
|
|
session.add_all(deals)
|
|
await session.commit()
|
|
return org.id, user.id, contact.id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deal_summary_returns_expected_metrics(session: AsyncSession) -> None:
|
|
org_id, _, _ = await _seed_data(session)
|
|
service = AnalyticsService(repository=AnalyticsRepository(session))
|
|
|
|
summary = await service.get_deal_summary(org_id, days=30)
|
|
|
|
assert summary.total_deals == 6
|
|
status_map = {item.status: item for item in summary.by_status}
|
|
assert status_map[DealStatus.NEW].count == 2
|
|
assert Decimal(status_map[DealStatus.NEW].amount_sum) == Decimal("250")
|
|
assert status_map[DealStatus.WON].count == 2
|
|
assert Decimal(summary.won.amount_sum) == Decimal("500")
|
|
assert Decimal(summary.won.average_amount) == Decimal("500")
|
|
assert summary.new_deals.count == 5 # все кроме старой закрытой сделки
|
|
assert summary.new_deals.days == 30
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession) -> None:
|
|
org_id, _, _ = await _seed_data(session)
|
|
service = AnalyticsService(repository=AnalyticsRepository(session))
|
|
|
|
funnel = await service.get_deal_funnel(org_id)
|
|
|
|
assert len(funnel) == 4
|
|
qual = next(item for item in funnel if item.stage == DealStage.QUALIFICATION)
|
|
assert qual.total == 2
|
|
assert qual.by_status[DealStatus.NEW] == 2
|
|
assert qual.conversion_to_next == 50.0
|
|
|
|
proposal = next(item for item in funnel if item.stage == DealStage.PROPOSAL)
|
|
assert proposal.total == 1
|
|
assert proposal.by_status[DealStatus.IN_PROGRESS] == 1
|
|
assert proposal.conversion_to_next == 200.0
|
|
|
|
last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED)
|
|
assert last_stage.conversion_to_next is None |