Módulo 13 • 16 min de lectura
13 - Performance y ASGI
Gunicorn, workers, profiling, benchmarks y optimización de FastAPI para producción.
#performance
#asgi
#gunicorn
#uvicorn
#profiling
1. El Stack ASGI: Cómo Fluyen las Requests
[Internet] → [Nginx/ALB] → [Gunicorn] → [Uvicorn Workers] → [FastAPI App]
↓
Process Manager (master)
↓
┌───────────────┼───────────────┐
↓ ↓ ↓
Worker 1 Worker 2 Worker 3
(Uvicorn) (Uvicorn) (Uvicorn)
↓ ↓ ↓
Event Loop Event Loop Event Loop
Comparativa con Node.js
| Capa | Node.js | Python |
|---|---|---|
| Process Manager | PM2 / Cluster | Gunicorn |
| HTTP Server | Built-in (V8) | Uvicorn / Hypercorn |
| Framework | Express / Fastify | FastAPI / Starlette |
| Protocolo | HTTP | ASGI (Async Server Gateway Interface) |
2. Configuración de Gunicorn + Uvicorn
gunicorn.conf.py
# gunicorn.conf.py
import multiprocessing
import os
# Bind
bind = os.getenv("GUNICORN_BIND", "0.0.0.0:8000")
# Workers
workers = int(os.getenv("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
# Timeouts
timeout = 120 # Request timeout
graceful_timeout = 30 # Time to finish requests on restart
keepalive = 5 # Keep-alive connections
# Lifecycle
max_requests = 10000 # Restart worker after N requests (memory leak protection)
max_requests_jitter = 1000 # Random jitter to prevent all workers restarting together
# Logging
accesslog = "-" # stdout
errorlog = "-" # stderr
loglevel = os.getenv("LOG_LEVEL", "info")
# Security
limit_request_line = 4096
limit_request_fields = 100
limit_request_field_size = 8190
# Hooks
def on_starting(server):
print("Gunicorn master starting...")
def worker_int(worker):
print(f"Worker {worker.pid} received SIGINT")
def worker_abort(worker):
print(f"Worker {worker.pid} received SIGABRT")
Comando de Inicio
# Desarrollo
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Producción
gunicorn app.main:app -c gunicorn.conf.py
# O directamente
gunicorn app.main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120
3. Fórmulas de Workers
CPU-Bound APIs
workers = (2 * cpu_count) + 1 # Classic formula
I/O-Bound APIs (la mayoría)
workers = cpu_count # O incluso menos si hay mucho async I/O
Kubernetes (1 worker per pod)
workers = 1 # K8s escala horizontalmente con pods
replicas = desired_concurrency # En el Deployment
Memoria Limitada
# Cada worker consume ~50-150MB
available_memory = 512 # MB
memory_per_worker = 100 # MB (medir con tu app)
workers = available_memory // memory_per_worker # 5 workers
4. Profiling: Encontrar Cuellos de Botella
py-spy (No requiere reinicio)
# Instalar
pip install py-spy
# Profiler en proceso corriendo
py-spy top --pid <PID>
# Generar flamegraph
py-spy record -o profile.svg --pid <PID>
# Profiler durante N segundos
py-spy record -o profile.svg --pid <PID> --duration 30
cProfile (Built-in)
import cProfile
import pstats
def profile_function(func):
"""Decorator para profiling de funciones."""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 funciones
return result
return wrapper
@profile_function
def expensive_operation():
# ...
pass
Middleware de Profiling (Solo Desarrollo)
import cProfile
import pstats
import io
class ProfilingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Solo si header especial presente
headers = dict(scope.get("headers", []))
if headers.get(b"x-profile") != b"true":
await self.app(scope, receive, send)
return
profiler = cProfile.Profile()
profiler.enable()
await self.app(scope, receive, send)
profiler.disable()
stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(30)
print(stream.getvalue())
# Solo en desarrollo
if settings.DEBUG:
app.add_middleware(ProfilingMiddleware)
5. Optimizaciones de FastAPI
1. Response Class Rápida
from fastapi.responses import ORJSONResponse
# Global
app = FastAPI(default_response_class=ORJSONResponse)
# Por endpoint
@app.get("/fast", response_class=ORJSONResponse)
async def fast_endpoint():
return {"data": large_payload}
Benchmark: orjson es 3-10x más rápido que el json estándar.
2. Evitar Pydantic en Hot Paths
# ❌ Lento para listas grandes
@app.get("/items", response_model=list[ItemResponse])
async def list_items():
items = await db.fetch_all()
return items # Pydantic valida CADA item
# ✅ Más rápido
@app.get("/items")
async def list_items() -> list[dict]:
items = await db.fetch_all()
# Serializar manualmente o usar model_dump() una vez
return [item.model_dump() for item in items]
3. Streaming para Responses Grandes
from fastapi.responses import StreamingResponse
import asyncio
async def generate_large_csv():
yield "id,name,email\n"
async for batch in fetch_users_in_batches(batch_size=1000):
for user in batch:
yield f"{user.id},{user.name},{user.email}\n"
@app.get("/export/users.csv")
async def export_users():
return StreamingResponse(
generate_large_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"},
)
4. Background Tasks para Operaciones Lentas
from fastapi import BackgroundTasks
async def send_welcome_email(email: str):
# Operación lenta que no necesita bloquear la response
await email_service.send(email, template="welcome")
@app.post("/users", status_code=201)
async def create_user(
data: UserCreate,
background_tasks: BackgroundTasks,
):
user = await user_service.create(data)
# Se ejecuta DESPUÉS de enviar la response
background_tasks.add_task(send_welcome_email, user.email)
return user
5. Connection Pooling Óptimo
# SQLAlchemy
engine = create_async_engine(
DATABASE_URL,
pool_size=5, # Conexiones base
max_overflow=10, # Conexiones extra bajo carga
pool_timeout=30, # Timeout esperando conexión
pool_recycle=1800, # Reciclar cada 30 min
pool_pre_ping=True, # Verificar antes de usar
)
# httpx para APIs externas
http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30,
),
timeout=httpx.Timeout(10.0, connect=5.0),
)
6. Caching Strategies
In-Memory con TTL
from functools import lru_cache
from cachetools import TTLCache
import asyncio
# Síncrono
@lru_cache(maxsize=1000)
def get_config(key: str) -> str:
return expensive_lookup(key)
# Async con TTL
cache = TTLCache(maxsize=1000, ttl=300) # 5 minutos
cache_lock = asyncio.Lock()
async def get_user_cached(user_id: int) -> User:
cache_key = f"user:{user_id}"
if cache_key in cache:
return cache[cache_key]
async with cache_lock:
# Double-check después del lock
if cache_key in cache:
return cache[cache_key]
user = await user_repo.get(user_id)
cache[cache_key] = user
return user
Redis Cache
from typing import TypeVar, Callable
import json
T = TypeVar("T")
class RedisCache:
def __init__(self, redis: Redis, prefix: str = "cache"):
self.redis = redis
self.prefix = prefix
async def get_or_set(
self,
key: str,
factory: Callable[[], T],
ttl: int = 300,
) -> T:
full_key = f"{self.prefix}:{key}"
# Try cache
cached = await self.redis.get(full_key)
if cached:
return json.loads(cached)
# Compute and cache
value = await factory()
await self.redis.set(full_key, json.dumps(value), ex=ttl)
return value
# Uso
cache = RedisCache(redis_client)
async def get_user_stats(user_id: int) -> dict:
return await cache.get_or_set(
f"stats:{user_id}",
lambda: compute_user_stats(user_id),
ttl=600,
)
7. Benchmarking
wrk
# Instalar
apt-get install wrk
# Benchmark básico
wrk -t12 -c400 -d30s http://localhost:8000/api/health
# Con script Lua para POST
wrk -t4 -c100 -d30s -s post.lua http://localhost:8000/api/users
-- post.lua
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/json"
wrk.body = '{"email": "test@test.com", "password": "test123"}'
locust
# locustfile.py
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
# Login
response = self.client.post("/auth/token", data={
"username": "test@test.com",
"password": "testpass123",
})
self.token = response.json()["access_token"]
self.client.headers["Authorization"] = f"Bearer {self.token}"
@task(3)
def get_users(self):
self.client.get("/users")
@task(1)
def create_order(self):
self.client.post("/orders", json={"items": [{"id": 1, "qty": 2}]})
# Ejecutar
locust -f locustfile.py --host=http://localhost:8000
# Abrir http://localhost:8089 para UI
8. Métricas de Performance
Baseline para APIs REST
| Métrica | Objetivo | Alerta |
|---|---|---|
| P50 Latency | < 50ms | > 100ms |
| P99 Latency | < 200ms | > 500ms |
| Error Rate | < 0.1% | > 1% |
| Throughput | Depende | -20% baseline |
Instrumentación
from prometheus_client import Histogram
LATENCY = Histogram(
"http_request_latency_seconds",
"Request latency",
["endpoint"],
buckets=[0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5],
)
@app.middleware("http")
async def latency_middleware(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
LATENCY.labels(endpoint=request.url.path).observe(duration)
return response
9. Configuración de Producción
Dockerfile Optimizado
FROM python:3.12-slim AS builder
# Instalar uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev --no-install-project
COPY . .
RUN uv sync --frozen --no-dev
FROM python:3.12-slim
# Security: non-root user
RUN useradd -m -u 1000 appuser
USER appuser
WORKDIR /app
COPY --from=builder --chown=appuser:appuser /app/.venv ./.venv
COPY --from=builder --chown=appuser:appuser /app/app ./app
COPY --from=builder --chown=appuser:appuser /app/gunicorn.conf.py .
ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
EXPOSE 8000
CMD ["gunicorn", "app.main:app", "-c", "gunicorn.conf.py"]
Variables de Entorno
# .env.production
GUNICORN_WORKERS=4
GUNICORN_BIND=0.0.0.0:8000
LOG_LEVEL=warning
DATABASE_POOL_SIZE=5
DATABASE_MAX_OVERFLOW=10
REDIS_MAX_CONNECTIONS=50
10. Checklist de Performance
- orjson como response class por defecto
- Connection pooling configurado para DB y Redis
- Timeouts en todas las llamadas externas
- Streaming para responses > 1MB
- Background tasks para operaciones no críticas
- Índices en columnas de filtro frecuente
- N+1 queries resueltos con
selectinload - Profiling ejecutado en staging
- Métricas de latencia y throughput
- max_requests en Gunicorn para memory leaks
Conclusión
El performance en Python/FastAPI se optimiza en capas:
- ASGI Stack — Gunicorn + Uvicorn workers
- Serialización — orjson para JSON rápido
- Database — Connection pooling y loading strategies
- Caching — Redis para datos frecuentes
- Profiling — py-spy para encontrar bottlenecks
Pattern Senior: Mide antes de optimizar. Usa Prometheus + Grafana para identificar los endpoints lentos, luego profilea con py-spy.
En el siguiente capítulo, profundizaremos en Docker avanzado para producción.