test_task_crm/app/repositories/org_repo.py

63 lines
2.2 KiB
Python

"""Organization repository for database operations."""
from __future__ import annotations
from collections.abc import Sequence
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.organization import Organization, OrganizationCreate
from app.models.organization_member import OrganizationMember
class OrganizationRepository:
"""Provides CRUD helpers for Organization model."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
@property
def session(self) -> AsyncSession:
return self._session
async def list(self) -> Sequence[Organization]:
result = await self._session.scalars(select(Organization))
return result.all()
async def get_by_id(self, organization_id: int) -> Organization | None:
return await self._session.get(Organization, organization_id)
async def get_by_name(self, name: str) -> Organization | None:
stmt = select(Organization).where(Organization.name == name)
result = await self._session.scalars(stmt)
return result.first()
async def list_for_user(self, user_id: int) -> Sequence[Organization]:
stmt = (
select(Organization)
.join(OrganizationMember, OrganizationMember.organization_id == Organization.id)
.where(OrganizationMember.user_id == user_id)
.order_by(Organization.id)
)
result = await self._session.scalars(stmt)
return result.unique().all()
async def get_membership(self, organization_id: int, user_id: int) -> OrganizationMember | None:
stmt = (
select(OrganizationMember)
.where(
OrganizationMember.organization_id == organization_id,
OrganizationMember.user_id == user_id,
)
.options(selectinload(OrganizationMember.organization))
)
result = await self._session.scalars(stmt)
return result.first()
async def create(self, data: OrganizationCreate) -> Organization:
organization = Organization(name=data.name)
self._session.add(organization)
await self._session.flush()
return organization