test_task_crm/app/api/deps.py

36 lines
1.1 KiB
Python

"""Reusable FastAPI dependencies."""
from collections.abc import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_session
from app.core.security import jwt_service, password_hasher
from app.repositories.user_repo import UserRepository
from app.services.auth_service import AuthService
from app.services.user_service import UserService
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
"""Provide a scoped database session."""
async for session in get_session():
yield session
def get_user_repository(session: AsyncSession = Depends(get_db_session)) -> UserRepository:
return UserRepository(session=session)
def get_user_service(repo: UserRepository = Depends(get_user_repository)) -> UserService:
return UserService(user_repository=repo, password_hasher=password_hasher)
def get_auth_service(
repo: UserRepository = Depends(get_user_repository),
) -> AuthService:
return AuthService(
user_repository=repo,
password_hasher=password_hasher,
jwt_service=jwt_service,
)