Módulo 19 16 min de lectura

19 - WebSockets y Real-time

Comunicación bidireccional con WebSockets, Server-Sent Events y patrones pub/sub con Redis.

#websockets #realtime #sse #pubsub #redis

1. WebSockets vs SSE vs Polling

CaracterísticaWebSocketSSELong Polling
DirecciónBidireccionalServer → ClientBidireccional
Protocolows://HTTPHTTP
ReconexiónManualAutomáticaManual
Binary❌ (solo texto)
Firewall⚠️ Puede bloquear✅ HTTP normal
Uso típicoChat, gamingNotificaciones, feedsLegacy

2. WebSockets en FastAPI

JavaScript/TypeScript
// Node.js: Socket.io
import { Server } from 'socket.io';

const io = new Server(server);

io.on('connection', (socket) => {
console.log('Client connected');

socket.on('message', (data) => {
  io.emit('message', data);  // Broadcast
});

socket.on('disconnect', () => {
  console.log('Client disconnected');
});
});
Python
# FastAPI: WebSocket nativo
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
  await websocket.accept()
  
  try:
      while True:
          data = await websocket.receive_text()
          await websocket.send_text(f"Echo: {data}")
  except WebSocketDisconnect:
      print("Client disconnected")

3. Connection Manager

# services/websocket.py
from fastapi import WebSocket
from typing import Dict, Set
import json
import asyncio

class ConnectionManager:
    """Gestiona conexiones WebSocket activas."""
    
    def __init__(self):
        # Conexiones por usuario
        self.active_connections: Dict[str, Set[WebSocket]] = {}
        # Conexiones por room/channel
        self.rooms: Dict[str, Set[WebSocket]] = {}
        self._lock = asyncio.Lock()
    
    async def connect(self, websocket: WebSocket, user_id: str):
        """Acepta nueva conexión."""
        await websocket.accept()
        
        async with self._lock:
            if user_id not in self.active_connections:
                self.active_connections[user_id] = set()
            self.active_connections[user_id].add(websocket)
    
    async def disconnect(self, websocket: WebSocket, user_id: str):
        """Elimina conexión."""
        async with self._lock:
            if user_id in self.active_connections:
                self.active_connections[user_id].discard(websocket)
                if not self.active_connections[user_id]:
                    del self.active_connections[user_id]
            
            # Eliminar de todas las rooms
            for room in list(self.rooms.keys()):
                self.rooms[room].discard(websocket)
                if not self.rooms[room]:
                    del self.rooms[room]
    
    async def join_room(self, websocket: WebSocket, room: str):
        """Une conexión a una room."""
        async with self._lock:
            if room not in self.rooms:
                self.rooms[room] = set()
            self.rooms[room].add(websocket)
    
    async def leave_room(self, websocket: WebSocket, room: str):
        """Saca conexión de una room."""
        async with self._lock:
            if room in self.rooms:
                self.rooms[room].discard(websocket)
    
    async def send_personal(self, user_id: str, message: dict):
        """Envía mensaje a un usuario específico."""
        connections = self.active_connections.get(user_id, set())
        for conn in connections:
            try:
                await conn.send_json(message)
            except Exception:
                pass  # Conexión muerta, se limpiará en disconnect
    
    async def broadcast_room(self, room: str, message: dict, exclude: WebSocket | None = None):
        """Broadcast a todos en una room."""
        connections = self.rooms.get(room, set())
        for conn in connections:
            if conn != exclude:
                try:
                    await conn.send_json(message)
                except Exception:
                    pass
    
    async def broadcast_all(self, message: dict):
        """Broadcast a todas las conexiones."""
        for connections in self.active_connections.values():
            for conn in connections:
                try:
                    await conn.send_json(message)
                except Exception:
                    pass

manager = ConnectionManager()

4. WebSocket con Autenticación

from fastapi import WebSocket, WebSocketDisconnect, Query, Depends
from app.core.security import decode_token
from app.services.websocket import manager

