I implemented end-to-end SSE streaming for a RAG engine: an async generator in FastAPI emits custom events (token, sources, confidence, done, error, etc.), the frontend consumes them with fetch + ReadableStream, an embeddable widget in vanilla JS does the same inside a closed Shadow DOM, and Nginx needs 6 specific directives to not ruin everything. Here's the real code, the event protocol, and the lessons learned.
Why SSE and Not WebSockets
When you need streaming in a RAG app, the first question is: WebSockets or SSE.
For an LLM chat, the answer is clear: SSE.
| Criterion | SSE | WebSockets |
|---|---|---|
| Flow direction | Server → client (unidirectional) | Bidirectional |
| Protocol | Standard HTTP | Custom protocol (ws://) |
| Reconnection | Automatic (built-in) | Manual |
| Proxy/CDN compat | Excellent (it's HTTP) | Problematic |
| Server complexity | Low (async generator) | High (state management) |
| Auth | Normal HTTP headers | Hack in query params or first message |
A RAG chat is fundamentally unidirectional during streaming: the user sends a message (a normal POST) and the server responds with a stream of tokens. You don't need a permanent bidirectional channel.
Also, SSE travels over standard HTTP. That means it works with any reverse proxy, CDN, load balancer, and firewall without special configuration (beyond disabling buffering, which we'll cover).
With WebSockets you'd need to handle: manual reconnection, handshake authentication, connection state, heartbeats, and proxies that close idle connections. All of that is complexity that adds nothing in this use case.
The Stack
┌─────────────┐ POST /messages ┌─────────────┐
│ Browser │ ──────────────────────►│ Nginx │
│ (Vue/Widget)│ │ (reverse │
│ │◄──── SSE stream ───────│ proxy) │
└─────────────┘ └──────┬──────┘
│
┌──────▼──────┐
│ FastAPI │
│ (uvicorn) │
│ │
│ async gen │──► Groq/OpenAI/Ollama
│ → SSE │──► PostgreSQL
│ │──► Redis
└─────────────┘
Backend: The Async Generator That Emits SSE
The Base: EventSourceResponse
FastAPI doesn't have native SSE support, but sse-starlette solves it in one line:
from sse_starlette.sse import EventSourceResponse
@router.post("/conversations/{conv_id}/messages")
async def send_message(conv_id: UUID, data: MessageCreate, ...):
# ... validations, RAG search, etc.
async def event_generator():
async for event in _build_stream(conv_id, data, ...):
yield event
return EventSourceResponse(event_generator())
EventSourceResponse takes an async generator and converts it to an SSE stream with the correct format:
event: token
data: {"content": "The "}
event: token
data: {"content": "answer "}
event: token
data: {"content": "is..."}
event: done
data: {"message_id": "abc-123"}
Each yield from the generator becomes an event: + data: block separated by \n\n.
The Event Protocol
I designed a protocol with 10 event types, each with a specific JSON payload:
# SSE Protocol Events
# ─────────────────────────────────────────
# user_message_id → Real ID of message saved in DB
# sources → Chunks retrieved from RAG pipeline
# confidence → Confidence score (0-1) from reranking
# token → Text fragment from LLM
# faq_match → Direct FAQ answer (no LLM)
# searching → Agentic search in progress (round N)
# suggestions → Suggested follow-up questions
# error → Error during streaming
# upgrade_required → Plan limit reached
# content_blocked → Content blocked by rules
# done → Stream finished, with message_id
Why typed events and not just data:? Because the frontend needs to know what it's receiving before parsing it. A token renders differently than a source, an error is handled differently than a confidence event. Without types, the frontend would have to guess the JSON content.
The Full Generator
Here's the real generator (simplified for readability, but with the exact structure):
async def event_generator():
# Track concurrent stream with Redis
await increment_concurrent(str(tenant_id))
try:
async for event in _event_generator_inner():
yield event
finally:
# ALWAYS decrement, even if client disconnects
await decrement_concurrent(str(tenant_id))
async def _event_generator_inner():
# 1. Real user message ID (to sync with optimistic UI)
yield {
"event": "user_message_id",
"data": json.dumps({"message_id": str(user_msg.id)})
}
# 2. Sources retrieved from RAG pipeline
yield {
"event": "sources",
"data": json.dumps({"sources": sources})
}
# 3. Confidence score
yield {
"event": "confidence",
"data": json.dumps({"score": round(confidence, 3)})
}
# 4. LLM tokens (the heart of streaming)
full_response = ""
buffer = ""
stream = stream_chat_response(query, sources, history, provider, ...)
if inspect.isasyncgen(stream):
async for token in stream:
buffer += token
full_response += token
# Buffer for agentic search: don't emit if [SEARCH:] might be coming
if buffer and '[' not in buffer[-20:]:
yield {
"event": "token",
"data": json.dumps({"content": buffer})
}
buffer = ""
else:
# Flush final buffer
if buffer:
yield {
"event": "token",
"data": json.dumps({"content": buffer})
}
else:
# Synchronous stream (Groq, OpenAI)
for token in stream:
buffer += token
full_response += token
if buffer and '[' not in buffer[-20:]:
yield {
"event": "token",
"data": json.dumps({"content": buffer})
}
buffer = ""
else:
if buffer:
yield {
"event": "token",
"data": json.dumps({"content": buffer})
}
# 5. Follow-up suggestions (post-stream)
suggestion_list = await generate_suggestions(full_response, sources, provider)
if suggestion_list:
yield {
"event": "suggestions",
"data": json.dumps({"suggestions": suggestion_list})
}
# 6. Save message and signal completion
msg = await chat_service.save_message(db, conv_id, MessageRole.ASSISTANT, full_response, ...)
yield {
"event": "done",
"data": json.dumps({"message_id": str(msg.id)})
}
The Buffer Detail for Agentic Search
The system supports agentic search: the LLM can emit [SEARCH: new query] mid-response to request more information. The problem: if you emit tokens one by one, the frontend could render a partial [SEARCH: as visible text.
The solution is a buffer that holds the last 20 characters if they contain [:
if buffer and '[' not in buffer[-20:]:
yield {"event": "token", "data": json.dumps({"content": buffer})}
buffer = ""
If the full marker [SEARCH: ...] is detected, the stream is interrupted, a new search is executed, and generation continues with expanded context:
match = SEARCH_PATTERN.search(accumulated)
if match and search_round < MAX_SEARCH_ROUNDS:
search_query = match.group(1).strip()
search_round += 1
yield {"event": "searching", "data": json.dumps({
"query": search_query, "round": search_round
})}
# Execute new search with the LLM's query
new_sources = await search_chunks(db, search_query, kb_ids, rag_config=rag_config)
# Deduplicate sources
existing_ids = {s["chunk_id"] for s in all_sources}
unique_new = [s for s in new_sources if s["chunk_id"] not in existing_ids]
if unique_new:
yield {"event": "sources", "data": json.dumps({
"sources": unique_new, "round": search_round
})}
# Continue generation with expanded context
# (max 3 search rounds)
Short-Circuits: FAQ, Cache and Upgrade
Before the full RAG pipeline, there are several "shortcuts" that return a different EventSourceResponse:
# FAQ match → instant response without LLM
if faq_match:
async def faq_event_generator():
yield {"event": "user_message_id", "data": json.dumps({"message_id": str(user_msg.id)})}
yield {"event": "faq_match", "data": json.dumps(faq_match)}
yield {"event": "done", "data": json.dumps({"message_id": str(msg.id)})}
return EventSourceResponse(faq_event_generator())
# Cache hit → cached response, simulate streaming
if cache_hit:
async def cache_event_generator():
yield {"event": "sources", "data": json.dumps({"sources": cache_hit["sources"]})}
yield {"event": "confidence", "data": json.dumps({"score": cache_hit["confidence"]})}
# Simulate streaming word by word
for token in cache_hit["response_text"].split(" "):
yield {"event": "token", "data": json.dumps({"content": token + " "})}
yield {"event": "done", "data": json.dumps({"message_id": str(msg.id), "cached": True})}
return EventSourceResponse(cache_event_generator())
# Budget exhausted → upgrade signal
if faq_only_mode:
async def upgrade_event_generator():
yield {"event": "upgrade_required", "data": json.dumps({
"message": "You've reached your AI query limit for this plan."
})}
yield {"event": "done", "data": json.dumps({"message_id": str(msg.id)})}
return EventSourceResponse(upgrade_event_generator())
Why simulate streaming for cache hits? Because the frontend has a single rendering path. If cache returned all the text at once, it would need special logic. Simulating streaming keeps the frontend code simple.
Streaming from LLM Providers
Each provider has its own format. The abstraction layer unifies everything into generators that yield strings:
# Groq and OpenAI: SYNCHRONOUS generator (SDK compatible)
def _stream_groq(messages, temperature=0.3, max_tokens=4096):
stream = groq_client.chat.completions.create(
model=settings.groq_model, messages=messages,
stream=True, temperature=temperature, max_tokens=max_tokens,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
# Ollama: ASYNC generator (NDJSON over HTTP streaming)
async def _stream_ollama(messages, temperature=0.3, max_tokens=4096):
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
async with client.stream("POST", f"{settings.ollama_url}/api/chat",
json={"model": settings.ollama_model,
"messages": messages, "stream": True}) as response:
async for line in response.aiter_lines():
data = json.loads(line)
content = data.get("message", {}).get("content", "")
if content:
yield content
if data.get("done"):
break
The key detail: Groq/OpenAI return synchronous generators, Ollama returns async. The SSE generator needs to handle both with inspect.isasyncgen(). It's ugly, but wrapping sync in async adds complexity and indirection that makes debugging harder.
Frontend: Consuming with fetch + ReadableStream
Why Not EventSource
The browser's EventSource API has a fatal flaw for this use case: it only supports GET. Our chat endpoint is a POST with JSON body (message content, knowledge base IDs, provider, etc.).
The alternative: fetch + ReadableStream. More code, but total control.
The SSE Parser
export async function sendMessage(
convId: string,
content: string,
knowledgeBaseIds: string[],
callbacks: StreamCallbacks,
provider?: string,
): Promise<void> {
const token = localStorage.getItem('access_token')
const response = await fetch(`/api/v1/conversations/${convId}/messages`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(token ? { Authorization: `Bearer ${token}` } : {}),
},
body: JSON.stringify({
content,
knowledge_base_ids: knowledgeBaseIds,
provider,
}),
})
if (!response.ok) {
// Detect plan limit error (403)
if (response.status === 403) {
const errorData = await response.json()
if (errorData.detail?.includes('limit')) {
callbacks.onError(`__PLAN_LIMIT__:${errorData.detail}`)
return
}
}
callbacks.onError(`Error: ${response.status}`)
return
}
const reader = response.body?.getReader()
if (!reader) {
callbacks.onError('No response stream')
return
}
const decoder = new TextDecoder()
let buffer = ''
let receivedDone = false
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || '' // Last incomplete line → to buffer
let currentEvent = ''
for (const line of lines) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim()
continue
}
if (line.startsWith('data: ')) {
const parsed = JSON.parse(line.slice(6))
// Dispatch by event type
if (currentEvent === 'token') {
callbacks.onToken(parsed.content)
} else if (currentEvent === 'sources') {
callbacks.onSources(parsed.sources)
} else if (currentEvent === 'confidence') {
callbacks.onConfidence?.(parsed.score)
} else if (currentEvent === 'faq_match') {
callbacks.onFaqMatch?.(parsed)
} else if (currentEvent === 'upgrade_required') {
callbacks.onUpgradeRequired?.(parsed.message)
} else if (currentEvent === 'content_blocked') {
callbacks.onContentBlocked?.(parsed.response, parsed.type)
} else if (currentEvent === 'error') {
callbacks.onError(parsed.error)
} else if (currentEvent === 'done' || 'message_id' in parsed) {
receivedDone = true
callbacks.onDone(parsed.message_id)
}
currentEvent = ''
}
}
}
} catch (err) {
callbacks.onError(`Connection error: ${err}`)
return
}
// If stream ended without "done" event, something went wrong
if (!receivedDone) {
callbacks.onError('Connection was interrupted before completing the response')
}
}
The Callbacks Interface
export interface StreamCallbacks {
onSources: (sources: Source[]) => void
onToken: (token: string) => void
onCodeResult: (result: CodeResult) => void
onSearching: (query: string, round: number) => void
onExecuting: (count: number) => void
onSuggestions: (suggestions: string[]) => void
onUserMessageId: (messageId: string) => void
onFaqMatch?: (match: FAQMatch) => void
onConfidence?: (score: number) => void
onUpgradeRequired?: (message: string) => void
onContentBlocked?: (response: string, blockType: string) => void
onDone: (messageId: string) => void
onError: (error: string) => void
}
The useChat composable connects these callbacks with Vue's reactive state:
await api.sendMessage(conversationId, content, kbIds, {
onToken: (token) => {
streamingContent.value += token
const last = messages.value[messages.value.length - 1]
if (last.role === 'assistant') {
last.content = streamingContent.value
}
},
onSources: (s) => {
sources.value = [...sources.value, ...s]
},
onConfidence: (score) => {
const last = messages.value[messages.value.length - 1]
if (last.role === 'assistant') {
last.confidenceScore = score
}
},
onDone: (messageId) => {
const last = messages.value[messages.value.length - 1]
if (last.role === 'assistant') {
last.id = messageId
last.sources = sources.value
}
streaming.value = false
},
onError: (err) => {
const last = messages.value[messages.value.length - 1]
if (last.role === 'assistant') {
last.content = `Error: ${err}`
}
streaming.value = false
},
})
Optimistic UI with Reconciliation
A subtle detail: when the user sends a message, we add it immediately to the UI with a temporary ID (crypto.randomUUID()). But the real ID is generated by the database. The user_message_id event syncs both:
const userMsg: Message = {
id: crypto.randomUUID(), // Optimistic temporary ID
role: 'user',
content,
...
}
messages.value.push(userMsg)
// When the real ID arrives from the server:
onUserMessageId: (realId) => {
userMsg.id = realId // Replace temporary ID with DB ID
},
This matters for feedback: when the user gives a thumbs-up to a message, it needs the real ID so the PATCH reaches the correct message.
Embeddable Widget: SSE in Vanilla JS
The widget is ~970 lines of vanilla JS running inside a closed Shadow DOM (mode: 'closed'). The SSE parsing is identical to the TypeScript frontend, but with var instead of const and no types:
var reader = res.body.getReader();
var decoder = new TextDecoder();
var buffer = '';
var assistantContent = '';
while (true) {
var readResult = await reader.read();
if (readResult.done) break;
buffer += decoder.decode(readResult.value, { stream: true });
var lines = buffer.split('\n');
buffer = lines.pop() || '';
var event = null;
for (var i = 0; i < lines.length; i++) {
var line = lines[i].trim();
if (line.startsWith('event:')) {
event = line.slice(6).trim();
} else if (line.startsWith('data:')) {
var data = JSON.parse(line.slice(5).trim());
if (event === 'session') {
sessionToken = data.session_token;
localStorage.setItem(STORAGE_KEY, sessionToken);
} else if (event === 'token') {
assistantContent += (data.content || '');
messages[assistantMsgIndex].content = assistantContent;
renderMessages(); // Re-render on each token
} else if (event === 'faq_match') {
messages.push({ role: 'assistant', content: data.answer, faqMatch: data });
renderMessages();
}
}
}
}
Key Differences from the Main App
-
Auth: The widget uses
X-API-Key(SHA-256 hashed) instead of JWT. No login. -
First event: Instead of
user_message_id, the widget receivessessionwith a token persisted inlocalStorageto maintain history across visits. -
Closed Shadow DOM:
host.attachShadow({ mode: 'closed' })completely isolates styles and DOM from the host page. Not evendocument.querySelectorcan access the widget from outside.
Nginx: The 6 Directives That Make It Work
This is the section that caused me the most headaches. Without the correct Nginx configuration, SSE streaming doesn't work. Tokens accumulate in Nginx's buffer and arrive all at once at the end.
location /api/ {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# === The 6 SSE directives ===
proxy_buffering off; # 1. Don't buffer the response
proxy_cache off; # 2. Don't cache
proxy_read_timeout 300s; # 3. Long timeout (streams last minutes)
proxy_set_header Connection ''; # 4. Disable upstream keep-alive
proxy_http_version 1.1; # 5. HTTP/1.1 (required for chunked)
chunked_transfer_encoding off; # 6. Disable Nginx's chunked encoding
}
What Each One Does
1. proxy_buffering off — The most important one. By default, Nginx buffers the complete upstream response before sending it to the client. With SSE, each event must reach the client immediately.
2. proxy_cache off — Nginx can cache upstream responses. A cached SSE stream is... a very bad idea.
3. proxy_read_timeout 300s — The default is 60 seconds. An LLM response with agentic search (3 rounds) can take 2-3 minutes. Without this timeout, Nginx closes the connection mid-response.
4. proxy_set_header Connection '' — Clears the Connection header to prevent Nginx from maintaining the upstream connection with keep-alive, which can cause data to be held back.
5. proxy_http_version 1.1 — HTTP/1.1 is required for chunked transfer-encoding. Nginx's default for upstream is HTTP/1.0.
6. chunked_transfer_encoding off — Seems contradictory to the previous point, but this disables chunked encoding from Nginx, not from upstream. Without this, Nginx can re-chunk the response and alter event timing.
The Symptom You'll See Without These Directives
If you're missing any of them, you'll see this behavior:
- The user sends a message
- The "loading" spinner stays for 5-30 seconds
- Suddenly, the entire response appears at once
- Streaming "works" on localhost (without Nginx) but not in production
If this happens to you, it's Nginx buffering. Add the 6 directives and it resolves immediately.
Cloudflare: The Other Invisible Proxy
If you use Cloudflare as proxy (orange cloud), there's a detail: Cloudflare can also buffer responses. But in practice, Cloudflare detects SSE automatically by the Content-Type: text/event-stream and disables buffering. No special configuration needed.
Redis: Stream Concurrency
A user could open 10 tabs and launch 10 simultaneous streams. Each stream consumes LLM resources, DB connections, and memory. Concurrency control uses Redis as an atomic counter:
_CONCURRENT_TTL = 300 # 5-minute TTL as safety net
async def increment_concurrent(tenant_id: str) -> int:
client = await get_redis()
if client is None:
return 0 # Fail-open: without Redis, allow
redis_key = f"concurrent:{tenant_id}"
pipe = client.pipeline()
pipe.incr(redis_key)
pipe.expire(redis_key, _CONCURRENT_TTL)
results = await pipe.execute()
return results[0]
async def decrement_concurrent(tenant_id: str) -> int:
client = await get_redis()
if client is None:
return 0
redis_key = f"concurrent:{tenant_id}"
count = await client.decr(redis_key)
if count <= 0:
await client.delete(redis_key) # Clean dead keys
return 0
await client.expire(redis_key, _CONCURRENT_TTL)
return count
The try/finally Pattern
Increment and decrement are in a try/finally to guarantee the counter decrements always, even if the client disconnects mid-stream:
async def event_generator():
await increment_concurrent(str(tenant_id))
try:
async for event in _event_generator_inner():
yield event
finally:
await decrement_concurrent(str(tenant_id))
TTL as Safety Net
What happens if the server crashes mid-stream? The decrement never executes and the counter stays inflated. The 5-minute TTL solves this: if it's not renewed with expire, the key self-deletes. The system self-heals.
Fail-Open and Rate Limiting
Two important principles:
Fail-open: If Redis is down, the rate limiter allows everything. I prefer serving requests without rate limiting to rejecting everything because Redis went down. Redis is a guard, not a gate.
Sliding window per API key: Besides per-tenant concurrency, each widget API key has its own rate limit using Redis sorted sets. Timestamps as scores,
zremrangebyscoreto clean expired entries,zcardto count — all in an atomic pipeline.
Error Handling: The 3 Layers
Backend: Always emit done after error. Without this, the frontend gets stuck in "streaming" state indefinitely:
except Exception as e:
yield {"event": "error", "data": json.dumps({"error": f"LLM error: {e}"})}
yield {"event": "done", "data": json.dumps({"message_id": "error"})}
return
Frontend: Detect incomplete streams. If the reader returns done: true but we never received a done event, something went wrong (Nginx timeout, backend crash):
if (!receivedDone) {
callbacks.onError('Connection was interrupted before completing the response')
}
Pre-stream: 403 errors (plan limit) arrive as normal HTTP before the SSE stream. The frontend uses a __PLAN_LIMIT__: prefix as internal convention so the composable distinguishes plan errors from generic errors and shows different UI (upgrade card vs. error message).
Real Numbers
| Metric | Value |
|---|---|
| First token latency (Groq) | ~200-400ms |
| First token latency (Ollama local) | ~1-2s |
| Typical total response time | ~3-8s |
| Total time with agentic search (3 rounds) | ~15-30s |
| FAQ match (no LLM) | ~15ms |
| Cache hit (simulated) | ~50ms |
| Nginx SSE overhead | <1ms per event |
| Concurrent streams per tenant (Free plan) | 2 |
| API key rate limit (default) | 30 req/min |
Lessons Learned
1. SSE > WebSockets for LLM Streaming
Don't try to shove WebSockets where SSE is sufficient. For a unidirectional flow (server → client), SSE is simpler, more proxy-compatible, and easier to debug. The browser's automatic reconnection is a bonus.
2. Always Emit "done" at the End
Without an explicit termination event, the frontend can't distinguish between "the stream ended" and "the stream was cut off". Emitting done always — even after errors — is what makes the protocol robust.
3. Nginx Buffering Is the Silent Enemy
In development without Nginx everything works perfectly. In production, tokens arrive all at once. The first time it happened, I spent 2 hours thinking the problem was in the backend. It was 6 lines of Nginx.
4. Buffer for Inline Markers
If your LLM can emit special markers ([SEARCH:], executable code, etc.), you need a buffer that holds text until you're sure there's no partial marker. Without this, the user sees artifacts like [SEAR in the chat.
5. Try/Finally with Redis Counters
Every counter that's incremented before a stream must be decremented in a finally. Not in the happy path, not in the error handler: in finally. Clients disconnect without warning, servers crash, and generators get interrupted.
6. Fail-Open for Auxiliary Services
Redis for rate limiting should be fail-open. If Redis goes down, it's better to serve without rate limiting than to reject all requests. The same applies to logging, analytics, and any non-critical service.
7. fetch + ReadableStream > EventSource
EventSource is elegant but limited (GET only, minimal headers). fetch + ReadableStream requires more code but supports POST, custom headers (JWT), and fine-grained error handling. For a real-world case, the flexibility justifies the ~30 extra lines.
What's Next
- Optional WebSocket upgrade: For future bidirectional features (typing indicators, presence), evaluate a selective upgrade without replacing SSE
- Backpressure: If the frontend can't render as fast as the LLM generates, implement backpressure signaling
- Compression: Evaluate SSE over HTTP/2 with frame compression to reduce bandwidth for high-traffic widgets
Conclusion
SSE for LLM streaming isn't hard to implement — but it's easy to implement poorly. The pitfalls are in the details: Nginx that buffers, generators that don't clean up counters, frontends that don't detect incomplete streams, and widgets that don't handle all event types.
The custom event protocol (token, sources, confidence, done, error, etc.) is what transforms a text stream into a usable system: the frontend knows what it's receiving and can render it correctly, show confidence indicators, accumulate sources from different search rounds, and handle edge cases like plan limits or blocked content.
Most importantly: always test through Nginx. What works on localhost without a proxy is not representative of production. Those 6 Nginx directives are the difference between "real token-by-token streaming" and "all text at once after 10 seconds".
Top comments (0)