test_task_crm/tests/services/test_contact_service.py

263 lines
7.8 KiB
Python

"""Unit tests for ContactService."""
from __future__ import annotations
import uuid
from collections.abc import AsyncGenerator
import pytest
import pytest_asyncio
from app.models.base import Base
from app.models.contact import Contact, ContactCreate
from app.models.deal import Deal
from app.models.organization import Organization
from app.models.organization_member import OrganizationMember, OrganizationRole
from app.models.user import User
from app.repositories.contact_repo import ContactRepository
from app.services.contact_service import (
ContactDeletionError,
ContactForbiddenError,
ContactListFilters,
ContactService,
ContactUpdateData,
)
from app.services.organization_service import OrganizationContext
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import StaticPool
@pytest_asyncio.fixture()
async def session() -> AsyncGenerator[AsyncSession, None]:
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
future=True,
poolclass=StaticPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(engine, expire_on_commit=False)
async with factory() as session:
yield session
await engine.dispose()
def _make_user(label: str) -> User:
return User(
email=f"{label}-{uuid.uuid4()}@example.com",
hashed_password="hashed",
name=f"{label.title()} User",
is_active=True,
)
def _context_for(
*,
organization: Organization,
user: User,
role: OrganizationRole,
) -> OrganizationContext:
membership = OrganizationMember(organization_id=organization.id, user_id=user.id, role=role)
return OrganizationContext(organization=organization, membership=membership)
async def _setup_contact(
session: AsyncSession,
*,
role: OrganizationRole = OrganizationRole.MANAGER,
owner: User | None = None,
context_user: User | None = None,
) -> tuple[OrganizationContext, ContactRepository, Contact]:
organization = Organization(name=f"Org-{uuid.uuid4()}"[:8])
owner_user = owner or _make_user("owner")
ctx_user = context_user or owner_user
session.add_all([organization, owner_user])
if ctx_user is not owner_user:
session.add(ctx_user)
await session.flush()
contact = Contact(
organization_id=organization.id,
owner_id=owner_user.id,
name="John Doe",
email="john.doe@example.com",
phone="+100000000",
)
session.add(contact)
await session.flush()
context = _context_for(organization=organization, user=ctx_user, role=role)
repo = ContactRepository(session=session)
return context, repo, contact
@pytest.mark.asyncio
async def test_create_contact_honors_owner_override(session: AsyncSession) -> None:
context, repo, _ = await _setup_contact(session)
other_user = _make_user("other")
session.add(other_user)
await session.flush()
service = ContactService(repository=repo)
contact = await service.create_contact(
ContactCreate(
organization_id=context.organization_id,
owner_id=other_user.id,
name="Alice",
email="alice@example.com",
phone=None,
),
context=context,
)
assert contact.owner_id == other_user.id
assert contact.name == "Alice"
@pytest.mark.asyncio
async def test_member_cannot_create_foreign_owner(session: AsyncSession) -> None:
owner = _make_user("owner")
member = _make_user("member")
context, repo, _ = await _setup_contact(
session,
role=OrganizationRole.MEMBER,
owner=owner,
context_user=member,
)
service = ContactService(repository=repo)
with pytest.raises(ContactForbiddenError):
await service.create_contact(
ContactCreate(
organization_id=context.organization_id,
owner_id=owner.id,
name="Restricted",
email=None,
phone=None,
),
context=context,
)
@pytest.mark.asyncio
async def test_list_contacts_supports_search(session: AsyncSession) -> None:
context, repo, base_contact = await _setup_contact(session)
service = ContactService(repository=repo)
another = Contact(
organization_id=context.organization_id,
owner_id=base_contact.owner_id,
name="Searchable",
email="findme@example.com",
phone=None,
)
session.add(another)
await session.flush()
contacts = await service.list_contacts(
filters=ContactListFilters(search="search"),
context=context,
)
assert len(contacts) == 1
assert contacts[0].id == another.id
@pytest.mark.asyncio
async def test_member_owner_filter_forbidden(session: AsyncSession) -> None:
owner = _make_user("owner")
member = _make_user("member")
context, repo, _ = await _setup_contact(
session,
role=OrganizationRole.MEMBER,
owner=owner,
context_user=member,
)
service = ContactService(repository=repo)
with pytest.raises(ContactForbiddenError):
await service.list_contacts(
filters=ContactListFilters(owner_id=owner.id),
context=context,
)
@pytest.mark.asyncio
async def test_member_can_view_foreign_contacts(session: AsyncSession) -> None:
owner = _make_user("owner")
member = _make_user("member")
context, repo, contact = await _setup_contact(
session,
role=OrganizationRole.MEMBER,
owner=owner,
context_user=member,
)
service = ContactService(repository=repo)
contacts = await service.list_contacts(filters=ContactListFilters(), context=context)
assert contacts and contacts[0].id == contact.id
assert contacts[0].owner_id == owner.id != context.user_id
@pytest.mark.asyncio
async def test_member_cannot_update_foreign_contact(session: AsyncSession) -> None:
owner = _make_user("owner")
member = _make_user("member")
context, repo, contact = await _setup_contact(
session,
role=OrganizationRole.MEMBER,
owner=owner,
context_user=member,
)
service = ContactService(repository=repo)
with pytest.raises(ContactForbiddenError):
await service.update_contact(contact, ContactUpdateData(name="Blocked"), context=context)
@pytest.mark.asyncio
async def test_update_contact_allows_nullifying_fields(session: AsyncSession) -> None:
context, repo, contact = await _setup_contact(session)
service = ContactService(repository=repo)
updated = await service.update_contact(
contact,
ContactUpdateData(name="Updated", email=None, phone=None),
context=context,
)
assert updated.name == "Updated"
assert updated.email is None
assert updated.phone is None
@pytest.mark.asyncio
async def test_delete_contact_blocks_when_deals_exist(session: AsyncSession) -> None:
context, repo, contact = await _setup_contact(session)
service = ContactService(repository=repo)
session.add(
Deal(
organization_id=context.organization_id,
contact_id=contact.id,
owner_id=contact.owner_id,
title="Pending",
amount=None,
),
)
await session.flush()
with pytest.raises(ContactDeletionError):
await service.delete_contact(contact, context=context)
@pytest.mark.asyncio
async def test_delete_contact_succeeds_without_deals(session: AsyncSession) -> None:
context, repo, contact = await _setup_contact(session)
service = ContactService(repository=repo)
await service.delete_contact(contact, context=context)
result = await session.scalar(select(Contact).where(Contact.id == contact.id))
assert result is None