Módulo 6 16 min de lectura

06 - Middleware y Control de Errores

Interceptores de request/response, exception handlers personalizados y el patrón de errores de dominio.

#middleware #errors #exceptions #fastapi #starlette

1. De Express Middleware a Starlette Middleware

En Express, el middleware es una cadena de funciones con next(). En FastAPI (Starlette), el middleware es un wrapper asíncrono que controla completamente el flujo de la request.

JavaScript/TypeScript
// Express: Cadena de funciones
app.use((req, res, next) => {
const start = Date.now();

res.on('finish', () => {
  const duration = Date.now() - start;
  console.log(`${req.method} ${req.url} - ${duration}ms`);
});

next();  // Continúa a la siguiente función
});
Python
# FastAPI: Wrapper asíncrono
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import time

@app.middleware("http")
async def add_process_time(request: Request, call_next):
  start = time.perf_counter()
  
  response = await call_next(request)  # Ejecuta el endpoint
  
  duration = time.perf_counter() - start
  response.headers["X-Process-Time"] = f"{duration:.4f}"
  
  return response

Anatomía del Middleware

async def middleware(request: Request, call_next: Callable) -> Response:
    # 1. PRE-PROCESSING (antes del endpoint)
    # - Validar headers
    # - Modificar request.state
    # - Short-circuit con Response temprana
    
    response = await call_next(request)  # 2. EJECUTAR ENDPOINT
    
    # 3. POST-PROCESSING (después del endpoint)
    # - Modificar headers de response
    # - Logging
    # - Métricas
    
    return response

2. Tipos de Middleware en FastAPI

Decorator-based (Simple)

@app.middleware("http")
async def cors_middleware(request: Request, call_next):
    response = await call_next(request)
    response.headers["Access-Control-Allow-Origin"] = "*"
    return response

Class-based (Complejo)

from starlette.middleware.base import BaseHTTPMiddleware

class RequestIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next) -> Response:
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        
        # Inyectar en request.state para acceso en endpoints
        request.state.request_id = request_id
        
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        
        return response

# Registrar
app.add_middleware(RequestIdMiddleware)

Pure ASGI Middleware (Máximo Control)

from starlette.types import ASGIApp, Receive, Send, Scope

