Módulo 18 16 min de lectura

18 - Caching Strategies

Redis, cache-aside, write-through, invalidation patterns y cache de respuestas HTTP.

#caching #redis #performance #patterns #optimization

1. Patrones de Caching

Cache-Aside (Lazy Loading)

[Request] → [Check Cache] → Miss → [Query DB] → [Store in Cache] → [Return]

              Hit → [Return from Cache]

Write-Through

[Write Request] → [Write to Cache] → [Write to DB] → [Return]

Write-Behind (Write-Back)

[Write Request] → [Write to Cache] → [Return]

              [Async Write to DB]

2. Configuración de Redis

# core/redis.py
import redis.asyncio as redis
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from app.core.config import settings

redis_pool: redis.ConnectionPool | None = None

async def init_redis_pool():
    global redis_pool
    redis_pool = redis.ConnectionPool.from_url(
        settings.REDIS_URL,
        max_connections=50,
        decode_responses=True,  # Strings en lugar de bytes
    )

async def close_redis_pool():
    global redis_pool
    if redis_pool:
        await redis_pool.disconnect()

@asynccontextmanager
async def get_redis() -> AsyncGenerator[redis.Redis, None]:
    client = redis.Redis(connection_pool=redis_pool)
    try:
        yield client
    finally:
        await client.close()

# Lifespan para FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_redis_pool()
    yield
    await close_redis_pool()

app = FastAPI(lifespan=lifespan)

Dependency

from typing import Annotated
from fastapi import Depends

async def get_redis_client() -> AsyncGenerator[redis.Redis, None]:
    async with get_redis() as client:
        yield client

RedisClient = Annotated[redis.Redis, Depends(get_redis_client)]

3. Cache-Aside Pattern

# services/cache.py
import json
from typing import TypeVar, Callable, Any
from datetime import timedelta
import redis.asyncio as redis

T = TypeVar("T")

class CacheService:
    def __init__(self, redis: redis.Redis, prefix: str = "cache"):
        self.redis = redis
        self.prefix = prefix
    
    def _key(self, key: str) -> str:
        return f"{self.prefix}:{key}"
    
    async def get(self, key: str) -> Any | None:
        """Obtener valor del cache."""
        data = await self.redis.get(self._key(key))
        if data:
            return json.loads(data)
        return None
    
    async def set(
        self,
        key: str,
        value: Any,
        ttl: int | timedelta = 300,
    ) -> None:
        """Guardar valor en cache."""
        if isinstance(ttl, timedelta):
            ttl = int(ttl.total_seconds())
        
        await self.redis.set(
            self._key(key),
            json.dumps(value, default=str),
            ex=ttl,
        )
    
    async def delete(self, key: str) -> None:
        """Invalidar cache."""
        await self.redis.delete(self._key(key))
    
    async def delete_pattern(self, pattern: str) -> int:
        """Invalidar múltiples keys por patrón."""
        keys = []
        async for key in self.redis.scan_iter(self._key(pattern)):
            keys.append(key)
        
        if keys:
            return await self.redis.delete(*keys)
        return 0
    
    async def get_or_set(
        self,
        key: str,
        factory: Callable[[], T],
        ttl: int = 300,
    ) -> T:
        """Cache-aside: obtener del cache o ejecutar factory."""
        cached = await self.get(key)
        if cached is not None:
            return cached
        
        # Cache miss: ejecutar factory
        value = await factory() if callable(factory) else factory
        await self.set(key, value, ttl)
        
        return value

Uso en Servicios

