¿Alguna vez has hecho clic en un botón, la página se queda "pensando" y no sabes si algo está pasando? Para evitar esto en procesos largos —generar un reporte, procesar imágenes, ejecutar un análisis— necesitas comunicación en tiempo real. Pero más importante: necesitas elegir la herramienta correcta.
¿Cuándo usar SSE y cuándo no?
Antes de escribir una línea de código, la decisión de arquitectura más importante es elegir el protocolo correcto.
| Criterio | Polling | WebSockets | SSE |
|---|---|---|---|
| Dirección | Client → Server | Bidireccional | Server → Client |
| Complejidad | Baja | Alta | Baja/Media |
| Reconexión automática | Manual | Manual | Nativa |
| Soporte HTTP/2 | Sí | No | Sí |
| Ideal para | Datos muy esporádicos | Chat, juegos | Progreso, notificaciones |
Regla práctica: Si el cliente nunca necesita enviar datos por el mismo canal abierto, SSE gana. Es HTTP estándar, tiene reconexión automática incorporada en el navegador, y es significativamente más simple de escalar que WebSockets.
La Arquitectura: Por qué 4 piezas y no 2
La tentación inicial es hacer todo en un solo proceso FastAPI. El problema: FastAPI (ASGI) está optimizado para I/O, no para cómputo pesado. Si pones una tarea de 30 segundos directamente en un endpoint, bloqueas el event loop y degradas toda la API.
✅ Arquitectura correcta:
Browser → FastAPI → Redis (cola) → Celery Worker → Redis (estado)
↑ |
└──────── SSE stream ←─────────────────┘
Cada pieza tiene una responsabilidad única:
- FastAPI: Recibe peticiones HTTP y mantiene las conexiones SSE abiertas.
- Celery Worker: Ejecuta las tareas pesadas en un proceso separado.
- Redis: Actúa como Broker (cola) y como State Store (donde vive el progreso).
- React: Consume el stream SSE y actualiza la UI.
Paso 1: El Backend (FastAPI)
El endpoint de encolar
Usamos celery_app.send_task para enviar la tarea al worker sin esperar a que termine.
@app.post("/task")
async def create_task():
task_id = str(uuid.uuid4())
# Encolar la tarea y guardar estado inicial
celery_app.send_task("tasks.run_simulation", args=[task_id], task_id=task_id)
redis_client.set(f"task:{task_id}", json.dumps({"status": "pending", "progress": 0}))
return {"task_id": task_id}
El stream SSE: Máxima estabilidad con StreamingResponse
Para evitar errores de encoding y asegurar compatibilidad total, usamos StreamingResponse formateando manualmente el protocolo SSE (data: <json>\n\n).
from fastapi.responses import StreamingResponse
@app.get("/task/{task_id}/stream")
async def task_stream(task_id: str, request: Request):
async def event_generator():
while True:
# 1. Detectar si el usuario cerró la pestaña
if await request.is_disconnected():
break
# 2. Leer progreso de Redis
raw = redis_client.get(f"task:{task_id}")
if raw:
data = json.loads(raw)
data["id"] = task_id
# 3. Formato obligatorio SSE: "data: ...\n\n"
yield f"data: {json.dumps(data)}\n\n"
if data.get("status") == "done":
break
await asyncio.sleep(0.5)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
Paso 2: El Worker (Celery)
El worker es el encargado de actualizar Redis en cada fase de la simulación.
@celery_app.task(name="tasks.run_simulation", bind=True)
def run_simulation(self, task_id):
# Fase 1: Simulación de inicio
time.sleep(2)
redis_client.set(f"task:{task_id}", json.dumps({"status": "processing", "progress": 33}))
# Fase 2: Trabajo pesado
time.sleep(random.uniform(3, 6))
redis_client.set(f"task:{task_id}", json.dumps({"status": "processing", "progress": 66}))
# Fase 3: Finalización
result = {
"status": "done",
"progress": 100,
"result": {"metric": random.uniform(0, 100), "processed_at": "..."}
}
redis_client.set(f"task:{task_id}", json.dumps(result))
return result
Paso 3: El Frontend (React)
Usamos la API nativa EventSource. Un detalle clave: cerramos la conexión manualmente cuando el estado es done para liberar recursos del servidor.
function TaskCard({ task, onUpdate }: TaskCardProps) {
useEffect(() => {
if (task.status === "done") return;
const eventSource = new EventSource(`http://localhost:8000/task/${task.id}/stream`);
eventSource.onmessage = (event) => {
const updated = JSON.parse(event.data);
onUpdate(updated);
if (updated.status === "done") eventSource.close();
};
return () => eventSource.close(); // Cleanup
}, [task.id, task.status]);
// ... Render barrita de progreso y badges
}
Paso 4: Dockerización
Orquestamos los 4 servicios para que la red interna permita la comunicación entre ellos usando sus nombres de servicio (redis, api, etc.).
services:
redis:
image: redis:7-alpine
api:
build: ./api
environment:
- REDIS_URL=redis://redis:6379/0
ports:
- "8000:8000"
worker:
build: ./worker
environment:
- REDIS_URL=redis://redis:6379/0
frontend:
build: ./frontend
ports:
- "5173:5173"
Casos de Producción: Lo que no te dicen
-
Nginx Buffering: Si usas Nginx como proxy, debes desactivar el buffering (
proxy_buffering off), de lo contrario, los eventos SSE se acumularán y llegarán todos de golpe al final. - Límite de conexiones: Los navegadores limitan a 6 las conexiones SSE por dominio en HTTP/1.1. Si tu app escala, considera usar HTTP/2.
-
Manejo de desconexiones: El check
request.is_disconnected()en FastAPI es vital para no dejar bucles infinitos en el backend.
Conclusión
Este stack (FastAPI + Celery + Redis + SSE) es una solución de grado industrial. Es robusta, fácil de debuguear y escala mucho mejor que intentar manejar sockets bidireccionales cuando solo necesitas enviar progreso al usuario.
¿Has implementado sistemas en tiempo real antes? ¿Qué retos encontraste con los streams de datos? ¡Te leo en los comentarios! 👇
Top comments (0)