Módulo 12 16 min de lectura

12 - Logging y Observabilidad

Structured logging con Structlog, OpenTelemetry, contextvars y métricas para sistemas distribuidos.

#logging #observability #opentelemetry #structlog #monitoring

1. De console.log a Structured Logging

JavaScript/TypeScript
// Node.js con Pino
import pino from 'pino';

const logger = pino({
level: 'info',
formatters: {
  level: (label) => ({ level: label }),
},
});

logger.info({ userId: 123, action: 'login' }, 'User logged in');
// {"level":"info","userId":123,"action":"login","msg":"User logged in"}
Python
# Python con Structlog
import structlog

logger = structlog.get_logger()

logger.info("user_logged_in", user_id=123, action="login")
# {"event": "user_logged_in", "user_id": 123, "action": "login", "timestamp": "..."}

Structured logging genera logs en formato JSON, parseables por herramientas como ELK, Datadog o Loki.


2. Configuración de Structlog

# core/logging.py
import logging
import sys
import structlog
from structlog.types import Processor

def setup_logging(json_logs: bool = True, log_level: str = "INFO"):
    """Configure structured logging for the application."""
    
    # Procesadores compartidos
    shared_processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,  # Merge context variables
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.UnicodeDecoder(),
    ]
    
    if json_logs:
        # Producción: JSON
        shared_processors.append(structlog.processors.format_exc_info)
        renderer = structlog.processors.JSONRenderer()
    else:
        # Desarrollo: Colores y formato legible
        renderer = structlog.dev.ConsoleRenderer(colors=True)
    
    structlog.configure(
        processors=shared_processors + [
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )
    
    # Configurar logging estándar
    formatter = structlog.stdlib.ProcessorFormatter(
        foreign_pre_chain=shared_processors,
        processors=[
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            renderer,
        ],
    )
    
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)
    
    root_logger = logging.getLogger()
    root_logger.handlers = [handler]
    root_logger.setLevel(log_level)
    
    # Silenciar loggers ruidosos
    for logger_name in ["uvicorn.access", "httpx", "httpcore"]:
        logging.getLogger(logger_name).setLevel(logging.WARNING)

# Llamar al inicio de la app
setup_logging(
    json_logs=settings.ENVIRONMENT == "production",
    log_level=settings.LOG_LEVEL,
)

Uso Básico

import structlog

logger = structlog.get_logger(__name__)

async def create_user(data: UserCreate) -> User:
    logger.info("creating_user", email=data.email)
    
    try:
        user = await repo.create(data)
        logger.info("user_created", user_id=user.id, email=user.email)
        return user
    except IntegrityError:
        logger.warning("duplicate_email", email=data.email)
        raise ConflictError("Email already exists")
    except Exception as e:
        logger.exception("user_creation_failed", email=data.email)
        raise

3. Contextvars: Request Context Propagation

contextvars es el equivalente a AsyncLocalStorage de Node.js. Permite propagar contexto a través de llamadas async sin pasarlo explícitamente.

JavaScript/TypeScript
// Node.js: AsyncLocalStorage
import { AsyncLocalStorage } from 'async_hooks';

const requestContext = new AsyncLocalStorage();

app.use((req, res, next) => {
const context = { requestId: uuid() };
requestContext.run(context, () => next());
});

// En cualquier parte del código
const ctx = requestContext.getStore();
console.log(ctx.requestId);
Python
# Python: contextvars
from contextvars import ContextVar
import structlog

request_id_ctx: ContextVar[str] = ContextVar("request_id", default="")
user_id_ctx: ContextVar[str | None] = ContextVar("user_id", default=None)

# Middleware que setea el contexto
@app.middleware("http")
async def inject_request_context(request: Request, call_next):
  request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
  request_id_ctx.set(request_id)
  
  # Bind to structlog (se incluye en todos los logs)
  structlog.contextvars.bind_contextvars(request_id=request_id)
  
  response = await call_next(request)
  response.headers["X-Request-ID"] = request_id
  
  structlog.contextvars.unbind_contextvars("request_id")
  return response

Bind de Usuario Autenticado

async def get_current_user_with_logging(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: DBSession,
) -> User:
    user = await get_current_user(token, db)
    
    # Añadir user_id a todos los logs subsecuentes
    user_id_ctx.set(str(user.id))
    structlog.contextvars.bind_contextvars(user_id=user.id)
    
    return user

Log Output Resultante