async def get_websocket_user(
    websocket: WebSocket,
    token: str = Query(...),
) -> str:
    """Dependency para autenticar WebSocket."""
    try:
        payload = decode_token(token)
        return payload.sub
    except Exception:
        await websocket.close(code=4001, reason="Invalid token")
        raise WebSocketDisconnect(code=4001)

@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    user_id: str = Depends(get_websocket_user),
):
    await manager.connect(websocket, user_id)
    
    try:
        # Notificar que el usuario está online
        await manager.broadcast_all({
            "type": "user_online",
            "user_id": user_id,
        })
        
        while True:
            data = await websocket.receive_json()
            await handle_message(websocket, user_id, data)
    
    except WebSocketDisconnect:
        await manager.disconnect(websocket, user_id)
        await manager.broadcast_all({
            "type": "user_offline",
            "user_id": user_id,
        })

async def handle_message(websocket: WebSocket, user_id: str, data: dict):
    """Procesa mensajes entrantes."""
    msg_type = data.get("type")
    
    match msg_type:
        case "join_room":
            room = data["room"]
            await manager.join_room(websocket, room)
            await manager.broadcast_room(room, {
                "type": "user_joined",
                "user_id": user_id,
                "room": room,
            })
        
        case "leave_room":
            room = data["room"]
            await manager.leave_room(websocket, room)
            await manager.broadcast_room(room, {
                "type": "user_left",
                "user_id": user_id,
                "room": room,
            })
        
        case "message":
            room = data.get("room")
            if room:
                await manager.broadcast_room(room, {
                    "type": "message",
                    "user_id": user_id,
                    "content": data["content"],
                    "room": room,
                }, exclude=websocket)
        
        case "ping":
            await websocket.send_json({"type": "pong"})

5. Scaling con Redis Pub/Sub

Para múltiples instancias de la API, necesitas Redis como message broker:

# services/pubsub.py
import redis.asyncio as redis
import json
import asyncio
from typing import Callable, Dict

class RedisPubSub:
    """Pub/Sub distribuido con Redis."""
    
    def __init__(self, redis_url: str):
        self.redis_url = redis_url
        self.redis: redis.Redis | None = None
        self.pubsub: redis.client.PubSub | None = None
        self.handlers: Dict[str, Callable] = {}
        self._listener_task: asyncio.Task | None = None
    
    async def connect(self):
        self.redis = redis.from_url(self.redis_url)
        self.pubsub = self.redis.pubsub()
    
    async def disconnect(self):
        if self._listener_task:
            self._listener_task.cancel()
        if self.pubsub:
            await self.pubsub.close()
        if self.redis:
            await self.redis.close()
    
    async def subscribe(self, channel: str, handler: Callable):
        """Suscribirse a un canal."""
        self.handlers[channel] = handler
        await self.pubsub.subscribe(channel)
        
        if not self._listener_task:
            self._listener_task = asyncio.create_task(self._listen())
    
    async def publish(self, channel: str, message: dict):
        """Publicar mensaje a un canal."""
        await self.redis.publish(channel, json.dumps(message))
    
    async def _listen(self):
        """Escucha mensajes de Redis."""
        async for message in self.pubsub.listen():
            if message["type"] == "message":
                channel = message["channel"].decode()
                data = json.loads(message["data"])
                
                handler = self.handlers.get(channel)
                if handler:
                    await handler(data)

pubsub = RedisPubSub(settings.REDIS_URL)

Integración con WebSocket Manager

# services/websocket_distributed.py
from app.services.pubsub import pubsub
from app.services.websocket import manager

async def setup_pubsub():
    """Configura pub/sub para WebSockets distribuidos."""
    await pubsub.connect()
    
    # Handler para mensajes de broadcast
    async def handle_broadcast(data: dict):
        msg_type = data.get("type")
        
        if msg_type == "room_message":
            await manager.broadcast_room(
                data["room"],
                data["message"],
            )
        elif msg_type == "user_message":
            await manager.send_personal(
                data["user_id"],
                data["message"],
            )
        elif msg_type == "global_message":
            await manager.broadcast_all(data["message"])
    
    await pubsub.subscribe("websocket:broadcast", handle_broadcast)

