test_task_crm/app/services/user_service.py

49 lines
1.6 KiB
Python

"""User-related business logic."""
from __future__ import annotations
from collections.abc import Sequence
from app.core.security import PasswordHasher
from app.models.user import User, UserCreate
from app.repositories.user_repo import UserRepository
class UserServiceError(Exception):
"""Base class for user service errors."""
class UserAlreadyExistsError(UserServiceError):
"""Raised when attempting to create a user with duplicate email."""
class UserNotFoundError(UserServiceError):
"""Raised when user record cannot be located."""
class UserService:
"""Encapsulates user-related workflows."""
def __init__(self, user_repository: UserRepository, password_hasher: PasswordHasher) -> None:
self._repository = user_repository
self._password_hasher = password_hasher
async def list_users(self) -> Sequence[User]:
return await self._repository.list()
async def get_user(self, user_id: int) -> User:
user = await self._repository.get_by_id(user_id)
if user is None:
raise UserNotFoundError(f"User {user_id} not found")
return user
async def create_user(self, data: UserCreate) -> User:
existing = await self._repository.get_by_email(data.email)
if existing is not None:
raise UserAlreadyExistsError(f"User {data.email} already exists")
hashed_password = self._password_hasher.hash(data.password)
user = await self._repository.create(data=data, hashed_password=hashed_password)
await self._repository.session.commit()
await self._repository.session.refresh(user)
return user