04 - Programación Asíncrona: asyncio Deep Dive
Event Loop interno, corrutinas, TaskGroups y run_in_executor. Domina la concurrencia de Python como Senior.
1. El Mental Model: Promesas vs Corrutinas
La diferencia fundamental entre JavaScript y Python async es cuándo empieza la ejecución:
// JavaScript: Eager Execution
// La Promise empieza a ejecutarse INMEDIATAMENTE
const promise = fetch('/api/users');
// ↑ La request HTTP ya se disparó
// Podemos await después
const response = await promise; # Python: Lazy Execution
# La corrutina es solo un OBJETO hasta que la awaits
coro = fetch_users()
# ↑ NO se ha ejecutado nada aún
# La ejecución empieza con await
response = await coro Implicación crítica: En Python, crear una corrutina sin await es un bug silencioso:
async def send_email(to: str):
print(f"Sending to {to}")
await smtp.send(to)
# ¡BUG! La función nunca se ejecuta
send_email("user@example.com") # RuntimeWarning: coroutine never awaited
# Correcto
await send_email("user@example.com")
2. Anatomía del Event Loop de Python
A diferencia del Event Loop de Node.js (libuv en C), el de Python (asyncio) está escrito mayormente en Python con selectores del sistema operativo.
El Loop Interno
# Simplificación conceptual del event loop
class EventLoop:
def __init__(self):
self.ready = deque() # Tareas listas para ejecutar
self.scheduled = [] # Tareas con timeout (heap)
self.selector = Selector() # I/O multiplexing (epoll/kqueue)
def run_forever(self):
while True:
# 1. Ejecutar todas las tareas ready
while self.ready:
callback = self.ready.popleft()
callback()
# 2. Mover tareas scheduled que ya expiraron a ready
now = time.time()
while self.scheduled and self.scheduled[0].when <= now:
task = heappop(self.scheduled)
self.ready.append(task.callback)
# 3. Esperar I/O (el loop "duerme" aquí)
timeout = self._calculate_timeout()
events = self.selector.select(timeout)
for key, mask in events:
self.ready.append(key.data) # Callback de I/O ready
Comparativa de Event Loops
| Característica | Node.js (libuv) | Python (asyncio) |
|---|---|---|
| Implementación | C (performance) | Python + selectors |
| Fases | 6 fases definidas | Ready → Scheduled → I/O |
| Timers | setTimeout/setInterval | asyncio.sleep, call_later |
| I/O | Nativo en todo | Requiere drivers async |
| CPU-bound | Bloquea el loop | Bloquea el loop |
3. Tareas y Concurrencia
Task: La Unidad de Ejecución
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(1) # Simula I/O
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Crear una Task permite ejecución concurrente
task1 = asyncio.create_task(fetch_user(1))
task2 = asyncio.create_task(fetch_user(2))
# Ambas tasks se ejecutan concurrentemente
# mientras hacemos otras cosas
await asyncio.sleep(0.5)
# Recolectar resultados
user1 = await task1
user2 = await task2
print(user1, user2)
asyncio.run(main()) # Total: ~1 segundo (no 2)
El Error Clásico: await Secuencial
# ❌ INCORRECTO: Ejecución secuencial (2 segundos)
async def fetch_all_slow():
user1 = await fetch_user(1) # Espera 1 segundo
user2 = await fetch_user(2) # Espera otro segundo
return [user1, user2]
# ✅ CORRECTO: Ejecución concurrente (1 segundo)
async def fetch_all_fast():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2))
return [task1.result(), task2.result()]
4. TaskGroups (Python 3.11+): El Nuevo Estándar
TaskGroup reemplaza a gather con mejor manejo de errores y cancelación automática.
// JavaScript: Promise.all / Promise.allSettled
try {
const [user, posts] = await Promise.all([
fetchUser(1),
fetchPosts(1),
]);
} catch (error) {
// Si UNA falla, las demás siguen ejecutándose
// (no hay cancelación automática)
} # Python 3.11+: TaskGroup con cancelación automática
async def fetch_user_data(user_id: int):
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
posts_task = tg.create_task(fetch_posts(user_id))
# Si UNA falla, TODAS las demás se cancelan
# y se propaga ExceptionGroup
return user_task.result(), posts_task.result() Manejo de ExceptionGroup
async def risky_operations():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(might_fail_1())
tg.create_task(might_fail_2())
tg.create_task(might_fail_3())
except* ValueError as eg:
# except* es la nueva sintaxis para ExceptionGroup
for exc in eg.exceptions:
print(f"ValueError: {exc}")
except* TypeError as eg:
for exc in eg.exceptions:
print(f"TypeError: {exc}")
gather vs TaskGroup
| Característica | asyncio.gather | TaskGroup |
|---|---|---|
| Cancelación | Manual | Automática al primer error |
| Excepciones | return_exceptions=True | ExceptionGroup |
| Context Manager | No | Sí |
| Recomendación | Legacy | Usar siempre (3.11+) |
5. Timeouts y Cancelación
asyncio.timeout (Python 3.11+)
async def fetch_with_timeout():
try:
async with asyncio.timeout(5.0):
# Si tarda más de 5 segundos, se cancela
result = await slow_external_api()
return result
except TimeoutError:
return {"error": "Request timed out"}
# Para Python < 3.11
async def fetch_with_timeout_legacy():
try:
result = await asyncio.wait_for(
slow_external_api(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
return {"error": "Request timed out"}
Cancelación Manual
async def long_running_task():
try:
while True:
await asyncio.sleep(1)
print("Working...")
except asyncio.CancelledError:
# Cleanup antes de terminar
print("Task cancelled, cleaning up...")
raise # Re-raise para propagar la cancelación
async def main():
task = asyncio.create_task(long_running_task())
await asyncio.sleep(3)
task.cancel() # Inyecta CancelledError en la task
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
6. Semaphores y Rate Limiting
Limitar Concurrencia
import asyncio
from collections.abc import Sequence
async def fetch_url(url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # Máximo N requests concurrentes
async with httpx.AsyncClient() as client:
response = await client.get(url)
return {"url": url, "status": response.status_code}
async def fetch_all(urls: Sequence[str], max_concurrent: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_url(url, semaphore))
for url in urls
]
return [task.result() for task in tasks]
# Fetch 1000 URLs, máximo 10 concurrentes
results = await fetch_all(urls, max_concurrent=10)
Rate Limiting con Token Bucket
class RateLimiter:
def __init__(self, rate: float, burst: int = 1):
self.rate = rate # requests per second
self.burst = burst
self.tokens = burst
self.last_refill = asyncio.get_event_loop().time()
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = asyncio.get_event_loop().time()
# Refill tokens based on elapsed time
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
# Wait for next token
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
# Uso
limiter = RateLimiter(rate=10, burst=5) # 10 req/s, burst de 5
async def rate_limited_request(url: str):
await limiter.acquire()
return await httpx.get(url)
7. run_in_executor: Escapando del GIL
Para operaciones CPU-bound o librerías síncronas (que bloquearían el event loop):
// Node.js: Worker Threads
const { Worker } = require('worker_threads');
function runCPUIntensive(data) {
return new Promise((resolve, reject) => {
const worker = new Worker('./worker.js', {
workerData: data
});
worker.on('message', resolve);
worker.on('error', reject);
});
} # Python: run_in_executor
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# Para CPU-bound: ProcessPoolExecutor (evita GIL)
async def cpu_intensive(data: bytes) -> bytes:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
heavy_computation, # Función síncrona
data
)
return result
# Para I/O síncrono: ThreadPoolExecutor
async def sync_io_wrapper(file_path: str) -> str:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
content = await loop.run_in_executor(
pool,
Path(file_path).read_text # Método síncrono
)
return content FastAPI: Funciones Síncronas en Endpoints Async
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
app = FastAPI()
def sync_heavy_computation(data: str) -> str:
# Operación CPU-bound síncrona
import hashlib
for _ in range(1000000):
data = hashlib.sha256(data.encode()).hexdigest()
return data
@app.post("/compute")
async def compute(payload: dict):
# FastAPI automáticamente usa threadpool para funciones sync
# pero para control explícito:
result = await run_in_threadpool(
sync_heavy_computation,
payload["data"]
)
return {"result": result}
8. Async Generators y Streaming
Server-Sent Events Pattern
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def event_generator():
counter = 0
while True:
counter += 1
yield f"data: Event {counter}\n\n"
await asyncio.sleep(1)
@app.get("/events")
async def stream_events():
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
Async Context Managers
from contextlib import asynccontextmanager
from typing import AsyncGenerator
@asynccontextmanager
async def get_db_connection() -> AsyncGenerator[Connection, None]:
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
# Uso
async def fetch_users():
async with get_db_connection() as conn:
return await conn.fetch("SELECT * FROM users")
9. Debugging Async Code
Modo Debug de asyncio
import asyncio
# Activar modo debug
asyncio.run(main(), debug=True)
# O via variable de entorno
# PYTHONASYNCIODEBUG=1 python main.py
El modo debug detecta:
- Corrutinas no awaited
- Callbacks que tardan demasiado (>100ms)
- Recursos no cerrados
Logging de Tasks
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
async def main():
# Ver todas las tasks activas
for task in asyncio.all_tasks():
print(f"Task: {task.get_name()}, Done: {task.done()}")
# Nombrar tasks para debugging
task = asyncio.create_task(
fetch_user(1),
name="fetch_user_1" # Nombre descriptivo
)
10. Patrones Avanzados
Async Queue (Producer-Consumer)
import asyncio
from typing import Any
async def producer(queue: asyncio.Queue[dict], items: list[dict]):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
# Señal de fin
await queue.put(None)
async def consumer(queue: asyncio.Queue[dict], name: str):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"[{name}] Processing: {item}")
await asyncio.sleep(0.1) # Simula trabajo
queue.task_done()
async def main():
queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=10)
items = [{"id": i} for i in range(20)]
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue, items))
# Múltiples consumidores
for i in range(3):
tg.create_task(consumer(queue, f"consumer-{i}"))
Async Retry con Backoff
import asyncio
import random
from functools import wraps
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
def async_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
):
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_attempts - 1:
raise
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
if jitter:
delay *= (0.5 + random.random())
await asyncio.sleep(delay)
raise last_exception # type: ignore
return wrapper
return decorator
@async_retry(max_attempts=5, base_delay=0.5)
async def unreliable_api_call(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
11. Tabla Comparativa: Node.js Async vs Python Async
| Concepto | Node.js | Python |
|---|---|---|
| Primitiva | Promise | Coroutine |
| Ejecución | Eager (inmediata) | Lazy (al await) |
| Concurrent exec | Promise.all() | TaskGroup / gather |
| Race | Promise.race() | asyncio.wait(return_when=FIRST_COMPLETED) |
| Timeout | AbortController | asyncio.timeout() |
| Semaphore | p-limit (npm) | asyncio.Semaphore (stdlib) |
| Queue | p-queue (npm) | asyncio.Queue (stdlib) |
| CPU-bound | Worker Threads | ProcessPoolExecutor |
| Sleep | setTimeout + Promise | asyncio.sleep() |
Conclusión
La asincronía en Python requiere un cambio de mental model respecto a Node.js:
- Las corrutinas son lazy — Siempre
awaitocreate_task() - TaskGroup es el nuevo estándar — Cancelación automática y mejor error handling
- El GIL no afecta I/O —
asynciomaneja miles de conexiones igual que Node run_in_executorpara blocking — CPU-bound va aProcessPoolExecutor
Regla de oro Senior: Si una operación toca la red o el disco, debe ser async. Si toca la CPU intensivamente, debe ir a un proceso separado.
En el siguiente capítulo, aplicaremos estos conceptos al sistema de Inyección de Dependencias de FastAPI.