test_task_crm/app/services/auth_service.py

88 lines
3.1 KiB
Python

"""Authentication workflows."""
from __future__ import annotations
from datetime import timedelta
from typing import Any
import jwt
from app.core.config import settings
from app.core.security import JWTService, PasswordHasher
from app.models.token import TokenResponse
from app.models.user import User
from app.repositories.user_repo import UserRepository
class InvalidCredentialsError(Exception):
"""Raised when user authentication fails."""
class InvalidRefreshTokenError(Exception):
"""Raised when refresh token validation fails."""
class AuthService:
"""Handles authentication flows and token issuance."""
def __init__(
self,
user_repository: UserRepository,
password_hasher: PasswordHasher,
jwt_service: JWTService,
) -> None:
self._user_repository = user_repository
self._password_hasher = password_hasher
self._jwt_service = jwt_service
async def authenticate(self, email: str, password: str) -> User:
user = await self._user_repository.get_by_email(email)
if user is None or not self._password_hasher.verify(password, user.hashed_password):
raise InvalidCredentialsError("Invalid email or password")
return user
def issue_tokens(self, user: User) -> TokenResponse:
access_expires = timedelta(minutes=settings.access_token_expire_minutes)
refresh_expires = timedelta(days=settings.refresh_token_expire_days)
access_token = self._jwt_service.create_access_token(
subject=str(user.id),
expires_delta=access_expires,
claims={"email": user.email, "scope": "access"},
)
refresh_token = self._jwt_service.create_access_token(
subject=str(user.id),
expires_delta=refresh_expires,
claims={"scope": "refresh"},
)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=int(access_expires.total_seconds()),
refresh_expires_in=int(refresh_expires.total_seconds()),
)
async def refresh_tokens(self, refresh_token: str) -> TokenResponse:
payload = self._decode_refresh_token(refresh_token)
sub = payload.get("sub")
if sub is None:
raise InvalidRefreshTokenError("Invalid refresh token")
try:
user_id = int(sub)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive
raise InvalidRefreshTokenError("Invalid refresh token") from exc
user = await self._user_repository.get_by_id(user_id)
if user is None:
raise InvalidRefreshTokenError("Invalid refresh token")
return self.issue_tokens(user)
def _decode_refresh_token(self, token: str) -> dict[str, Any]:
try:
payload = self._jwt_service.decode(token)
except jwt.PyJWTError as exc:
raise InvalidRefreshTokenError("Invalid refresh token") from exc
if payload.get("scope") != "refresh":
raise InvalidRefreshTokenError("Invalid refresh token")
return payload