DEV Community

Cover image for 🚀 Notificaciones en Tiempo Real con SSE, FastAPI y Celery
Enrique Lazo Bello
Enrique Lazo Bello

Posted on

🚀 Notificaciones en Tiempo Real con SSE, FastAPI y Celery

¿Alguna vez has hecho clic en un botón, la página se queda "pensando" y no sabes si algo está pasando? Para evitar esto en procesos largos —generar un reporte, procesar imágenes, ejecutar un análisis— necesitas comunicación en tiempo real. Pero más importante: necesitas elegir la herramienta correcta.

¿Cuándo usar SSE y cuándo no?

Antes de escribir una línea de código, la decisión de arquitectura más importante es elegir el protocolo correcto.

Criterio Polling WebSockets SSE
Dirección Client → Server Bidireccional Server → Client
Complejidad Baja Alta Baja/Media
Reconexión automática Manual Manual Nativa
Soporte HTTP/2 No
Ideal para Datos muy esporádicos Chat, juegos Progreso, notificaciones

Regla práctica: Si el cliente nunca necesita enviar datos por el mismo canal abierto, SSE gana. Es HTTP estándar, tiene reconexión automática incorporada en el navegador, y es significativamente más simple de escalar que WebSockets.

La Arquitectura: Por qué 4 piezas y no 2

La tentación inicial es hacer todo en un solo proceso FastAPI. El problema: FastAPI (ASGI) está optimizado para I/O, no para cómputo pesado. Si pones una tarea de 30 segundos directamente en un endpoint, bloqueas el event loop y degradas toda la API.

✅ Arquitectura correcta:
Browser → FastAPI → Redis (cola) → Celery Worker → Redis (estado)
                 ↑                                      |
                 └──────── SSE stream ←─────────────────┘
Enter fullscreen mode Exit fullscreen mode

Cada pieza tiene una responsabilidad única:

  • FastAPI: Recibe peticiones HTTP y mantiene las conexiones SSE abiertas.
  • Celery Worker: Ejecuta las tareas pesadas en un proceso separado.
  • Redis: Actúa como Broker (cola) y como State Store (donde vive el progreso).
  • React: Consume el stream SSE y actualiza la UI.

Paso 1: El Backend (FastAPI)

El endpoint de encolar

Usamos celery_app.send_task para enviar la tarea al worker sin esperar a que termine.

@app.post("/task")
async def create_task():
    task_id = str(uuid.uuid4())
    # Encolar la tarea y guardar estado inicial
    celery_app.send_task("tasks.run_simulation", args=[task_id], task_id=task_id)
    redis_client.set(f"task:{task_id}", json.dumps({"status": "pending", "progress": 0}))
    return {"task_id": task_id}
Enter fullscreen mode Exit fullscreen mode

El stream SSE: Máxima estabilidad con StreamingResponse

Para evitar errores de encoding y asegurar compatibilidad total, usamos StreamingResponse formateando manualmente el protocolo SSE (data: <json>\n\n).

from fastapi.responses import StreamingResponse

@app.get("/task/{task_id}/stream")
async def task_stream(task_id: str, request: Request):
    async def event_generator():
        while True:
            # 1. Detectar si el usuario cerró la pestaña
            if await request.is_disconnected():
                break

            # 2. Leer progreso de Redis
            raw = redis_client.get(f"task:{task_id}")
            if raw:
                data = json.loads(raw)
                data["id"] = task_id
                # 3. Formato obligatorio SSE: "data: ...\n\n"
                yield f"data: {json.dumps(data)}\n\n"

                if data.get("status") == "done":
                    break

            await asyncio.sleep(0.5)

    return StreamingResponse(
        event_generator(), 
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )
Enter fullscreen mode Exit fullscreen mode

Paso 2: El Worker (Celery)

El worker es el encargado de actualizar Redis en cada fase de la simulación.

@celery_app.task(name="tasks.run_simulation", bind=True)
def run_simulation(self, task_id):
    # Fase 1: Simulación de inicio
    time.sleep(2)
    redis_client.set(f"task:{task_id}", json.dumps({"status": "processing", "progress": 33}))

    # Fase 2: Trabajo pesado
    time.sleep(random.uniform(3, 6))
    redis_client.set(f"task:{task_id}", json.dumps({"status": "processing", "progress": 66}))

    # Fase 3: Finalización
    result = {
        "status": "done", 
        "progress": 100, 
        "result": {"metric": random.uniform(0, 100), "processed_at": "..."}
    }
    redis_client.set(f"task:{task_id}", json.dumps(result))
    return result
Enter fullscreen mode Exit fullscreen mode

Paso 3: El Frontend (React)

Usamos la API nativa EventSource. Un detalle clave: cerramos la conexión manualmente cuando el estado es done para liberar recursos del servidor.

function TaskCard({ task, onUpdate }: TaskCardProps) {
  useEffect(() => {
    if (task.status === "done") return;

    const eventSource = new EventSource(`http://localhost:8000/task/${task.id}/stream`);

    eventSource.onmessage = (event) => {
      const updated = JSON.parse(event.data);
      onUpdate(updated);
      if (updated.status === "done") eventSource.close();
    };

    return () => eventSource.close(); // Cleanup
  }, [task.id, task.status]);

  // ... Render barrita de progreso y badges
}
Enter fullscreen mode Exit fullscreen mode

Paso 4: Dockerización

Orquestamos los 4 servicios para que la red interna permita la comunicación entre ellos usando sus nombres de servicio (redis, api, etc.).

services:
  redis:
    image: redis:7-alpine
  api:
    build: ./api
    environment:
      - REDIS_URL=redis://redis:6379/0
    ports:
      - "8000:8000"
  worker:
    build: ./worker
    environment:
      - REDIS_URL=redis://redis:6379/0
  frontend:
    build: ./frontend
    ports:
      - "5173:5173"
Enter fullscreen mode Exit fullscreen mode

Casos de Producción: Lo que no te dicen

  1. Nginx Buffering: Si usas Nginx como proxy, debes desactivar el buffering (proxy_buffering off), de lo contrario, los eventos SSE se acumularán y llegarán todos de golpe al final.
  2. Límite de conexiones: Los navegadores limitan a 6 las conexiones SSE por dominio en HTTP/1.1. Si tu app escala, considera usar HTTP/2.
  3. Manejo de desconexiones: El check request.is_disconnected() en FastAPI es vital para no dejar bucles infinitos en el backend.

Conclusión

Este stack (FastAPI + Celery + Redis + SSE) es una solución de grado industrial. Es robusta, fácil de debuguear y escala mucho mejor que intentar manejar sockets bidireccionales cuando solo necesitas enviar progreso al usuario.

¿Has implementado sistemas en tiempo real antes? ¿Qué retos encontraste con los streams de datos? ¡Te leo en los comentarios! 👇

Top comments (0)