# Modificar el handler de mensajes
async def handle_message(websocket: WebSocket, user_id: str, data: dict):
    msg_type = data.get("type")
    
    if msg_type == "message":
        room = data.get("room")
        message = {
            "type": "message",
            "user_id": user_id,
            "content": data["content"],
            "room": room,
        }
        
        # Publicar a Redis (todas las instancias recibirán)
        await pubsub.publish("websocket:broadcast", {
            "type": "room_message",
            "room": room,
            "message": message,
        })

6. Server-Sent Events (SSE)

Para notificaciones unidireccionales (server → client):

from fastapi import Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio

@app.get("/events")
async def event_stream(request: Request, current_user: CurrentUser):
    """Stream de eventos SSE."""
    
    async def generate():
        queue: asyncio.Queue = asyncio.Queue()
        
        # Registrar para recibir eventos
        user_queues[current_user.id] = queue
        
        try:
            while True:
                # Verificar si el cliente desconectó
                if await request.is_disconnected():
                    break
                
                try:
                    # Esperar evento con timeout
                    event = await asyncio.wait_for(queue.get(), timeout=30)
                    yield {
                        "event": event["type"],
                        "data": json.dumps(event["data"]),
                    }
                except asyncio.TimeoutError:
                    # Keepalive
                    yield {"event": "ping", "data": ""}
        
        finally:
            del user_queues[current_user.id]
    
    return EventSourceResponse(generate())

# Enviar evento a un usuario
async def send_notification(user_id: str, notification: dict):
    queue = user_queues.get(user_id)
    if queue:
        await queue.put({
            "type": "notification",
            "data": notification,
        })

Cliente JavaScript

// SSE es más simple que WebSocket para notificaciones
const eventSource = new EventSource('/events?token=' + token);

eventSource.addEventListener('notification', (event) => {
  const data = JSON.parse(event.data);
  showNotification(data);
});

eventSource.addEventListener('ping', () => {
  // Keepalive, ignorar
});

eventSource.onerror = () => {
  // SSE reconecta automáticamente
  console.log('SSE reconnecting...');
};

7. Heartbeat y Reconnection

import asyncio

class HeartbeatManager:
    """Detecta conexiones muertas con heartbeat."""
    
    def __init__(self, interval: int = 30, timeout: int = 10):
        self.interval = interval
        self.timeout = timeout
        self.last_pong: Dict[WebSocket, float] = {}
    
    async def start_heartbeat(self, websocket: WebSocket):
        """Inicia heartbeat para una conexión."""
        self.last_pong[websocket] = asyncio.get_event_loop().time()
        
        while True:
            await asyncio.sleep(self.interval)
            
            # Verificar último pong
            last = self.last_pong.get(websocket, 0)
            now = asyncio.get_event_loop().time()
            
            if now - last > self.interval + self.timeout:
                # Conexión muerta
                await websocket.close(code=1000)
                break
            
            try:
                await websocket.send_json({"type": "ping"})
            except Exception:
                break
    
    def record_pong(self, websocket: WebSocket):
        """Registra recepción de pong."""
        self.last_pong[websocket] = asyncio.get_event_loop().time()
    
    def cleanup(self, websocket: WebSocket):
        """Limpia tracking de conexión."""
        self.last_pong.pop(websocket, None)

heartbeat = HeartbeatManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, user_id: str = Depends(get_websocket_user)):
    await manager.connect(websocket, user_id)
    
    # Iniciar heartbeat en background
    heartbeat_task = asyncio.create_task(heartbeat.start_heartbeat(websocket))
    
    try:
        while True:
            data = await websocket.receive_json()
            
            if data.get("type") == "pong":
                heartbeat.record_pong(websocket)
            else:
                await handle_message(websocket, user_id, data)
    
    except WebSocketDisconnect:
        pass
    finally:
        heartbeat_task.cancel()
        heartbeat.cleanup(websocket)
        await manager.disconnect(websocket, user_id)

8. Rate Limiting en WebSockets

from collections import defaultdict
import time

