Módulo 17 18 min de lectura

17 - Task Queues: Celery y Redis

Procesamiento distribuido: workers, reintentos, scheduling y monitorización con Flower.

#celery #redis #task-queues #distributed #background-jobs

1. BackgroundTasks vs Celery

JavaScript/TypeScript
// Node.js: BullMQ
import { Queue, Worker } from 'bullmq';

const queue = new Queue('email', { connection: redis });

// Producer
await queue.add('send-welcome', { userId: 123 });

// Worker (proceso separado)
const worker = new Worker('email', async (job) => {
await sendEmail(job.data.userId);
}, { connection: redis });
Python
# Python: Celery
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

# Task definition
@app.task(bind=True, max_retries=3)
def send_welcome_email(self, user_id: int):
  try:
      send_email(user_id)
  except Exception as exc:
      raise self.retry(exc=exc, countdown=60)

# Producer (desde FastAPI)
send_welcome_email.delay(user_id=123)

Cuándo usar cada uno

EscenarioBackgroundTasksCelery
Enviar email simpleOverkill
Procesar imagen❌ Bloquea worker
Generar PDF grande
Tarea con reintentos
Scheduling (cron)✅ (Celery Beat)
Múltiples workers

2. Configuración de Celery

Estructura de Proyecto

app/
├── main.py              # FastAPI app
├── worker/
│   ├── __init__.py
│   ├── celery_app.py    # Configuración Celery
│   ├── tasks/
│   │   ├── __init__.py
│   │   ├── email.py
│   │   ├── reports.py
│   │   └── notifications.py
│   └── config.py

celery_app.py

# app/worker/celery_app.py
from celery import Celery
from kombu import Queue
from app.core.config import settings

celery_app = Celery(
    "worker",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL,
    include=[
        "app.worker.tasks.email",
        "app.worker.tasks.reports",
        "app.worker.tasks.notifications",
    ],
)

# Configuración
celery_app.conf.update(
    # Serialización
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    
    # Timezone
    timezone="UTC",
    enable_utc=True,
    
    # Resultados
    result_expires=3600,  # 1 hora
    result_backend_transport_options={
        "retry_policy": {"timeout": 5.0}
    },
    
    # Tareas
    task_acks_late=True,  # Acknowledge después de ejecutar
    task_reject_on_worker_lost=True,
    task_time_limit=600,  # 10 min max por tarea
    task_soft_time_limit=540,  # Soft limit (9 min)
    
    # Workers
    worker_prefetch_multiplier=1,  # Una tarea a la vez
    worker_concurrency=4,
    
    # Colas
    task_default_queue="default",
    task_queues=(
        Queue("default", routing_key="default"),
        Queue("high_priority", routing_key="high"),
        Queue("low_priority", routing_key="low"),
    ),
    
    # Routing
    task_routes={
        "app.worker.tasks.email.*": {"queue": "high_priority"},
        "app.worker.tasks.reports.*": {"queue": "low_priority"},
    },
)

# Celery Beat (scheduling)
celery_app.conf.beat_schedule = {
    "cleanup-expired-tokens": {
        "task": "app.worker.tasks.maintenance.cleanup_tokens",
        "schedule": 3600.0,  # Cada hora
    },
    "daily-report": {
        "task": "app.worker.tasks.reports.generate_daily_report",
        "schedule": crontab(hour=6, minute=0),  # 6 AM UTC
    },
}

3. Definición de Tasks

Task Básica

# app/worker/tasks/email.py
from app.worker.celery_app import celery_app
from app.services.email import EmailService
import structlog

logger = structlog.get_logger(__name__)

@celery_app.task(
    bind=True,
    name="send_welcome_email",
    max_retries=3,
    default_retry_delay=60,
)
def send_welcome_email(self, user_id: int, email: str):
    """Envía email de bienvenida."""
    logger.info("sending_welcome_email", user_id=user_id, task_id=self.request.id)
    
    try:
        email_service = EmailService()
        email_service.send(
            to=email,
            template="welcome",
            context={"user_id": user_id},
        )
        logger.info("welcome_email_sent", user_id=user_id)
        return {"status": "sent", "user_id": user_id}
    
    except ConnectionError as exc:
        logger.warning("email_service_unavailable", exc=str(exc))
        raise self.retry(exc=exc, countdown=60 * (self.request.retries + 1))
    
    except Exception as exc:
        logger.exception("send_email_failed", user_id=user_id)
        raise

Task con Progreso

# app/worker/tasks/reports.py
from celery import current_task

