Módulo 19 • 16 min de lectura
19 - WebSockets y Real-time
Comunicación bidireccional con WebSockets, Server-Sent Events y patrones pub/sub con Redis.
#websockets
#realtime
#sse
#pubsub
#redis
1. WebSockets vs SSE vs Polling
| Característica | WebSocket | SSE | Long Polling |
|---|---|---|---|
| Dirección | Bidireccional | Server → Client | Bidireccional |
| Protocolo | ws:// | HTTP | HTTP |
| Reconexión | Manual | Automática | Manual |
| Binary | ✅ | ❌ (solo texto) | ✅ |
| Firewall | ⚠️ Puede bloquear | ✅ HTTP normal | ✅ |
| Uso típico | Chat, gaming | Notificaciones, feeds | Legacy |
2. WebSockets en FastAPI
JavaScript/TypeScript
// Node.js: Socket.io
import { Server } from 'socket.io';
const io = new Server(server);
io.on('connection', (socket) => {
console.log('Client connected');
socket.on('message', (data) => {
io.emit('message', data); // Broadcast
});
socket.on('disconnect', () => {
console.log('Client disconnected');
});
}); Python
# FastAPI: WebSocket nativo
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print("Client disconnected") 3. Connection Manager
# services/websocket.py
from fastapi import WebSocket
from typing import Dict, Set
import json
import asyncio
class ConnectionManager:
"""Gestiona conexiones WebSocket activas."""
def __init__(self):
# Conexiones por usuario
self.active_connections: Dict[str, Set[WebSocket]] = {}
# Conexiones por room/channel
self.rooms: Dict[str, Set[WebSocket]] = {}
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, user_id: str):
"""Acepta nueva conexión."""
await websocket.accept()
async with self._lock:
if user_id not in self.active_connections:
self.active_connections[user_id] = set()
self.active_connections[user_id].add(websocket)
async def disconnect(self, websocket: WebSocket, user_id: str):
"""Elimina conexión."""
async with self._lock:
if user_id in self.active_connections:
self.active_connections[user_id].discard(websocket)
if not self.active_connections[user_id]:
del self.active_connections[user_id]
# Eliminar de todas las rooms
for room in list(self.rooms.keys()):
self.rooms[room].discard(websocket)
if not self.rooms[room]:
del self.rooms[room]
async def join_room(self, websocket: WebSocket, room: str):
"""Une conexión a una room."""
async with self._lock:
if room not in self.rooms:
self.rooms[room] = set()
self.rooms[room].add(websocket)
async def leave_room(self, websocket: WebSocket, room: str):
"""Saca conexión de una room."""
async with self._lock:
if room in self.rooms:
self.rooms[room].discard(websocket)
async def send_personal(self, user_id: str, message: dict):
"""Envía mensaje a un usuario específico."""
connections = self.active_connections.get(user_id, set())
for conn in connections:
try:
await conn.send_json(message)
except Exception:
pass # Conexión muerta, se limpiará en disconnect
async def broadcast_room(self, room: str, message: dict, exclude: WebSocket | None = None):
"""Broadcast a todos en una room."""
connections = self.rooms.get(room, set())
for conn in connections:
if conn != exclude:
try:
await conn.send_json(message)
except Exception:
pass
async def broadcast_all(self, message: dict):
"""Broadcast a todas las conexiones."""
for connections in self.active_connections.values():
for conn in connections:
try:
await conn.send_json(message)
except Exception:
pass
manager = ConnectionManager()
4. WebSocket con Autenticación
from fastapi import WebSocket, WebSocketDisconnect, Query, Depends
from app.core.security import decode_token
from app.services.websocket import manager
async def get_websocket_user(
websocket: WebSocket,
token: str = Query(...),
) -> str:
"""Dependency para autenticar WebSocket."""
try:
payload = decode_token(token)
return payload.sub
except Exception:
await websocket.close(code=4001, reason="Invalid token")
raise WebSocketDisconnect(code=4001)
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
user_id: str = Depends(get_websocket_user),
):
await manager.connect(websocket, user_id)
try:
# Notificar que el usuario está online
await manager.broadcast_all({
"type": "user_online",
"user_id": user_id,
})
while True:
data = await websocket.receive_json()
await handle_message(websocket, user_id, data)
except WebSocketDisconnect:
await manager.disconnect(websocket, user_id)
await manager.broadcast_all({
"type": "user_offline",
"user_id": user_id,
})
async def handle_message(websocket: WebSocket, user_id: str, data: dict):
"""Procesa mensajes entrantes."""
msg_type = data.get("type")
match msg_type:
case "join_room":
room = data["room"]
await manager.join_room(websocket, room)
await manager.broadcast_room(room, {
"type": "user_joined",
"user_id": user_id,
"room": room,
})
case "leave_room":
room = data["room"]
await manager.leave_room(websocket, room)
await manager.broadcast_room(room, {
"type": "user_left",
"user_id": user_id,
"room": room,
})
case "message":
room = data.get("room")
if room:
await manager.broadcast_room(room, {
"type": "message",
"user_id": user_id,
"content": data["content"],
"room": room,
}, exclude=websocket)
case "ping":
await websocket.send_json({"type": "pong"})
5. Scaling con Redis Pub/Sub
Para múltiples instancias de la API, necesitas Redis como message broker:
# services/pubsub.py
import redis.asyncio as redis
import json
import asyncio
from typing import Callable, Dict
class RedisPubSub:
"""Pub/Sub distribuido con Redis."""
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis: redis.Redis | None = None
self.pubsub: redis.client.PubSub | None = None
self.handlers: Dict[str, Callable] = {}
self._listener_task: asyncio.Task | None = None
async def connect(self):
self.redis = redis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()
async def disconnect(self):
if self._listener_task:
self._listener_task.cancel()
if self.pubsub:
await self.pubsub.close()
if self.redis:
await self.redis.close()
async def subscribe(self, channel: str, handler: Callable):
"""Suscribirse a un canal."""
self.handlers[channel] = handler
await self.pubsub.subscribe(channel)
if not self._listener_task:
self._listener_task = asyncio.create_task(self._listen())
async def publish(self, channel: str, message: dict):
"""Publicar mensaje a un canal."""
await self.redis.publish(channel, json.dumps(message))
async def _listen(self):
"""Escucha mensajes de Redis."""
async for message in self.pubsub.listen():
if message["type"] == "message":
channel = message["channel"].decode()
data = json.loads(message["data"])
handler = self.handlers.get(channel)
if handler:
await handler(data)
pubsub = RedisPubSub(settings.REDIS_URL)
Integración con WebSocket Manager
# services/websocket_distributed.py
from app.services.pubsub import pubsub
from app.services.websocket import manager
async def setup_pubsub():
"""Configura pub/sub para WebSockets distribuidos."""
await pubsub.connect()
# Handler para mensajes de broadcast
async def handle_broadcast(data: dict):
msg_type = data.get("type")
if msg_type == "room_message":
await manager.broadcast_room(
data["room"],
data["message"],
)
elif msg_type == "user_message":
await manager.send_personal(
data["user_id"],
data["message"],
)
elif msg_type == "global_message":
await manager.broadcast_all(data["message"])
await pubsub.subscribe("websocket:broadcast", handle_broadcast)
# Modificar el handler de mensajes
async def handle_message(websocket: WebSocket, user_id: str, data: dict):
msg_type = data.get("type")
if msg_type == "message":
room = data.get("room")
message = {
"type": "message",
"user_id": user_id,
"content": data["content"],
"room": room,
}
# Publicar a Redis (todas las instancias recibirán)
await pubsub.publish("websocket:broadcast", {
"type": "room_message",
"room": room,
"message": message,
})
6. Server-Sent Events (SSE)
Para notificaciones unidireccionales (server → client):
from fastapi import Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
@app.get("/events")
async def event_stream(request: Request, current_user: CurrentUser):
"""Stream de eventos SSE."""
async def generate():
queue: asyncio.Queue = asyncio.Queue()
# Registrar para recibir eventos
user_queues[current_user.id] = queue
try:
while True:
# Verificar si el cliente desconectó
if await request.is_disconnected():
break
try:
# Esperar evento con timeout
event = await asyncio.wait_for(queue.get(), timeout=30)
yield {
"event": event["type"],
"data": json.dumps(event["data"]),
}
except asyncio.TimeoutError:
# Keepalive
yield {"event": "ping", "data": ""}
finally:
del user_queues[current_user.id]
return EventSourceResponse(generate())
# Enviar evento a un usuario
async def send_notification(user_id: str, notification: dict):
queue = user_queues.get(user_id)
if queue:
await queue.put({
"type": "notification",
"data": notification,
})
Cliente JavaScript
// SSE es más simple que WebSocket para notificaciones
const eventSource = new EventSource('/events?token=' + token);
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
showNotification(data);
});
eventSource.addEventListener('ping', () => {
// Keepalive, ignorar
});
eventSource.onerror = () => {
// SSE reconecta automáticamente
console.log('SSE reconnecting...');
};
7. Heartbeat y Reconnection
import asyncio
class HeartbeatManager:
"""Detecta conexiones muertas con heartbeat."""
def __init__(self, interval: int = 30, timeout: int = 10):
self.interval = interval
self.timeout = timeout
self.last_pong: Dict[WebSocket, float] = {}
async def start_heartbeat(self, websocket: WebSocket):
"""Inicia heartbeat para una conexión."""
self.last_pong[websocket] = asyncio.get_event_loop().time()
while True:
await asyncio.sleep(self.interval)
# Verificar último pong
last = self.last_pong.get(websocket, 0)
now = asyncio.get_event_loop().time()
if now - last > self.interval + self.timeout:
# Conexión muerta
await websocket.close(code=1000)
break
try:
await websocket.send_json({"type": "ping"})
except Exception:
break
def record_pong(self, websocket: WebSocket):
"""Registra recepción de pong."""
self.last_pong[websocket] = asyncio.get_event_loop().time()
def cleanup(self, websocket: WebSocket):
"""Limpia tracking de conexión."""
self.last_pong.pop(websocket, None)
heartbeat = HeartbeatManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, user_id: str = Depends(get_websocket_user)):
await manager.connect(websocket, user_id)
# Iniciar heartbeat en background
heartbeat_task = asyncio.create_task(heartbeat.start_heartbeat(websocket))
try:
while True:
data = await websocket.receive_json()
if data.get("type") == "pong":
heartbeat.record_pong(websocket)
else:
await handle_message(websocket, user_id, data)
except WebSocketDisconnect:
pass
finally:
heartbeat_task.cancel()
heartbeat.cleanup(websocket)
await manager.disconnect(websocket, user_id)
8. Rate Limiting en WebSockets
from collections import defaultdict
import time
class WebSocketRateLimiter:
def __init__(self, max_messages: int = 10, window_seconds: int = 1):
self.max_messages = max_messages
self.window = window_seconds
self.message_counts: Dict[str, list] = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = time.time()
window_start = now - self.window
# Limpiar mensajes viejos
self.message_counts[user_id] = [
t for t in self.message_counts[user_id]
if t > window_start
]
if len(self.message_counts[user_id]) >= self.max_messages:
return False
self.message_counts[user_id].append(now)
return True
rate_limiter = WebSocketRateLimiter(max_messages=10, window_seconds=1)
async def handle_message(websocket: WebSocket, user_id: str, data: dict):
if not rate_limiter.is_allowed(user_id):
await websocket.send_json({
"type": "error",
"message": "Rate limit exceeded",
})
return
# Procesar mensaje...
9. Testing de WebSockets
import pytest
from httpx import AsyncClient
from httpx_ws import aconnect_ws
@pytest.fixture
async def websocket_client(client: AsyncClient, auth_token: str):
async with aconnect_ws(
f"/ws?token={auth_token}",
client,
) as ws:
yield ws
async def test_websocket_connection(websocket_client):
# Enviar mensaje
await websocket_client.send_json({"type": "ping"})
# Recibir respuesta
response = await websocket_client.receive_json()
assert response["type"] == "pong"
async def test_websocket_room_join(websocket_client):
# Unirse a room
await websocket_client.send_json({
"type": "join_room",
"room": "test-room",
})
# Verificar confirmación
response = await websocket_client.receive_json()
assert response["type"] == "user_joined"
assert response["room"] == "test-room"
async def test_websocket_broadcast(websocket_client, another_websocket_client):
# Ambos se unen a la misma room
await websocket_client.send_json({"type": "join_room", "room": "chat"})
await another_websocket_client.send_json({"type": "join_room", "room": "chat"})
# Consumir mensajes de join
await websocket_client.receive_json()
await another_websocket_client.receive_json()
await another_websocket_client.receive_json()
# Enviar mensaje
await websocket_client.send_json({
"type": "message",
"room": "chat",
"content": "Hello!",
})
# El otro cliente debe recibirlo
response = await another_websocket_client.receive_json()
assert response["type"] == "message"
assert response["content"] == "Hello!"
10. Comparativa: Socket.io vs FastAPI WebSocket
| Aspecto | Socket.io | FastAPI WebSocket |
|---|---|---|
| Protocolo | Custom sobre WS | WebSocket puro |
| Rooms | Built-in | Manual (ConnectionManager) |
| Reconnection | Automática | Manual |
| Fallback | Polling automático | No |
| Namespaces | Built-in | Manual (paths) |
| Scaling | Redis adapter | Redis Pub/Sub manual |
| Binary | Soportado | Soportado |
11. Arquitectura Real-time Escalable
┌─────────────┐
│ Client │
└──────┬──────┘
│ WebSocket
┌──────▼──────┐
│ Load Balancer│
│ (Sticky) │
└──────┬──────┘
┌────────────┼────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ API Pod 1 │ │ API Pod 2 │ │ API Pod 3 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└────────────┼────────────┘
│ Pub/Sub
┌──────▼──────┐
│ Redis │
└─────────────┘
Sticky sessions: Necesarias para WebSocket (la conexión debe mantenerse con el mismo pod).
Conclusión
WebSockets en FastAPI requieren más setup manual que Socket.io:
- ConnectionManager para gestionar conexiones
- Redis Pub/Sub para múltiples instancias
- Heartbeat para detectar conexiones muertas
- SSE como alternativa más simple para notificaciones
Pattern Senior: Usa SSE para notificaciones unidireccionales (más simple, reconexión automática). Reserva WebSockets para comunicación bidireccional real (chat, colaboración).
En el siguiente capítulo, consolidaremos todo con patrones de Arquitectura Enterprise.