test_task_crm/app/services/auth_service.py

44 lines
1.5 KiB
Python

"""Authentication workflows."""
from __future__ import annotations
from datetime import timedelta
from app.core.config import settings
from app.core.security import JWTService, PasswordHasher
from app.models.token import TokenResponse
from app.models.user import User
from app.repositories.user_repo import UserRepository
class InvalidCredentialsError(Exception):
"""Raised when user authentication fails."""
class AuthService:
"""Handles authentication flows and token issuance."""
def __init__(
self,
user_repository: UserRepository,
password_hasher: PasswordHasher,
jwt_service: JWTService,
) -> None:
self._user_repository = user_repository
self._password_hasher = password_hasher
self._jwt_service = jwt_service
async def authenticate(self, email: str, password: str) -> User:
user = await self._user_repository.get_by_email(email)
if user is None or not self._password_hasher.verify(password, user.hashed_password):
raise InvalidCredentialsError("Invalid email or password")
return user
def create_access_token(self, user: User) -> TokenResponse:
expires_delta = timedelta(minutes=settings.access_token_expire_minutes)
token = self._jwt_service.create_access_token(
subject=str(user.id),
expires_delta=expires_delta,
claims={"email": user.email},
)
return TokenResponse(access_token=token, expires_in=int(expires_delta.total_seconds()))