class RawASGIMiddleware:
    def __init__(self, app: ASGIApp):
        self.app = app
    
    async def __call__(self, scope: Scope, receive: Receive, send: Send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        
        # Interceptar el send para modificar responses
        async def custom_send(message):
            if message["type"] == "http.response.start":
                headers = list(message.get("headers", []))
                headers.append((b"x-custom-header", b"value"))
                message["headers"] = headers
            await send(message)
        
        await self.app(scope, receive, custom_send)

app.add_middleware(RawASGIMiddleware)

3. Orden de Ejecución de Middleware

El orden importa: Los middleware se ejecutan en orden inverso al que se registran (el último registrado es el primero en ejecutarse).

app = FastAPI()

@app.middleware("http")
async def middleware_1(request: Request, call_next):
    print("M1: before")
    response = await call_next(request)
    print("M1: after")
    return response

@app.middleware("http")
async def middleware_2(request: Request, call_next):
    print("M2: before")
    response = await call_next(request)
    print("M2: after")
    return response

# Output para una request:
# M2: before  ← Último registrado, primero en ejecutar
# M1: before
# [endpoint]
# M1: after
# M2: after   ← Último registrado, último en terminar

Orden Recomendado de Registro

# De ÚLTIMO a PRIMERO en ejecución:
app.add_middleware(GZipMiddleware)           # 4. Compresión (más externo)
app.add_middleware(CORSMiddleware, ...)      # 3. CORS
app.add_middleware(RequestIdMiddleware)       # 2. Request ID
app.add_middleware(AuthenticationMiddleware)  # 1. Auth (más cercano al endpoint)

4. Gestión de Excepciones: El Patrón de Dominio

El Problema: HTTPException en la Lógica de Negocio

# ❌ MAL: Tu servicio conoce detalles HTTP
class UserService:
    async def get_user(self, user_id: int) -> User:
        user = await self.repo.get(user_id)
        if not user:
            raise HTTPException(status_code=404, detail="User not found")  # ❌
        return user

La Solución: Excepciones de Dominio

# exceptions/domain.py
class DomainError(Exception):
    """Base para todas las excepciones de dominio."""
    def __init__(self, message: str, code: str | None = None):
        self.message = message
        self.code = code or self.__class__.__name__
        super().__init__(message)

class NotFoundError(DomainError):
    """Recurso no encontrado."""
    pass

class ValidationError(DomainError):
    """Error de validación de negocio."""
    pass

class ConflictError(DomainError):
    """Conflicto de estado (ej: email ya existe)."""
    pass

class UnauthorizedError(DomainError):
    """No autenticado."""
    pass

class ForbiddenError(DomainError):
    """Autenticado pero sin permisos."""
    pass
# services/user.py - Lógica PURA
class UserService:
    async def get_user(self, user_id: int) -> User:
        user = await self.repo.get(user_id)
        if not user:
            raise NotFoundError(f"User {user_id} not found")  # ✅ Dominio
        return user
    
    async def create_user(self, data: UserCreate) -> User:
        if await self.repo.exists_by_email(data.email):
            raise ConflictError(f"Email {data.email} already registered")
        return await self.repo.create(data)

5. Exception Handlers: Traducir Dominio a HTTP

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

# Mapeo de errores de dominio a códigos HTTP
ERROR_STATUS_MAP = {
    NotFoundError: 404,
    ValidationError: 422,
    ConflictError: 409,
    UnauthorizedError: 401,
    ForbiddenError: 403,
}

@app.exception_handler(DomainError)
async def domain_error_handler(request: Request, exc: DomainError) -> JSONResponse:
    status_code = ERROR_STATUS_MAP.get(type(exc), 500)
    
    return JSONResponse(
        status_code=status_code,
        content={
            "error": {
                "code": exc.code,
                "message": exc.message,
                "request_id": getattr(request.state, "request_id", None),
            }
        },
    )

Handler para Errores de Validación de Pydantic

from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError as PydanticValidationError

@app.exception_handler(RequestValidationError)
async def validation_error_handler(
    request: Request, 
    exc: RequestValidationError
) -> JSONResponse:
    """Formato de errores de validación según RFC 7807."""
    errors = []
    for error in exc.errors():
        errors.append({
            "field": ".".join(str(loc) for loc in error["loc"]),
            "message": error["msg"],
            "type": error["type"],
        })
    
    return JSONResponse(
        status_code=422,
        content={
            "error": {
                "code": "VALIDATION_ERROR",
                "message": "Request validation failed",
                "details": errors,
                "request_id": getattr(request.state, "request_id", None),
            }
        },
    )

Handler Global para Excepciones No Capturadas

@app.exception_handler(Exception)
async def unhandled_exception_handler(
    request: Request, 
    exc: Exception
) -> JSONResponse:
    """Captura cualquier excepción no manejada."""
    # Log completo para debugging
    logger.exception(
        "Unhandled exception",
        exc_info=exc,
        extra={
            "request_id": getattr(request.state, "request_id", None),
            "path": request.url.path,
            "method": request.method,
        }
    )
    
    # Response genérica (no exponer detalles internos)
    return JSONResponse(
        status_code=500,
        content={
            "error": {
                "code": "INTERNAL_ERROR",
                "message": "An unexpected error occurred",
                "request_id": getattr(request.state, "request_id", None),
            }
        },
    )

6. Middleware vs Exception Handlers

CaracterísticaMiddlewareException Handler
Cuándo se ejecutaCada requestSolo cuando hay excepción
Puede modificar request
Puede modificar response✅ (genera la response)
Acceso a la excepciónSolo si la capturas✅ (es el parámetro)
Uso típicoLogging, auth, headersFormatear errores

Combinación: Error Logging en Middleware

@app.middleware("http")
async def error_logging_middleware(request: Request, call_next):
    try:
        response = await call_next(request)
        return response
    except Exception as exc:
        # Log antes de que el exception handler lo procese
        logger.error(
            f"Request failed: {request.method} {request.url.path}",
            exc_info=exc,
            extra={"request_id": getattr(request.state, "request_id", None)}
        )
        raise  # Re-raise para que el exception handler lo maneje

7. Request State: Compartir Datos

request.state es un namespace para compartir datos entre middleware y endpoints:

from starlette.requests import Request

@app.middleware("http")
async def enrich_request(request: Request, call_next):
    # Añadir datos al state
    request.state.request_id = str(uuid.uuid4())
    request.state.start_time = time.perf_counter()
    request.state.client_ip = request.client.host if request.client else None
    
    response = await call_next(request)
    
    # Usar datos del state después
    duration = time.perf_counter() - request.state.start_time
    response.headers["X-Request-Duration"] = str(duration)
    
    return response

@app.get("/debug")
async def debug_endpoint(request: Request):
    return {
        "request_id": request.state.request_id,
        "client_ip": request.state.client_ip,
    }

8. Middleware de Autenticación

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

class AuthMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: ASGIApp, public_paths: set[str] | None = None):
        super().__init__(app)
        self.public_paths = public_paths or {"/health", "/docs", "/openapi.json"}
    
    async def dispatch(self, request: Request, call_next) -> Response:
        # Skip para rutas públicas
        if request.url.path in self.public_paths:
            return await call_next(request)
        
        # Extraer token
        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            return JSONResponse(
                status_code=401,
                content={"error": {"code": "UNAUTHORIZED", "message": "Missing token"}}
            )
        
        token = auth_header.split(" ")[1]
        
        try:
            # Validar token y extraer usuario
            payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
            request.state.user_id = payload["sub"]
            request.state.user_roles = payload.get("roles", [])
        except jwt.ExpiredSignatureError:
            return JSONResponse(
                status_code=401,
                content={"error": {"code": "TOKEN_EXPIRED", "message": "Token expired"}}
            )
        except jwt.InvalidTokenError:
            return JSONResponse(
                status_code=401,
                content={"error": {"code": "INVALID_TOKEN", "message": "Invalid token"}}
            )
        
        return await call_next(request)

