"""Unit tests for ContactService.""" from __future__ import annotations import uuid from collections.abc import AsyncGenerator import pytest import pytest_asyncio from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.pool import StaticPool from app.models.base import Base from app.models.contact import Contact, ContactCreate from app.models.deal import Deal from app.models.organization import Organization from app.models.organization_member import OrganizationMember, OrganizationRole from app.models.user import User from app.repositories.contact_repo import ContactRepository from app.services.contact_service import ( ContactDeletionError, ContactForbiddenError, ContactListFilters, ContactService, ContactUpdateData, ) from app.services.organization_service import OrganizationContext @pytest_asyncio.fixture() async def session() -> AsyncGenerator[AsyncSession, None]: engine = create_async_engine( "sqlite+aiosqlite:///:memory:", future=True, poolclass=StaticPool, ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) factory = async_sessionmaker(engine, expire_on_commit=False) async with factory() as session: yield session await engine.dispose() def _make_user(label: str) -> User: return User( email=f"{label}-{uuid.uuid4()}@example.com", hashed_password="hashed", name=f"{label.title()} User", is_active=True, ) def _context_for( *, organization: Organization, user: User, role: OrganizationRole, ) -> OrganizationContext: membership = OrganizationMember(organization_id=organization.id, user_id=user.id, role=role) return OrganizationContext(organization=organization, membership=membership) async def _setup_contact( session: AsyncSession, *, role: OrganizationRole = OrganizationRole.MANAGER, owner: User | None = None, context_user: User | None = None, ) -> tuple[OrganizationContext, ContactRepository, Contact]: organization = Organization(name=f"Org-{uuid.uuid4()}"[:8]) owner_user = owner or _make_user("owner") ctx_user = context_user or owner_user session.add_all([organization, owner_user]) if ctx_user is not owner_user: session.add(ctx_user) await session.flush() contact = Contact( organization_id=organization.id, owner_id=owner_user.id, name="John Doe", email="john.doe@example.com", phone="+100000000", ) session.add(contact) await session.flush() context = _context_for(organization=organization, user=ctx_user, role=role) repo = ContactRepository(session=session) return context, repo, contact @pytest.mark.asyncio async def test_create_contact_honors_owner_override(session: AsyncSession) -> None: context, repo, _ = await _setup_contact(session) other_user = _make_user("other") session.add(other_user) await session.flush() service = ContactService(repository=repo) contact = await service.create_contact( ContactCreate( organization_id=context.organization_id, owner_id=other_user.id, name="Alice", email="alice@example.com", phone=None, ), context=context, ) assert contact.owner_id == other_user.id assert contact.name == "Alice" @pytest.mark.asyncio async def test_member_cannot_create_foreign_owner(session: AsyncSession) -> None: owner = _make_user("owner") member = _make_user("member") context, repo, _ = await _setup_contact( session, role=OrganizationRole.MEMBER, owner=owner, context_user=member, ) service = ContactService(repository=repo) with pytest.raises(ContactForbiddenError): await service.create_contact( ContactCreate( organization_id=context.organization_id, owner_id=owner.id, name="Restricted", email=None, phone=None, ), context=context, ) @pytest.mark.asyncio async def test_list_contacts_supports_search(session: AsyncSession) -> None: context, repo, base_contact = await _setup_contact(session) service = ContactService(repository=repo) another = Contact( organization_id=context.organization_id, owner_id=base_contact.owner_id, name="Searchable", email="findme@example.com", phone=None, ) session.add(another) await session.flush() contacts = await service.list_contacts( filters=ContactListFilters(search="search"), context=context, ) assert len(contacts) == 1 assert contacts[0].id == another.id @pytest.mark.asyncio async def test_member_owner_filter_forbidden(session: AsyncSession) -> None: owner = _make_user("owner") member = _make_user("member") context, repo, _ = await _setup_contact( session, role=OrganizationRole.MEMBER, owner=owner, context_user=member, ) service = ContactService(repository=repo) with pytest.raises(ContactForbiddenError): await service.list_contacts( filters=ContactListFilters(owner_id=owner.id), context=context, ) @pytest.mark.asyncio async def test_member_can_view_foreign_contacts(session: AsyncSession) -> None: owner = _make_user("owner") member = _make_user("member") context, repo, contact = await _setup_contact( session, role=OrganizationRole.MEMBER, owner=owner, context_user=member, ) service = ContactService(repository=repo) contacts = await service.list_contacts(filters=ContactListFilters(), context=context) assert contacts and contacts[0].id == contact.id assert contacts[0].owner_id == owner.id != context.user_id @pytest.mark.asyncio async def test_member_cannot_update_foreign_contact(session: AsyncSession) -> None: owner = _make_user("owner") member = _make_user("member") context, repo, contact = await _setup_contact( session, role=OrganizationRole.MEMBER, owner=owner, context_user=member, ) service = ContactService(repository=repo) with pytest.raises(ContactForbiddenError): await service.update_contact(contact, ContactUpdateData(name="Blocked"), context=context) @pytest.mark.asyncio async def test_update_contact_allows_nullifying_fields(session: AsyncSession) -> None: context, repo, contact = await _setup_contact(session) service = ContactService(repository=repo) updated = await service.update_contact( contact, ContactUpdateData(name="Updated", email=None, phone=None), context=context, ) assert updated.name == "Updated" assert updated.email is None assert updated.phone is None @pytest.mark.asyncio async def test_delete_contact_blocks_when_deals_exist(session: AsyncSession) -> None: context, repo, contact = await _setup_contact(session) service = ContactService(repository=repo) session.add( Deal( organization_id=context.organization_id, contact_id=contact.id, owner_id=contact.owner_id, title="Pending", amount=None, ), ) await session.flush() with pytest.raises(ContactDeletionError): await service.delete_contact(contact, context=context) @pytest.mark.asyncio async def test_delete_contact_succeeds_without_deals(session: AsyncSession) -> None: context, repo, contact = await _setup_contact(session) service = ContactService(repository=repo) await service.delete_contact(contact, context=context) result = await session.scalar(select(Contact).where(Contact.id == contact.id)) assert result is None