class UserService:
    def __init__(self, repo: UserRepository, cache: CacheService):
        self.repo = repo
        self.cache = cache
    
    async def get_user(self, user_id: int) -> User:
        cache_key = f"user:{user_id}"
        
        # Cache-aside
        user_data = await self.cache.get_or_set(
            cache_key,
            lambda: self._fetch_user(user_id),
            ttl=300,  # 5 minutos
        )
        
        return User.model_validate(user_data)
    
    async def _fetch_user(self, user_id: int) -> dict:
        user = await self.repo.get(user_id)
        if not user:
            raise NotFoundError(f"User {user_id} not found")
        return user.model_dump()
    
    async def update_user(self, user_id: int, data: UserUpdate) -> User:
        user = await self.repo.update(user_id, data.model_dump(exclude_unset=True))
        
        # Invalidar cache después de update
        await self.cache.delete(f"user:{user_id}")
        
        return user

4. Decorator de Cache

from functools import wraps
from typing import Callable, Any
import hashlib
import json

def cached(
    ttl: int = 300,
    key_prefix: str = "",
    key_builder: Callable[..., str] | None = None,
):
    """Decorator para cachear resultados de funciones async."""
    
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Obtener cache service del primer argumento (self)
            cache = getattr(args[0], 'cache', None)
            if not cache:
                return await func(*args, **kwargs)
            
            # Construir cache key
            if key_builder:
                cache_key = key_builder(*args, **kwargs)
            else:
                # Default: hash de argumentos
                key_data = json.dumps(
                    {"args": args[1:], "kwargs": kwargs},
                    default=str,
                    sort_keys=True,
                )
                key_hash = hashlib.md5(key_data.encode()).hexdigest()[:16]
                cache_key = f"{key_prefix or func.__name__}:{key_hash}"
            
            # Cache-aside
            return await cache.get_or_set(
                cache_key,
                lambda: func(*args, **kwargs),
                ttl=ttl,
            )
        
        return wrapper
    return decorator

# Uso
class ProductService:
    def __init__(self, repo: ProductRepository, cache: CacheService):
        self.repo = repo
        self.cache = cache
    
    @cached(ttl=600, key_prefix="products")
    async def list_products(self, category: str, page: int = 1) -> list[Product]:
        return await self.repo.list(category=category, page=page)
    
    @cached(
        ttl=300,
        key_builder=lambda self, product_id: f"product:{product_id}",
    )
    async def get_product(self, product_id: int) -> Product:
        return await self.repo.get(product_id)

5. HTTP Response Caching

Cache-Control Headers

from fastapi import Response
from fastapi.responses import JSONResponse

@app.get("/products/{product_id}")
async def get_product(
    product_id: int,
    response: Response,
    service: ProductService = Depends(),
):
    product = await service.get_product(product_id)
    
    # Cache headers para CDN/Browser
    response.headers["Cache-Control"] = "public, max-age=300"
    response.headers["ETag"] = f'"{product.updated_at.timestamp()}"'
    
    return product

@app.get("/users/me")
async def get_current_user(current_user: CurrentUser, response: Response):
    # Datos privados: no cachear
    response.headers["Cache-Control"] = "private, no-store"
    return current_user

ETag y Conditional Requests

from fastapi import Header, HTTPException

@app.get("/products/{product_id}")
async def get_product(
    product_id: int,
    if_none_match: str | None = Header(None),
    service: ProductService = Depends(),
):
    product = await service.get_product(product_id)
    etag = f'"{product.updated_at.timestamp()}"'
    
    # Si el cliente tiene la versión actual
    if if_none_match == etag:
        raise HTTPException(status_code=304)  # Not Modified
    
    return JSONResponse(
        content=product.model_dump(),
        headers={
            "ETag": etag,
            "Cache-Control": "public, max-age=60",
        },
    )

6. Cache Invalidation Patterns

Event-Based Invalidation

from enum import Enum
from typing import Callable

class CacheEvent(str, Enum):
    USER_UPDATED = "user.updated"
    USER_DELETED = "user.deleted"
    PRODUCT_UPDATED = "product.updated"
    CATEGORY_UPDATED = "category.updated"