{
  "event": "order_created",
  "order_id": 456,
  "amount": 99.99,
  "request_id": "abc-123",
  "user_id": 42,
  "timestamp": "2024-02-01T10:30:00Z",
  "level": "info",
  "logger": "app.services.orders"
}

4. OpenTelemetry: Distributed Tracing

OpenTelemetry permite trazar requests a través de múltiples servicios.

Instalación

uv add opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi opentelemetry-instrumentation-sqlalchemy opentelemetry-instrumentation-httpx opentelemetry-exporter-otlp

Configuración

# core/telemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

def setup_telemetry(app: FastAPI, service_name: str):
    """Configure OpenTelemetry tracing."""
    
    # Resource identifica el servicio
    resource = Resource.create({
        "service.name": service_name,
        "service.version": settings.VERSION,
        "deployment.environment": settings.ENVIRONMENT,
    })
    
    # Tracer provider
    provider = TracerProvider(resource=resource)
    
    # Exporter (envía spans a Jaeger/Tempo/etc.)
    if settings.OTLP_ENDPOINT:
        exporter = OTLPSpanExporter(endpoint=settings.OTLP_ENDPOINT)
        provider.add_span_processor(BatchSpanProcessor(exporter))
    
    trace.set_tracer_provider(provider)
    
    # Auto-instrumentación
    FastAPIInstrumentor.instrument_app(app)
    SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
    HTTPXClientInstrumentor().instrument()

# En main.py
setup_telemetry(app, "my-api")

Spans Personalizados

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

async def process_order(order_id: int) -> Order:
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        
        # Child span para validación
        with tracer.start_as_current_span("validate_inventory"):
            await validate_inventory(order_id)
        
        # Child span para pago
        with tracer.start_as_current_span("process_payment") as payment_span:
            try:
                result = await payment_service.charge(order_id)
                payment_span.set_attribute("payment.status", "success")
            except PaymentError as e:
                payment_span.set_attribute("payment.status", "failed")
                payment_span.record_exception(e)
                raise
        
        return await finalize_order(order_id)

Correlación Log-Trace

from opentelemetry import trace

def add_trace_context(logger, method_name, event_dict):
    """Processor que añade trace_id y span_id a los logs."""
    span = trace.get_current_span()
    if span.is_recording():
        ctx = span.get_span_context()
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

# Añadir a la configuración de structlog
structlog.configure(
    processors=[
        add_trace_context,
        # ... otros processors
    ]
)

5. Métricas con Prometheus

# core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from fastapi import Response

# Métricas
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"],
)

REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "HTTP request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)

ACTIVE_REQUESTS = Gauge(
    "http_requests_active",
    "Active HTTP requests",
)

DB_POOL_SIZE = Gauge(
    "db_pool_size",
    "Database connection pool size",
    ["state"],  # "used", "available"
)

# Middleware de métricas
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    ACTIVE_REQUESTS.inc()
    start_time = time.perf_counter()
    
    try:
        response = await call_next(request)
        status = response.status_code
    except Exception:
        status = 500
        raise
    finally:
        ACTIVE_REQUESTS.dec()
        duration = time.perf_counter() - start_time
        
        endpoint = request.url.path
        method = request.method
        
        REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
        REQUEST_LATENCY.labels(method=method, endpoint=endpoint).observe(duration)
    
    return response

# Endpoint de métricas
@app.get("/metrics")
async def metrics():
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST,
    )

Métricas de Negocio

ORDERS_CREATED = Counter(
    "orders_created_total",
    "Total orders created",
    ["status", "payment_method"],
)

ORDER_VALUE = Histogram(
    "order_value_dollars",
    "Order value in dollars",
    buckets=[10, 25, 50, 100, 250, 500, 1000],
)

async def create_order(data: OrderCreate) -> Order:
    order = await order_repo.create(data)
    
    # Registrar métricas
    ORDERS_CREATED.labels(
        status="created",
        payment_method=data.payment_method,
    ).inc()
    ORDER_VALUE.observe(float(order.total))
    
    return order

6. Health Checks

# api/health.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from enum import Enum

router = APIRouter(tags=["health"])

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

class ComponentHealth(BaseModel):
    status: HealthStatus
    latency_ms: float | None = None
    message: str | None = None

class HealthResponse(BaseModel):
    status: HealthStatus
    version: str
    components: dict[str, ComponentHealth]

