Módulo 4 20 min de lectura

04 - Programación Asíncrona: asyncio Deep Dive

Event Loop interno, corrutinas, TaskGroups y run_in_executor. Domina la concurrencia de Python como Senior.

#asyncio #concurrency #event-loop #coroutines #taskgroup

1. El Mental Model: Promesas vs Corrutinas

La diferencia fundamental entre JavaScript y Python async es cuándo empieza la ejecución:

JavaScript/TypeScript
// JavaScript: Eager Execution
// La Promise empieza a ejecutarse INMEDIATAMENTE
const promise = fetch('/api/users');
// ↑ La request HTTP ya se disparó

// Podemos await después
const response = await promise;
Python
# Python: Lazy Execution
# La corrutina es solo un OBJETO hasta que la awaits
coro = fetch_users()
# ↑ NO se ha ejecutado nada aún

# La ejecución empieza con await
response = await coro

Implicación crítica: En Python, crear una corrutina sin await es un bug silencioso:

async def send_email(to: str):
    print(f"Sending to {to}")
    await smtp.send(to)

# ¡BUG! La función nunca se ejecuta
send_email("user@example.com")  # RuntimeWarning: coroutine never awaited

# Correcto
await send_email("user@example.com")

2. Anatomía del Event Loop de Python

A diferencia del Event Loop de Node.js (libuv en C), el de Python (asyncio) está escrito mayormente en Python con selectores del sistema operativo.

El Loop Interno

# Simplificación conceptual del event loop
class EventLoop:
    def __init__(self):
        self.ready = deque()      # Tareas listas para ejecutar
        self.scheduled = []       # Tareas con timeout (heap)
        self.selector = Selector() # I/O multiplexing (epoll/kqueue)
    
    def run_forever(self):
        while True:
            # 1. Ejecutar todas las tareas ready
            while self.ready:
                callback = self.ready.popleft()
                callback()
            
            # 2. Mover tareas scheduled que ya expiraron a ready
            now = time.time()
            while self.scheduled and self.scheduled[0].when <= now:
                task = heappop(self.scheduled)
                self.ready.append(task.callback)
            
            # 3. Esperar I/O (el loop "duerme" aquí)
            timeout = self._calculate_timeout()
            events = self.selector.select(timeout)
            for key, mask in events:
                self.ready.append(key.data)  # Callback de I/O ready

Comparativa de Event Loops

CaracterísticaNode.js (libuv)Python (asyncio)
ImplementaciónC (performance)Python + selectors
Fases6 fases definidasReady → Scheduled → I/O
TimerssetTimeout/setIntervalasyncio.sleep, call_later
I/ONativo en todoRequiere drivers async
CPU-boundBloquea el loopBloquea el loop

3. Tareas y Concurrencia

Task: La Unidad de Ejecución

import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(1)  # Simula I/O
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    # Crear una Task permite ejecución concurrente
    task1 = asyncio.create_task(fetch_user(1))
    task2 = asyncio.create_task(fetch_user(2))
    
    # Ambas tasks se ejecutan concurrentemente
    # mientras hacemos otras cosas
    await asyncio.sleep(0.5)
    
    # Recolectar resultados
    user1 = await task1
    user2 = await task2
    
    print(user1, user2)

asyncio.run(main())  # Total: ~1 segundo (no 2)

El Error Clásico: await Secuencial

# ❌ INCORRECTO: Ejecución secuencial (2 segundos)
async def fetch_all_slow():
    user1 = await fetch_user(1)  # Espera 1 segundo
    user2 = await fetch_user(2)  # Espera otro segundo
    return [user1, user2]

# ✅ CORRECTO: Ejecución concurrente (1 segundo)
async def fetch_all_fast():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
    return [task1.result(), task2.result()]

4. TaskGroups (Python 3.11+): El Nuevo Estándar

TaskGroup reemplaza a gather con mejor manejo de errores y cancelación automática.

JavaScript/TypeScript
// JavaScript: Promise.all / Promise.allSettled
try {
const [user, posts] = await Promise.all([
  fetchUser(1),
  fetchPosts(1),
]);
} catch (error) {
// Si UNA falla, las demás siguen ejecutándose
// (no hay cancelación automática)
}
Python
# Python 3.11+: TaskGroup con cancelación automática
async def fetch_user_data(user_id: int):
  async with asyncio.TaskGroup() as tg:
      user_task = tg.create_task(fetch_user(user_id))
      posts_task = tg.create_task(fetch_posts(user_id))
  
  # Si UNA falla, TODAS las demás se cancelan
  # y se propaga ExceptionGroup
  return user_task.result(), posts_task.result()

