DEV Community

Bernard Uriza
Bernard Uriza

Posted on

Live chain-of-thought in a chatbot: how to actually stream the tool calls (not just the text)

Most "streaming" LLM chatbots stream just the text. The model says "I'll search for that…" and then you wait 6 seconds while the tokens dribble in. The actual search? Hidden. The 3 scrapes it did to fact-check? Hidden. You're staring at a typing indicator that doesn't tell you anything about what's actually taking time.

I just built a chatbot where every tool call surfaces as a step in real time — 🔍 search_engine, 📄 scrape_as_markdown, 📄 scrape_as_markdownwhile the response streams token by token afterwards. The user sees the agent's chain-of-thought as it happens, not as a postmortem.

The trick is that you have to stream three different things, and each layer needs to know what to do with each kind of event. Here's the architecture.

The shape of the stream

The agent runner (in my case, fi-runner wrapping the Claude Agent SDK) emits events of three types as they happen:

async for event in runner.run_stream(user_message, session_id=sid):
    # event["type"] is one of:
    #   "tool_call"  → event["tool"] is a ToolCall(name, server, is_error, ...)
    #   "text"       → event["text"] is a delta (a few tokens of the response)
    #   "result"     → event["result"] is the final TurnResult (post-guards)
Enter fullscreen mode Exit fullscreen mode

Three types because they mean three different things visually:

  • tool_call lands BEFORE the text. The model decides to use a tool, and you want to show which tool, which server, immediately. That's the "thinking step".
  • text lands while the response is generating. Token deltas, typewriter effect.
  • result lands LAST and is the authoritative final text — it can differ from the concatenated text deltas because post-turn guards (anti-drift, PHI redaction) may have rewritten the response.

That last point is a footgun the spec doesn't yell at you about. We'll come back to it.

Layer 1: the FastAPI endpoint

Server-Sent Events (SSE) is the right transport here — unidirectional, text-based, survives proxies, browsers handle reconnect natively. FastAPI handles it with StreamingResponse:

import json
from fastapi.responses import StreamingResponse

def _sse(event: str, data: dict) -> str:
    return f"event: {event}\ndata: {json.dumps(data)}\n\n"

@app.post("/chat/stream")
async def chat_stream_endpoint(req: ChatRequest) -> StreamingResponse:
    async def gen():
        yield _sse("open", {"session_id": req.session_id})
        try:
            async with asyncio.timeout(180):
                async for event in chat_stream(req.message, session_id=req.session_id):
                    t = event.get("type")
                    if t == "tool_call":
                        yield _sse("tool_call", tool_call_to_wire(event["tool"]))
                    elif t == "text":
                        yield _sse("text", {"delta": event["text"]})
                    elif t == "result":
                        yield _sse("result", result_to_wire(event["result"]))
        except asyncio.CancelledError:
            raise            # client closed tab — propagate so the LLM call cancels
        except TimeoutError:
            yield _sse("error", {"kind": "TimeoutError", "message": "turn exceeded 180s"})
        except Exception as exc:
            yield _sse("error", {"kind": type(exc).__name__, "message": str(exc)})
        finally:
            yield _sse("done", {})

    return StreamingResponse(
        gen(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # nginx: don't buffer
            "Connection": "keep-alive",
        },
    )
Enter fullscreen mode Exit fullscreen mode

Three things in here are non-obvious:

  1. The exception ladder. except CancelledError: raise MUST come before except Exception. When the user closes the tab, FastAPI propagates CancelledError into the generator — if you swallow it as a "normal error" and yield an error frame, you (a) write to a socket that's already closed, and (b) more importantly, the LLM call upstream may not actually cancel. It keeps running in the shadow, burning tokens.

  2. asyncio.timeout(180). If your upstream tool (Bright Data MCP in my case) hangs, the SSE socket stays open forever. The user sees a typing indicator that never resolves. A hard ceiling per turn turns a wedge into a clean error event.

  3. X-Accel-Buffering: no. nginx by default buffers responses. SSE through nginx without this header means the user gets nothing until the generator finishes — defeating the entire point. Cloudflare has its own knobs.

Layer 2: the wire contract (PHI safety)

The naïve approach is to dict() the ToolCall and send it. Don't. The input field on a tool call carries whatever the LLM passed in — for a search tool, that's the query verbatim; for Bright Data, URLs with auth tokens in query strings; for an internal medical tool, possibly PHI. None of that should leave the process over the SSE wire.

I keep the wire shape in its own module:

# wire.py — the SINGLE source of truth for what leaves over SSE
from typing import TypedDict, Any

class ToolCallWire(TypedDict):
    name: str | None
    server: str | None
    id: str | None
    is_error: bool | None
    # NO `input` field. Intentionally narrower than the in-process ToolCall.

