58 lines
1.7 KiB
Python
58 lines
1.7 KiB
Python
"""Security helpers for hashing passwords and issuing JWT tokens."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Mapping
|
|
|
|
import jwt
|
|
from passlib.context import CryptContext # type: ignore
|
|
|
|
from app.core.config import settings
|
|
|
|
|
|
class PasswordHasher:
|
|
"""Wraps passlib context to hash and verify secrets."""
|
|
|
|
def __init__(self) -> None:
|
|
self._context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
|
|
|
|
def hash(self, password: str) -> str:
|
|
return self._context.hash(password)
|
|
|
|
def verify(self, password: str, hashed_password: str) -> bool:
|
|
return self._context.verify(password, hashed_password)
|
|
|
|
|
|
class JWTService:
|
|
"""Handles encoding and decoding of JWT access tokens."""
|
|
|
|
def __init__(self, secret_key: str, algorithm: str) -> None:
|
|
self._secret_key = secret_key
|
|
self._algorithm = algorithm
|
|
|
|
def create_access_token(
|
|
self,
|
|
subject: str,
|
|
expires_delta: timedelta,
|
|
claims: Mapping[str, Any] | None = None,
|
|
) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
payload: dict[str, Any] = {
|
|
"sub": subject,
|
|
"iat": now,
|
|
"exp": now + expires_delta,
|
|
}
|
|
if claims:
|
|
payload.update(claims)
|
|
return jwt.encode(payload, self._secret_key, algorithm=self._algorithm)
|
|
|
|
def decode(self, token: str) -> dict[str, Any]:
|
|
return jwt.decode(token, self._secret_key, algorithms=[self._algorithm])
|
|
|
|
|
|
password_hasher = PasswordHasher()
|
|
jwt_service = JWTService(
|
|
secret_key=settings.jwt_secret_key.get_secret_value(),
|
|
algorithm=settings.jwt_algorithm,
|
|
)
|