class CacheInvalidator:
    def __init__(self, cache: CacheService):
        self.cache = cache
        self._handlers: dict[CacheEvent, list[Callable]] = {}
    
    def on(self, event: CacheEvent):
        """Decorator para registrar handlers de invalidación."""
        def decorator(func: Callable):
            if event not in self._handlers:
                self._handlers[event] = []
            self._handlers[event].append(func)
            return func
        return decorator
    
    async def emit(self, event: CacheEvent, **kwargs):
        """Emitir evento de invalidación."""
        handlers = self._handlers.get(event, [])
        for handler in handlers:
            await handler(self.cache, **kwargs)

# Configurar handlers
invalidator = CacheInvalidator(cache_service)

@invalidator.on(CacheEvent.USER_UPDATED)
async def invalidate_user_cache(cache: CacheService, user_id: int, **kwargs):
    await cache.delete(f"user:{user_id}")
    await cache.delete_pattern(f"user:{user_id}:*")

@invalidator.on(CacheEvent.CATEGORY_UPDATED)
async def invalidate_category_cache(cache: CacheService, category_id: int, **kwargs):
    # Invalidar todos los productos de la categoría
    await cache.delete_pattern(f"products:category:{category_id}:*")

# Uso en servicios
class UserService:
    async def update_user(self, user_id: int, data: UserUpdate) -> User:
        user = await self.repo.update(user_id, data)
        
        # Emitir evento de invalidación
        await invalidator.emit(CacheEvent.USER_UPDATED, user_id=user_id)
        
        return user

Tag-Based Invalidation

class TaggedCache:
    """Cache con tags para invalidación en grupo."""
    
    def __init__(self, redis: redis.Redis):
        self.redis = redis
    
    async def set_with_tags(
        self,
        key: str,
        value: Any,
        tags: list[str],
        ttl: int = 300,
    ):
        pipe = self.redis.pipeline()
        
        # Guardar valor
        pipe.set(key, json.dumps(value), ex=ttl)
        
        # Asociar key con tags
        for tag in tags:
            pipe.sadd(f"tag:{tag}", key)
            pipe.expire(f"tag:{tag}", ttl + 60)
        
        await pipe.execute()
    
    async def invalidate_by_tag(self, tag: str) -> int:
        """Invalidar todas las keys con un tag."""
        tag_key = f"tag:{tag}"
        keys = await self.redis.smembers(tag_key)
        
        if keys:
            pipe = self.redis.pipeline()
            for key in keys:
                pipe.delete(key)
            pipe.delete(tag_key)
            await pipe.execute()
        
        return len(keys)

# Uso
cache = TaggedCache(redis)

# Cachear con tags
await cache.set_with_tags(
    "product:123",
    product_data,
    tags=["products", "category:electronics", "brand:apple"],
)

# Invalidar todos los productos de una categoría
await cache.invalidate_by_tag("category:electronics")

7. Caching de Queries Complejas

class QueryCache:
    """Cache para queries de base de datos con parámetros."""
    
    def __init__(self, redis: redis.Redis, prefix: str = "query"):
        self.redis = redis
        self.prefix = prefix
    
    def _build_key(self, query_name: str, params: dict) -> str:
        """Genera key determinista basada en query y parámetros."""
        param_str = json.dumps(params, sort_keys=True, default=str)
        param_hash = hashlib.sha256(param_str.encode()).hexdigest()[:16]
        return f"{self.prefix}:{query_name}:{param_hash}"
    
    async def cached_query(
        self,
        query_name: str,
        params: dict,
        executor: Callable[[], Any],
        ttl: int = 300,
    ) -> Any:
        key = self._build_key(query_name, params)
        
        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        
        result = await executor()
        await self.redis.set(key, json.dumps(result, default=str), ex=ttl)
        
        return result
    
    async def invalidate_query(self, query_name: str):
        """Invalidar todas las variantes de una query."""
        pattern = f"{self.prefix}:{query_name}:*"
        count = 0
        async for key in self.redis.scan_iter(pattern):
            await self.redis.delete(key)
            count += 1
        return count

# Uso
query_cache = QueryCache(redis)

