DEV Community

Cover image for Cómo implementé streaming SSE de extremo a extremo: desde el LLM hasta el navegador pasando por Nginx
Martin Palopoli
Martin Palopoli

Posted on

Cómo implementé streaming SSE de extremo a extremo: desde el LLM hasta el navegador pasando por Nginx

TL;DR

Implementé streaming SSE de extremo a extremo para un motor RAG: un generador async en FastAPI emite eventos custom (token, sources, confidence, done, error, etc.), el frontend los consume con fetch + ReadableStream, un widget embebible en vanilla JS hace lo mismo dentro de un Shadow DOM cerrado, y Nginx necesita 6 directivas específicas para no arruinar todo. Comparto el código real, el protocolo de eventos, y las lecciones aprendidas.


Por qué SSE y no WebSockets

Cuando necesitás streaming en una app RAG, la primera pregunta es: WebSockets o SSE.

Para un chat con LLM, la respuesta es clara: SSE.

Criterio SSE WebSockets
Dirección del flujo Servidor → cliente (unidireccional) Bidireccional
Protocolo HTTP estándar Protocolo propio (ws://)
Reconexión Automática (built-in) Manual
Proxy/CDN compat Excelente (es HTTP) Problemática
Complejidad server Baja (generador async) Alta (state management)
Auth Headers HTTP normales Hack en query params o primer mensaje

Un chat RAG es fundamentalmente unidireccional durante el streaming: el usuario envía un mensaje (un POST normal) y el servidor responde con un stream de tokens. No necesitás un canal bidireccional permanente.

Además, SSE viaja sobre HTTP estándar. Eso significa que funciona con cualquier reverse proxy, CDN, load balancer, y firewall sin configuración especial (más allá de desactivar buffering, que ya veremos).

Con WebSockets tenés que manejar: reconexión manual, autenticación en el handshake, estado de la conexión, heartbeats, y proxies que cierran conexiones idle. Todo eso es complejidad que no aporta nada en este caso de uso.


El stack

┌─────────────┐     POST /messages     ┌─────────────┐
│   Browser    │ ──────────────────────►│   Nginx     │
│  (Vue/Widget)│                        │  (reverse   │
│              │◄──── SSE stream ───────│   proxy)    │
└─────────────┘                        └──────┬──────┘
                                              │
                                       ┌──────▼──────┐
                                       │   FastAPI    │
                                       │  (uvicorn)   │
                                       │              │
                                       │  async gen   │──► Groq/OpenAI/Ollama
                                       │  → SSE       │──► PostgreSQL
                                       │              │──► Redis
                                       └─────────────┘
Enter fullscreen mode Exit fullscreen mode

Backend: el generador async que emite SSE

La base: EventSourceResponse

FastAPI no tiene soporte nativo para SSE, pero sse-starlette lo resuelve con una línea:

from sse_starlette.sse import EventSourceResponse

@router.post("/conversations/{conv_id}/messages")
async def send_message(conv_id: UUID, data: MessageCreate, ...):
    # ... validaciones, búsqueda RAG, etc.

    async def event_generator():
        async for event in _build_stream(conv_id, data, ...):
            yield event

    return EventSourceResponse(event_generator())
Enter fullscreen mode Exit fullscreen mode

EventSourceResponse toma un generador async y lo convierte en un stream SSE con el formato correcto:

event: token
data: {"content": "La "}

event: token
data: {"content": "respuesta "}

event: token
data: {"content": "es..."}

event: done
data: {"message_id": "abc-123"}
Enter fullscreen mode Exit fullscreen mode

Cada yield del generador se convierte en un bloque event: + data: separado por \n\n.

El protocolo de eventos

Diseñé un protocolo con 10 tipos de eventos, cada uno con un payload JSON específico:

# Eventos del protocolo SSE
# ─────────────────────────────────────────
# user_message_id  → ID real del mensaje guardado en DB
# sources          → Chunks recuperados del RAG pipeline
# confidence       → Score de confianza (0-1) del reranking
# token            → Fragmento de texto del LLM
# faq_match        → Respuesta FAQ directa (sin LLM)
# searching        → Búsqueda agentic en progreso (round N)
# suggestions      → Preguntas de seguimiento sugeridas
# error            → Error durante el streaming
# upgrade_required → Límite de plan alcanzado
# content_blocked  → Contenido bloqueado por reglas
# done             → Stream finalizado, con message_id
Enter fullscreen mode Exit fullscreen mode

¿Por qué eventos tipados y no solo data:? Porque el frontend necesita saber qué está recibiendo antes de parsearlo. Un token se renderiza diferente a una fuente, un error se maneja diferente a un evento de confianza. Sin tipos, el frontend tendría que adivinar el contenido del JSON.

El generador completo

Así se ve el generador real (simplificado para legibilidad, pero con la estructura exacta):

async def event_generator():
    # Trackear stream concurrente con Redis
    await increment_concurrent(str(tenant_id))
    try:
        async for event in _event_generator_inner():
            yield event
    finally:
        # SIEMPRE decrementar, incluso si el cliente desconecta
        await decrement_concurrent(str(tenant_id))


async def _event_generator_inner():
    # 1. ID real del mensaje del usuario (para sincronizar con el optimistic UI)
    yield {
        "event": "user_message_id",
        "data": json.dumps({"message_id": str(user_msg.id)})
    }

    # 2. Fuentes recuperadas del pipeline RAG
    yield {
        "event": "sources",
        "data": json.dumps({"sources": sources})
    }

    # 3. Score de confianza
    yield {
        "event": "confidence",
        "data": json.dumps({"score": round(confidence, 3)})
    }

    # 4. Tokens del LLM (el corazón del streaming)
    full_response = ""
    buffer = ""

    stream = stream_chat_response(query, sources, history, provider, ...)

    if inspect.isasyncgen(stream):
        async for token in stream:
            buffer += token
            full_response += token

            # Buffer para agentic search: no emitir si puede venir [SEARCH:]
            if buffer and '[' not in buffer[-20:]:
                yield {
                    "event": "token",
                    "data": json.dumps({"content": buffer})
                }
                buffer = ""
        else:
            # Flush del buffer final
            if buffer:
                yield {
                    "event": "token",
                    "data": json.dumps({"content": buffer})
                }
    else:
        # Stream sincrónico (Groq, OpenAI)
        for token in stream:
            buffer += token
            full_response += token
            if buffer and '[' not in buffer[-20:]:
                yield {
                    "event": "token",
                    "data": json.dumps({"content": buffer})
                }
                buffer = ""
        else:
            if buffer:
                yield {
                    "event": "token",
                    "data": json.dumps({"content": buffer})
                }

    # 5. Sugerencias de seguimiento (post-stream)
    suggestion_list = await generate_suggestions(full_response, sources, provider)
    if suggestion_list:
        yield {
            "event": "suggestions",
            "data": json.dumps({"suggestions": suggestion_list})
        }

    # 6. Guardar mensaje y señalizar fin
    msg = await chat_service.save_message(db, conv_id, MessageRole.ASSISTANT, full_response, ...)

    yield {
        "event": "done",
        "data": json.dumps({"message_id": str(msg.id)})
    }
Enter fullscreen mode Exit fullscreen mode

El detalle del buffer para agentic search

El sistema soporta búsqueda agentic: el LLM puede emitir [SEARCH: nueva consulta] en medio de su respuesta para pedir más información. El problema: si emitís tokens uno a uno, el frontend podría renderizar [SEARCH: parcial como texto visible.

La solución es un buffer que retiene los últimos 20 caracteres si contienen [:

if buffer and '[' not in buffer[-20:]:
    yield {"event": "token", "data": json.dumps({"content": buffer})}
    buffer = ""
Enter fullscreen mode Exit fullscreen mode

Si se detecta el marcador completo [SEARCH: ...], se interrumpe el stream, se ejecuta una nueva búsqueda, y se continúa con contexto expandido:

match = SEARCH_PATTERN.search(accumulated)
if match and search_round < MAX_SEARCH_ROUNDS:
    search_query = match.group(1).strip()
    search_round += 1

    yield {"event": "searching", "data": json.dumps({
        "query": search_query, "round": search_round
    })}

    # Ejecutar nueva búsqueda con el query del LLM
    new_sources = await search_chunks(db, search_query, kb_ids, rag_config=rag_config)

    # Deduplicar fuentes
    existing_ids = {s["chunk_id"] for s in all_sources}
    unique_new = [s for s in new_sources if s["chunk_id"] not in existing_ids]

    if unique_new:
        yield {"event": "sources", "data": json.dumps({
            "sources": unique_new, "round": search_round
        })}

    # Continuar generación con contexto expandido
    # (máximo 3 rounds de búsqueda)
Enter fullscreen mode Exit fullscreen mode

Short-circuits: FAQ, cache y upgrade

Antes del pipeline RAG completo, hay varios "atajos" que retornan un EventSourceResponse diferente:

# FAQ match → respuesta instantánea sin LLM
if faq_match:
    async def faq_event_generator():
        yield {"event": "user_message_id", "data": json.dumps({"message_id": str(user_msg.id)})}
        yield {"event": "faq_match", "data": json.dumps(faq_match)}
        yield {"event": "done", "data": json.dumps({"message_id": str(msg.id)})}
    return EventSourceResponse(faq_event_generator())

# Cache hit → respuesta cacheada, simular streaming
if cache_hit:
    async def cache_event_generator():
        yield {"event": "sources", "data": json.dumps({"sources": cache_hit["sources"]})}
        yield {"event": "confidence", "data": json.dumps({"score": cache_hit["confidence"]})}
        # Simular streaming palabra por palabra
        for token in cache_hit["response_text"].split(" "):
            yield {"event": "token", "data": json.dumps({"content": token + " "})}
        yield {"event": "done", "data": json.dumps({"message_id": str(msg.id), "cached": True})}
    return EventSourceResponse(cache_event_generator())

# Budget agotado → señal de upgrade
if faq_only_mode:
    async def upgrade_event_generator():
        yield {"event": "upgrade_required", "data": json.dumps({
            "message": "Has alcanzado el límite de consultas de IA de tu plan."
        })}
        yield {"event": "done", "data": json.dumps({"message_id": str(msg.id)})}
    return EventSourceResponse(upgrade_event_generator())
Enter fullscreen mode Exit fullscreen mode

¿Por qué simular streaming en cache hits? Porque el frontend tiene una sola ruta de renderizado. Si el cache devolviera todo el texto de golpe, necesitaría lógica especial. Simular el streaming mantiene el código del frontend simple.


Streaming desde los providers LLM

Cada provider tiene su propio formato. La capa de abstracción unifica todo en generadores que yield strings:

# Groq y OpenAI: generador SINCRÓNICO (SDK compatible)
def _stream_groq(messages, temperature=0.3, max_tokens=4096):
    stream = groq_client.chat.completions.create(
        model=settings.groq_model, messages=messages,
        stream=True, temperature=temperature, max_tokens=max_tokens,
    )
    for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield delta.content

# Ollama: generador ASYNC (NDJSON sobre HTTP streaming)
async def _stream_ollama(messages, temperature=0.3, max_tokens=4096):
    async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
        async with client.stream("POST", f"{settings.ollama_url}/api/chat",
                                 json={"model": settings.ollama_model,
                                       "messages": messages, "stream": True}) as response:
            async for line in response.aiter_lines():
                data = json.loads(line)
                content = data.get("message", {}).get("content", "")
                if content:
                    yield content
                if data.get("done"):
                    break
Enter fullscreen mode Exit fullscreen mode

El detalle clave: Groq/OpenAI retornan generadores sincrónicos, Ollama retorna async. El generador SSE necesita manejar ambos con inspect.isasyncgen(). Es feo, pero wrappear sync en async agrega complejidad e indirección que dificulta el debugging.


Frontend: consumo con fetch + ReadableStream

Por qué no EventSource

La API EventSource del browser tiene un problema fatal para este caso de uso: solo soporta GET. Nuestro endpoint de chat es un POST con body JSON (contenido del mensaje, IDs de knowledge bases, provider, etc.).

La alternativa: fetch + ReadableStream. Es más código, pero da control total.

El parser SSE

export async function sendMessage(
  convId: string,
  content: string,
  knowledgeBaseIds: string[],
  callbacks: StreamCallbacks,
  provider?: string,
): Promise<void> {
  const token = localStorage.getItem('access_token')

  const response = await fetch(`/api/v1/conversations/${convId}/messages`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      ...(token ? { Authorization: `Bearer ${token}` } : {}),
    },
    body: JSON.stringify({
      content,
      knowledge_base_ids: knowledgeBaseIds,
      provider,
    }),
  })

  if (!response.ok) {
    // Detectar error de límite de plan (403)
    if (response.status === 403) {
      const errorData = await response.json()
      if (errorData.detail?.includes('Limite de')) {
        callbacks.onError(`__PLAN_LIMIT__:${errorData.detail}`)
        return
      }
    }
    callbacks.onError(`Error: ${response.status}`)
    return
  }

  const reader = response.body?.getReader()
  if (!reader) {
    callbacks.onError('No response stream')
    return
  }

  const decoder = new TextDecoder()
  let buffer = ''
  let receivedDone = false

  try {
    while (true) {
      const { done, value } = await reader.read()
      if (done) break

      buffer += decoder.decode(value, { stream: true })
      const lines = buffer.split('\n')
      buffer = lines.pop() || ''  // Última línea incompleta → al buffer

      let currentEvent = ''
      for (const line of lines) {
        if (line.startsWith('event: ')) {
          currentEvent = line.slice(7).trim()
          continue
        }
        if (line.startsWith('data: ')) {
          const parsed = JSON.parse(line.slice(6))

          // Despachar según tipo de evento
          if (currentEvent === 'token') {
            callbacks.onToken(parsed.content)
          } else if (currentEvent === 'sources') {
            callbacks.onSources(parsed.sources)
          } else if (currentEvent === 'confidence') {
            callbacks.onConfidence?.(parsed.score)
          } else if (currentEvent === 'faq_match') {
            callbacks.onFaqMatch?.(parsed)
          } else if (currentEvent === 'upgrade_required') {
            callbacks.onUpgradeRequired?.(parsed.message)
          } else if (currentEvent === 'content_blocked') {
            callbacks.onContentBlocked?.(parsed.response, parsed.type)
          } else if (currentEvent === 'error') {
            callbacks.onError(parsed.error)
          } else if (currentEvent === 'done' || 'message_id' in parsed) {
            receivedDone = true
            callbacks.onDone(parsed.message_id)
          }
          currentEvent = ''
        }
      }
    }
  } catch (err) {
    callbacks.onError(`Error de conexión: ${err}`)
    return
  }

  // Si el stream terminó sin evento "done", hubo un problema
  if (!receivedDone) {
    callbacks.onError('La conexión se interrumpió antes de completar la respuesta')
  }
}
Enter fullscreen mode Exit fullscreen mode

La interfaz de callbacks

export interface StreamCallbacks {
  onSources: (sources: Source[]) => void
  onToken: (token: string) => void
  onCodeResult: (result: CodeResult) => void
  onSearching: (query: string, round: number) => void
  onExecuting: (count: number) => void
  onSuggestions: (suggestions: string[]) => void
  onUserMessageId: (messageId: string) => void
  onFaqMatch?: (match: FAQMatch) => void
  onConfidence?: (score: number) => void
  onUpgradeRequired?: (message: string) => void
  onContentBlocked?: (response: string, blockType: string) => void
  onDone: (messageId: string) => void
  onError: (error: string) => void
}
Enter fullscreen mode Exit fullscreen mode

El composable useChat conecta estos callbacks con el estado reactivo de Vue:

await api.sendMessage(conversationId, content, kbIds, {
  onToken: (token) => {
    streamingContent.value += token
    const last = messages.value[messages.value.length - 1]
    if (last.role === 'assistant') {
      last.content = streamingContent.value
    }
  },
  onSources: (s) => {
    sources.value = [...sources.value, ...s]
  },
  onConfidence: (score) => {
    const last = messages.value[messages.value.length - 1]
    if (last.role === 'assistant') {
      last.confidenceScore = score
    }
  },
  onDone: (messageId) => {
    const last = messages.value[messages.value.length - 1]
    if (last.role === 'assistant') {
      last.id = messageId
      last.sources = sources.value
    }
    streaming.value = false
  },
  onError: (err) => {
    const last = messages.value[messages.value.length - 1]
    if (last.role === 'assistant') {
      last.content = `Error: ${err}`
    }
    streaming.value = false
  },
})
Enter fullscreen mode Exit fullscreen mode

Optimistic UI con reconciliación

Un detalle sutil: cuando el usuario envía un mensaje, lo agregamos inmediatamente a la UI con un ID temporal (crypto.randomUUID()). Pero el ID real lo genera la base de datos. El evento user_message_id sincroniza ambos:

const userMsg: Message = {
  id: crypto.randomUUID(),  // ID temporal optimista
  role: 'user',
  content,
  ...
}
messages.value.push(userMsg)

// Cuando llega el ID real del server:
onUserMessageId: (realId) => {
  userMsg.id = realId  // Reemplazar ID temporal con ID de DB
},
Enter fullscreen mode Exit fullscreen mode

Esto es importante para el feedback: cuando el usuario le da thumbs-up a un mensaje, necesita el ID real para que el PATCH llegue al mensaje correcto.


Widget embebible: SSE en vanilla JS

El widget es ~970 líneas de vanilla JS corriendo dentro de un Shadow DOM cerrado (mode: 'closed'). El parsing SSE es idéntico al del frontend TypeScript, pero con var en lugar de const y sin types:

var reader = res.body.getReader();
var decoder = new TextDecoder();
var buffer = '';
var assistantContent = '';

while (true) {
  var readResult = await reader.read();
  if (readResult.done) break;
  buffer += decoder.decode(readResult.value, { stream: true });

  var lines = buffer.split('\n');
  buffer = lines.pop() || '';

  var event = null;
  for (var i = 0; i < lines.length; i++) {
    var line = lines[i].trim();
    if (line.startsWith('event:')) {
      event = line.slice(6).trim();
    } else if (line.startsWith('data:')) {
      var data = JSON.parse(line.slice(5).trim());

      if (event === 'session') {
        sessionToken = data.session_token;
        localStorage.setItem(STORAGE_KEY, sessionToken);
      } else if (event === 'token') {
        assistantContent += (data.content || '');
        messages[assistantMsgIndex].content = assistantContent;
        renderMessages();  // Re-render en cada token
      } else if (event === 'faq_match') {
        messages.push({ role: 'assistant', content: data.answer, faqMatch: data });
        renderMessages();
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Diferencias clave con la app principal

  1. Auth: El widget usa X-API-Key (SHA-256 hasheada) en vez de JWT. No hay login.
  2. Primer evento: En vez de user_message_id, el widget recibe session con un token que se persiste en localStorage para mantener historial entre visitas.
  3. Shadow DOM cerrado: host.attachShadow({ mode: 'closed' }) aísla estilos y DOM completamente de la página host. Ni document.querySelector puede acceder al widget desde afuera.

Nginx: las 6 directivas que lo hacen funcionar

Esta es la sección que más dolores de cabeza me dio. Sin la configuración correcta de Nginx, el streaming SSE no funciona. Los tokens se acumulan en el buffer de Nginx y llegan todos juntos al final.

location /api/ {
    proxy_pass http://127.0.0.1:8000;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;

    # === Las 6 directivas SSE ===
    proxy_buffering off;             # 1. No buffear la respuesta
    proxy_cache off;                 # 2. No cachear
    proxy_read_timeout 300s;         # 3. Timeout largo (streams duran minutos)
    proxy_set_header Connection '';  # 4. Deshabilitar keep-alive del upstream
    proxy_http_version 1.1;          # 5. HTTP/1.1 (requerido para chunked)
    chunked_transfer_encoding off;   # 6. Evitar chunked encoding de Nginx
}
Enter fullscreen mode Exit fullscreen mode

Qué hace cada una

1. proxy_buffering off — La más importante. Por default, Nginx buferea la respuesta completa del upstream antes de enviarla al cliente. Con SSE, cada evento debe llegar al cliente inmediatamente.

2. proxy_cache off — Nginx puede cachear respuestas de upstream. Un stream SSE cacheado es... una muy mala idea.

3. proxy_read_timeout 300s — El default es 60 segundos. Una respuesta de LLM con búsqueda agentic (3 rounds) puede tardar 2-3 minutos. Sin este timeout, Nginx cierra la conexión a la mitad de la respuesta.

4. proxy_set_header Connection '' — Limpia el header Connection para evitar que Nginx mantenga la conexión upstream con keep-alive, lo que puede causar que los datos se retengan.

5. proxy_http_version 1.1 — HTTP/1.1 es requerido para transfer-encoding chunked. El default de Nginx para upstream es HTTP/1.0.

6. chunked_transfer_encoding off — Parece contradictorio con el punto anterior, pero esto desactiva el chunked encoding de Nginx, no del upstream. Sin esto, Nginx puede re-chunked la respuesta y alterar el timing de los eventos.

El síntoma que vas a ver sin estas directivas

Si te falta alguna, vas a ver este comportamiento:

  1. El usuario envía un mensaje
  2. El spinner de "cargando" se queda 5-30 segundos
  3. De repente, toda la respuesta aparece de golpe
  4. El streaming "funciona" en localhost (sin Nginx) pero no en producción

Si te pasa esto, es Nginx buffereando. Agregá las 6 directivas y se resuelve inmediatamente.

Cloudflare: el otro proxy invisible

Si usás Cloudflare como proxy (nube naranja), hay un detalle: Cloudflare también puede buffear responses. Pero en la práctica, Cloudflare detecta SSE automáticamente por el Content-Type: text/event-stream y desactiva el buffering. No necesité configuración especial.


Redis: concurrencia de streams

Un usuario podría abrir 10 tabs y lanzar 10 streams simultáneos. Cada stream consume recursos del LLM, conexiones de DB, y memoria. El control de concurrencia usa Redis como contador atómico:

_CONCURRENT_TTL = 300  # 5 minutos de TTL como red de seguridad

async def increment_concurrent(tenant_id: str) -> int:
    client = await get_redis()
    if client is None:
        return 0  # Fail-open: sin Redis, permitir

    redis_key = f"concurrent:{tenant_id}"
    pipe = client.pipeline()
    pipe.incr(redis_key)
    pipe.expire(redis_key, _CONCURRENT_TTL)
    results = await pipe.execute()
    return results[0]

async def decrement_concurrent(tenant_id: str) -> int:
    client = await get_redis()
    if client is None:
        return 0

    redis_key = f"concurrent:{tenant_id}"
    count = await client.decr(redis_key)
    if count <= 0:
        await client.delete(redis_key)  # Limpiar keys muertos
        return 0
    await client.expire(redis_key, _CONCURRENT_TTL)
    return count
Enter fullscreen mode Exit fullscreen mode

El patrón try/finally

El incremento y decremento están en un try/finally para garantizar que el contador se decrementa siempre, incluso si el cliente desconecta a mitad del stream:

async def event_generator():
    await increment_concurrent(str(tenant_id))
    try:
        async for event in _event_generator_inner():
            yield event
    finally:
        await decrement_concurrent(str(tenant_id))
Enter fullscreen mode Exit fullscreen mode

TTL como red de seguridad

¿Qué pasa si el servidor crashea a mitad de un stream? El decrement nunca se ejecuta y el contador queda inflado. El TTL de 5 minutos resuelve esto: si no se renueva con expire, la key se auto-elimina. El sistema se auto-sana.

Fail-open y rate limiting

Dos principios importantes:

  1. Fail-open: Si Redis está caído, el rate limiter permite todo. Prefiero servir requests sin rate limiting a rechazar todo porque Redis se cayó. Redis es un guardia, no una puerta.

  2. Sliding window por API key: Además de la concurrencia por tenant, cada API key del widget tiene su propio rate limit usando sorted sets de Redis. Timestamps como scores, zremrangebyscore para limpiar expirados, zcard para contar — todo en un pipeline atómico.


Error handling: las 3 capas

Backend: Siempre emitir done después de error. Sin esto, el frontend se queda en estado "streaming" indefinidamente:

except Exception as e:
    yield {"event": "error", "data": json.dumps({"error": f"Error del LLM: {e}"})}
    yield {"event": "done", "data": json.dumps({"message_id": "error"})}
    return
Enter fullscreen mode Exit fullscreen mode

Frontend: Detectar streams incompletos. Si el reader retorna done: true pero nunca recibimos un evento done, algo salió mal (timeout de Nginx, crash del backend):

if (!receivedDone) {
  callbacks.onError('La conexión se interrumpió antes de completar la respuesta')
}
Enter fullscreen mode Exit fullscreen mode

Pre-stream: Los errores 403 (límite de plan) llegan como HTTP normal antes del stream SSE. El frontend usa un prefijo __PLAN_LIMIT__: como convención interna para que el composable distinga errores de plan de errores genéricos y muestre UI diferente (card de upgrade vs. mensaje de error).


Números reales

Métrica Valor
Latencia primer token (Groq) ~200-400ms
Latencia primer token (Ollama local) ~1-2s
Tiempo total respuesta típica ~3-8s
Tiempo total con agentic search (3 rounds) ~15-30s
FAQ match (sin LLM) ~15ms
Cache hit (simulado) ~50ms
Overhead de Nginx SSE <1ms por evento
Concurrent streams por tenant (Free plan) 2
Rate limit API key (default) 30 req/min

Lecciones aprendidas

1. SSE > WebSockets para LLM streaming

No intentés meter WebSockets donde SSE alcanza. Para un flujo unidireccional (servidor → cliente), SSE es más simple, más compatible con proxies, y más fácil de debuggear. La reconexión automática del browser es un bonus.

2. Siempre emitir "done" al final

Sin un evento de terminación explícito, el frontend no puede distinguir entre "el stream terminó" y "el stream se cortó". Emitir done siempre — incluso después de errores — es lo que hace al protocolo robusto.

3. Nginx buffering es el enemigo silencioso

En desarrollo sin Nginx todo funciona perfecto. En producción, los tokens llegan todos juntos. La primera vez que me pasó, perdí 2 horas pensando que el problema era en el backend. Eran 6 líneas de Nginx.

4. Buffer para marcadores inline

Si tu LLM puede emitir marcadores especiales ([SEARCH:], código ejecutable, etc.), necesitás un buffer que retenga texto hasta estar seguro de que no hay un marcador parcial. Sin esto, el usuario ve artefactos como [SEAR en el chat.

5. Try/finally con contadores Redis

Todo counter que se incrementa antes de un stream debe decrementarse en un finally. No en el happy path, no en el error handler: en finally. Los clientes desconectan sin aviso, los servidores crashean, y los generadores se interrumpen.

6. Fail-open en servicios auxiliares

Redis para rate limiting debe ser fail-open. Si Redis se cae, es mejor servir sin rate limiting que rechazar todos los requests. Lo mismo aplica para logging, analytics, y cualquier servicio que no sea crítico para la respuesta.

7. fetch + ReadableStream > EventSource

EventSource es elegante pero limitado (solo GET, headers mínimos). fetch + ReadableStream requiere más código pero soporta POST, headers custom (JWT), y manejo fino de errores. Para un caso real, la flexibilidad justifica las ~30 líneas extra.


Lo que sigue

  • WebSocket upgrade opcional: Para features bidireccionales futuras (typing indicators, presencia), evaluar un upgrade selectivo sin reemplazar SSE
  • Backpressure: Si el frontend no puede renderizar tan rápido como el LLM genera, implementar señalización de backpressure
  • Compression: Evaluar SSE sobre HTTP/2 con compresión de frames para reducir bandwidth en widgets con alto tráfico

Conclusión

SSE para streaming de LLM no es difícil de implementar — pero sí es fácil de implementar mal. Las trampas están en los detalles: Nginx que buferea, generadores que no limpian contadores, frontends que no detectan streams incompletos, y widgets que no manejan todos los tipos de evento.

El protocolo de eventos custom (token, sources, confidence, done, error, etc.) es lo que transforma un stream de texto en un sistema usable: el frontend sabe qué está recibiendo y puede renderizarlo correctamente, mostrar indicadores de confianza, acumular fuentes de diferentes rounds de búsqueda, y manejar edge cases como límites de plan o contenido bloqueado.

Lo más importante: testear siempre a través de Nginx. Lo que funciona en localhost sin proxy no es representativo de producción. Esas 6 directivas de Nginx son la diferencia entre "streaming real token por token" y "todo el texto de golpe después de 10 segundos".

Top comments (0)