test_task_crm/app/services/organization_service.py

122 lines
4.4 KiB
Python

"""Organization-related business rules."""
from __future__ import annotations
from dataclasses import dataclass
from app.models.organization import Organization
from app.models.organization_member import OrganizationMember, OrganizationRole
from app.repositories.org_repo import OrganizationRepository
class OrganizationServiceError(Exception):
"""Base class for organization service errors."""
class OrganizationContextMissingError(OrganizationServiceError):
"""Raised when the request lacks organization context."""
class OrganizationAccessDeniedError(OrganizationServiceError):
"""Raised when a user tries to work with a foreign organization."""
class OrganizationForbiddenError(OrganizationServiceError):
"""Raised when a user does not have enough privileges."""
class OrganizationMemberAlreadyExistsError(OrganizationServiceError):
"""Raised when attempting to add a duplicate organization member."""
@dataclass(slots=True, frozen=True)
class OrganizationContext:
"""Resolved organization and membership information for a request."""
organization: Organization
membership: OrganizationMember
@property
def organization_id(self) -> int:
return self.organization.id
@property
def role(self) -> OrganizationRole:
return self.membership.role
@property
def user_id(self) -> int:
return self.membership.user_id
class OrganizationService:
"""Encapsulates organization-specific policies."""
def __init__(self, repository: OrganizationRepository) -> None:
self._repository = repository
async def get_context(
self, *, user_id: int, organization_id: int | None
) -> OrganizationContext:
"""Resolve request context ensuring the user belongs to the given organization."""
if organization_id is None:
raise OrganizationContextMissingError("X-Organization-Id header is required")
membership = await self._repository.get_membership(organization_id, user_id)
if membership is None or membership.organization is None:
raise OrganizationAccessDeniedError("Organization not found")
return OrganizationContext(organization=membership.organization, membership=membership)
def ensure_entity_in_context(
self, *, entity_organization_id: int, context: OrganizationContext
) -> None:
"""Make sure a resource belongs to the current organization."""
if entity_organization_id != context.organization_id:
raise OrganizationAccessDeniedError("Resource belongs to another organization")
def ensure_can_manage_settings(self, context: OrganizationContext) -> None:
"""Allow only owner/admin to change organization-level settings."""
if context.role not in {OrganizationRole.OWNER, OrganizationRole.ADMIN}:
raise OrganizationForbiddenError("Only owner/admin can modify organization settings")
def ensure_can_manage_entity(self, context: OrganizationContext) -> None:
"""Managers/admins/owners may manage entities; members are restricted."""
if context.role == OrganizationRole.MEMBER:
raise OrganizationForbiddenError("Members cannot manage shared entities")
def ensure_member_owns_entity(self, *, context: OrganizationContext, owner_id: int) -> None:
"""Members can only mutate entities they own (contacts/deals/tasks)."""
if context.role == OrganizationRole.MEMBER and owner_id != context.user_id:
raise OrganizationForbiddenError("Members can only modify their own records")
async def add_member(
self,
*,
context: OrganizationContext,
user_id: int,
role: OrganizationRole,
) -> OrganizationMember:
"""Add a user to the current organization enforced by permissions."""
self.ensure_can_manage_settings(context)
existing = await self._repository.get_membership(context.organization_id, user_id)
if existing is not None:
raise OrganizationMemberAlreadyExistsError("User already belongs to this organization")
membership = OrganizationMember(
organization_id=context.organization_id,
user_id=user_id,
role=role,
)
self._repository.session.add(membership)
await self._repository.session.commit()
await self._repository.session.refresh(membership)
return membership