async def get_sales_report(
    start_date: date,
    end_date: date,
    category: str | None = None,
):
    params = {
        "start": start_date.isoformat(),
        "end": end_date.isoformat(),
        "category": category,
    }
    
    return await query_cache.cached_query(
        "sales_report",
        params,
        lambda: db.execute_report_query(start_date, end_date, category),
        ttl=3600,  # 1 hora
    )

8. Distributed Locking

import asyncio
from contextlib import asynccontextmanager

class DistributedLock:
    """Lock distribuido con Redis para evitar cache stampede."""
    
    def __init__(self, redis: redis.Redis):
        self.redis = redis
    
    @asynccontextmanager
    async def acquire(
        self,
        key: str,
        timeout: int = 10,
        blocking: bool = True,
        blocking_timeout: float = 5.0,
    ):
        lock_key = f"lock:{key}"
        lock_value = str(uuid.uuid4())
        acquired = False
        
        start_time = asyncio.get_event_loop().time()
        
        while True:
            # Intentar adquirir lock
            acquired = await self.redis.set(
                lock_key,
                lock_value,
                nx=True,
                ex=timeout,
            )
            
            if acquired:
                break
            
            if not blocking:
                raise LockError(f"Could not acquire lock for {key}")
            
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed >= blocking_timeout:
                raise LockError(f"Timeout acquiring lock for {key}")
            
            await asyncio.sleep(0.1)
        
        try:
            yield
        finally:
            # Liberar lock solo si somos el owner
            if acquired:
                lua_script = """
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("del", KEYS[1])
                else
                    return 0
                end
                """
                await self.redis.eval(lua_script, 1, lock_key, lock_value)

# Uso: evitar cache stampede
lock = DistributedLock(redis)

async def get_expensive_data(key: str) -> dict:
    # Primero intentar cache
    cached = await cache.get(key)
    if cached:
        return cached
    
    # Adquirir lock para evitar que múltiples requests
    # ejecuten la misma query costosa
    async with lock.acquire(f"compute:{key}"):
        # Double-check después de adquirir lock
        cached = await cache.get(key)
        if cached:
            return cached
        
        # Computar y cachear
        data = await compute_expensive_data(key)
        await cache.set(key, data, ttl=300)
        return data

9. Métricas de Cache

from prometheus_client import Counter, Histogram

CACHE_HITS = Counter(
    "cache_hits_total",
    "Total cache hits",
    ["cache_name"],
)

CACHE_MISSES = Counter(
    "cache_misses_total",
    "Total cache misses",
    ["cache_name"],
)

CACHE_LATENCY = Histogram(
    "cache_operation_seconds",
    "Cache operation latency",
    ["cache_name", "operation"],
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1],
)

class InstrumentedCache(CacheService):
    def __init__(self, redis: redis.Redis, name: str = "default"):
        super().__init__(redis)
        self.name = name
    
    async def get(self, key: str) -> Any | None:
        with CACHE_LATENCY.labels(self.name, "get").time():
            result = await super().get(key)
        
        if result is not None:
            CACHE_HITS.labels(self.name).inc()
        else:
            CACHE_MISSES.labels(self.name).inc()
        
        return result

10. Comparativa: ioredis vs redis-py

Aspectoioredis (Node.js)redis-py (Python)
AsyncNativoredis.asyncio
Connection PoolAutomáticoExplícito
ClusterSoportadoSoportado
Pub/SubSoportadoSoportado
Lua ScriptsdefineCommandeval()
PipeliningAutomáticoManual

11. Checklist de Caching


Conclusión

El caching efectivo requiere estrategia:

  1. Cache-aside para la mayoría de casos
  2. Invalidación explícita en writes
  3. Tags para invalidación en grupo
  4. Distributed locks para evitar stampede
  5. Métricas para optimizar TTLs

Pattern Senior: Implementa cache en capas: HTTP cache (CDN/browser) → Application cache (Redis) → Query cache (DB). Cada capa reduce carga en la siguiente.

En el siguiente capítulo, implementaremos WebSockets y comunicación real-time.