Módulo 13 16 min de lectura

13 - Performance y ASGI

Gunicorn, workers, profiling, benchmarks y optimización de FastAPI para producción.

#performance #asgi #gunicorn #uvicorn #profiling

1. El Stack ASGI: Cómo Fluyen las Requests

[Internet] → [Nginx/ALB] → [Gunicorn] → [Uvicorn Workers] → [FastAPI App]

                    Process Manager (master)

              ┌───────────────┼───────────────┐
              ↓               ↓               ↓
         Worker 1        Worker 2        Worker 3
         (Uvicorn)       (Uvicorn)       (Uvicorn)
              ↓               ↓               ↓
         Event Loop      Event Loop      Event Loop

Comparativa con Node.js

CapaNode.jsPython
Process ManagerPM2 / ClusterGunicorn
HTTP ServerBuilt-in (V8)Uvicorn / Hypercorn
FrameworkExpress / FastifyFastAPI / Starlette
ProtocoloHTTPASGI (Async Server Gateway Interface)

2. Configuración de Gunicorn + Uvicorn

gunicorn.conf.py

# gunicorn.conf.py
import multiprocessing
import os

# Bind
bind = os.getenv("GUNICORN_BIND", "0.0.0.0:8000")

# Workers
workers = int(os.getenv("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000

# Timeouts
timeout = 120  # Request timeout
graceful_timeout = 30  # Time to finish requests on restart
keepalive = 5  # Keep-alive connections

# Lifecycle
max_requests = 10000  # Restart worker after N requests (memory leak protection)
max_requests_jitter = 1000  # Random jitter to prevent all workers restarting together

# Logging
accesslog = "-"  # stdout
errorlog = "-"   # stderr
loglevel = os.getenv("LOG_LEVEL", "info")

# Security
limit_request_line = 4096
limit_request_fields = 100
limit_request_field_size = 8190

# Hooks
def on_starting(server):
    print("Gunicorn master starting...")

def worker_int(worker):
    print(f"Worker {worker.pid} received SIGINT")

def worker_abort(worker):
    print(f"Worker {worker.pid} received SIGABRT")

Comando de Inicio

# Desarrollo
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

# Producción
gunicorn app.main:app -c gunicorn.conf.py

# O directamente
gunicorn app.main:app \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --timeout 120

3. Fórmulas de Workers

CPU-Bound APIs

workers = (2 * cpu_count) + 1  # Classic formula

I/O-Bound APIs (la mayoría)

workers = cpu_count  # O incluso menos si hay mucho async I/O

Kubernetes (1 worker per pod)

workers = 1  # K8s escala horizontalmente con pods
replicas = desired_concurrency  # En el Deployment

Memoria Limitada

# Cada worker consume ~50-150MB
available_memory = 512  # MB
memory_per_worker = 100  # MB (medir con tu app)
workers = available_memory // memory_per_worker  # 5 workers

4. Profiling: Encontrar Cuellos de Botella

py-spy (No requiere reinicio)

# Instalar
pip install py-spy

# Profiler en proceso corriendo
py-spy top --pid <PID>

# Generar flamegraph
py-spy record -o profile.svg --pid <PID>

# Profiler durante N segundos
py-spy record -o profile.svg --pid <PID> --duration 30

cProfile (Built-in)

import cProfile
import pstats

def profile_function(func):
    """Decorator para profiling de funciones."""
    def wrapper(*args, **kwargs):
        profiler = cProfile.Profile()
        profiler.enable()
        result = func(*args, **kwargs)
        profiler.disable()
        
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')
        stats.print_stats(20)  # Top 20 funciones
        
        return result
    return wrapper

@profile_function
def expensive_operation():
    # ...
    pass

Middleware de Profiling (Solo Desarrollo)

import cProfile
import pstats
import io

class ProfilingMiddleware:
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        
        # Solo si header especial presente
        headers = dict(scope.get("headers", []))
        if headers.get(b"x-profile") != b"true":
            await self.app(scope, receive, send)
            return
        
        profiler = cProfile.Profile()
        profiler.enable()
        
        await self.app(scope, receive, send)
        
        profiler.disable()
        stream = io.StringIO()
        stats = pstats.Stats(profiler, stream=stream)
        stats.sort_stats('cumulative')
        stats.print_stats(30)
        print(stream.getvalue())

# Solo en desarrollo
if settings.DEBUG:
    app.add_middleware(ProfilingMiddleware)

5. Optimizaciones de FastAPI

1. Response Class Rápida

from fastapi.responses import ORJSONResponse

# Global
app = FastAPI(default_response_class=ORJSONResponse)

# Por endpoint
@app.get("/fast", response_class=ORJSONResponse)
async def fast_endpoint():
    return {"data": large_payload}

Benchmark: orjson es 3-10x más rápido que el json estándar.

2. Evitar Pydantic en Hot Paths

# ❌ Lento para listas grandes
@app.get("/items", response_model=list[ItemResponse])
async def list_items():
    items = await db.fetch_all()
    return items  # Pydantic valida CADA item

# ✅ Más rápido
@app.get("/items")
async def list_items() -> list[dict]:
    items = await db.fetch_all()
    # Serializar manualmente o usar model_dump() una vez
    return [item.model_dump() for item in items]

3. Streaming para Responses Grandes

from fastapi.responses import StreamingResponse
import asyncio

async def generate_large_csv():
    yield "id,name,email\n"
    async for batch in fetch_users_in_batches(batch_size=1000):
        for user in batch:
            yield f"{user.id},{user.name},{user.email}\n"

@app.get("/export/users.csv")
async def export_users():
    return StreamingResponse(
        generate_large_csv(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=users.csv"},
    )

4. Background Tasks para Operaciones Lentas

from fastapi import BackgroundTasks

async def send_welcome_email(email: str):
    # Operación lenta que no necesita bloquear la response
    await email_service.send(email, template="welcome")

@app.post("/users", status_code=201)
async def create_user(
    data: UserCreate,
    background_tasks: BackgroundTasks,
):
    user = await user_service.create(data)
    
    # Se ejecuta DESPUÉS de enviar la response
    background_tasks.add_task(send_welcome_email, user.email)
    
    return user

5. Connection Pooling Óptimo

# SQLAlchemy
engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,           # Conexiones base
    max_overflow=10,       # Conexiones extra bajo carga
    pool_timeout=30,       # Timeout esperando conexión
    pool_recycle=1800,     # Reciclar cada 30 min
    pool_pre_ping=True,    # Verificar antes de usar
)

# httpx para APIs externas
http_client = httpx.AsyncClient(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30,
    ),
    timeout=httpx.Timeout(10.0, connect=5.0),
)

6. Caching Strategies

In-Memory con TTL

from functools import lru_cache
from cachetools import TTLCache
import asyncio

# Síncrono
@lru_cache(maxsize=1000)
def get_config(key: str) -> str:
    return expensive_lookup(key)

# Async con TTL
cache = TTLCache(maxsize=1000, ttl=300)  # 5 minutos
cache_lock = asyncio.Lock()

async def get_user_cached(user_id: int) -> User:
    cache_key = f"user:{user_id}"
    
    if cache_key in cache:
        return cache[cache_key]
    
    async with cache_lock:
        # Double-check después del lock
        if cache_key in cache:
            return cache[cache_key]
        
        user = await user_repo.get(user_id)
        cache[cache_key] = user
        return user

Redis Cache

from typing import TypeVar, Callable
import json

T = TypeVar("T")

class RedisCache:
    def __init__(self, redis: Redis, prefix: str = "cache"):
        self.redis = redis
        self.prefix = prefix
    
    async def get_or_set(
        self,
        key: str,
        factory: Callable[[], T],
        ttl: int = 300,
    ) -> T:
        full_key = f"{self.prefix}:{key}"
        
        # Try cache
        cached = await self.redis.get(full_key)
        if cached:
            return json.loads(cached)
        
        # Compute and cache
        value = await factory()
        await self.redis.set(full_key, json.dumps(value), ex=ttl)
        
        return value

# Uso
cache = RedisCache(redis_client)

async def get_user_stats(user_id: int) -> dict:
    return await cache.get_or_set(
        f"stats:{user_id}",
        lambda: compute_user_stats(user_id),
        ttl=600,
    )

7. Benchmarking

wrk

# Instalar
apt-get install wrk

# Benchmark básico
wrk -t12 -c400 -d30s http://localhost:8000/api/health

# Con script Lua para POST
wrk -t4 -c100 -d30s -s post.lua http://localhost:8000/api/users
-- post.lua
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/json"
wrk.body = '{"email": "test@test.com", "password": "test123"}'

locust

# locustfile.py
from locust import HttpUser, task, between

class APIUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        # Login
        response = self.client.post("/auth/token", data={
            "username": "test@test.com",
            "password": "testpass123",
        })
        self.token = response.json()["access_token"]
        self.client.headers["Authorization"] = f"Bearer {self.token}"
    
    @task(3)
    def get_users(self):
        self.client.get("/users")
    
    @task(1)
    def create_order(self):
        self.client.post("/orders", json={"items": [{"id": 1, "qty": 2}]})
# Ejecutar
locust -f locustfile.py --host=http://localhost:8000
# Abrir http://localhost:8089 para UI

8. Métricas de Performance

Baseline para APIs REST

MétricaObjetivoAlerta
P50 Latency< 50ms> 100ms
P99 Latency< 200ms> 500ms
Error Rate< 0.1%> 1%
ThroughputDepende-20% baseline

Instrumentación

from prometheus_client import Histogram

LATENCY = Histogram(
    "http_request_latency_seconds",
    "Request latency",
    ["endpoint"],
    buckets=[0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5],
)

@app.middleware("http")
async def latency_middleware(request: Request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    duration = time.perf_counter() - start
    
    LATENCY.labels(endpoint=request.url.path).observe(duration)
    
    return response

9. Configuración de Producción

Dockerfile Optimizado

FROM python:3.12-slim AS builder

# Instalar uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev --no-install-project

COPY . .
RUN uv sync --frozen --no-dev

FROM python:3.12-slim

# Security: non-root user
RUN useradd -m -u 1000 appuser
USER appuser

WORKDIR /app
COPY --from=builder --chown=appuser:appuser /app/.venv ./.venv
COPY --from=builder --chown=appuser:appuser /app/app ./app
COPY --from=builder --chown=appuser:appuser /app/gunicorn.conf.py .

ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

EXPOSE 8000

CMD ["gunicorn", "app.main:app", "-c", "gunicorn.conf.py"]

Variables de Entorno

# .env.production
GUNICORN_WORKERS=4
GUNICORN_BIND=0.0.0.0:8000
LOG_LEVEL=warning
DATABASE_POOL_SIZE=5
DATABASE_MAX_OVERFLOW=10
REDIS_MAX_CONNECTIONS=50

10. Checklist de Performance


Conclusión

El performance en Python/FastAPI se optimiza en capas:

  1. ASGI Stack — Gunicorn + Uvicorn workers
  2. Serialización — orjson para JSON rápido
  3. Database — Connection pooling y loading strategies
  4. Caching — Redis para datos frecuentes
  5. Profiling — py-spy para encontrar bottlenecks

Pattern Senior: Mide antes de optimizar. Usa Prometheus + Grafana para identificar los endpoints lentos, luego profilea con py-spy.

En el siguiente capítulo, profundizaremos en Docker avanzado para producción.