"""Repository helpers for contacts with role-aware access.""" from __future__ import annotations from collections.abc import Mapping, Sequence from dataclasses import dataclass from typing import Any from sqlalchemy import Select, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.contact import Contact, ContactCreate from app.models.organization_member import OrganizationRole class ContactAccessError(Exception): """Raised when attempting operations without sufficient permissions.""" @dataclass(slots=True) class ContactQueryParams: """Filters accepted by contact list queries.""" organization_id: int page: int = 1 page_size: int = 20 search: str | None = None owner_id: int | None = None class ContactRepository: """Provides CRUD helpers for Contact entities.""" def __init__(self, session: AsyncSession) -> None: self._session = session @property def session(self) -> AsyncSession: return self._session async def list( self, *, params: ContactQueryParams, role: OrganizationRole, user_id: int, ) -> Sequence[Contact]: stmt: Select[tuple[Contact]] = select(Contact).where(Contact.organization_id == params.organization_id) stmt = self._apply_filters(stmt, params, role, user_id) offset = (max(params.page, 1) - 1) * params.page_size stmt = stmt.order_by(Contact.created_at.desc()).offset(offset).limit(params.page_size) result = await self._session.scalars(stmt) return result.all() async def get( self, contact_id: int, *, organization_id: int, role: OrganizationRole, user_id: int, ) -> Contact | None: stmt = select(Contact).where(Contact.id == contact_id, Contact.organization_id == organization_id) result = await self._session.scalars(stmt) return result.first() async def create( self, data: ContactCreate, *, role: OrganizationRole, user_id: int, ) -> Contact: if role == OrganizationRole.MEMBER and data.owner_id != user_id: raise ContactAccessError("Members can only create contacts they own") contact = Contact(**data.model_dump()) self._session.add(contact) await self._session.flush() return contact async def update( self, contact: Contact, updates: Mapping[str, Any], *, role: OrganizationRole, user_id: int, ) -> Contact: if role == OrganizationRole.MEMBER and contact.owner_id != user_id: raise ContactAccessError("Members can only modify their own contacts") for field, value in updates.items(): if hasattr(contact, field): setattr(contact, field, value) await self._session.flush() await self._session.refresh(contact) return contact async def delete( self, contact: Contact, *, role: OrganizationRole, user_id: int, ) -> None: if role == OrganizationRole.MEMBER and contact.owner_id != user_id: raise ContactAccessError("Members can only delete their own contacts") await self._session.delete(contact) await self._session.flush() def _apply_filters( self, stmt: Select[tuple[Contact]], params: ContactQueryParams, role: OrganizationRole, user_id: int, ) -> Select[tuple[Contact]]: if params.search: pattern = f"%{params.search.lower()}%" stmt = stmt.where( func.lower(Contact.name).like(pattern) | func.lower(func.coalesce(Contact.email, "")).like(pattern) ) if params.owner_id is not None: if role == OrganizationRole.MEMBER: raise ContactAccessError("Members cannot filter by owner") stmt = stmt.where(Contact.owner_id == params.owner_id) return stmt