"""Unit tests for ActivityService.""" from __future__ import annotations import uuid from collections.abc import AsyncGenerator import pytest import pytest_asyncio from app.models.activity import Activity, ActivityType from app.models.base import Base from app.models.contact import Contact from app.models.deal import Deal from app.models.organization import Organization from app.models.organization_member import OrganizationMember, OrganizationRole from app.models.user import User from app.repositories.activity_repo import ActivityRepository from app.services.activity_service import ( ActivityForbiddenError, ActivityListFilters, ActivityService, ActivityValidationError, ) from app.services.organization_service import OrganizationContext from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.pool import StaticPool @pytest_asyncio.fixture() async def session() -> AsyncGenerator[AsyncSession, None]: engine = create_async_engine( "sqlite+aiosqlite:///:memory:", future=True, poolclass=StaticPool, ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) session_factory = async_sessionmaker(engine, expire_on_commit=False) async with session_factory() as session: yield session await engine.dispose() def _make_user(suffix: str) -> User: return User( email=f"user-{suffix}@example.com", hashed_password="hashed", name="Test", is_active=True, ) async def _prepare_deal( session: AsyncSession, *, role: OrganizationRole = OrganizationRole.MANAGER, ) -> tuple[OrganizationContext, ActivityRepository, int, Organization]: org = Organization(name=f"Org-{uuid.uuid4()}"[:8]) user = _make_user("owner") session.add_all([org, user]) await session.flush() contact = Contact( organization_id=org.id, owner_id=user.id, name="Alice", email="alice@example.com", ) session.add(contact) await session.flush() deal = Deal( organization_id=org.id, contact_id=contact.id, owner_id=user.id, title="Activity", amount=None, ) session.add(deal) await session.flush() membership = OrganizationMember(organization_id=org.id, user_id=user.id, role=role) context = OrganizationContext(organization=org, membership=membership) return context, ActivityRepository(session=session), deal.id, org @pytest.mark.asyncio async def test_list_activities_returns_only_current_deal(session: AsyncSession) -> None: context, repo, deal_id, _ = await _prepare_deal(session) service = ActivityService(repository=repo) session.add_all( [ Activity( deal_id=deal_id, author_id=context.user_id, type=ActivityType.COMMENT, payload={"text": "hi"}, ), Activity( deal_id=deal_id + 1, author_id=context.user_id, type=ActivityType.SYSTEM, payload={}, ), ], ) await session.flush() activities = await service.list_activities( filters=ActivityListFilters(deal_id=deal_id, limit=10, offset=0), context=context, ) assert len(activities) == 1 assert activities[0].deal_id == deal_id @pytest.mark.asyncio async def test_add_comment_rejects_empty_text(session: AsyncSession) -> None: context, repo, deal_id, _ = await _prepare_deal(session) service = ActivityService(repository=repo) with pytest.raises(ActivityValidationError): await service.add_comment( deal_id=deal_id, author_id=context.user_id, text=" ", context=context, ) @pytest.mark.asyncio async def test_record_activity_blocks_foreign_deal(session: AsyncSession) -> None: context, repo, _deal_id, _ = await _prepare_deal(session) service = ActivityService(repository=repo) # Create a second deal in another organization other_org = Organization(name="External") other_user = _make_user("external") session.add_all([other_org, other_user]) await session.flush() other_contact = Contact( organization_id=other_org.id, owner_id=other_user.id, name="Bob", email="bob@example.com", ) session.add(other_contact) await session.flush() other_deal = Deal( organization_id=other_org.id, contact_id=other_contact.id, owner_id=other_user.id, title="Foreign", amount=None, ) session.add(other_deal) await session.flush() with pytest.raises(ActivityForbiddenError): await service.list_activities( filters=ActivityListFilters(deal_id=other_deal.id), context=context, ) @pytest.mark.asyncio async def test_add_comment_trims_payload_text(session: AsyncSession) -> None: context, repo, deal_id, _ = await _prepare_deal(session) service = ActivityService(repository=repo) activity = await service.add_comment( deal_id=deal_id, author_id=context.user_id, text=" trimmed text ", context=context, ) assert activity.payload["text"] == "trimmed text"