DEV Community

Manfred Macx
Manfred Macx

Posted on

Your Agent Streams Text But Breaks on Tool Calls. Here's the Fix.

Streaming tokens from an LLM is easy. You get a callback per token, you push it to the client, done.

Then you add tool calls.

The LLM starts streaming a tool input JSON character by character. You need to execute the tool (blocking, could take 3 seconds). Then you resume streaming. Meanwhile, the client is sitting there wondering if the connection dropped.

Then you add multi-agent pipelines. Agent A streams into Agent B streams into Agent C. Which events does the UI show? All of them? Just the final output?

Then a user's browser tab goes to sleep and they miss 40% of the stream. They refresh. Do they start over or resume?

These are the failure modes that hit production streaming agents. Here's how to handle all of them.


Start With the Event Envelope

Don't pipe raw LLM tokens to your client. Normalize everything to a typed event:

class EventType(str, Enum):
    TEXT_DELTA = "text_delta"
    TEXT_DONE = "text_done"
    TOOL_CALL_START = "tool_call_start"
    TOOL_CALL_INPUT = "tool_call_input"
    TOOL_CALL_DONE = "tool_call_done"
    TOOL_RESULT = "tool_result"
    AGENT_DONE = "agent_done"
    ERROR = "error"
    FATAL = "fatal"

@dataclass
class StreamEvent:
    type: EventType
    data: Any
    agent_id: str
    turn_id: str
    sequence: int          # Monotonic — clients can detect dropped events
    timestamp_ms: int
    tool_call_id: Optional[str] = None
Enter fullscreen mode Exit fullscreen mode

The sequence field is critical. It lets clients detect gaps and request replay (more on that later).

The LLM provider's event format changes. Anthropic changed their streaming API format twice in 2024. If your frontend depends on it directly, you're rewriting the frontend every time. Normalize at the boundary.


The Tool Call State Machine

This is the hard part. When you're streaming and the LLM decides to call a tool, the stream pauses. The LLM streams the tool input JSON incrementally, you accumulate it, parse it when complete, execute the tool, then resume.

class ToolCallAccumulator:
    def __init__(self, tool_call_id: str, tool_name: str):
        self.tool_call_id = tool_call_id
        self.tool_name = tool_name
        self._input_buffer = ""

    def append(self, delta: str) -> None:
        self._input_buffer += delta

    def finalize(self) -> dict:
        try:
            return json.loads(self._input_buffer)
        except json.JSONDecodeError:
            return self._repair_json(self._input_buffer)

    def _repair_json(self, partial: str) -> dict:
        # Close open strings and braces
        if partial.count('"') % 2 != 0:
            partial += '"'
        open_braces = partial.count('{') - partial.count('}')
        partial += '}' * open_braces
        try:
            return json.loads(partial)
        except json.JSONDecodeError:
            return {"_parse_error": True, "raw": partial}
Enter fullscreen mode Exit fullscreen mode

The JSON repair is not theoretical. LLMs occasionally get cut off mid-JSON if max_tokens is too low or if there's a network hiccup. Better to get partial input than to crash.

The full streaming loop with tool calls runs as an async generator:

async def stream_with_tool_calls(messages, tools, tool_executor, ...) -> AsyncGenerator[StreamEvent, None]:
    while True:  # Agentic loop
        async with client.messages.stream(model="...", messages=messages, tools=tools) as stream:
            async for event in stream:
                if tool_input_delta:
                    yield StreamEvent(type=TOOL_CALL_INPUT, ...)
                elif text_delta:
                    yield StreamEvent(type=TEXT_DELTA, data=event.delta.text)

        if final_message.stop_reason != "tool_use":
            yield StreamEvent(type=AGENT_DONE, ...)
            break

        # Execute all tool calls concurrently, yield results
        tool_results = await asyncio.gather(*[execute_one(tc) for tc in tool_calls])
        for result in tool_results:
            yield StreamEvent(type=TOOL_RESULT, ...)

        # Continue loop with tool results appended to messages
        messages = messages + [assistant_message, tool_results_message]
Enter fullscreen mode Exit fullscreen mode

Note the asyncio.gather for concurrent tool execution — if the LLM calls three tools in parallel, execute them in parallel.


SSE vs WebSocket

Use SSE when:

  • Agent output is one-directional (agent → user)
  • You're building a stateless API
  • You want simplicity (SSE is just HTTP)
  • HTTP/2 multiplexing handles concurrent streams

Use WebSocket when:

  • Users need to interrupt mid-stream ("stop, that's wrong")
  • Multi-turn conversations need low-latency input
  • You need bidirectional control messages

SSE setup:

@app.get("/agent/stream/{turn_id}")
async def stream_events(turn_id: str):
    async def event_generator():
        while True:
            try:
                event = await asyncio.wait_for(queue.get(), timeout=30.0)
            except asyncio.TimeoutError:
                yield ": keepalive\n\n"  # Prevents proxy timeout
                continue

            if event is None:  # Sentinel
                yield "data: {\"type\": \"done\"}\n\n"
                break

            yield event.to_sse()  # "data: {...}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # CRITICAL: disable nginx buffering
        }
    )
Enter fullscreen mode Exit fullscreen mode

The X-Accel-Buffering: no header is frequently forgotten. Without it, nginx buffers your SSE response and the client gets everything at once at the end, defeating the purpose.


Backpressure: When the Client Is Slower Than the LLM

