"""Authentication workflows.""" from __future__ import annotations from datetime import timedelta from typing import Any import jwt from app.core.config import settings from app.core.security import JWTService, PasswordHasher from app.models.token import TokenResponse from app.models.user import User from app.repositories.user_repo import UserRepository class InvalidCredentialsError(Exception): """Raised when user authentication fails.""" class InvalidRefreshTokenError(Exception): """Raised when refresh token validation fails.""" class AuthService: """Handles authentication flows and token issuance.""" def __init__( self, user_repository: UserRepository, password_hasher: PasswordHasher, jwt_service: JWTService, ) -> None: self._user_repository = user_repository self._password_hasher = password_hasher self._jwt_service = jwt_service async def authenticate(self, email: str, password: str) -> User: user = await self._user_repository.get_by_email(email) if user is None or not self._password_hasher.verify(password, user.hashed_password): raise InvalidCredentialsError("Invalid email or password") return user def issue_tokens(self, user: User) -> TokenResponse: access_expires = timedelta(minutes=settings.access_token_expire_minutes) refresh_expires = timedelta(days=settings.refresh_token_expire_days) access_token = self._jwt_service.create_access_token( subject=str(user.id), expires_delta=access_expires, claims={"email": user.email, "scope": "access"}, ) refresh_token = self._jwt_service.create_access_token( subject=str(user.id), expires_delta=refresh_expires, claims={"scope": "refresh"}, ) return TokenResponse( access_token=access_token, refresh_token=refresh_token, expires_in=int(access_expires.total_seconds()), refresh_expires_in=int(refresh_expires.total_seconds()), ) async def refresh_tokens(self, refresh_token: str) -> TokenResponse: payload = self._decode_refresh_token(refresh_token) sub = payload.get("sub") if sub is None: raise InvalidRefreshTokenError("Invalid refresh token") try: user_id = int(sub) except (TypeError, ValueError) as exc: # pragma: no cover - defensive raise InvalidRefreshTokenError("Invalid refresh token") from exc user = await self._user_repository.get_by_id(user_id) if user is None: raise InvalidRefreshTokenError("Invalid refresh token") return self.issue_tokens(user) def _decode_refresh_token(self, token: str) -> dict[str, Any]: try: payload = self._jwt_service.decode(token) except jwt.PyJWTError as exc: raise InvalidRefreshTokenError("Invalid refresh token") from exc if payload.get("scope") != "refresh": raise InvalidRefreshTokenError("Invalid refresh token") return payload