Módulo 17 • 18 min de lectura
17 - Task Queues: Celery y Redis
Procesamiento distribuido: workers, reintentos, scheduling y monitorización con Flower.
#celery
#redis
#task-queues
#distributed
#background-jobs
1. BackgroundTasks vs Celery
JavaScript/TypeScript
// Node.js: BullMQ
import { Queue, Worker } from 'bullmq';
const queue = new Queue('email', { connection: redis });
// Producer
await queue.add('send-welcome', { userId: 123 });
// Worker (proceso separado)
const worker = new Worker('email', async (job) => {
await sendEmail(job.data.userId);
}, { connection: redis }); Python
# Python: Celery
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
# Task definition
@app.task(bind=True, max_retries=3)
def send_welcome_email(self, user_id: int):
try:
send_email(user_id)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
# Producer (desde FastAPI)
send_welcome_email.delay(user_id=123) Cuándo usar cada uno
| Escenario | BackgroundTasks | Celery |
|---|---|---|
| Enviar email simple | ✅ | Overkill |
| Procesar imagen | ❌ Bloquea worker | ✅ |
| Generar PDF grande | ❌ | ✅ |
| Tarea con reintentos | ❌ | ✅ |
| Scheduling (cron) | ❌ | ✅ (Celery Beat) |
| Múltiples workers | ❌ | ✅ |
2. Configuración de Celery
Estructura de Proyecto
app/
├── main.py # FastAPI app
├── worker/
│ ├── __init__.py
│ ├── celery_app.py # Configuración Celery
│ ├── tasks/
│ │ ├── __init__.py
│ │ ├── email.py
│ │ ├── reports.py
│ │ └── notifications.py
│ └── config.py
celery_app.py
# app/worker/celery_app.py
from celery import Celery
from kombu import Queue
from app.core.config import settings
celery_app = Celery(
"worker",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
include=[
"app.worker.tasks.email",
"app.worker.tasks.reports",
"app.worker.tasks.notifications",
],
)
# Configuración
celery_app.conf.update(
# Serialización
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# Timezone
timezone="UTC",
enable_utc=True,
# Resultados
result_expires=3600, # 1 hora
result_backend_transport_options={
"retry_policy": {"timeout": 5.0}
},
# Tareas
task_acks_late=True, # Acknowledge después de ejecutar
task_reject_on_worker_lost=True,
task_time_limit=600, # 10 min max por tarea
task_soft_time_limit=540, # Soft limit (9 min)
# Workers
worker_prefetch_multiplier=1, # Una tarea a la vez
worker_concurrency=4,
# Colas
task_default_queue="default",
task_queues=(
Queue("default", routing_key="default"),
Queue("high_priority", routing_key="high"),
Queue("low_priority", routing_key="low"),
),
# Routing
task_routes={
"app.worker.tasks.email.*": {"queue": "high_priority"},
"app.worker.tasks.reports.*": {"queue": "low_priority"},
},
)
# Celery Beat (scheduling)
celery_app.conf.beat_schedule = {
"cleanup-expired-tokens": {
"task": "app.worker.tasks.maintenance.cleanup_tokens",
"schedule": 3600.0, # Cada hora
},
"daily-report": {
"task": "app.worker.tasks.reports.generate_daily_report",
"schedule": crontab(hour=6, minute=0), # 6 AM UTC
},
}
3. Definición de Tasks
Task Básica
# app/worker/tasks/email.py
from app.worker.celery_app import celery_app
from app.services.email import EmailService
import structlog
logger = structlog.get_logger(__name__)
@celery_app.task(
bind=True,
name="send_welcome_email",
max_retries=3,
default_retry_delay=60,
)
def send_welcome_email(self, user_id: int, email: str):
"""Envía email de bienvenida."""
logger.info("sending_welcome_email", user_id=user_id, task_id=self.request.id)
try:
email_service = EmailService()
email_service.send(
to=email,
template="welcome",
context={"user_id": user_id},
)
logger.info("welcome_email_sent", user_id=user_id)
return {"status": "sent", "user_id": user_id}
except ConnectionError as exc:
logger.warning("email_service_unavailable", exc=str(exc))
raise self.retry(exc=exc, countdown=60 * (self.request.retries + 1))
except Exception as exc:
logger.exception("send_email_failed", user_id=user_id)
raise
Task con Progreso
# app/worker/tasks/reports.py
from celery import current_task
@celery_app.task(bind=True)
def generate_large_report(self, report_id: int, params: dict):
"""Genera reporte grande con tracking de progreso."""
total_steps = 100
for step in range(total_steps):
# Actualizar progreso
self.update_state(
state="PROGRESS",
meta={"current": step, "total": total_steps}
)
# Procesar paso
process_step(step, params)
# Guardar resultado
report_url = save_report(report_id)
return {"status": "completed", "url": report_url}
Consultar Progreso desde FastAPI
from celery.result import AsyncResult
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
result = AsyncResult(task_id, app=celery_app)
if result.state == "PENDING":
return {"status": "pending"}
elif result.state == "PROGRESS":
return {
"status": "progress",
"current": result.info.get("current", 0),
"total": result.info.get("total", 100),
}
elif result.state == "SUCCESS":
return {"status": "completed", "result": result.result}
elif result.state == "FAILURE":
return {"status": "failed", "error": str(result.info)}
else:
return {"status": result.state}
4. Patterns Avanzados
Chain (Pipeline)
from celery import chain
# Ejecutar tareas en secuencia
pipeline = chain(
download_file.s(url),
process_file.s(),
upload_result.s(),
notify_completion.s(user_id),
)
result = pipeline.apply_async()
Group (Paralelo)
from celery import group
# Ejecutar tareas en paralelo
parallel_tasks = group(
process_chunk.s(chunk) for chunk in chunks
)
result = parallel_tasks.apply_async()
# Esperar todos los resultados
results = result.get() # Lista de resultados
Chord (Paralelo + Callback)
from celery import chord
# Paralelo con callback cuando todos terminen
workflow = chord(
[process_item.s(item) for item in items],
aggregate_results.s()
)
result = workflow.apply_async()
Task con Límite de Concurrencia
from celery import Task
from celery.exceptions import Reject
import redis
class RateLimitedTask(Task):
"""Task con rate limiting por recurso."""
def __init__(self):
self.redis = redis.from_url(settings.REDIS_URL)
def acquire_lock(self, key: str, timeout: int = 60) -> bool:
return self.redis.set(f"lock:{key}", "1", nx=True, ex=timeout)
def release_lock(self, key: str):
self.redis.delete(f"lock:{key}")
@celery_app.task(base=RateLimitedTask, bind=True)
def process_user_export(self, user_id: int):
lock_key = f"export:{user_id}"
if not self.acquire_lock(lock_key, timeout=300):
# Ya hay un export en progreso para este usuario
raise Reject("Export already in progress", requeue=False)
try:
# Procesar export
return do_export(user_id)
finally:
self.release_lock(lock_key)
5. Error Handling y Reintentos
from celery.exceptions import MaxRetriesExceededError, Reject
@celery_app.task(
bind=True,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # Exponential backoff
retry_backoff_max=600, # Max 10 minutos
retry_jitter=True, # Añade randomness
max_retries=5,
)
def resilient_task(self, data: dict):
try:
return process(data)
except ValueError as exc:
# Error de datos: no reintentar
logger.error("invalid_data", data=data)
raise Reject(str(exc), requeue=False)
except ExternalServiceError as exc:
# Servicio externo caído: reintentar
raise self.retry(exc=exc)
Dead Letter Queue
# Configuración para tareas fallidas
celery_app.conf.update(
task_acks_on_failure_or_timeout=True,
task_reject_on_worker_lost=True,
)
# Handler para tareas que exceden reintentos
@celery_app.task(bind=True)
def handle_failed_task(self, task_id: str, error: str):
"""Procesa tareas que fallaron definitivamente."""
logger.error("task_permanently_failed", task_id=task_id, error=error)
# Guardar en DB para revisión manual
FailedTask.create(task_id=task_id, error=error)
# Notificar
send_alert(f"Task {task_id} failed permanently: {error}")
6. Integración con FastAPI
Endpoint Fire-and-Forget
from fastapi import FastAPI, BackgroundTasks
from app.worker.tasks.email import send_welcome_email
from app.worker.tasks.reports import generate_large_report
@app.post("/users", status_code=201)
async def create_user(data: UserCreate, db: DBSession):
user = await user_service.create(db, data)
# Encolar tarea async (no bloquea)
send_welcome_email.delay(user_id=user.id, email=user.email)
return user
@app.post("/reports", status_code=202)
async def create_report(params: ReportParams, current_user: CurrentUser):
# Encolar y devolver task_id
task = generate_large_report.delay(
report_id=uuid.uuid4().hex,
params=params.model_dump(),
)
return {
"task_id": task.id,
"status_url": f"/tasks/{task.id}",
}
Dependency para Celery App
from celery import Celery
from typing import Annotated
def get_celery() -> Celery:
return celery_app
CeleryApp = Annotated[Celery, Depends(get_celery)]
@app.post("/batch-process")
async def batch_process(items: list[str], celery: CeleryApp):
from celery import group
job = group(process_item.s(item) for item in items)
result = job.apply_async()
return {"group_id": result.id}
7. Docker Compose con Workers
# docker-compose.yml
services:
api:
build: .
ports:
- "8000:8000"
depends_on:
- redis
- db
environment:
- REDIS_URL=redis://redis:6379/0
celery-worker:
build: .
command: celery -A app.worker.celery_app worker --loglevel=info --concurrency=4
depends_on:
- redis
- db
environment:
- REDIS_URL=redis://redis:6379/0
deploy:
replicas: 2 # Múltiples workers
celery-worker-high:
build: .
command: celery -A app.worker.celery_app worker --loglevel=info -Q high_priority --concurrency=2
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379/0
celery-beat:
build: .
command: celery -A app.worker.celery_app beat --loglevel=info
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379/0
flower:
build: .
command: celery -A app.worker.celery_app flower --port=5555
ports:
- "5555:5555"
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379/0
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
redis_data:
8. Monitorización con Flower
Flower es el dashboard de monitorización para Celery:
# Ejecutar localmente
celery -A app.worker.celery_app flower --port=5555
# Abrir http://localhost:5555
Métricas Disponibles
- Workers: Estado, tareas activas, CPU/memoria
- Tasks: Cola, activas, completadas, fallidas
- Queues: Mensajes pendientes por cola
- Graphs: Throughput, latencia
Alertas con Flower API
import httpx
async def check_celery_health():
async with httpx.AsyncClient() as client:
response = await client.get("http://flower:5555/api/workers")
workers = response.json()
active_workers = [w for w in workers.values() if w.get("status")]
if len(active_workers) < 2:
await send_alert("Less than 2 Celery workers active!")
9. Testing de Tasks
# tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from app.worker.tasks.email import send_welcome_email
@pytest.fixture
def celery_app():
from app.worker.celery_app import celery_app
celery_app.conf.update(task_always_eager=True) # Ejecutar sincrónicamente
return celery_app
def test_send_welcome_email_success(celery_app):
with patch("app.worker.tasks.email.EmailService") as mock_service:
mock_service.return_value.send.return_value = True
result = send_welcome_email.delay(user_id=1, email="test@test.com")
assert result.get()["status"] == "sent"
mock_service.return_value.send.assert_called_once()
def test_send_welcome_email_retry_on_connection_error(celery_app):
with patch("app.worker.tasks.email.EmailService") as mock_service:
mock_service.return_value.send.side_effect = ConnectionError("Failed")
with pytest.raises(ConnectionError):
send_welcome_email.delay(user_id=1, email="test@test.com").get()
# Test con worker real
@pytest.fixture(scope="session")
def celery_worker(celery_app):
"""Inicia un worker para integration tests."""
from celery.contrib.testing.worker import start_worker
with start_worker(celery_app, perform_ping_check=False) as worker:
yield worker
def test_task_integration(celery_worker):
result = send_welcome_email.delay(user_id=1, email="test@test.com")
assert result.get(timeout=10)["status"] == "sent"
10. Comparativa: BullMQ vs Celery
| Aspecto | BullMQ (Node.js) | Celery (Python) |
|---|---|---|
| Broker | Redis | Redis, RabbitMQ, SQS |
| Backend Results | Redis | Redis, DB, S3 |
| Scheduling | bull-board, Arena | Celery Beat |
| Monitoring | Bull Board | Flower |
| Concurrency | Per-worker | Per-process |
| Patterns | Flows | Chain, Group, Chord |
| Retries | Built-in | Built-in + backoff |
Conclusión
Celery es el estándar para procesamiento distribuido en Python:
- Tasks como funciones decoradas con
@celery_app.task - Reintentos automáticos con backoff exponencial
- Colas múltiples para priorización
- Flower para monitorización en tiempo real
- Celery Beat para scheduling tipo cron
Pattern Senior: Usa task_acks_late=True para que las tareas solo se marquen como completadas después de ejecutar. Esto previene pérdida de tareas si un worker muere.
En el siguiente capítulo, implementaremos caching avanzado con Redis.