test_task_crm/app/core/cache.py

161 lines
5.0 KiB
Python

"""Redis cache utilities and availability tracking."""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, Awaitable, Callable, Optional
import redis.asyncio as redis
from redis.asyncio.client import Redis
from redis.exceptions import RedisError
from app.core.config import settings
logger = logging.getLogger(__name__)
class RedisCacheManager:
"""Manages lifecycle and availability of the Redis cache client."""
def __init__(self) -> None:
self._client: Redis | None = None
self._available: bool = False
self._lock = asyncio.Lock()
@property
def is_enabled(self) -> bool:
return settings.redis_enabled
@property
def is_available(self) -> bool:
return self._available and self._client is not None
def get_client(self) -> Redis | None:
if not self.is_enabled:
return None
if self.is_available:
return self._client
return None
async def startup(self) -> None:
if not self.is_enabled:
return
async with self._lock:
if self._client is not None:
return
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
await self._refresh_availability()
async def shutdown(self) -> None:
async with self._lock:
if self._client is not None:
await self._client.close()
self._client = None
self._available = False
async def reconnect(self) -> None:
if not self.is_enabled:
return
async with self._lock:
if self._client is None:
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
await self._refresh_availability()
async def _refresh_availability(self) -> None:
if self._client is None:
self._available = False
return
try:
await self._client.ping()
except RedisError as exc: # pragma: no cover - logging only
self._available = False
logger.warning("Redis ping failed: %s", exc)
else:
self._available = True
def mark_unavailable(self) -> None:
self._available = False
def mark_available(self) -> None:
if self._client is not None:
self._available = True
cache_manager = RedisCacheManager()
async def init_cache() -> None:
"""Initialize Redis cache connection if enabled."""
await cache_manager.startup()
async def shutdown_cache() -> None:
"""Close Redis cache connection."""
await cache_manager.shutdown()
def get_cache_client() -> Optional[Redis]:
"""Expose the active Redis client for dependency injection."""
return cache_manager.get_client()
async def read_json(client: Redis, key: str) -> Any | None:
"""Read and decode JSON payload from Redis."""
try:
raw = await client.get(key)
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.debug("Redis GET failed for %s: %s", key, exc)
return None
if raw is None:
return None
cache_manager.mark_available()
try:
return json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc: # pragma: no cover - malformed payloads
logger.warning("Discarding malformed cache entry %s: %s", key, exc)
return None
async def write_json(client: Redis, key: str, value: Any, ttl_seconds: int, backoff_ms: int) -> None:
"""Serialize data to JSON and store it with TTL using retry/backoff."""
payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
async def _operation() -> Any:
return await client.set(name=key, value=payload, ex=ttl_seconds)
await _run_with_retry(_operation, backoff_ms)
async def delete_keys(client: Redis, keys: list[str], backoff_ms: int) -> None:
"""Delete cache keys with retry/backoff semantics."""
if not keys:
return
async def _operation() -> Any:
return await client.delete(*keys)
await _run_with_retry(_operation, backoff_ms)
async def _run_with_retry(operation: Callable[[], Awaitable[Any]], max_sleep_ms: int) -> None:
try:
await operation()
cache_manager.mark_available()
return
except RedisError as exc: # pragma: no cover - network errors
cache_manager.mark_unavailable()
logger.debug("Redis cache operation failed: %s", exc)
if max_sleep_ms <= 0:
return
sleep_seconds = min(max_sleep_ms / 1000, 0.1)
await asyncio.sleep(sleep_seconds)
await cache_manager.reconnect()
try:
await operation()
cache_manager.mark_available()
except RedisError as exc: # pragma: no cover - repeated network errors
cache_manager.mark_unavailable()
logger.warning("Redis cache operation failed after retry: %s", exc)