async def check_database(db: AsyncSession) -> ComponentHealth:
    start = time.perf_counter()
    try:
        await db.execute(text("SELECT 1"))
        latency = (time.perf_counter() - start) * 1000
        return ComponentHealth(status=HealthStatus.HEALTHY, latency_ms=latency)
    except Exception as e:
        return ComponentHealth(status=HealthStatus.UNHEALTHY, message=str(e))

async def check_redis(redis: Redis) -> ComponentHealth:
    start = time.perf_counter()
    try:
        await redis.ping()
        latency = (time.perf_counter() - start) * 1000
        return ComponentHealth(status=HealthStatus.HEALTHY, latency_ms=latency)
    except Exception as e:
        return ComponentHealth(status=HealthStatus.UNHEALTHY, message=str(e))

@router.get("/health", response_model=HealthResponse)
async def health_check(
    db: Annotated[AsyncSession, Depends(get_db)],
    redis: Annotated[Redis, Depends(get_redis)],
) -> HealthResponse:
    components = {
        "database": await check_database(db),
        "redis": await check_redis(redis),
    }
    
    # Determinar estado general
    if all(c.status == HealthStatus.HEALTHY for c in components.values()):
        overall = HealthStatus.HEALTHY
    elif any(c.status == HealthStatus.UNHEALTHY for c in components.values()):
        overall = HealthStatus.UNHEALTHY
    else:
        overall = HealthStatus.DEGRADED
    
    return HealthResponse(
        status=overall,
        version=settings.VERSION,
        components=components,
    )

@router.get("/health/live")
async def liveness():
    """Kubernetes liveness probe - is the process alive?"""
    return {"status": "ok"}

@router.get("/health/ready")
async def readiness(db: Annotated[AsyncSession, Depends(get_db)]):
    """Kubernetes readiness probe - can we serve traffic?"""
    try:
        await db.execute(text("SELECT 1"))
        return {"status": "ok"}
    except Exception:
        raise HTTPException(status_code=503, detail="Not ready")

7. Error Tracking con Sentry

# core/sentry.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.logging import LoggingIntegration

def setup_sentry():
    if not settings.SENTRY_DSN:
        return
    
    sentry_sdk.init(
        dsn=settings.SENTRY_DSN,
        environment=settings.ENVIRONMENT,
        release=settings.VERSION,
        traces_sample_rate=0.1,  # 10% de traces
        profiles_sample_rate=0.1,  # 10% de profiles
        integrations=[
            FastApiIntegration(transaction_style="endpoint"),
            SqlalchemyIntegration(),
            LoggingIntegration(
                level=logging.INFO,
                event_level=logging.ERROR,
            ),
        ],
        before_send=filter_sensitive_data,
    )

def filter_sensitive_data(event, hint):
    """Remove sensitive data before sending to Sentry."""
    if "request" in event and "headers" in event["request"]:
        headers = event["request"]["headers"]
        if "authorization" in headers:
            headers["authorization"] = "[FILTERED]"
        if "cookie" in headers:
            headers["cookie"] = "[FILTERED]"
    return event

# Contexto adicional en errores
def add_user_context(user: User):
    sentry_sdk.set_user({
        "id": str(user.id),
        "email": user.email,
    })

def add_request_context(request_id: str):
    sentry_sdk.set_tag("request_id", request_id)

8. Dashboard Recomendado

docker-compose para Observabilidad Local

# docker-compose.observability.yml
services:
  # Métricas
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
  
  # Tracing
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"  # UI
      - "4317:4317"    # OTLP gRPC
  
  # Logs
  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"

9. Tabla Comparativa

AspectoNode.jsPython
Structured LoggingPino, WinstonStructlog, Loguru
Context PropagationAsyncLocalStoragecontextvars
TracingOpenTelemetryOpenTelemetry
Metricsprom-clientprometheus_client
Error TrackingSentrySentry

Conclusión

La observabilidad en Python sigue los mismos principios que en Node.js:

  1. Structured logging — JSON logs con Structlog
  2. Context propagationcontextvars para request_id, user_id
  3. Distributed tracing — OpenTelemetry para cross-service
  4. Metrics — Prometheus para alerting
  5. Error tracking — Sentry para debugging

Pattern Senior: Siempre incluye request_id y trace_id en logs. Esto permite correlacionar logs con traces cuando debuggeas problemas en producción.

En el siguiente capítulo, optimizaremos el rendimiento con profiling y configuración de ASGI.