test_task_crm/app/api/deps.py

75 lines
2.4 KiB
Python

"""Reusable FastAPI dependencies."""
from collections.abc import AsyncGenerator
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_session
from app.core.security import jwt_service, password_hasher
from app.models.user import User
from app.repositories.deal_repo import DealRepository
from app.repositories.org_repo import OrganizationRepository
from app.repositories.user_repo import UserRepository
from app.services.auth_service import AuthService
from app.services.user_service import UserService
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api_v1_prefix}/auth/token")
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
"""Provide a scoped database session."""
async for session in get_session():
yield session
def get_user_repository(session: AsyncSession = Depends(get_db_session)) -> UserRepository:
return UserRepository(session=session)
def get_organization_repository(session: AsyncSession = Depends(get_db_session)) -> OrganizationRepository:
return OrganizationRepository(session=session)
def get_deal_repository(session: AsyncSession = Depends(get_db_session)) -> DealRepository:
return DealRepository(session=session)
def get_user_service(repo: UserRepository = Depends(get_user_repository)) -> UserService:
return UserService(user_repository=repo, password_hasher=password_hasher)
def get_auth_service(
repo: UserRepository = Depends(get_user_repository),
) -> AuthService:
return AuthService(
user_repository=repo,
password_hasher=password_hasher,
jwt_service=jwt_service,
)
async def get_current_user(
token: str = Depends(oauth2_scheme),
repo: UserRepository = Depends(get_user_repository),
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt_service.decode(token)
sub = payload.get("sub")
if sub is None:
raise credentials_exception
user_id = int(sub)
except (jwt.PyJWTError, TypeError, ValueError):
raise credentials_exception from None
user = await repo.get_by_id(user_id)
if user is None:
raise credentials_exception
return user