@celery_app.task(bind=True)
def generate_large_report(self, report_id: int, params: dict):
    """Genera reporte grande con tracking de progreso."""
    total_steps = 100
    
    for step in range(total_steps):
        # Actualizar progreso
        self.update_state(
            state="PROGRESS",
            meta={"current": step, "total": total_steps}
        )
        
        # Procesar paso
        process_step(step, params)
    
    # Guardar resultado
    report_url = save_report(report_id)
    
    return {"status": "completed", "url": report_url}

Consultar Progreso desde FastAPI

from celery.result import AsyncResult

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    result = AsyncResult(task_id, app=celery_app)
    
    if result.state == "PENDING":
        return {"status": "pending"}
    elif result.state == "PROGRESS":
        return {
            "status": "progress",
            "current": result.info.get("current", 0),
            "total": result.info.get("total", 100),
        }
    elif result.state == "SUCCESS":
        return {"status": "completed", "result": result.result}
    elif result.state == "FAILURE":
        return {"status": "failed", "error": str(result.info)}
    else:
        return {"status": result.state}

4. Patterns Avanzados

Chain (Pipeline)

from celery import chain

# Ejecutar tareas en secuencia
pipeline = chain(
    download_file.s(url),
    process_file.s(),
    upload_result.s(),
    notify_completion.s(user_id),
)
result = pipeline.apply_async()

Group (Paralelo)

from celery import group

# Ejecutar tareas en paralelo
parallel_tasks = group(
    process_chunk.s(chunk) for chunk in chunks
)
result = parallel_tasks.apply_async()

# Esperar todos los resultados
results = result.get()  # Lista de resultados

Chord (Paralelo + Callback)

from celery import chord

# Paralelo con callback cuando todos terminen
workflow = chord(
    [process_item.s(item) for item in items],
    aggregate_results.s()
)
result = workflow.apply_async()

Task con Límite de Concurrencia

from celery import Task
from celery.exceptions import Reject
import redis

class RateLimitedTask(Task):
    """Task con rate limiting por recurso."""
    
    def __init__(self):
        self.redis = redis.from_url(settings.REDIS_URL)
    
    def acquire_lock(self, key: str, timeout: int = 60) -> bool:
        return self.redis.set(f"lock:{key}", "1", nx=True, ex=timeout)
    
    def release_lock(self, key: str):
        self.redis.delete(f"lock:{key}")

@celery_app.task(base=RateLimitedTask, bind=True)
def process_user_export(self, user_id: int):
    lock_key = f"export:{user_id}"
    
    if not self.acquire_lock(lock_key, timeout=300):
        # Ya hay un export en progreso para este usuario
        raise Reject("Export already in progress", requeue=False)
    
    try:
        # Procesar export
        return do_export(user_id)
    finally:
        self.release_lock(lock_key)

5. Error Handling y Reintentos

from celery.exceptions import MaxRetriesExceededError, Reject

@celery_app.task(
    bind=True,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,  # Exponential backoff
    retry_backoff_max=600,  # Max 10 minutos
    retry_jitter=True,  # Añade randomness
    max_retries=5,
)
def resilient_task(self, data: dict):
    try:
        return process(data)
    except ValueError as exc:
        # Error de datos: no reintentar
        logger.error("invalid_data", data=data)
        raise Reject(str(exc), requeue=False)
    except ExternalServiceError as exc:
        # Servicio externo caído: reintentar
        raise self.retry(exc=exc)

Dead Letter Queue

# Configuración para tareas fallidas
celery_app.conf.update(
    task_acks_on_failure_or_timeout=True,
    task_reject_on_worker_lost=True,
)

# Handler para tareas que exceden reintentos
@celery_app.task(bind=True)
def handle_failed_task(self, task_id: str, error: str):
    """Procesa tareas que fallaron definitivamente."""
    logger.error("task_permanently_failed", task_id=task_id, error=error)
    # Guardar en DB para revisión manual
    FailedTask.create(task_id=task_id, error=error)
    # Notificar
    send_alert(f"Task {task_id} failed permanently: {error}")

6. Integración con FastAPI

Endpoint Fire-and-Forget

from fastapi import FastAPI, BackgroundTasks
from app.worker.tasks.email import send_welcome_email
from app.worker.tasks.reports import generate_large_report

@app.post("/users", status_code=201)
async def create_user(data: UserCreate, db: DBSession):
    user = await user_service.create(db, data)
    
    # Encolar tarea async (no bloquea)
    send_welcome_email.delay(user_id=user.id, email=user.email)
    
    return user

