DEV Community

Cover image for Your Multi-Agent SSE Stream Works in Dev. Here's What Kills It in Production.
Priyank Agrawal
Priyank Agrawal

Posted on

Your Multi-Agent SSE Stream Works in Dev. Here's What Kills It in Production.

Methodology note: Intent classification numbers (25% → 8% misclassification) were measured over ~1,400 messages across two weeks using a 12-intent test set with human-labelled ground truth. Abandonment rates (68% → 31%) are session-level. Small dataset — treat as directional signals, not benchmarks.


The 90-second version

Before the deep dives — here are the six mistakes and fixes. If you only read this far, you still leave with something useful.

# Mistake Fix
01 Flat text streams break the moment structured data appears Use typed SSE events. Design the schema before writing a single agent.
02 Loose dict state in LangGraph crashes two agents downstream, mid-stream Pydantic with extra='forbid' at every agent boundary
03 run_in_executor introduces race conditions with shared mutable state Use ainvoke instead. If you can't, use per-request graph instances.
04 Catching only two exception types leaves zombie connections open Bare except Exception + finally block. Not lazy — required.
05 Free-text conversation summaries hallucinate user constraints Structured Pydantic output for summarisation — explicit fields, not prose
06 Mobile browser kills EventSource silently on app switch ReconnectingEventSource with Last-Event-ID resume, not just retry

1. Typed SSE events — design the contract before the code

Streaming a text response is trivial. Streaming structured data, UI signals, and text simultaneously over one connection is not.

When a multi-agent system runs, you have free-text chunks, typed job objects, quick-action buttons, and agent metadata all in flight at once. Streaming everything as flat text fails the moment structured data appears — you can't reliably parse a JSON object out of a partial token stream.

The mistake: Designing the SSE event schema after the agents are built. Every agent output has to be rewired to match the new contract.

The fix: Define the typed event schema first. It becomes the contract every agent writes to and every frontend listener reads from.

# backend/app/api/v1/chat.py
async def stream_agent_response(orchestrator_result):
    async for event in orchestrator_result:
        event_type = event.get("type")

        if event_type == "chunk":        # streaming text token
            yield f"event: chunk\ndata: {json.dumps({'content': event['content'], 'agent': event['agent']})}\n\n"
        elif event_type == "jobs_data":  # structured JSON — not text
            yield f"event: jobs_data\ndata: {json.dumps({'jobs': event['jobs']})}\n\n"
        elif event_type == "complete":
            yield f"event: complete\ndata: {json.dumps({'conversation_id': event['conversation_id']})}\n\n"
Enter fullscreen mode Exit fullscreen mode
// frontend — subscribe by type, not raw text
eventSource.addEventListener('chunk', (e) => appendTextChunk(JSON.parse(e.data).content));
eventSource.addEventListener('jobs_data', (e) => setJobResults(JSON.parse(e.data).jobs));
eventSource.addEventListener('complete', () => eventSource.close());
Enter fullscreen mode Exit fullscreen mode

⚠️ What this code is missing: No backpressure — a slow client + fast generator = unbounded memory buffering. No SSE heartbeat — load balancers silently kill idle connections after 30–60s. Add ": heartbeat\n\n" every 15 seconds alongside the main stream.

# Production addition — heartbeat so proxies don't kill idle connections
async def stream_with_heartbeat(generator, interval: int = 15):
    last_event = asyncio.get_event_loop().time()
    async for event in generator:
        yield event
        elapsed = asyncio.get_event_loop().time() - last_event
        if elapsed > interval:
            yield ": heartbeat\n\n"
            last_event = asyncio.get_event_loop().time()
Enter fullscreen mode Exit fullscreen mode

2. Pydantic state in LangGraph — errors at write time, not read time

The Ranking Agent reads jobs_found. But there's no contract saying what fields it contains. If the Job Search Agent wrote malformed data, the crash happens at the Ranking Agent — mid-stream, with a live user waiting, and no easy way to trace it back to the source.

The mistake: Loose dict and list types everywhere in your LangGraph state. Crashes surface two agents downstream with no clear origin.

The fix: Pydantic models at every agent boundary. Add extra='forbid' — unexpected fields become hard errors at the producing agent, not silent pass-throughs.

# state.py
from pydantic import BaseModel, ConfigDict
from typing import Optional, List

class JobResult(BaseModel):
    model_config = ConfigDict(extra='forbid')  # unexpected fields = hard error
    job_id: str
    title: str
    match_score: float
    matched_skills: List[str]

class AgentState(BaseModel):
    model_config = ConfigDict(extra='forbid')
    user_message: str
    intent: Optional[IntentData] = None
    jobs_found: List[JobResult] = []
    error: Optional[str] = None
    is_complete: bool = False
