12 - Logging y Observabilidad
Structured logging con Structlog, OpenTelemetry, contextvars y métricas para sistemas distribuidos.
1. De console.log a Structured Logging
// Node.js con Pino
import pino from 'pino';
const logger = pino({
level: 'info',
formatters: {
level: (label) => ({ level: label }),
},
});
logger.info({ userId: 123, action: 'login' }, 'User logged in');
// {"level":"info","userId":123,"action":"login","msg":"User logged in"} # Python con Structlog
import structlog
logger = structlog.get_logger()
logger.info("user_logged_in", user_id=123, action="login")
# {"event": "user_logged_in", "user_id": 123, "action": "login", "timestamp": "..."} Structured logging genera logs en formato JSON, parseables por herramientas como ELK, Datadog o Loki.
2. Configuración de Structlog
# core/logging.py
import logging
import sys
import structlog
from structlog.types import Processor
def setup_logging(json_logs: bool = True, log_level: str = "INFO"):
"""Configure structured logging for the application."""
# Procesadores compartidos
shared_processors: list[Processor] = [
structlog.contextvars.merge_contextvars, # Merge context variables
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
]
if json_logs:
# Producción: JSON
shared_processors.append(structlog.processors.format_exc_info)
renderer = structlog.processors.JSONRenderer()
else:
# Desarrollo: Colores y formato legible
renderer = structlog.dev.ConsoleRenderer(colors=True)
structlog.configure(
processors=shared_processors + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Configurar logging estándar
formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
renderer,
],
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.handlers = [handler]
root_logger.setLevel(log_level)
# Silenciar loggers ruidosos
for logger_name in ["uvicorn.access", "httpx", "httpcore"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
# Llamar al inicio de la app
setup_logging(
json_logs=settings.ENVIRONMENT == "production",
log_level=settings.LOG_LEVEL,
)
Uso Básico
import structlog
logger = structlog.get_logger(__name__)
async def create_user(data: UserCreate) -> User:
logger.info("creating_user", email=data.email)
try:
user = await repo.create(data)
logger.info("user_created", user_id=user.id, email=user.email)
return user
except IntegrityError:
logger.warning("duplicate_email", email=data.email)
raise ConflictError("Email already exists")
except Exception as e:
logger.exception("user_creation_failed", email=data.email)
raise
3. Contextvars: Request Context Propagation
contextvars es el equivalente a AsyncLocalStorage de Node.js. Permite propagar contexto a través de llamadas async sin pasarlo explícitamente.
// Node.js: AsyncLocalStorage
import { AsyncLocalStorage } from 'async_hooks';
const requestContext = new AsyncLocalStorage();
app.use((req, res, next) => {
const context = { requestId: uuid() };
requestContext.run(context, () => next());
});
// En cualquier parte del código
const ctx = requestContext.getStore();
console.log(ctx.requestId); # Python: contextvars
from contextvars import ContextVar
import structlog
request_id_ctx: ContextVar[str] = ContextVar("request_id", default="")
user_id_ctx: ContextVar[str | None] = ContextVar("user_id", default=None)
# Middleware que setea el contexto
@app.middleware("http")
async def inject_request_context(request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request_id_ctx.set(request_id)
# Bind to structlog (se incluye en todos los logs)
structlog.contextvars.bind_contextvars(request_id=request_id)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
structlog.contextvars.unbind_contextvars("request_id")
return response Bind de Usuario Autenticado
async def get_current_user_with_logging(
token: Annotated[str, Depends(oauth2_scheme)],
db: DBSession,
) -> User:
user = await get_current_user(token, db)
# Añadir user_id a todos los logs subsecuentes
user_id_ctx.set(str(user.id))
structlog.contextvars.bind_contextvars(user_id=user.id)
return user
Log Output Resultante
{
"event": "order_created",
"order_id": 456,
"amount": 99.99,
"request_id": "abc-123",
"user_id": 42,
"timestamp": "2024-02-01T10:30:00Z",
"level": "info",
"logger": "app.services.orders"
}
4. OpenTelemetry: Distributed Tracing
OpenTelemetry permite trazar requests a través de múltiples servicios.
Instalación
uv add opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi opentelemetry-instrumentation-sqlalchemy opentelemetry-instrumentation-httpx opentelemetry-exporter-otlp
Configuración
# core/telemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def setup_telemetry(app: FastAPI, service_name: str):
"""Configure OpenTelemetry tracing."""
# Resource identifica el servicio
resource = Resource.create({
"service.name": service_name,
"service.version": settings.VERSION,
"deployment.environment": settings.ENVIRONMENT,
})
# Tracer provider
provider = TracerProvider(resource=resource)
# Exporter (envía spans a Jaeger/Tempo/etc.)
if settings.OTLP_ENDPOINT:
exporter = OTLPSpanExporter(endpoint=settings.OTLP_ENDPOINT)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
# Auto-instrumentación
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
HTTPXClientInstrumentor().instrument()
# En main.py
setup_telemetry(app, "my-api")
Spans Personalizados
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def process_order(order_id: int) -> Order:
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
# Child span para validación
with tracer.start_as_current_span("validate_inventory"):
await validate_inventory(order_id)
# Child span para pago
with tracer.start_as_current_span("process_payment") as payment_span:
try:
result = await payment_service.charge(order_id)
payment_span.set_attribute("payment.status", "success")
except PaymentError as e:
payment_span.set_attribute("payment.status", "failed")
payment_span.record_exception(e)
raise
return await finalize_order(order_id)
Correlación Log-Trace
from opentelemetry import trace
def add_trace_context(logger, method_name, event_dict):
"""Processor que añade trace_id y span_id a los logs."""
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
# Añadir a la configuración de structlog
structlog.configure(
processors=[
add_trace_context,
# ... otros processors
]
)
5. Métricas con Prometheus
# core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from fastapi import Response
# Métricas
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
ACTIVE_REQUESTS = Gauge(
"http_requests_active",
"Active HTTP requests",
)
DB_POOL_SIZE = Gauge(
"db_pool_size",
"Database connection pool size",
["state"], # "used", "available"
)
# Middleware de métricas
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
ACTIVE_REQUESTS.inc()
start_time = time.perf_counter()
try:
response = await call_next(request)
status = response.status_code
except Exception:
status = 500
raise
finally:
ACTIVE_REQUESTS.dec()
duration = time.perf_counter() - start_time
endpoint = request.url.path
method = request.method
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint).observe(duration)
return response
# Endpoint de métricas
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST,
)
Métricas de Negocio
ORDERS_CREATED = Counter(
"orders_created_total",
"Total orders created",
["status", "payment_method"],
)
ORDER_VALUE = Histogram(
"order_value_dollars",
"Order value in dollars",
buckets=[10, 25, 50, 100, 250, 500, 1000],
)
async def create_order(data: OrderCreate) -> Order:
order = await order_repo.create(data)
# Registrar métricas
ORDERS_CREATED.labels(
status="created",
payment_method=data.payment_method,
).inc()
ORDER_VALUE.observe(float(order.total))
return order
6. Health Checks
# api/health.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from enum import Enum
router = APIRouter(tags=["health"])
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class ComponentHealth(BaseModel):
status: HealthStatus
latency_ms: float | None = None
message: str | None = None
class HealthResponse(BaseModel):
status: HealthStatus
version: str
components: dict[str, ComponentHealth]
async def check_database(db: AsyncSession) -> ComponentHealth:
start = time.perf_counter()
try:
await db.execute(text("SELECT 1"))
latency = (time.perf_counter() - start) * 1000
return ComponentHealth(status=HealthStatus.HEALTHY, latency_ms=latency)
except Exception as e:
return ComponentHealth(status=HealthStatus.UNHEALTHY, message=str(e))
async def check_redis(redis: Redis) -> ComponentHealth:
start = time.perf_counter()
try:
await redis.ping()
latency = (time.perf_counter() - start) * 1000
return ComponentHealth(status=HealthStatus.HEALTHY, latency_ms=latency)
except Exception as e:
return ComponentHealth(status=HealthStatus.UNHEALTHY, message=str(e))
@router.get("/health", response_model=HealthResponse)
async def health_check(
db: Annotated[AsyncSession, Depends(get_db)],
redis: Annotated[Redis, Depends(get_redis)],
) -> HealthResponse:
components = {
"database": await check_database(db),
"redis": await check_redis(redis),
}
# Determinar estado general
if all(c.status == HealthStatus.HEALTHY for c in components.values()):
overall = HealthStatus.HEALTHY
elif any(c.status == HealthStatus.UNHEALTHY for c in components.values()):
overall = HealthStatus.UNHEALTHY
else:
overall = HealthStatus.DEGRADED
return HealthResponse(
status=overall,
version=settings.VERSION,
components=components,
)
@router.get("/health/live")
async def liveness():
"""Kubernetes liveness probe - is the process alive?"""
return {"status": "ok"}
@router.get("/health/ready")
async def readiness(db: Annotated[AsyncSession, Depends(get_db)]):
"""Kubernetes readiness probe - can we serve traffic?"""
try:
await db.execute(text("SELECT 1"))
return {"status": "ok"}
except Exception:
raise HTTPException(status_code=503, detail="Not ready")
7. Error Tracking con Sentry
# core/sentry.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
def setup_sentry():
if not settings.SENTRY_DSN:
return
sentry_sdk.init(
dsn=settings.SENTRY_DSN,
environment=settings.ENVIRONMENT,
release=settings.VERSION,
traces_sample_rate=0.1, # 10% de traces
profiles_sample_rate=0.1, # 10% de profiles
integrations=[
FastApiIntegration(transaction_style="endpoint"),
SqlalchemyIntegration(),
LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR,
),
],
before_send=filter_sensitive_data,
)
def filter_sensitive_data(event, hint):
"""Remove sensitive data before sending to Sentry."""
if "request" in event and "headers" in event["request"]:
headers = event["request"]["headers"]
if "authorization" in headers:
headers["authorization"] = "[FILTERED]"
if "cookie" in headers:
headers["cookie"] = "[FILTERED]"
return event
# Contexto adicional en errores
def add_user_context(user: User):
sentry_sdk.set_user({
"id": str(user.id),
"email": user.email,
})
def add_request_context(request_id: str):
sentry_sdk.set_tag("request_id", request_id)
8. Dashboard Recomendado
docker-compose para Observabilidad Local
# docker-compose.observability.yml
services:
# Métricas
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
# Tracing
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "4317:4317" # OTLP gRPC
# Logs
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
9. Tabla Comparativa
| Aspecto | Node.js | Python |
|---|---|---|
| Structured Logging | Pino, Winston | Structlog, Loguru |
| Context Propagation | AsyncLocalStorage | contextvars |
| Tracing | OpenTelemetry | OpenTelemetry |
| Metrics | prom-client | prometheus_client |
| Error Tracking | Sentry | Sentry |
Conclusión
La observabilidad en Python sigue los mismos principios que en Node.js:
- Structured logging — JSON logs con Structlog
- Context propagation —
contextvarspara request_id, user_id - Distributed tracing — OpenTelemetry para cross-service
- Metrics — Prometheus para alerting
- Error tracking — Sentry para debugging
Pattern Senior: Siempre incluye request_id y trace_id en logs. Esto permite correlacionar logs con traces cuando debuggeas problemas en producción.
En el siguiente capítulo, optimizaremos el rendimiento con profiling y configuración de ASGI.