44 lines
1.5 KiB
Python
44 lines
1.5 KiB
Python
"""Authentication workflows."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
|
|
from app.core.config import settings
|
|
from app.core.security import JWTService, PasswordHasher
|
|
from app.models.token import TokenResponse
|
|
from app.models.user import User
|
|
from app.repositories.user_repo import UserRepository
|
|
|
|
|
|
class InvalidCredentialsError(Exception):
|
|
"""Raised when user authentication fails."""
|
|
|
|
|
|
class AuthService:
|
|
"""Handles authentication flows and token issuance."""
|
|
|
|
def __init__(
|
|
self,
|
|
user_repository: UserRepository,
|
|
password_hasher: PasswordHasher,
|
|
jwt_service: JWTService,
|
|
) -> None:
|
|
self._user_repository = user_repository
|
|
self._password_hasher = password_hasher
|
|
self._jwt_service = jwt_service
|
|
|
|
async def authenticate(self, email: str, password: str) -> User:
|
|
user = await self._user_repository.get_by_email(email)
|
|
if user is None or not self._password_hasher.verify(password, user.hashed_password):
|
|
raise InvalidCredentialsError("Invalid email or password")
|
|
return user
|
|
|
|
def create_access_token(self, user: User) -> TokenResponse:
|
|
expires_delta = timedelta(minutes=settings.access_token_expire_minutes)
|
|
token = self._jwt_service.create_access_token(
|
|
subject=str(user.id),
|
|
expires_delta=expires_delta,
|
|
claims={"email": user.email},
|
|
)
|
|
return TokenResponse(access_token=token, expires_in=int(expires_delta.total_seconds()))
|