Enter fullscreen mode Exit fullscreen mode

Rule: If data crosses an agent boundary, it gets a type. No exceptions.


3. run_in_executor has a hidden race condition

FastAPI is async. LangGraph's LLM calls are, in some configurations, synchronous. Calling synchronous code inside an async endpoint blocks the entire event loop for the duration of that LLM call. The symptom: SSE connections open, the thinking event fires, then nothing for 10–15 seconds.

Before the fix: P95 latency was 3× P50.

After: 1.4× P50.

But here's what most guides skip:

⚠️ Critical caveat: run_in_executor moves code to a thread pool. If your LangGraph setup has any shared mutable state — a shared cache, a shared agent instance, anything not thread-safe — you've introduced race conditions that are invisible at low traffic and catastrophic at scale.

Prefer this:

# Use the async method if your setup supports it
result = await langgraph_chain.ainvoke(state)
Enter fullscreen mode Exit fullscreen mode

Safe fallback when ainvoke is unavailable:

from concurrent.futures import ThreadPoolExecutor
from functools import partial

_executor = ThreadPoolExecutor(max_workers=10)

async def stream_chat(request):
    # Build a FRESH graph per request — no shared state across threads
    graph = build_agent_graph()
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(_executor, partial(graph.invoke, state))
Enter fullscreen mode Exit fullscreen mode

The one-liner fix is only safe if you're certain there is no shared mutable state anywhere in the graph.


4. LLM down mid-stream — catch everything, not just two exceptions

The SSE connection is open. Intent classified. Then the LLM returns a 503. Without the right handling, the connection hangs indefinitely. After a few of these, zombie connections accumulate — open SSE connections that will never close.

The original mistake: Catching only httpx.HTTPStatusError and groq.APIStatusError. A Pydantic validation error, a plain Python bug, any other library's timeout — all bubble up uncaught, leaving the connection open forever.

async def safe_agent_stream(agent_fn, state, fallback: str):
    try:
        async for event in agent_fn(state):
            yield event
    except (httpx.HTTPStatusError, groq.APIStatusError):
        yield {"type": "error", "content": fallback, "retry": True}
    except asyncio.TimeoutError:
        yield {"type": "error", "content": "Response timed out.", "retry": True}
    except Exception:
        # Not lazy — this is the safety net for everything unexpected
        logger.exception("Unexpected agent error")
        yield {"type": "error", "content": "Something went wrong.", "retry": True}
    finally:
        # Runs regardless — frontend can always close the connection
        yield {"type": "complete", "conversation_id": state.conversation_id}
Enter fullscreen mode Exit fullscreen mode

The finally block is not optional. It guarantees the frontend receives a complete event even when everything above it fails, so it can close the connection cleanly.


5. Free-text conversation summaries hallucinate — use structured output

At turn 20, injecting full conversation history into every call costs too much. The fix is to summarise old messages and keep recent ones verbatim.

But free-text summarisation introduces a subtle failure: the summary model says "user prefers small companies". The user said "no startups under 50 people". These are opposite. The downstream agent makes recommendations based on hallucinated preferences with no way to debug it from the recent context alone.

# Wrong — free text is hard to audit and easy to distort
summary = "User is looking for Python roles at mid-sized companies..."

# Right — structured output forces explicit, auditable extraction
class ConversationSummary(BaseModel):
    job_preferences: List[str]      # ["Python", "remote", "Series B+"]
    hard_constraints: List[str]     # ["no startups under 50 people"]
    mentioned_companies: List[str]  # ["Stripe", "Notion"]
    current_goal: str

summary = summarize_to_struct(older_messages, ConversationSummary)
Enter fullscreen mode Exit fullscreen mode

summarize_to_struct is a separate LLM call using a fast, cheap model (haiku or gpt-3.5-turbo). The result is cached in your session store — don't re-summarise every turn. Structured output makes it auditable: you can log and diff summaries across turns to catch distortions before they affect downstream agents.


6. Mobile SSE dies silently — you need resume, not just retry

On desktop, SSE is reliable. On mobile, background/foreground app switching kills EventSource silently — no error event, no warning. The user returns to a chat with no response.

A naive retry loop reconnects from scratch and replays events the user already saw. What you actually need is resume from the last received event.

class ReconnectingEventSource {
    constructor(url, options = {}) {
        this.url = url;
        this.lastEventId = null;
        this.retryDelay = 1000;
        this.maxDelay = 30000;
        this.maxAttempts = 5;
        this.attempts = 0;
        this.closed = false;
        this.handlers = {};
        this.connect();
    }

