Módulo 18 • 16 min de lectura
18 - Caching Strategies
Redis, cache-aside, write-through, invalidation patterns y cache de respuestas HTTP.
#caching
#redis
#performance
#patterns
#optimization
1. Patrones de Caching
Cache-Aside (Lazy Loading)
[Request] → [Check Cache] → Miss → [Query DB] → [Store in Cache] → [Return]
↓
Hit → [Return from Cache]
Write-Through
[Write Request] → [Write to Cache] → [Write to DB] → [Return]
Write-Behind (Write-Back)
[Write Request] → [Write to Cache] → [Return]
↓
[Async Write to DB]
2. Configuración de Redis
# core/redis.py
import redis.asyncio as redis
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from app.core.config import settings
redis_pool: redis.ConnectionPool | None = None
async def init_redis_pool():
global redis_pool
redis_pool = redis.ConnectionPool.from_url(
settings.REDIS_URL,
max_connections=50,
decode_responses=True, # Strings en lugar de bytes
)
async def close_redis_pool():
global redis_pool
if redis_pool:
await redis_pool.disconnect()
@asynccontextmanager
async def get_redis() -> AsyncGenerator[redis.Redis, None]:
client = redis.Redis(connection_pool=redis_pool)
try:
yield client
finally:
await client.close()
# Lifespan para FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_redis_pool()
yield
await close_redis_pool()
app = FastAPI(lifespan=lifespan)
Dependency
from typing import Annotated
from fastapi import Depends
async def get_redis_client() -> AsyncGenerator[redis.Redis, None]:
async with get_redis() as client:
yield client
RedisClient = Annotated[redis.Redis, Depends(get_redis_client)]
3. Cache-Aside Pattern
# services/cache.py
import json
from typing import TypeVar, Callable, Any
from datetime import timedelta
import redis.asyncio as redis
T = TypeVar("T")
class CacheService:
def __init__(self, redis: redis.Redis, prefix: str = "cache"):
self.redis = redis
self.prefix = prefix
def _key(self, key: str) -> str:
return f"{self.prefix}:{key}"
async def get(self, key: str) -> Any | None:
"""Obtener valor del cache."""
data = await self.redis.get(self._key(key))
if data:
return json.loads(data)
return None
async def set(
self,
key: str,
value: Any,
ttl: int | timedelta = 300,
) -> None:
"""Guardar valor en cache."""
if isinstance(ttl, timedelta):
ttl = int(ttl.total_seconds())
await self.redis.set(
self._key(key),
json.dumps(value, default=str),
ex=ttl,
)
async def delete(self, key: str) -> None:
"""Invalidar cache."""
await self.redis.delete(self._key(key))
async def delete_pattern(self, pattern: str) -> int:
"""Invalidar múltiples keys por patrón."""
keys = []
async for key in self.redis.scan_iter(self._key(pattern)):
keys.append(key)
if keys:
return await self.redis.delete(*keys)
return 0
async def get_or_set(
self,
key: str,
factory: Callable[[], T],
ttl: int = 300,
) -> T:
"""Cache-aside: obtener del cache o ejecutar factory."""
cached = await self.get(key)
if cached is not None:
return cached
# Cache miss: ejecutar factory
value = await factory() if callable(factory) else factory
await self.set(key, value, ttl)
return value
Uso en Servicios
class UserService:
def __init__(self, repo: UserRepository, cache: CacheService):
self.repo = repo
self.cache = cache
async def get_user(self, user_id: int) -> User:
cache_key = f"user:{user_id}"
# Cache-aside
user_data = await self.cache.get_or_set(
cache_key,
lambda: self._fetch_user(user_id),
ttl=300, # 5 minutos
)
return User.model_validate(user_data)
async def _fetch_user(self, user_id: int) -> dict:
user = await self.repo.get(user_id)
if not user:
raise NotFoundError(f"User {user_id} not found")
return user.model_dump()
async def update_user(self, user_id: int, data: UserUpdate) -> User:
user = await self.repo.update(user_id, data.model_dump(exclude_unset=True))
# Invalidar cache después de update
await self.cache.delete(f"user:{user_id}")
return user
4. Decorator de Cache
from functools import wraps
from typing import Callable, Any
import hashlib
import json
def cached(
ttl: int = 300,
key_prefix: str = "",
key_builder: Callable[..., str] | None = None,
):
"""Decorator para cachear resultados de funciones async."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Obtener cache service del primer argumento (self)
cache = getattr(args[0], 'cache', None)
if not cache:
return await func(*args, **kwargs)
# Construir cache key
if key_builder:
cache_key = key_builder(*args, **kwargs)
else:
# Default: hash de argumentos
key_data = json.dumps(
{"args": args[1:], "kwargs": kwargs},
default=str,
sort_keys=True,
)
key_hash = hashlib.md5(key_data.encode()).hexdigest()[:16]
cache_key = f"{key_prefix or func.__name__}:{key_hash}"
# Cache-aside
return await cache.get_or_set(
cache_key,
lambda: func(*args, **kwargs),
ttl=ttl,
)
return wrapper
return decorator
# Uso
class ProductService:
def __init__(self, repo: ProductRepository, cache: CacheService):
self.repo = repo
self.cache = cache
@cached(ttl=600, key_prefix="products")
async def list_products(self, category: str, page: int = 1) -> list[Product]:
return await self.repo.list(category=category, page=page)
@cached(
ttl=300,
key_builder=lambda self, product_id: f"product:{product_id}",
)
async def get_product(self, product_id: int) -> Product:
return await self.repo.get(product_id)
5. HTTP Response Caching
Cache-Control Headers
from fastapi import Response
from fastapi.responses import JSONResponse
@app.get("/products/{product_id}")
async def get_product(
product_id: int,
response: Response,
service: ProductService = Depends(),
):
product = await service.get_product(product_id)
# Cache headers para CDN/Browser
response.headers["Cache-Control"] = "public, max-age=300"
response.headers["ETag"] = f'"{product.updated_at.timestamp()}"'
return product
@app.get("/users/me")
async def get_current_user(current_user: CurrentUser, response: Response):
# Datos privados: no cachear
response.headers["Cache-Control"] = "private, no-store"
return current_user
ETag y Conditional Requests
from fastapi import Header, HTTPException
@app.get("/products/{product_id}")
async def get_product(
product_id: int,
if_none_match: str | None = Header(None),
service: ProductService = Depends(),
):
product = await service.get_product(product_id)
etag = f'"{product.updated_at.timestamp()}"'
# Si el cliente tiene la versión actual
if if_none_match == etag:
raise HTTPException(status_code=304) # Not Modified
return JSONResponse(
content=product.model_dump(),
headers={
"ETag": etag,
"Cache-Control": "public, max-age=60",
},
)
6. Cache Invalidation Patterns
Event-Based Invalidation
from enum import Enum
from typing import Callable
class CacheEvent(str, Enum):
USER_UPDATED = "user.updated"
USER_DELETED = "user.deleted"
PRODUCT_UPDATED = "product.updated"
CATEGORY_UPDATED = "category.updated"
class CacheInvalidator:
def __init__(self, cache: CacheService):
self.cache = cache
self._handlers: dict[CacheEvent, list[Callable]] = {}
def on(self, event: CacheEvent):
"""Decorator para registrar handlers de invalidación."""
def decorator(func: Callable):
if event not in self._handlers:
self._handlers[event] = []
self._handlers[event].append(func)
return func
return decorator
async def emit(self, event: CacheEvent, **kwargs):
"""Emitir evento de invalidación."""
handlers = self._handlers.get(event, [])
for handler in handlers:
await handler(self.cache, **kwargs)
# Configurar handlers
invalidator = CacheInvalidator(cache_service)
@invalidator.on(CacheEvent.USER_UPDATED)
async def invalidate_user_cache(cache: CacheService, user_id: int, **kwargs):
await cache.delete(f"user:{user_id}")
await cache.delete_pattern(f"user:{user_id}:*")
@invalidator.on(CacheEvent.CATEGORY_UPDATED)
async def invalidate_category_cache(cache: CacheService, category_id: int, **kwargs):
# Invalidar todos los productos de la categoría
await cache.delete_pattern(f"products:category:{category_id}:*")
# Uso en servicios
class UserService:
async def update_user(self, user_id: int, data: UserUpdate) -> User:
user = await self.repo.update(user_id, data)
# Emitir evento de invalidación
await invalidator.emit(CacheEvent.USER_UPDATED, user_id=user_id)
return user
Tag-Based Invalidation
class TaggedCache:
"""Cache con tags para invalidación en grupo."""
def __init__(self, redis: redis.Redis):
self.redis = redis
async def set_with_tags(
self,
key: str,
value: Any,
tags: list[str],
ttl: int = 300,
):
pipe = self.redis.pipeline()
# Guardar valor
pipe.set(key, json.dumps(value), ex=ttl)
# Asociar key con tags
for tag in tags:
pipe.sadd(f"tag:{tag}", key)
pipe.expire(f"tag:{tag}", ttl + 60)
await pipe.execute()
async def invalidate_by_tag(self, tag: str) -> int:
"""Invalidar todas las keys con un tag."""
tag_key = f"tag:{tag}"
keys = await self.redis.smembers(tag_key)
if keys:
pipe = self.redis.pipeline()
for key in keys:
pipe.delete(key)
pipe.delete(tag_key)
await pipe.execute()
return len(keys)
# Uso
cache = TaggedCache(redis)
# Cachear con tags
await cache.set_with_tags(
"product:123",
product_data,
tags=["products", "category:electronics", "brand:apple"],
)
# Invalidar todos los productos de una categoría
await cache.invalidate_by_tag("category:electronics")
7. Caching de Queries Complejas
class QueryCache:
"""Cache para queries de base de datos con parámetros."""
def __init__(self, redis: redis.Redis, prefix: str = "query"):
self.redis = redis
self.prefix = prefix
def _build_key(self, query_name: str, params: dict) -> str:
"""Genera key determinista basada en query y parámetros."""
param_str = json.dumps(params, sort_keys=True, default=str)
param_hash = hashlib.sha256(param_str.encode()).hexdigest()[:16]
return f"{self.prefix}:{query_name}:{param_hash}"
async def cached_query(
self,
query_name: str,
params: dict,
executor: Callable[[], Any],
ttl: int = 300,
) -> Any:
key = self._build_key(query_name, params)
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
result = await executor()
await self.redis.set(key, json.dumps(result, default=str), ex=ttl)
return result
async def invalidate_query(self, query_name: str):
"""Invalidar todas las variantes de una query."""
pattern = f"{self.prefix}:{query_name}:*"
count = 0
async for key in self.redis.scan_iter(pattern):
await self.redis.delete(key)
count += 1
return count
# Uso
query_cache = QueryCache(redis)
async def get_sales_report(
start_date: date,
end_date: date,
category: str | None = None,
):
params = {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"category": category,
}
return await query_cache.cached_query(
"sales_report",
params,
lambda: db.execute_report_query(start_date, end_date, category),
ttl=3600, # 1 hora
)
8. Distributed Locking
import asyncio
from contextlib import asynccontextmanager
class DistributedLock:
"""Lock distribuido con Redis para evitar cache stampede."""
def __init__(self, redis: redis.Redis):
self.redis = redis
@asynccontextmanager
async def acquire(
self,
key: str,
timeout: int = 10,
blocking: bool = True,
blocking_timeout: float = 5.0,
):
lock_key = f"lock:{key}"
lock_value = str(uuid.uuid4())
acquired = False
start_time = asyncio.get_event_loop().time()
while True:
# Intentar adquirir lock
acquired = await self.redis.set(
lock_key,
lock_value,
nx=True,
ex=timeout,
)
if acquired:
break
if not blocking:
raise LockError(f"Could not acquire lock for {key}")
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= blocking_timeout:
raise LockError(f"Timeout acquiring lock for {key}")
await asyncio.sleep(0.1)
try:
yield
finally:
# Liberar lock solo si somos el owner
if acquired:
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
await self.redis.eval(lua_script, 1, lock_key, lock_value)
# Uso: evitar cache stampede
lock = DistributedLock(redis)
async def get_expensive_data(key: str) -> dict:
# Primero intentar cache
cached = await cache.get(key)
if cached:
return cached
# Adquirir lock para evitar que múltiples requests
# ejecuten la misma query costosa
async with lock.acquire(f"compute:{key}"):
# Double-check después de adquirir lock
cached = await cache.get(key)
if cached:
return cached
# Computar y cachear
data = await compute_expensive_data(key)
await cache.set(key, data, ttl=300)
return data
9. Métricas de Cache
from prometheus_client import Counter, Histogram
CACHE_HITS = Counter(
"cache_hits_total",
"Total cache hits",
["cache_name"],
)
CACHE_MISSES = Counter(
"cache_misses_total",
"Total cache misses",
["cache_name"],
)
CACHE_LATENCY = Histogram(
"cache_operation_seconds",
"Cache operation latency",
["cache_name", "operation"],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1],
)
class InstrumentedCache(CacheService):
def __init__(self, redis: redis.Redis, name: str = "default"):
super().__init__(redis)
self.name = name
async def get(self, key: str) -> Any | None:
with CACHE_LATENCY.labels(self.name, "get").time():
result = await super().get(key)
if result is not None:
CACHE_HITS.labels(self.name).inc()
else:
CACHE_MISSES.labels(self.name).inc()
return result
10. Comparativa: ioredis vs redis-py
| Aspecto | ioredis (Node.js) | redis-py (Python) |
|---|---|---|
| Async | Nativo | redis.asyncio |
| Connection Pool | Automático | Explícito |
| Cluster | Soportado | Soportado |
| Pub/Sub | Soportado | Soportado |
| Lua Scripts | defineCommand | eval() |
| Pipelining | Automático | Manual |
11. Checklist de Caching
- TTL apropiado según frecuencia de cambios
- Invalidación en writes (no solo TTL)
- Serialización con
orjsonpara velocidad - Lock distribuido para cache stampede
- Métricas de hit rate
- Fallback si Redis no disponible
- Tags para invalidación en grupo
- Compression para valores grandes
Conclusión
El caching efectivo requiere estrategia:
- Cache-aside para la mayoría de casos
- Invalidación explícita en writes
- Tags para invalidación en grupo
- Distributed locks para evitar stampede
- Métricas para optimizar TTLs
Pattern Senior: Implementa cache en capas: HTTP cache (CDN/browser) → Application cache (Redis) → Query cache (DB). Cada capa reduce carga en la siguiente.
En el siguiente capítulo, implementaremos WebSockets y comunicación real-time.