def tool_call_to_wire(tc: Any) -> ToolCallWire:
    return {
        "name": getattr(tc, "name", None),
        "server": getattr(tc, "server", None),
        "id": getattr(tc, "id", None),
        "is_error": getattr(tc, "is_error", None),
    }
Enter fullscreen mode Exit fullscreen mode

Two things to notice:

  • The wire type is deliberately narrower than the in-process type. The compiler enforces what gets serialized. If you ever do dict(tool_call), you have to actively bypass the type to leak input. That's how you make PHI-safety the path of least resistance.
  • tool_call_to_wire uses getattr with None defaults because it sees partial objects mid-stream — a ToolUseBlock arrives BEFORE its matching ToolResultBlock, so is_error is still None. Defensive getattr here is correct. The result_to_wire counterpart, where the object is always complete, uses direct attribute access — you WANT it to raise if the upstream library renames a field, so you find out before shipping silent empty results.

Layer 3: the React side

EventSource is the obvious choice for SSE… except it's GET-only, no request body. My chat endpoint is POST. So I drop EventSource and use fetch streaming:

const res = await fetch(`${API_URL}/chat/stream`, {
  method: "POST",
  headers: { "Content-Type": "application/json", Accept: "text/event-stream" },
  body: JSON.stringify({ session_id, message }),
  signal: abortController.signal,  // ← user can cancel mid-stream
});

const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });   // {stream:true} handles UTF-8 split between chunks
  const frames = buffer.split("\n\n");
  buffer = frames.pop() ?? "";                          // last frame may be partial — save for next read
  for (const frame of frames) {
    const { event, data } = parseFrame(frame);
    if (event === "tool_call") {
      patchAssistant({ steps: [...prev.steps, data], status: "streaming" });
    } else if (event === "text") {
      patchAssistant({ content: prev.content + data.delta });
    } else if (event === "result") {
      // REPLACE, don't append — post-guard text may differ
      patchAssistant({ content: data.text, steps: data.tool_calls, status: "done" });
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The {stream: true} flag on TextDecoder is what makes this work for UTF-8 — without it, a multi-byte character split between chunks corrupts. The buffer-and-split-on-blank-line is just the SSE framing.

The replace-not-append on the result event is the footgun I promised. The streamed text deltas are the LLM's raw output as it generates. The result.text is what the post-turn guards left after running. If your anti-drift guard rewrites the response (mine does — it strips report-voice markdown headers), the streamed deltas and the final text don't match. If you append the result to the streamed content, you double-render. If you replace, you get a smooth "preview → settled" transition. The spec calls for replace.

The scroll problem

Naïve useEffect(() => scrollIntoView(), [messages]) runs on every text delta. Result: ~30 scroll animations per second fighting each other, AND if the user scrolled up to re-read an earlier response, you yank them back to the tail mid-read. Both unusable.

The fix is the "sticky-bottom" pattern that ChatGPT and Claude.ai use:

useEffect(() => {
  const distanceFromBottom = doc.scrollHeight - (window.innerHeight + window.scrollY);
  const nearBottom = distanceFromBottom < 200;
  const newMessage = messages.length > lastCountRef.current;
  lastCountRef.current = messages.length;

  if (newMessage || nearBottom) {
    tailRef.current?.scrollIntoView({ behavior: "smooth", block: "end" });
  }
}, [messages]);
Enter fullscreen mode Exit fullscreen mode

Scroll on new message (always — turn boundary, the user wants to see the answer). Scroll on delta only if the user is already near the bottom. The 200px threshold is the sweet spot — strict enough that you respect intent to read, lax enough that a small scroll bump doesn't lose autoscroll.

What you actually see

When this all hangs together right, the user types acme.com and immediately sees:

🤖 pensando…
  🔍 search_engine
  📄 scrape_as_markdown
  📄 scrape_as_markdown
  ⚙️ search_documents
Enter fullscreen mode Exit fullscreen mode

…stepping in over ~4 seconds, with the roast text starting to type after. That sequence used to be a black box. Now it's receipts.

What's next

Two gaps I'm hitting in the current setup:

  1. No duration_ms per tool call — when one of those scrape steps takes 8 seconds, you can't show it. The Mermaid turn-flow can't colour slow steps. Just shipped this in fi-runner 0.14 — ToolCall.duration_ms paired by tool_use_id.

  2. No preflight on the MCP servers — if Bright Data MCP fails to spawn at boot (bad token, missing npx), I only find out when the model tries the first tool. Generic is_error=true, mid-roast, in production. Also shipped in 0.14 — Runner.preflight() does a JSON-RPC handshake (initializetools/list) against each MCP at startup, returns {name: alive, tools, error}. Wire into your lifespan event and the first bad demo dies at boot.

If you're building anything with agents that use external tools, the message of this post is: don't hide the tools. The chain-of-thought IS the product. Showing it turns "the AI is doing magic" into "the AI is making 4 specific API calls and here they are", which is the difference between users trusting it and not.

Top comments (0)