"""Unit tests for AnalyticsService.""" from __future__ import annotations from collections.abc import AsyncGenerator from datetime import datetime, timedelta, timezone from decimal import Decimal import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.pool import StaticPool from app.models import Base from app.models.contact import Contact from app.models.deal import Deal, DealStage, DealStatus from app.models.organization import Organization from app.models.organization_member import OrganizationMember, OrganizationRole from app.models.user import User from app.repositories.analytics_repo import AnalyticsRepository from app.services.analytics_service import AnalyticsService, invalidate_analytics_cache from tests.utils.fake_redis import InMemoryRedis @pytest_asyncio.fixture() async def session() -> AsyncGenerator[AsyncSession, None]: engine = create_async_engine( "sqlite+aiosqlite:///:memory:", future=True, poolclass=StaticPool, ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) session_factory = async_sessionmaker(engine, expire_on_commit=False) async with session_factory() as session: yield session await engine.dispose() async def _seed_data(session: AsyncSession) -> tuple[int, int, int]: org = Organization(name="Analytics Org") user = User( email="analytics@example.com", hashed_password="hashed", name="Analyst", is_active=True, ) session.add_all([org, user]) await session.flush() member = OrganizationMember( organization_id=org.id, user_id=user.id, role=OrganizationRole.OWNER, ) contact = Contact( organization_id=org.id, owner_id=user.id, name="Client", email="client@example.com", ) session.add_all([member, contact]) await session.flush() now = datetime.now(timezone.utc) deals = [ Deal( organization_id=org.id, contact_id=contact.id, owner_id=user.id, title="Qual 1", amount=Decimal("100"), status=DealStatus.NEW, stage=DealStage.QUALIFICATION, created_at=now - timedelta(days=5), ), Deal( organization_id=org.id, contact_id=contact.id, owner_id=user.id, title="Qual 2", amount=Decimal("150"), status=DealStatus.NEW, stage=DealStage.QUALIFICATION, created_at=now - timedelta(days=3), ), Deal( organization_id=org.id, contact_id=contact.id, owner_id=user.id, title="Proposal", amount=Decimal("200"), status=DealStatus.IN_PROGRESS, stage=DealStage.PROPOSAL, created_at=now - timedelta(days=15), ), Deal( organization_id=org.id, contact_id=contact.id, owner_id=user.id, title="Negotiation Won", amount=Decimal("500"), status=DealStatus.WON, stage=DealStage.NEGOTIATION, created_at=now - timedelta(days=2), ), Deal( organization_id=org.id, contact_id=contact.id, owner_id=user.id, title="Negotiation Won No Amount", amount=None, status=DealStatus.WON, stage=DealStage.NEGOTIATION, created_at=now - timedelta(days=1), ), Deal( organization_id=org.id, contact_id=contact.id, owner_id=user.id, title="Closed Lost", amount=Decimal("300"), status=DealStatus.LOST, stage=DealStage.CLOSED, created_at=now - timedelta(days=40), ), ] session.add_all(deals) await session.commit() return org.id, user.id, contact.id @pytest.mark.asyncio async def test_deal_summary_returns_expected_metrics(session: AsyncSession) -> None: org_id, _, _ = await _seed_data(session) service = AnalyticsService(repository=AnalyticsRepository(session)) summary = await service.get_deal_summary(org_id, days=30) assert summary.total_deals == 6 status_map = {item.status: item for item in summary.by_status} assert status_map[DealStatus.NEW].count == 2 assert Decimal(status_map[DealStatus.NEW].amount_sum) == Decimal("250") assert status_map[DealStatus.WON].count == 2 assert Decimal(summary.won.amount_sum) == Decimal("500") assert Decimal(summary.won.average_amount) == Decimal("500") assert summary.new_deals.count == 5 # все кроме старой закрытой сделки assert summary.new_deals.days == 30 @pytest.mark.asyncio async def test_funnel_breakdown_contains_stage_conversions(session: AsyncSession) -> None: org_id, _, _ = await _seed_data(session) service = AnalyticsService(repository=AnalyticsRepository(session)) funnel = await service.get_deal_funnel(org_id) assert len(funnel) == 4 qual = next(item for item in funnel if item.stage == DealStage.QUALIFICATION) assert qual.total == 2 assert qual.by_status[DealStatus.NEW] == 2 assert qual.conversion_to_next == 50.0 proposal = next(item for item in funnel if item.stage == DealStage.PROPOSAL) assert proposal.total == 1 assert proposal.by_status[DealStatus.IN_PROGRESS] == 1 assert proposal.conversion_to_next == 200.0 last_stage = next(item for item in funnel if item.stage == DealStage.CLOSED) assert last_stage.conversion_to_next is None class _ExplodingRepository(AnalyticsRepository): async def fetch_status_rollup(self, organization_id: int): # type: ignore[override] raise AssertionError("cache not used for status rollup") async def count_new_deals_since(self, organization_id: int, threshold): # type: ignore[override] raise AssertionError("cache not used for new deal count") async def fetch_stage_status_rollup(self, organization_id: int): # type: ignore[override] raise AssertionError("cache not used for funnel rollup") @pytest.mark.asyncio async def test_summary_reads_from_cache_when_available(session: AsyncSession) -> None: org_id, _, _ = await _seed_data(session) cache = InMemoryRedis() service = AnalyticsService( repository=AnalyticsRepository(session), cache=cache, ttl_seconds=60, backoff_ms=0, ) await service.get_deal_summary(org_id, days=30) service._repository = _ExplodingRepository(session) cached = await service.get_deal_summary(org_id, days=30) assert cached.total_deals == 6 @pytest.mark.asyncio async def test_invalidation_refreshes_cached_summary(session: AsyncSession) -> None: org_id, _, contact_id = await _seed_data(session) cache = InMemoryRedis() service = AnalyticsService( repository=AnalyticsRepository(session), cache=cache, ttl_seconds=60, backoff_ms=0, ) await service.get_deal_summary(org_id, days=30) deal = Deal( organization_id=org_id, contact_id=contact_id, owner_id=1, title="New", amount=Decimal("50"), status=DealStatus.NEW, stage=DealStage.QUALIFICATION, created_at=datetime.now(timezone.utc), ) session.add(deal) await session.commit() cached = await service.get_deal_summary(org_id, days=30) assert cached.total_deals == 6 await invalidate_analytics_cache(cache, org_id, backoff_ms=0) refreshed = await service.get_deal_summary(org_id, days=30) assert refreshed.total_deals == 7 @pytest.mark.asyncio async def test_funnel_reads_from_cache_when_available(session: AsyncSession) -> None: org_id, _, _ = await _seed_data(session) cache = InMemoryRedis() service = AnalyticsService( repository=AnalyticsRepository(session), cache=cache, ttl_seconds=60, backoff_ms=0, ) await service.get_deal_funnel(org_id) service._repository = _ExplodingRepository(session) cached = await service.get_deal_funnel(org_id) assert len(cached) == 4