229 lines
6.7 KiB
Python
229 lines
6.7 KiB
Python
"""Unit tests for ContactService."""
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncGenerator
|
|
import uuid
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
from app.models.base import Base
|
|
from app.models.contact import Contact, ContactCreate
|
|
from app.models.deal import Deal
|
|
from app.models.organization import Organization
|
|
from app.models.organization_member import OrganizationMember, OrganizationRole
|
|
from app.models.user import User
|
|
from app.repositories.contact_repo import ContactRepository
|
|
from app.services.contact_service import (
|
|
ContactDeletionError,
|
|
ContactForbiddenError,
|
|
ContactListFilters,
|
|
ContactService,
|
|
ContactUpdateData,
|
|
)
|
|
from app.services.organization_service import OrganizationContext
|
|
|
|
|
|
@pytest_asyncio.fixture()
|
|
async def session() -> AsyncGenerator[AsyncSession, None]:
|
|
engine = create_async_engine(
|
|
"sqlite+aiosqlite:///:memory:",
|
|
future=True,
|
|
poolclass=StaticPool,
|
|
)
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
factory = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with factory() as session:
|
|
yield session
|
|
await engine.dispose()
|
|
|
|
|
|
def _make_user(label: str) -> User:
|
|
return User(
|
|
email=f"{label}-{uuid.uuid4()}@example.com",
|
|
hashed_password="hashed",
|
|
name=f"{label.title()} User",
|
|
is_active=True,
|
|
)
|
|
|
|
|
|
def _context_for(
|
|
*,
|
|
organization: Organization,
|
|
user: User,
|
|
role: OrganizationRole,
|
|
) -> OrganizationContext:
|
|
membership = OrganizationMember(organization_id=organization.id, user_id=user.id, role=role)
|
|
return OrganizationContext(organization=organization, membership=membership)
|
|
|
|
|
|
async def _setup_contact(
|
|
session: AsyncSession,
|
|
*,
|
|
role: OrganizationRole = OrganizationRole.MANAGER,
|
|
owner: User | None = None,
|
|
context_user: User | None = None,
|
|
) -> tuple[OrganizationContext, ContactRepository, Contact]:
|
|
organization = Organization(name=f"Org-{uuid.uuid4()}"[:8])
|
|
owner_user = owner or _make_user("owner")
|
|
ctx_user = context_user or owner_user
|
|
session.add_all([organization, owner_user])
|
|
if ctx_user is not owner_user:
|
|
session.add(ctx_user)
|
|
await session.flush()
|
|
|
|
contact = Contact(
|
|
organization_id=organization.id,
|
|
owner_id=owner_user.id,
|
|
name="John Doe",
|
|
email="john.doe@example.com",
|
|
phone="+100000000",
|
|
)
|
|
session.add(contact)
|
|
await session.flush()
|
|
|
|
context = _context_for(organization=organization, user=ctx_user, role=role)
|
|
repo = ContactRepository(session=session)
|
|
return context, repo, contact
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_contact_honors_owner_override(session: AsyncSession) -> None:
|
|
context, repo, _ = await _setup_contact(session)
|
|
other_user = _make_user("other")
|
|
session.add(other_user)
|
|
await session.flush()
|
|
|
|
service = ContactService(repository=repo)
|
|
contact = await service.create_contact(
|
|
ContactCreate(
|
|
organization_id=context.organization_id,
|
|
owner_id=other_user.id,
|
|
name="Alice",
|
|
email="alice@example.com",
|
|
phone=None,
|
|
),
|
|
context=context,
|
|
)
|
|
|
|
assert contact.owner_id == other_user.id
|
|
assert contact.name == "Alice"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_member_cannot_create_foreign_owner(session: AsyncSession) -> None:
|
|
owner = _make_user("owner")
|
|
member = _make_user("member")
|
|
context, repo, _ = await _setup_contact(
|
|
session,
|
|
role=OrganizationRole.MEMBER,
|
|
owner=owner,
|
|
context_user=member,
|
|
)
|
|
service = ContactService(repository=repo)
|
|
|
|
with pytest.raises(ContactForbiddenError):
|
|
await service.create_contact(
|
|
ContactCreate(
|
|
organization_id=context.organization_id,
|
|
owner_id=owner.id,
|
|
name="Restricted",
|
|
email=None,
|
|
phone=None,
|
|
),
|
|
context=context,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_contacts_supports_search(session: AsyncSession) -> None:
|
|
context, repo, base_contact = await _setup_contact(session)
|
|
service = ContactService(repository=repo)
|
|
|
|
another = Contact(
|
|
organization_id=context.organization_id,
|
|
owner_id=base_contact.owner_id,
|
|
name="Searchable",
|
|
email="findme@example.com",
|
|
phone=None,
|
|
)
|
|
session.add(another)
|
|
await session.flush()
|
|
|
|
contacts = await service.list_contacts(
|
|
filters=ContactListFilters(search="search"),
|
|
context=context,
|
|
)
|
|
|
|
assert len(contacts) == 1
|
|
assert contacts[0].id == another.id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_member_owner_filter_forbidden(session: AsyncSession) -> None:
|
|
owner = _make_user("owner")
|
|
member = _make_user("member")
|
|
context, repo, _ = await _setup_contact(
|
|
session,
|
|
role=OrganizationRole.MEMBER,
|
|
owner=owner,
|
|
context_user=member,
|
|
)
|
|
service = ContactService(repository=repo)
|
|
|
|
with pytest.raises(ContactForbiddenError):
|
|
await service.list_contacts(
|
|
filters=ContactListFilters(owner_id=owner.id),
|
|
context=context,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_contact_allows_nullifying_fields(session: AsyncSession) -> None:
|
|
context, repo, contact = await _setup_contact(session)
|
|
service = ContactService(repository=repo)
|
|
|
|
updated = await service.update_contact(
|
|
contact,
|
|
ContactUpdateData(name="Updated", email=None, phone=None),
|
|
context=context,
|
|
)
|
|
|
|
assert updated.name == "Updated"
|
|
assert updated.email is None
|
|
assert updated.phone is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_contact_blocks_when_deals_exist(session: AsyncSession) -> None:
|
|
context, repo, contact = await _setup_contact(session)
|
|
service = ContactService(repository=repo)
|
|
|
|
session.add(
|
|
Deal(
|
|
organization_id=context.organization_id,
|
|
contact_id=contact.id,
|
|
owner_id=contact.owner_id,
|
|
title="Pending",
|
|
amount=None,
|
|
)
|
|
)
|
|
await session.flush()
|
|
|
|
with pytest.raises(ContactDeletionError):
|
|
await service.delete_contact(contact, context=context)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_contact_succeeds_without_deals(session: AsyncSession) -> None:
|
|
context, repo, contact = await _setup_contact(session)
|
|
service = ContactService(repository=repo)
|
|
|
|
await service.delete_contact(contact, context=context)
|
|
result = await session.scalar(select(Contact).where(Contact.id == contact.id))
|
|
assert result is None
|