    connect() {
        // Pass Last-Event-ID so server resumes from the right point
        const url = this.lastEventId
            ? `${this.url}&lastEventId=${this.lastEventId}`
            : this.url;

        this.es = new EventSource(url);

        this.es.addEventListener('complete', () => {
            this.closed = true;
            this.es.close();
        });

        this.es.onerror = () => {
            if (!this.closed) { this.es.close(); this.scheduleReconnect(); }
        };

        Object.entries(this.handlers).forEach(([type, fn]) => {
            this.es.addEventListener(type, (e) => {
                if (e.lastEventId) this.lastEventId = e.lastEventId; // track position
                fn(e);
            });
        });
    }

    scheduleReconnect() {
        if (this.attempts >= this.maxAttempts) {
            this.handlers['give_up']?.();
            return;
        }
        const delay = Math.min(this.retryDelay * 2 ** this.attempts++, this.maxDelay);
        setTimeout(() => this.connect(), delay);
    }

    addEventListener(type, fn) {
        this.handlers[type] = fn;
        this.es?.addEventListener(type, fn);
    }
}
Enter fullscreen mode Exit fullscreen mode

Server side: Emit an id: field with every SSE event. On reconnect with lastEventId, replay only the missed events from a short-lived Redis buffer (60s TTL). If resume complexity isn't worth it, the honest fallback is a "connection dropped — tap to retry" button with the last message pre-filled.


7. Intent classification — the hard cases and fixes

"Find me Python jobs" → job search. "Write a cold email" → email generator. The hard cases:

  • "I applied to Google last week, any updates?"followup or general?
  • "Help me with the Stripe interview"interview_prep or general?
  • "Yo" — ???

Initial misclassification rate on ambiguous inputs: ~25%. Three changes reduced it to under 8%:

1. Explicit "when to use" descriptions in the classifier prompt:

INTENT_DESCRIPTIONS = {
    "followup": "User asking about an application they ALREADY submitted. Keywords: 'I applied', 'heard back', 'status'.",
    "job_search": "User wants to DISCOVER new jobs not yet applied to. Keywords: 'find', 'show me', 'any jobs'.",
    "skill_gap": "User wants to know what's MISSING. Keywords: 'missing', 'improve', 'gap'.",
}
Enter fullscreen mode Exit fullscreen mode

2. Confidence scores with fallback:

class IntentClassification(BaseModel):
    intent: str
    confidence: float   # 0.0 to 1.0
    reasoning: str      # forces the model to show its work

if classification.confidence < 0.70:
    return route_to_general(message, context)
Enter fullscreen mode Exit fullscreen mode

3. Previous intent as context:

context_hint = f"Previous intents: {', '.join(recent_intents[-3:])}"
# If last intent was "followup" and user replies "any updates?" —
# prior context makes correct classification almost automatic
Enter fullscreen mode Exit fullscreen mode

8. Low confidence + open question = 68% abandonment

When intent confidence falls below 0.70 and the system asks "what are you looking for?" — 68% of sessions end there. Replacing the open question with three specific quick-reply buttons dropped abandonment to 31%.

if classification.confidence < 0.70:
    return {
        "text": "I can help with a few things — pick one:",
        "suggestions": [
            "Find me jobs matching my profile",
            "Write a cold email for a role",
            "What skills should I add to my resume?"
        ]
        # Personalise from user_profile in production
        # Don't show "add skills" to a user whose resume is already uploaded
    }
Enter fullscreen mode Exit fullscreen mode

On the numbers: ~1,400 sessions, two weeks, single product. Directional, not benchmarks. What generalises: open question → blank form feeling → abandonment. Specific options → momentum → engagement.


9. Build these on day one — retrofitting is painful

Six decisions that are cheap upfront and expensive to add later:

  • SSE event schema first. Changing the contract mid-build means rewiring every agent output and every frontend listener simultaneously.
  • Pydantic + extra='forbid' everywhere. Retrofitting types into a working dict-based state means rewriting agents that already have subtle state-sharing bugs you haven't found yet.
  • Bare except + finally from day one. Zombie connections are hard to detect (they look like slow users) and hard to purge once accumulating.
  • Log every intent classification with confidence score. After two weeks of logs, the classifier's weak spots become obvious. You can't improve what you didn't measure.
  • Structured summarisation, not free text. Switching mid-product means auditing every cached summary in your session store for distortions.
  • ReconnectingEventSource before mobile launch. Retrofitting reconnection touches every event handler in the frontend. It's never a small change once you have production listeners.


Quick poll for the comments — which of these hit you in production?

  • Zombie SSE connections that never closed
  • Loose dict state crashing mid-stream
  • The run_in_executor race condition
  • Mobile EventSource dying silently
  • Free-text summaries hallucinating user constraints
  • Something not on this list ⬇️

Found a failure mode I missed? Drop it in the comments — I'll add it to the article with credit.

Top comments (0)