167 lines
5.3 KiB
Python
167 lines
5.3 KiB
Python
"""Business logic for contact workflows."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
from dataclasses import dataclass
|
|
from typing import cast
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.models.contact import Contact, ContactCreate
|
|
from app.models.deal import Deal
|
|
from app.repositories.contact_repo import ContactAccessError, ContactQueryParams, ContactRepository
|
|
from app.services.organization_service import OrganizationContext
|
|
|
|
|
|
class ContactServiceError(Exception):
|
|
"""Base error for contact workflows."""
|
|
|
|
|
|
class ContactNotFoundError(ContactServiceError):
|
|
"""Raised when contact cannot be found within organization."""
|
|
|
|
|
|
class ContactForbiddenError(ContactServiceError):
|
|
"""Raised when user lacks permissions for the operation."""
|
|
|
|
|
|
class ContactOrganizationError(ContactServiceError):
|
|
"""Raised when attempting to operate outside current organization."""
|
|
|
|
|
|
class ContactDeletionError(ContactServiceError):
|
|
"""Raised when contact cannot be deleted due to business constraints."""
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ContactListFilters:
|
|
"""Filters accepted by contact list endpoint."""
|
|
|
|
page: int = 1
|
|
page_size: int = 20
|
|
search: str | None = None
|
|
owner_id: int | None = None
|
|
|
|
|
|
class _UnsetType:
|
|
__slots__ = ()
|
|
|
|
|
|
UNSET = _UnsetType()
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ContactUpdateData:
|
|
"""Subset of fields allowed during contact update."""
|
|
|
|
name: str | None | _UnsetType = UNSET
|
|
email: str | None | _UnsetType = UNSET
|
|
phone: str | None | _UnsetType = UNSET
|
|
|
|
|
|
class ContactService:
|
|
"""Encapsulates contact-specific business rules."""
|
|
|
|
def __init__(self, repository: ContactRepository) -> None:
|
|
self._repository = repository
|
|
|
|
async def list_contacts(
|
|
self,
|
|
*,
|
|
filters: ContactListFilters,
|
|
context: OrganizationContext,
|
|
) -> Sequence[Contact]:
|
|
params = ContactQueryParams(
|
|
organization_id=context.organization_id,
|
|
page=filters.page,
|
|
page_size=filters.page_size,
|
|
search=filters.search,
|
|
owner_id=filters.owner_id,
|
|
)
|
|
try:
|
|
return await self._repository.list(
|
|
params=params,
|
|
role=context.role,
|
|
user_id=context.user_id,
|
|
)
|
|
except ContactAccessError as exc:
|
|
raise ContactForbiddenError(str(exc)) from exc
|
|
|
|
async def create_contact(
|
|
self,
|
|
data: ContactCreate,
|
|
*,
|
|
context: OrganizationContext,
|
|
) -> Contact:
|
|
self._ensure_same_organization(data.organization_id, context)
|
|
try:
|
|
return await self._repository.create(data, role=context.role, user_id=context.user_id)
|
|
except ContactAccessError as exc:
|
|
raise ContactForbiddenError(str(exc)) from exc
|
|
|
|
async def get_contact(
|
|
self,
|
|
contact_id: int,
|
|
*,
|
|
context: OrganizationContext,
|
|
) -> Contact:
|
|
contact = await self._repository.get(
|
|
contact_id,
|
|
organization_id=context.organization_id,
|
|
role=context.role,
|
|
user_id=context.user_id,
|
|
)
|
|
if contact is None:
|
|
raise ContactNotFoundError("Contact not found")
|
|
return contact
|
|
|
|
async def update_contact(
|
|
self,
|
|
contact: Contact,
|
|
updates: ContactUpdateData,
|
|
*,
|
|
context: OrganizationContext,
|
|
) -> Contact:
|
|
self._ensure_same_organization(contact.organization_id, context)
|
|
payload = self._build_update_mapping(updates)
|
|
if not payload:
|
|
return contact
|
|
try:
|
|
return await self._repository.update(
|
|
contact,
|
|
payload,
|
|
role=context.role,
|
|
user_id=context.user_id,
|
|
)
|
|
except ContactAccessError as exc:
|
|
raise ContactForbiddenError(str(exc)) from exc
|
|
|
|
async def delete_contact(self, contact: Contact, *, context: OrganizationContext) -> None:
|
|
self._ensure_same_organization(contact.organization_id, context)
|
|
await self._ensure_no_related_deals(contact_id=contact.id)
|
|
try:
|
|
await self._repository.delete(contact, role=context.role, user_id=context.user_id)
|
|
except ContactAccessError as exc:
|
|
raise ContactForbiddenError(str(exc)) from exc
|
|
|
|
def _ensure_same_organization(self, organization_id: int, context: OrganizationContext) -> None:
|
|
if organization_id != context.organization_id:
|
|
raise ContactOrganizationError("Contact belongs to another organization")
|
|
|
|
def _build_update_mapping(self, updates: ContactUpdateData) -> dict[str, str | None]:
|
|
payload: dict[str, str | None] = {}
|
|
if updates.name is not UNSET:
|
|
payload["name"] = cast(str | None, updates.name)
|
|
if updates.email is not UNSET:
|
|
payload["email"] = cast(str | None, updates.email)
|
|
if updates.phone is not UNSET:
|
|
payload["phone"] = cast(str | None, updates.phone)
|
|
return payload
|
|
|
|
async def _ensure_no_related_deals(self, contact_id: int) -> None:
|
|
stmt = select(Deal.id).where(Deal.contact_id == contact_id).limit(1)
|
|
result = await self._repository.session.scalar(stmt)
|
|
if result is not None:
|
|
raise ContactDeletionError("Contact has related deals and cannot be deleted")
|