# Registrar
app.add_middleware(AuthMiddleware, public_paths={"/health", "/login", "/register"})

9. Rate Limiting Middleware

from collections import defaultdict
import asyncio

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(
        self, 
        app: ASGIApp, 
        requests_per_minute: int = 60,
        by_ip: bool = True,
    ):
        super().__init__(app)
        self.requests_per_minute = requests_per_minute
        self.by_ip = by_ip
        self.requests: dict[str, list[float]] = defaultdict(list)
        self.lock = asyncio.Lock()
    
    def _get_key(self, request: Request) -> str:
        if self.by_ip:
            return request.client.host if request.client else "unknown"
        return request.headers.get("X-API-Key", "anonymous")
    
    async def dispatch(self, request: Request, call_next) -> Response:
        key = self._get_key(request)
        now = time.time()
        window_start = now - 60
        
        async with self.lock:
            # Limpiar requests antiguos
            self.requests[key] = [
                ts for ts in self.requests[key] if ts > window_start
            ]
            
            if len(self.requests[key]) >= self.requests_per_minute:
                retry_after = 60 - (now - self.requests[key][0])
                return JSONResponse(
                    status_code=429,
                    content={
                        "error": {
                            "code": "RATE_LIMIT_EXCEEDED",
                            "message": f"Rate limit exceeded. Retry after {int(retry_after)}s",
                        }
                    },
                    headers={"Retry-After": str(int(retry_after))},
                )
            
            self.requests[key].append(now)
        
        response = await call_next(request)
        
        # Añadir headers de rate limit
        remaining = self.requests_per_minute - len(self.requests[key])
        response.headers["X-RateLimit-Limit"] = str(self.requests_per_minute)
        response.headers["X-RateLimit-Remaining"] = str(remaining)
        
        return response

app.add_middleware(RateLimitMiddleware, requests_per_minute=100)

10. CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://app.example.com",
        "https://admin.example.com",
    ],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["Authorization", "Content-Type", "X-Request-ID"],
    expose_headers=["X-Request-ID", "X-RateLimit-Remaining"],
    max_age=600,  # Cache preflight por 10 minutos
)

11. Tabla Comparativa: Express vs FastAPI

ConceptoExpressFastAPI/Starlette
Middleware básicoapp.use(fn)@app.middleware("http")
Middleware de claseN/ABaseHTTPMiddleware
Error handlerapp.use((err, req, res, next))@app.exception_handler(Error)
Request datareq.body, req.queryrequest.json(), request.query_params
Response helperres.json()JSONResponse()
Shared stateres.localsrequest.state
Chain controlnext()await call_next(request)

Conclusión

El manejo de errores y middleware en FastAPI sigue un patrón más estructurado que Express:

  1. Excepciones de dominio — Separa la lógica de negocio del protocolo HTTP
  2. Exception handlers — Traducen errores de dominio a responses HTTP
  3. Middleware async — Control completo del ciclo request/response
  4. request.state — Compartir datos sin globals

Pattern Senior: Define una jerarquía de DomainError, registra un handler genérico que las traduce a HTTP, y mantén tus servicios libres de cualquier conocimiento de FastAPI.

En el siguiente capítulo, profundizaremos en validación avanzada y serialización con Pydantic.