class WebSocketRateLimiter:
    def __init__(self, max_messages: int = 10, window_seconds: int = 1):
        self.max_messages = max_messages
        self.window = window_seconds
        self.message_counts: Dict[str, list] = defaultdict(list)
    
    def is_allowed(self, user_id: str) -> bool:
        now = time.time()
        window_start = now - self.window
        
        # Limpiar mensajes viejos
        self.message_counts[user_id] = [
            t for t in self.message_counts[user_id]
            if t > window_start
        ]
        
        if len(self.message_counts[user_id]) >= self.max_messages:
            return False
        
        self.message_counts[user_id].append(now)
        return True

rate_limiter = WebSocketRateLimiter(max_messages=10, window_seconds=1)

async def handle_message(websocket: WebSocket, user_id: str, data: dict):
    if not rate_limiter.is_allowed(user_id):
        await websocket.send_json({
            "type": "error",
            "message": "Rate limit exceeded",
        })
        return
    
    # Procesar mensaje...

9. Testing de WebSockets

import pytest
from httpx import AsyncClient
from httpx_ws import aconnect_ws

@pytest.fixture
async def websocket_client(client: AsyncClient, auth_token: str):
    async with aconnect_ws(
        f"/ws?token={auth_token}",
        client,
    ) as ws:
        yield ws

async def test_websocket_connection(websocket_client):
    # Enviar mensaje
    await websocket_client.send_json({"type": "ping"})
    
    # Recibir respuesta
    response = await websocket_client.receive_json()
    assert response["type"] == "pong"

async def test_websocket_room_join(websocket_client):
    # Unirse a room
    await websocket_client.send_json({
        "type": "join_room",
        "room": "test-room",
    })
    
    # Verificar confirmación
    response = await websocket_client.receive_json()
    assert response["type"] == "user_joined"
    assert response["room"] == "test-room"

async def test_websocket_broadcast(websocket_client, another_websocket_client):
    # Ambos se unen a la misma room
    await websocket_client.send_json({"type": "join_room", "room": "chat"})
    await another_websocket_client.send_json({"type": "join_room", "room": "chat"})
    
    # Consumir mensajes de join
    await websocket_client.receive_json()
    await another_websocket_client.receive_json()
    await another_websocket_client.receive_json()
    
    # Enviar mensaje
    await websocket_client.send_json({
        "type": "message",
        "room": "chat",
        "content": "Hello!",
    })
    
    # El otro cliente debe recibirlo
    response = await another_websocket_client.receive_json()
    assert response["type"] == "message"
    assert response["content"] == "Hello!"

10. Comparativa: Socket.io vs FastAPI WebSocket

AspectoSocket.ioFastAPI WebSocket
ProtocoloCustom sobre WSWebSocket puro
RoomsBuilt-inManual (ConnectionManager)
ReconnectionAutomáticaManual
FallbackPolling automáticoNo
NamespacesBuilt-inManual (paths)
ScalingRedis adapterRedis Pub/Sub manual
BinarySoportadoSoportado

11. Arquitectura Real-time Escalable

                    ┌─────────────┐
                    │   Client    │
                    └──────┬──────┘
                           │ WebSocket
                    ┌──────▼──────┐
                    │ Load Balancer│
                    │ (Sticky)    │
                    └──────┬──────┘
              ┌────────────┼────────────┐
              │            │            │
       ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
       │  API Pod 1  │ │  API Pod 2  │ │  API Pod 3  │
       └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
              │            │            │
              └────────────┼────────────┘
                           │ Pub/Sub
                    ┌──────▼──────┐
                    │    Redis    │
                    └─────────────┘

Sticky sessions: Necesarias para WebSocket (la conexión debe mantenerse con el mismo pod).


Conclusión

WebSockets en FastAPI requieren más setup manual que Socket.io:

  1. ConnectionManager para gestionar conexiones
  2. Redis Pub/Sub para múltiples instancias
  3. Heartbeat para detectar conexiones muertas
  4. SSE como alternativa más simple para notificaciones

Pattern Senior: Usa SSE para notificaciones unidireccionales (más simple, reconexión automática). Reserva WebSockets para comunicación bidireccional real (chat, colaboración).

En el siguiente capítulo, consolidaremos todo con patrones de Arquitectura Enterprise.