At ~80 tokens/second, an LLM can produce faster than a mobile client can render or a slow network can deliver. Without backpressure, you get either memory exhaustion (unbounded queue) or event drops (silent data loss).

class BackpressureStream:
    def __init__(self, max_buffer: int = 100, high_watermark: float = 0.8):
        self.high_watermark = int(max_buffer * high_watermark)
        self._queue: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)
        self._slow_down = asyncio.Event()

    async def produce(self, event: StreamEvent) -> bool:
        if self._queue.qsize() >= self.high_watermark:
            self._slow_down.set()
        if self._queue.full():
            self._dropped += 1
            return False  # Drop the event
        await self._queue.put(event)
        return True

    def should_slow_down(self) -> bool:
        return self._slow_down.is_set()

# Producer respects backpressure:
async for event in stream_with_tool_calls(...):
    success = await stream.produce(event)
    if stream.should_slow_down():
        await asyncio.sleep(0.05)  # 50ms throttle
Enter fullscreen mode Exit fullscreen mode

For text deltas specifically, you can also coalesce: buffer tokens and flush every 50ms instead of every token. Reduces SSE overhead at the cost of slightly less "live" feel.


Stream Replay: Handling Disconnects

Browser tabs go to sleep. Mobile connections drop. Users scroll away and come back.

SSE has a built-in mechanism: Last-Event-ID. When an EventSource reconnects, the browser automatically sends the last event ID it received as a header. Your server can replay from there.

But this only works if you've persisted the events:

class StreamReplayBuffer:
    async def save_event(self, event: StreamEvent):
        key = f"stream:{event.turn_id}:events"
        await self.redis.rpush(key, event.to_sse())
        await self.redis.expire(key, 3600)  # 1 hour TTL

    async def replay_from(self, turn_id: str, last_seen_sequence: int) -> list[str]:
        key = f"stream:{turn_id}:events"
        all_events_raw = await self.redis.lrange(key, 0, -1)

        return [
            raw for raw in all_events_raw
            if json.loads(raw.replace("data: ", "")).get("seq", -1) > last_seen_sequence
        ]

@app.get("/agent/stream/{turn_id}")
async def stream_with_replay(turn_id: str, last_event_id: Optional[str] = None, ...):
    last_seq = int(last_event_id) if last_event_id else -1

    async def event_generator():
        # Replay missed events first
        for raw_event in await replay_buffer.replay_from(turn_id, last_seq):
            yield raw_event

        # Then continue live stream
        ...
Enter fullscreen mode Exit fullscreen mode

The JavaScript EventSource sends Last-Event-ID automatically, but only if you're setting event.id or using the id: field in SSE format. Add sequence numbers to your event IDs.


Streaming UI: The Incremental Renderer

DOM manipulation at 80 tokens/second is expensive. Don't update the DOM on every event.

class IncrementalRenderer {
    constructor(container) {
        this.buffer = '';
        this.flushInterval = setInterval(() => this._flush(), 16); // 60fps
    }

    onTextDelta(text) {
        this.buffer += text; // Just buffer — don't touch DOM
    }

    _flush() {
        if (!this.buffer) return;

        if (!this.activeParagraph) {
            this.activeParagraph = document.createElement('p');
            this.container.appendChild(this.activeParagraph);
        }

        this.activeParagraph.textContent += this.buffer;
        this.buffer = '';

        // Auto-scroll only if user is already at bottom
        const { scrollTop, scrollHeight, clientHeight } = this.container;
        if (scrollHeight - scrollTop - clientHeight < 100) {
            this.container.scrollTop = scrollHeight;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

For tool calls, show a status card immediately (don't wait for the result):

onToolCallStart({ tool_name, id }) {
    const card = document.createElement('div');
    card.className = 'tool-call-card pending';
    card.id = `tool-${id}`;
    card.innerHTML = `<span>${tool_name}</span> <span>⏳ running...</span>`;
    this.container.appendChild(card);
}

onToolResult({ tool_call_id, is_error }) {
    const card = document.getElementById(`tool-${tool_call_id}`);
    if (card) {
        card.className = `tool-call-card ${is_error ? 'error' : 'done'}`;
        card.querySelector('span:last-child').textContent = is_error ? '' : '';
    }
}
Enter fullscreen mode Exit fullscreen mode

The Production Checklist (Short Version)

Infrastructure:

  • proxy_read_timeout 3600s in nginx (default 60s kills long streams)
  • X-Accel-Buffering: no on SSE responses
  • CDN bypass for streaming endpoints (CloudFront buffers by default)

Backend:

  • Bounded asyncio.Queue (prevents memory exhaustion)
  • Heartbeat / keepalive every 15-30s (prevents proxy timeout)
  • Tool execution has timeout (asyncio.wait_for(tool(), timeout=30))
  • Sequence numbers on every event
  • Error events emitted before exceptions (client knows what happened)

Frontend:

  • EventSource reconnect + Last-Event-ID support
  • Text rendered at 60fps max
  • Input disabled during active stream
  • Cancel/interrupt button visible

The Full Pattern Library

These implementations are in MAC-020 of the Machina Market production agent pattern library: https://machinamarket.surge.sh

The pack includes the full 9-module source, the backpressure implementation, WebSocket bidirectional streaming with interrupt support, multi-agent pipeline router, Redis replay buffer, and the complete 40-point production checklist.

The series now covers 20 packs — from RAG and tool use to cost optimization to observability to workflow planning to real-time streaming. All Python, all production-tested.

What streaming edge cases have you hit in production? Happy to dig into specifics in the comments.

Top comments (0)