Manejo de ExceptionGroup

async def risky_operations():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(might_fail_1())
            tg.create_task(might_fail_2())
            tg.create_task(might_fail_3())
    except* ValueError as eg:
        # except* es la nueva sintaxis para ExceptionGroup
        for exc in eg.exceptions:
            print(f"ValueError: {exc}")
    except* TypeError as eg:
        for exc in eg.exceptions:
            print(f"TypeError: {exc}")

gather vs TaskGroup

Característicaasyncio.gatherTaskGroup
CancelaciónManualAutomática al primer error
Excepcionesreturn_exceptions=TrueExceptionGroup
Context ManagerNo
RecomendaciónLegacyUsar siempre (3.11+)

5. Timeouts y Cancelación

asyncio.timeout (Python 3.11+)

async def fetch_with_timeout():
    try:
        async with asyncio.timeout(5.0):
            # Si tarda más de 5 segundos, se cancela
            result = await slow_external_api()
            return result
    except TimeoutError:
        return {"error": "Request timed out"}

# Para Python < 3.11
async def fetch_with_timeout_legacy():
    try:
        result = await asyncio.wait_for(
            slow_external_api(),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "Request timed out"}

Cancelación Manual

async def long_running_task():
    try:
        while True:
            await asyncio.sleep(1)
            print("Working...")
    except asyncio.CancelledError:
        # Cleanup antes de terminar
        print("Task cancelled, cleaning up...")
        raise  # Re-raise para propagar la cancelación

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(3)
    task.cancel()  # Inyecta CancelledError en la task
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

6. Semaphores y Rate Limiting

Limitar Concurrencia

import asyncio
from collections.abc import Sequence

async def fetch_url(url: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:  # Máximo N requests concurrentes
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return {"url": url, "status": response.status_code}

async def fetch_all(urls: Sequence[str], max_concurrent: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(fetch_url(url, semaphore))
            for url in urls
        ]
    
    return [task.result() for task in tasks]

# Fetch 1000 URLs, máximo 10 concurrentes
results = await fetch_all(urls, max_concurrent=10)

Rate Limiting con Token Bucket

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = asyncio.get_event_loop().time()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        async with self.lock:
            now = asyncio.get_event_loop().time()
            # Refill tokens based on elapsed time
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now
            
            if self.tokens >= 1:
                self.tokens -= 1
                return
            
            # Wait for next token
            wait_time = (1 - self.tokens) / self.rate
            await asyncio.sleep(wait_time)
            self.tokens = 0

# Uso
limiter = RateLimiter(rate=10, burst=5)  # 10 req/s, burst de 5

async def rate_limited_request(url: str):
    await limiter.acquire()
    return await httpx.get(url)

7. run_in_executor: Escapando del GIL

Para operaciones CPU-bound o librerías síncronas (que bloquearían el event loop):

JavaScript/TypeScript
// Node.js: Worker Threads
const { Worker } = require('worker_threads');

function runCPUIntensive(data) {
return new Promise((resolve, reject) => {
  const worker = new Worker('./worker.js', {
    workerData: data
  });
  worker.on('message', resolve);
  worker.on('error', reject);
});
}
Python
# Python: run_in_executor
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

# Para CPU-bound: ProcessPoolExecutor (evita GIL)
async def cpu_intensive(data: bytes) -> bytes:
  loop = asyncio.get_event_loop()
  with ProcessPoolExecutor() as pool:
      result = await loop.run_in_executor(
          pool,
          heavy_computation,  # Función síncrona
          data
      )
  return result

# Para I/O síncrono: ThreadPoolExecutor
async def sync_io_wrapper(file_path: str) -> str:
  loop = asyncio.get_event_loop()
  with ThreadPoolExecutor() as pool:
      content = await loop.run_in_executor(
          pool,
          Path(file_path).read_text  # Método síncrono
      )
  return content

FastAPI: Funciones Síncronas en Endpoints Async

from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool

app = FastAPI()

def sync_heavy_computation(data: str) -> str:
    # Operación CPU-bound síncrona
    import hashlib
    for _ in range(1000000):
        data = hashlib.sha256(data.encode()).hexdigest()
    return data

@app.post("/compute")
async def compute(payload: dict):
    # FastAPI automáticamente usa threadpool para funciones sync
    # pero para control explícito:
    result = await run_in_threadpool(
        sync_heavy_computation,
        payload["data"]
    )
    return {"result": result}

8. Async Generators y Streaming

Server-Sent Events Pattern

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def event_generator():
    counter = 0
    while True:
        counter += 1
        yield f"data: Event {counter}\n\n"
        await asyncio.sleep(1)

@app.get("/events")
async def stream_events():
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

Async Context Managers

from contextlib import asynccontextmanager
from typing import AsyncGenerator

@asynccontextmanager
async def get_db_connection() -> AsyncGenerator[Connection, None]:
    conn = await pool.acquire()
    try:
        yield conn
    finally:
        await pool.release(conn)

# Uso
async def fetch_users():
    async with get_db_connection() as conn:
        return await conn.fetch("SELECT * FROM users")

9. Debugging Async Code

Modo Debug de asyncio

import asyncio

# Activar modo debug
asyncio.run(main(), debug=True)

# O via variable de entorno
# PYTHONASYNCIODEBUG=1 python main.py

El modo debug detecta:

Logging de Tasks

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)

