Compare commits

..

2 Commits

6 changed files with 804 additions and 19 deletions

View File

@ -11,12 +11,14 @@ from app.core.database import get_session
from app.core.security import jwt_service, password_hasher from app.core.security import jwt_service, password_hasher
from app.models.user import User from app.models.user import User
from app.repositories.activity_repo import ActivityRepository from app.repositories.activity_repo import ActivityRepository
from app.repositories.contact_repo import ContactRepository
from app.repositories.deal_repo import DealRepository from app.repositories.deal_repo import DealRepository
from app.repositories.org_repo import OrganizationRepository from app.repositories.org_repo import OrganizationRepository
from app.repositories.task_repo import TaskRepository from app.repositories.task_repo import TaskRepository
from app.repositories.user_repo import UserRepository from app.repositories.user_repo import UserRepository
from app.services.auth_service import AuthService from app.services.auth_service import AuthService
from app.services.activity_service import ActivityService from app.services.activity_service import ActivityService
from app.services.contact_service import ContactService
from app.services.deal_service import DealService from app.services.deal_service import DealService
from app.services.organization_service import ( from app.services.organization_service import (
OrganizationAccessDeniedError, OrganizationAccessDeniedError,
@ -48,6 +50,10 @@ def get_deal_repository(session: AsyncSession = Depends(get_db_session)) -> Deal
return DealRepository(session=session) return DealRepository(session=session)
def get_contact_repository(session: AsyncSession = Depends(get_db_session)) -> ContactRepository:
return ContactRepository(session=session)
def get_task_repository(session: AsyncSession = Depends(get_db_session)) -> TaskRepository: def get_task_repository(session: AsyncSession = Depends(get_db_session)) -> TaskRepository:
return TaskRepository(session=session) return TaskRepository(session=session)
@ -86,6 +92,12 @@ def get_activity_service(
return ActivityService(repository=repo) return ActivityService(repository=repo)
def get_contact_service(
repo: ContactRepository = Depends(get_contact_repository),
) -> ContactService:
return ContactService(repository=repo)
def get_task_service( def get_task_service(
task_repo: TaskRepository = Depends(get_task_repository), task_repo: TaskRepository = Depends(get_task_repository),
activity_repo: ActivityRepository = Depends(get_activity_repository), activity_repo: ActivityRepository = Depends(get_activity_repository),

View File

@ -1,10 +1,20 @@
"""Contact API stubs and schemas.""" """Contact API endpoints."""
from __future__ import annotations from __future__ import annotations
from fastapi import APIRouter, Depends, Query, status from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel, EmailStr from pydantic import BaseModel, ConfigDict, EmailStr
from app.api.deps import get_organization_context from app.api.deps import get_contact_service, get_organization_context
from app.models.contact import ContactCreate, ContactRead
from app.services.contact_service import (
ContactDeletionError,
ContactForbiddenError,
ContactListFilters,
ContactNotFoundError,
ContactOrganizationError,
ContactService,
ContactUpdateData,
)
from app.services.organization_service import OrganizationContext from app.services.organization_service import OrganizationContext
@ -12,33 +22,106 @@ class ContactCreatePayload(BaseModel):
name: str name: str
email: EmailStr | None = None email: EmailStr | None = None
phone: str | None = None phone: str | None = None
owner_id: int | None = None
def to_domain(self, *, organization_id: int, fallback_owner: int) -> ContactCreate:
return ContactCreate(
organization_id=organization_id,
owner_id=self.owner_id or fallback_owner,
name=self.name,
email=self.email,
phone=self.phone,
)
class ContactUpdatePayload(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str | None = None
email: EmailStr | None = None
phone: str | None = None
def to_update_data(self) -> ContactUpdateData:
dump = self.model_dump(exclude_unset=True)
return ContactUpdateData(
name=dump.get("name"),
email=dump.get("email"),
phone=dump.get("phone"),
)
router = APIRouter(prefix="/contacts", tags=["contacts"]) router = APIRouter(prefix="/contacts", tags=["contacts"])
def _stub(endpoint: str) -> dict[str, str]: @router.get("/", response_model=list[ContactRead])
return {"detail": f"{endpoint} is not implemented yet"}
@router.get("/", status_code=status.HTTP_501_NOT_IMPLEMENTED)
async def list_contacts( async def list_contacts(
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100), page_size: int = Query(20, ge=1, le=100),
search: str | None = None, search: str | None = Query(default=None, min_length=1),
owner_id: int | None = None, owner_id: int | None = None,
context: OrganizationContext = Depends(get_organization_context), context: OrganizationContext = Depends(get_organization_context),
) -> dict[str, str]: service: ContactService = Depends(get_contact_service),
"""Placeholder list endpoint supporting the required filters.""" ) -> list[ContactRead]:
_ = context filters = ContactListFilters(
return _stub("GET /contacts") page=page,
page_size=page_size,
search=search,
owner_id=owner_id,
)
try:
contacts = await service.list_contacts(filters=filters, context=context)
except ContactForbiddenError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) from exc
return [ContactRead.model_validate(contact) for contact in contacts]
@router.post("/", status_code=status.HTTP_501_NOT_IMPLEMENTED) @router.post("/", response_model=ContactRead, status_code=status.HTTP_201_CREATED)
async def create_contact( async def create_contact(
payload: ContactCreatePayload, payload: ContactCreatePayload,
context: OrganizationContext = Depends(get_organization_context), context: OrganizationContext = Depends(get_organization_context),
) -> dict[str, str]: service: ContactService = Depends(get_contact_service),
"""Placeholder for creating a contact within the current organization.""" ) -> ContactRead:
_ = (payload, context) data = payload.to_domain(organization_id=context.organization_id, fallback_owner=context.user_id)
return _stub("POST /contacts") try:
contact = await service.create_contact(data, context=context)
except ContactForbiddenError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) from exc
except ContactOrganizationError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
return ContactRead.model_validate(contact)
@router.patch("/{contact_id}", response_model=ContactRead)
async def update_contact(
contact_id: int,
payload: ContactUpdatePayload,
context: OrganizationContext = Depends(get_organization_context),
service: ContactService = Depends(get_contact_service),
) -> ContactRead:
try:
contact = await service.get_contact(contact_id, context=context)
updated = await service.update_contact(contact, payload.to_update_data(), context=context)
except ContactNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ContactForbiddenError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) from exc
except ContactOrganizationError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
return ContactRead.model_validate(updated)
@router.delete("/{contact_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_contact(
contact_id: int,
context: OrganizationContext = Depends(get_organization_context),
service: ContactService = Depends(get_contact_service),
) -> None:
try:
contact = await service.get_contact(contact_id, context=context)
await service.delete_contact(contact, context=context)
except ContactNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except ContactForbiddenError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) from exc
except ContactDeletionError as exc:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc

View File

@ -0,0 +1,137 @@
"""Repository helpers for contacts with role-aware access."""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any
from sqlalchemy import Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.contact import Contact, ContactCreate
from app.models.organization_member import OrganizationRole
class ContactAccessError(Exception):
"""Raised when attempting operations without sufficient permissions."""
@dataclass(slots=True)
class ContactQueryParams:
"""Filters accepted by contact list queries."""
organization_id: int
page: int = 1
page_size: int = 20
search: str | None = None
owner_id: int | None = None
class ContactRepository:
"""Provides CRUD helpers for Contact entities."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
@property
def session(self) -> AsyncSession:
return self._session
async def list(
self,
*,
params: ContactQueryParams,
role: OrganizationRole,
user_id: int,
) -> Sequence[Contact]:
stmt: Select[tuple[Contact]] = select(Contact).where(Contact.organization_id == params.organization_id)
stmt = self._apply_filters(stmt, params, role, user_id)
offset = (max(params.page, 1) - 1) * params.page_size
stmt = stmt.order_by(Contact.created_at.desc()).offset(offset).limit(params.page_size)
result = await self._session.scalars(stmt)
return result.all()
async def get(
self,
contact_id: int,
*,
organization_id: int,
role: OrganizationRole,
user_id: int,
) -> Contact | None:
stmt = select(Contact).where(Contact.id == contact_id, Contact.organization_id == organization_id)
stmt = self._apply_role_clause(stmt, role, user_id)
result = await self._session.scalars(stmt)
return result.first()
async def create(
self,
data: ContactCreate,
*,
role: OrganizationRole,
user_id: int,
) -> Contact:
if role == OrganizationRole.MEMBER and data.owner_id != user_id:
raise ContactAccessError("Members can only create contacts they own")
contact = Contact(**data.model_dump())
self._session.add(contact)
await self._session.flush()
return contact
async def update(
self,
contact: Contact,
updates: Mapping[str, Any],
*,
role: OrganizationRole,
user_id: int,
) -> Contact:
if role == OrganizationRole.MEMBER and contact.owner_id != user_id:
raise ContactAccessError("Members can only modify their own contacts")
for field, value in updates.items():
if hasattr(contact, field):
setattr(contact, field, value)
await self._session.flush()
await self._session.refresh(contact)
return contact
async def delete(
self,
contact: Contact,
*,
role: OrganizationRole,
user_id: int,
) -> None:
if role == OrganizationRole.MEMBER and contact.owner_id != user_id:
raise ContactAccessError("Members can only delete their own contacts")
await self._session.delete(contact)
await self._session.flush()
def _apply_filters(
self,
stmt: Select[tuple[Contact]],
params: ContactQueryParams,
role: OrganizationRole,
user_id: int,
) -> Select[tuple[Contact]]:
if params.search:
pattern = f"%{params.search.lower()}%"
stmt = stmt.where(
func.lower(Contact.name).like(pattern)
| func.lower(func.coalesce(Contact.email, "")).like(pattern)
)
if params.owner_id is not None:
if role == OrganizationRole.MEMBER:
raise ContactAccessError("Members cannot filter by owner")
stmt = stmt.where(Contact.owner_id == params.owner_id)
return self._apply_role_clause(stmt, role, user_id)
def _apply_role_clause(
self,
stmt: Select[tuple[Contact]],
role: OrganizationRole,
user_id: int,
) -> Select[tuple[Contact]]:
if role == OrganizationRole.MEMBER:
return stmt.where(Contact.owner_id == user_id)
return stmt

View File

@ -0,0 +1,155 @@
"""Business logic for contact workflows."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from sqlalchemy import select
from app.models.contact import Contact, ContactCreate
from app.models.deal import Deal
from app.repositories.contact_repo import ContactAccessError, ContactQueryParams, ContactRepository
from app.services.organization_service import OrganizationContext
class ContactServiceError(Exception):
"""Base error for contact workflows."""
class ContactNotFoundError(ContactServiceError):
"""Raised when contact cannot be found within organization."""
class ContactForbiddenError(ContactServiceError):
"""Raised when user lacks permissions for the operation."""
class ContactOrganizationError(ContactServiceError):
"""Raised when attempting to operate outside current organization."""
class ContactDeletionError(ContactServiceError):
"""Raised when contact cannot be deleted due to business constraints."""
@dataclass(slots=True)
class ContactListFilters:
"""Filters accepted by contact list endpoint."""
page: int = 1
page_size: int = 20
search: str | None = None
owner_id: int | None = None
class _UnsetType:
__slots__ = ()
UNSET = _UnsetType()
@dataclass(slots=True)
class ContactUpdateData:
"""Subset of fields allowed during contact update."""
name: str | None | _UnsetType = UNSET
email: str | None | _UnsetType = UNSET
phone: str | None | _UnsetType = UNSET
class ContactService:
"""Encapsulates contact-specific business rules."""
def __init__(self, repository: ContactRepository) -> None:
self._repository = repository
async def list_contacts(
self,
*,
filters: ContactListFilters,
context: OrganizationContext,
) -> Sequence[Contact]:
params = ContactQueryParams(
organization_id=context.organization_id,
page=filters.page,
page_size=filters.page_size,
search=filters.search,
owner_id=filters.owner_id,
)
try:
return await self._repository.list(params=params, role=context.role, user_id=context.user_id)
except ContactAccessError as exc:
raise ContactForbiddenError(str(exc)) from exc
async def create_contact(
self,
data: ContactCreate,
*,
context: OrganizationContext,
) -> Contact:
self._ensure_same_organization(data.organization_id, context)
try:
return await self._repository.create(data, role=context.role, user_id=context.user_id)
except ContactAccessError as exc:
raise ContactForbiddenError(str(exc)) from exc
async def get_contact(
self,
contact_id: int,
*,
context: OrganizationContext,
) -> Contact:
contact = await self._repository.get(
contact_id,
organization_id=context.organization_id,
role=context.role,
user_id=context.user_id,
)
if contact is None:
raise ContactNotFoundError("Contact not found")
return contact
async def update_contact(
self,
contact: Contact,
updates: ContactUpdateData,
*,
context: OrganizationContext,
) -> Contact:
self._ensure_same_organization(contact.organization_id, context)
payload = self._build_update_mapping(updates)
if not payload:
return contact
try:
return await self._repository.update(contact, payload, role=context.role, user_id=context.user_id)
except ContactAccessError as exc:
raise ContactForbiddenError(str(exc)) from exc
async def delete_contact(self, contact: Contact, *, context: OrganizationContext) -> None:
self._ensure_same_organization(contact.organization_id, context)
await self._ensure_no_related_deals(contact_id=contact.id)
try:
await self._repository.delete(contact, role=context.role, user_id=context.user_id)
except ContactAccessError as exc:
raise ContactForbiddenError(str(exc)) from exc
def _ensure_same_organization(self, organization_id: int, context: OrganizationContext) -> None:
if organization_id != context.organization_id:
raise ContactOrganizationError("Contact belongs to another organization")
def _build_update_mapping(self, updates: ContactUpdateData) -> dict[str, str | None]:
payload: dict[str, str | None] = {}
if updates.name is not UNSET:
payload["name"] = updates.name
if updates.email is not UNSET:
payload["email"] = updates.email
if updates.phone is not UNSET:
payload["phone"] = updates.phone
return payload
async def _ensure_no_related_deals(self, contact_id: int) -> None:
stmt = select(Deal.id).where(Deal.contact_id == contact_id).limit(1)
result = await self._repository.session.scalar(stmt)
if result is not None:
raise ContactDeletionError("Contact has related deals and cannot be deleted")

View File

@ -0,0 +1,170 @@
"""API tests for contact endpoints."""
from __future__ import annotations
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.models.contact import Contact
from app.models.organization_member import OrganizationMember, OrganizationRole
from app.models.user import User
from tests.api.v1.task_activity_shared import auth_headers, make_token, prepare_scenario
@pytest.mark.asyncio
async def test_list_contacts_supports_search_and_pagination(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
async with session_factory() as session:
session.add_all(
[
Contact(
organization_id=scenario.organization_id,
owner_id=scenario.user_id,
name="Alpha Lead",
email="alpha@example.com",
phone=None,
),
Contact(
organization_id=scenario.organization_id,
owner_id=scenario.user_id,
name="Beta Prospect",
email="beta@example.com",
phone=None,
),
]
)
await session.commit()
response = await client.get(
"/api/v1/contacts/?page=1&page_size=10&search=alpha",
headers=auth_headers(token, scenario),
)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["name"] == "Alpha Lead"
@pytest.mark.asyncio
async def test_create_contact_returns_created_payload(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
response = await client.post(
"/api/v1/contacts/",
json={
"name": "New Contact",
"email": "new@example.com",
"phone": "+123",
"owner_id": scenario.user_id,
},
headers=auth_headers(token, scenario),
)
assert response.status_code == 201
payload = response.json()
assert payload["name"] == "New Contact"
assert payload["email"] == "new@example.com"
@pytest.mark.asyncio
async def test_member_cannot_assign_foreign_owner(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
async with session_factory() as session:
membership = await session.scalar(
select(OrganizationMember).where(
OrganizationMember.organization_id == scenario.organization_id,
OrganizationMember.user_id == scenario.user_id,
)
)
assert membership is not None
membership.role = OrganizationRole.MEMBER
other_user = User(
email="manager@example.com",
hashed_password="hashed",
name="Manager",
is_active=True,
)
session.add(other_user)
await session.flush()
session.add(
OrganizationMember(
organization_id=scenario.organization_id,
user_id=other_user.id,
role=OrganizationRole.ADMIN,
)
)
await session.commit()
response = await client.post(
"/api/v1/contacts/",
json={
"name": "Blocked",
"email": "blocked@example.com",
"owner_id": scenario.user_id + 1,
},
headers=auth_headers(token, scenario),
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_patch_contact_updates_fields(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
async with session_factory() as session:
contact = Contact(
organization_id=scenario.organization_id,
owner_id=scenario.user_id,
name="Old Name",
email="old@example.com",
phone="+111",
)
session.add(contact)
await session.commit()
contact_id = contact.id
response = await client.patch(
f"/api/v1/contacts/{contact_id}",
json={"name": "Updated", "phone": None},
headers=auth_headers(token, scenario),
)
assert response.status_code == 200
payload = response.json()
assert payload["name"] == "Updated"
assert payload["phone"] is None
@pytest.mark.asyncio
async def test_delete_contact_with_deals_returns_conflict(
session_factory: async_sessionmaker[AsyncSession], client: AsyncClient
) -> None:
scenario = await prepare_scenario(session_factory)
token = make_token(scenario.user_id, scenario.user_email)
response = await client.delete(
f"/api/v1/contacts/{scenario.contact_id}",
headers=auth_headers(token, scenario),
)
assert response.status_code == 409

View File

@ -0,0 +1,228 @@
"""Unit tests for ContactService."""
from __future__ import annotations
from collections.abc import AsyncGenerator
import uuid
import pytest
import pytest_asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import StaticPool
from app.models.base import Base
from app.models.contact import Contact, ContactCreate
from app.models.deal import Deal
from app.models.organization import Organization
from app.models.organization_member import OrganizationMember, OrganizationRole
from app.models.user import User
from app.repositories.contact_repo import ContactRepository
from app.services.contact_service import (
ContactDeletionError,
ContactForbiddenError,
ContactListFilters,
ContactService,
ContactUpdateData,
)
from app.services.organization_service import OrganizationContext
@pytest_asyncio.fixture()
async def session() -> AsyncGenerator[AsyncSession, None]:
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
future=True,
poolclass=StaticPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(engine, expire_on_commit=False)
async with factory() as session:
yield session
await engine.dispose()
def _make_user(label: str) -> User:
return User(
email=f"{label}-{uuid.uuid4()}@example.com",
hashed_password="hashed",
name=f"{label.title()} User",
is_active=True,
)
def _context_for(
*,
organization: Organization,
user: User,
role: OrganizationRole,
) -> OrganizationContext:
membership = OrganizationMember(organization_id=organization.id, user_id=user.id, role=role)
return OrganizationContext(organization=organization, membership=membership)
async def _setup_contact(
session: AsyncSession,
*,
role: OrganizationRole = OrganizationRole.MANAGER,
owner: User | None = None,
context_user: User | None = None,
) -> tuple[OrganizationContext, ContactRepository, Contact]:
organization = Organization(name=f"Org-{uuid.uuid4()}"[:8])
owner_user = owner or _make_user("owner")
ctx_user = context_user or owner_user
session.add_all([organization, owner_user])
if ctx_user is not owner_user:
session.add(ctx_user)
await session.flush()
contact = Contact(
organization_id=organization.id,
owner_id=owner_user.id,
name="John Doe",
email="john.doe@example.com",
phone="+100000000",
)
session.add(contact)
await session.flush()
context = _context_for(organization=organization, user=ctx_user, role=role)
repo = ContactRepository(session=session)
return context, repo, contact
@pytest.mark.asyncio
async def test_create_contact_honors_owner_override(session: AsyncSession) -> None:
context, repo, _ = await _setup_contact(session)
other_user = _make_user("other")
session.add(other_user)
await session.flush()
service = ContactService(repository=repo)
contact = await service.create_contact(
ContactCreate(
organization_id=context.organization_id,
owner_id=other_user.id,
name="Alice",
email="alice@example.com",
phone=None,
),
context=context,
)
assert contact.owner_id == other_user.id
assert contact.name == "Alice"
@pytest.mark.asyncio
async def test_member_cannot_create_foreign_owner(session: AsyncSession) -> None:
owner = _make_user("owner")
member = _make_user("member")
context, repo, _ = await _setup_contact(
session,
role=OrganizationRole.MEMBER,
owner=owner,
context_user=member,
)
service = ContactService(repository=repo)
with pytest.raises(ContactForbiddenError):
await service.create_contact(
ContactCreate(
organization_id=context.organization_id,
owner_id=owner.id,
name="Restricted",
email=None,
phone=None,
),
context=context,
)
@pytest.mark.asyncio
async def test_list_contacts_supports_search(session: AsyncSession) -> None:
context, repo, base_contact = await _setup_contact(session)
service = ContactService(repository=repo)
another = Contact(
organization_id=context.organization_id,
owner_id=base_contact.owner_id,
name="Searchable",
email="findme@example.com",
phone=None,
)
session.add(another)
await session.flush()
contacts = await service.list_contacts(
filters=ContactListFilters(search="search"),
context=context,
)
assert len(contacts) == 1
assert contacts[0].id == another.id
@pytest.mark.asyncio
async def test_member_owner_filter_forbidden(session: AsyncSession) -> None:
owner = _make_user("owner")
member = _make_user("member")
context, repo, _ = await _setup_contact(
session,
role=OrganizationRole.MEMBER,
owner=owner,
context_user=member,
)
service = ContactService(repository=repo)
with pytest.raises(ContactForbiddenError):
await service.list_contacts(
filters=ContactListFilters(owner_id=owner.id),
context=context,
)
@pytest.mark.asyncio
async def test_update_contact_allows_nullifying_fields(session: AsyncSession) -> None:
context, repo, contact = await _setup_contact(session)
service = ContactService(repository=repo)
updated = await service.update_contact(
contact,
ContactUpdateData(name="Updated", email=None, phone=None),
context=context,
)
assert updated.name == "Updated"
assert updated.email is None
assert updated.phone is None
@pytest.mark.asyncio
async def test_delete_contact_blocks_when_deals_exist(session: AsyncSession) -> None:
context, repo, contact = await _setup_contact(session)
service = ContactService(repository=repo)
session.add(
Deal(
organization_id=context.organization_id,
contact_id=contact.id,
owner_id=contact.owner_id,
title="Pending",
amount=None,
)
)
await session.flush()
with pytest.raises(ContactDeletionError):
await service.delete_contact(contact, context=context)
@pytest.mark.asyncio
async def test_delete_contact_succeeds_without_deals(session: AsyncSession) -> None:
context, repo, contact = await _setup_contact(session)
service = ContactService(repository=repo)
await service.delete_contact(contact, context=context)
result = await session.scalar(select(Contact).where(Contact.id == contact.id))
assert result is None