@app.post("/reports", status_code=202)
async def create_report(params: ReportParams, current_user: CurrentUser):
    # Encolar y devolver task_id
    task = generate_large_report.delay(
        report_id=uuid.uuid4().hex,
        params=params.model_dump(),
    )
    
    return {
        "task_id": task.id,
        "status_url": f"/tasks/{task.id}",
    }

Dependency para Celery App

from celery import Celery
from typing import Annotated

def get_celery() -> Celery:
    return celery_app

CeleryApp = Annotated[Celery, Depends(get_celery)]

@app.post("/batch-process")
async def batch_process(items: list[str], celery: CeleryApp):
    from celery import group
    
    job = group(process_item.s(item) for item in items)
    result = job.apply_async()
    
    return {"group_id": result.id}

7. Docker Compose con Workers

# docker-compose.yml
services:
  api:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - db
    environment:
      - REDIS_URL=redis://redis:6379/0

  celery-worker:
    build: .
    command: celery -A app.worker.celery_app worker --loglevel=info --concurrency=4
    depends_on:
      - redis
      - db
    environment:
      - REDIS_URL=redis://redis:6379/0
    deploy:
      replicas: 2  # Múltiples workers

  celery-worker-high:
    build: .
    command: celery -A app.worker.celery_app worker --loglevel=info -Q high_priority --concurrency=2
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379/0

  celery-beat:
    build: .
    command: celery -A app.worker.celery_app beat --loglevel=info
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379/0

  flower:
    build: .
    command: celery -A app.worker.celery_app flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379/0

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  redis_data:

8. Monitorización con Flower

Flower es el dashboard de monitorización para Celery:

# Ejecutar localmente
celery -A app.worker.celery_app flower --port=5555

# Abrir http://localhost:5555

Métricas Disponibles

Alertas con Flower API

import httpx

async def check_celery_health():
    async with httpx.AsyncClient() as client:
        response = await client.get("http://flower:5555/api/workers")
        workers = response.json()
        
        active_workers = [w for w in workers.values() if w.get("status")]
        
        if len(active_workers) < 2:
            await send_alert("Less than 2 Celery workers active!")

9. Testing de Tasks

# tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from app.worker.tasks.email import send_welcome_email

@pytest.fixture
def celery_app():
    from app.worker.celery_app import celery_app
    celery_app.conf.update(task_always_eager=True)  # Ejecutar sincrónicamente
    return celery_app

def test_send_welcome_email_success(celery_app):
    with patch("app.worker.tasks.email.EmailService") as mock_service:
        mock_service.return_value.send.return_value = True
        
        result = send_welcome_email.delay(user_id=1, email="test@test.com")
        
        assert result.get()["status"] == "sent"
        mock_service.return_value.send.assert_called_once()

def test_send_welcome_email_retry_on_connection_error(celery_app):
    with patch("app.worker.tasks.email.EmailService") as mock_service:
        mock_service.return_value.send.side_effect = ConnectionError("Failed")
        
        with pytest.raises(ConnectionError):
            send_welcome_email.delay(user_id=1, email="test@test.com").get()

# Test con worker real
@pytest.fixture(scope="session")
def celery_worker(celery_app):
    """Inicia un worker para integration tests."""
    from celery.contrib.testing.worker import start_worker
    
    with start_worker(celery_app, perform_ping_check=False) as worker:
        yield worker

def test_task_integration(celery_worker):
    result = send_welcome_email.delay(user_id=1, email="test@test.com")
    assert result.get(timeout=10)["status"] == "sent"

10. Comparativa: BullMQ vs Celery

AspectoBullMQ (Node.js)Celery (Python)
BrokerRedisRedis, RabbitMQ, SQS
Backend ResultsRedisRedis, DB, S3
Schedulingbull-board, ArenaCelery Beat
MonitoringBull BoardFlower
ConcurrencyPer-workerPer-process
PatternsFlowsChain, Group, Chord
RetriesBuilt-inBuilt-in + backoff

Conclusión

Celery es el estándar para procesamiento distribuido en Python:

  1. Tasks como funciones decoradas con @celery_app.task
  2. Reintentos automáticos con backoff exponencial
  3. Colas múltiples para priorización
  4. Flower para monitorización en tiempo real
  5. Celery Beat para scheduling tipo cron

Pattern Senior: Usa task_acks_late=True para que las tareas solo se marquen como completadas después de ejecutar. Esto previene pérdida de tareas si un worker muere.

En el siguiente capítulo, implementaremos caching avanzado con Redis.