async def main():
    # Ver todas las tasks activas
    for task in asyncio.all_tasks():
        print(f"Task: {task.get_name()}, Done: {task.done()}")

    # Nombrar tasks para debugging
    task = asyncio.create_task(
        fetch_user(1),
        name="fetch_user_1"  # Nombre descriptivo
    )

10. Patrones Avanzados

Async Queue (Producer-Consumer)

import asyncio
from typing import Any

async def producer(queue: asyncio.Queue[dict], items: list[dict]):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    
    # Señal de fin
    await queue.put(None)

async def consumer(queue: asyncio.Queue[dict], name: str):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        
        print(f"[{name}] Processing: {item}")
        await asyncio.sleep(0.1)  # Simula trabajo
        queue.task_done()

async def main():
    queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=10)
    items = [{"id": i} for i in range(20)]
    
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, items))
        # Múltiples consumidores
        for i in range(3):
            tg.create_task(consumer(queue, f"consumer-{i}"))

Async Retry con Backoff

import asyncio
import random
from functools import wraps
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')

def async_retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
):
    def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            last_exception = None
            
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    
                    if attempt == max_attempts - 1:
                        raise
                    
                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay
                    )
                    
                    if jitter:
                        delay *= (0.5 + random.random())
                    
                    await asyncio.sleep(delay)
            
            raise last_exception  # type: ignore
        
        return wrapper
    return decorator

@async_retry(max_attempts=5, base_delay=0.5)
async def unreliable_api_call(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

11. Tabla Comparativa: Node.js Async vs Python Async

ConceptoNode.jsPython
PrimitivaPromiseCoroutine
EjecuciónEager (inmediata)Lazy (al await)
Concurrent execPromise.all()TaskGroup / gather
RacePromise.race()asyncio.wait(return_when=FIRST_COMPLETED)
TimeoutAbortControllerasyncio.timeout()
Semaphorep-limit (npm)asyncio.Semaphore (stdlib)
Queuep-queue (npm)asyncio.Queue (stdlib)
CPU-boundWorker ThreadsProcessPoolExecutor
SleepsetTimeout + Promiseasyncio.sleep()

Conclusión

La asincronía en Python requiere un cambio de mental model respecto a Node.js:

  1. Las corrutinas son lazy — Siempre await o create_task()
  2. TaskGroup es el nuevo estándar — Cancelación automática y mejor error handling
  3. El GIL no afecta I/Oasyncio maneja miles de conexiones igual que Node
  4. run_in_executor para blocking — CPU-bound va a ProcessPoolExecutor

Regla de oro Senior: Si una operación toca la red o el disco, debe ser async. Si toca la CPU intensivamente, debe ir a un proceso separado.

En el siguiente capítulo, aplicaremos estos conceptos al sistema de Inyección de Dependencias de FastAPI.