161 lines
5.0 KiB
Python
161 lines
5.0 KiB
Python
"""Redis cache utilities and availability tracking."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any, Awaitable, Callable, Optional
|
|
|
|
import redis.asyncio as redis
|
|
from redis.asyncio.client import Redis
|
|
from redis.exceptions import RedisError
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RedisCacheManager:
|
|
"""Manages lifecycle and availability of the Redis cache client."""
|
|
|
|
def __init__(self) -> None:
|
|
self._client: Redis | None = None
|
|
self._available: bool = False
|
|
self._lock = asyncio.Lock()
|
|
|
|
@property
|
|
def is_enabled(self) -> bool:
|
|
return settings.redis_enabled
|
|
|
|
@property
|
|
def is_available(self) -> bool:
|
|
return self._available and self._client is not None
|
|
|
|
def get_client(self) -> Redis | None:
|
|
if not self.is_enabled:
|
|
return None
|
|
if self.is_available:
|
|
return self._client
|
|
return None
|
|
|
|
async def startup(self) -> None:
|
|
if not self.is_enabled:
|
|
return
|
|
async with self._lock:
|
|
if self._client is not None:
|
|
return
|
|
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
|
|
await self._refresh_availability()
|
|
|
|
async def shutdown(self) -> None:
|
|
async with self._lock:
|
|
if self._client is not None:
|
|
await self._client.close()
|
|
self._client = None
|
|
self._available = False
|
|
|
|
async def reconnect(self) -> None:
|
|
if not self.is_enabled:
|
|
return
|
|
async with self._lock:
|
|
if self._client is None:
|
|
self._client = redis.from_url(settings.redis_url, encoding="utf-8", decode_responses=False)
|
|
await self._refresh_availability()
|
|
|
|
async def _refresh_availability(self) -> None:
|
|
if self._client is None:
|
|
self._available = False
|
|
return
|
|
try:
|
|
await self._client.ping()
|
|
except RedisError as exc: # pragma: no cover - logging only
|
|
self._available = False
|
|
logger.warning("Redis ping failed: %s", exc)
|
|
else:
|
|
self._available = True
|
|
|
|
def mark_unavailable(self) -> None:
|
|
self._available = False
|
|
|
|
def mark_available(self) -> None:
|
|
if self._client is not None:
|
|
self._available = True
|
|
|
|
|
|
cache_manager = RedisCacheManager()
|
|
|
|
|
|
async def init_cache() -> None:
|
|
"""Initialize Redis cache connection if enabled."""
|
|
await cache_manager.startup()
|
|
|
|
|
|
async def shutdown_cache() -> None:
|
|
"""Close Redis cache connection."""
|
|
await cache_manager.shutdown()
|
|
|
|
|
|
def get_cache_client() -> Optional[Redis]:
|
|
"""Expose the active Redis client for dependency injection."""
|
|
return cache_manager.get_client()
|
|
|
|
|
|
async def read_json(client: Redis, key: str) -> Any | None:
|
|
"""Read and decode JSON payload from Redis."""
|
|
try:
|
|
raw = await client.get(key)
|
|
except RedisError as exc: # pragma: no cover - network errors
|
|
cache_manager.mark_unavailable()
|
|
logger.debug("Redis GET failed for %s: %s", key, exc)
|
|
return None
|
|
if raw is None:
|
|
return None
|
|
cache_manager.mark_available()
|
|
try:
|
|
return json.loads(raw.decode("utf-8"))
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as exc: # pragma: no cover - malformed payloads
|
|
logger.warning("Discarding malformed cache entry %s: %s", key, exc)
|
|
return None
|
|
|
|
|
|
async def write_json(client: Redis, key: str, value: Any, ttl_seconds: int, backoff_ms: int) -> None:
|
|
"""Serialize data to JSON and store it with TTL using retry/backoff."""
|
|
payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
|
|
|
|
async def _operation() -> Any:
|
|
return await client.set(name=key, value=payload, ex=ttl_seconds)
|
|
|
|
await _run_with_retry(_operation, backoff_ms)
|
|
|
|
|
|
async def delete_keys(client: Redis, keys: list[str], backoff_ms: int) -> None:
|
|
"""Delete cache keys with retry/backoff semantics."""
|
|
if not keys:
|
|
return
|
|
|
|
async def _operation() -> Any:
|
|
return await client.delete(*keys)
|
|
|
|
await _run_with_retry(_operation, backoff_ms)
|
|
|
|
|
|
async def _run_with_retry(operation: Callable[[], Awaitable[Any]], max_sleep_ms: int) -> None:
|
|
try:
|
|
await operation()
|
|
cache_manager.mark_available()
|
|
return
|
|
except RedisError as exc: # pragma: no cover - network errors
|
|
cache_manager.mark_unavailable()
|
|
logger.debug("Redis cache operation failed: %s", exc)
|
|
if max_sleep_ms <= 0:
|
|
return
|
|
sleep_seconds = min(max_sleep_ms / 1000, 0.1)
|
|
await asyncio.sleep(sleep_seconds)
|
|
await cache_manager.reconnect()
|
|
try:
|
|
await operation()
|
|
cache_manager.mark_available()
|
|
except RedisError as exc: # pragma: no cover - repeated network errors
|
|
cache_manager.mark_unavailable()
|
|
logger.warning("Redis cache operation failed after retry: %s", exc)
|