DEV Community

Cover image for Streaming Tool Calls with Anthropic's API: The Buffer Pattern Nobody Documents
Gabriel Anhaia
Gabriel Anhaia

Posted on

Streaming Tool Calls with Anthropic's API: The Buffer Pattern Nobody Documents


Streaming tool calls look easy in the docs. You wire up an SSE handler, the example prints text as it arrives, you assume the rest is symmetric.

Then you ship one. The first tool call comes in as a half-baked JSON object. Your parser dies on {"city": "San Fran. The second tool call interleaves with a thinking block and your text-vs-tool dispatch falls over. You patch around it until the patches outweigh the original handler.

The fix is small and the docs don't spell it out. Here's the buffer pattern, the multi-block dispatcher, and the three gotchas that bite every team that tries this without one.

What tool_use deltas look like over SSE

The non-streaming response is the clean version. You get one tool_use block, fully formed, with the input object already parsed:

{
  "type": "tool_use",
  "id": "toolu_01A09q90qw90lq917835lq9",
  "name": "get_weather",
  "input": {"city": "San Francisco", "unit": "celsius"}
}
Enter fullscreen mode Exit fullscreen mode

Streaming doesn't give you that. It gives you a sequence of events that you have to reassemble. The model emits a content_block_start to declare a tool_use block (with name and id, but no input), then a stream of input_json_delta events with string fragments, then a content_block_stop.

A real capture for the same call looks like this (trimmed for clarity):

event: content_block_start
data: {"type":"content_block_start","index":1,"content_block":{
  "type":"tool_use","id":"toolu_01A09q90qw90lq917835lq9",
  "name":"get_weather","input":{}}}

event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{
  "type":"input_json_delta","partial_json":"{\"ci"}}

event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{
  "type":"input_json_delta","partial_json":"ty\": \"San Fran"}}

event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{
  "type":"input_json_delta","partial_json":"cisco\", \"unit\""}}

event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{
  "type":"input_json_delta","partial_json":": \"celsius\"}"}}

event: content_block_stop
data: {"type":"content_block_stop","index":1}
Enter fullscreen mode Exit fullscreen mode

The input field in content_block_start is {}. It's always {}. The real arguments live in the partial_json fragments. They split on arbitrary character boundaries: mid-key, mid-value, mid-escape sequence. Nothing in the protocol guarantees a delta ends on a syntactic boundary.

This is the part the quickstart skips. It's also where every naive parser dies.

The naive parser that explodes on partial JSON

You read the SDK example, you see delta.partial_json, you write the obvious thing:

# naive: broken on every non-trivial call
import json
from anthropic import Anthropic

client = Anthropic()
tool_input = None

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    tools=[get_weather_tool],
    messages=[{"role": "user", "content": "Weather in SF?"}],
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "input_json_delta":
                # parse as it arrives: this is the bug
                tool_input = json.loads(event.delta.partial_json)
Enter fullscreen mode Exit fullscreen mode

First delta is {"ci. json.loads raises json.decoder.JSONDecodeError: Unterminated string starting at: line 1 column 2 (char 1). The handler crashes before the second event arrives.

You "fix" it by wrapping in a try/except and silently dropping failed parses. Now you parse nothing until the final delta closes the object, which means you ignored every fragment up to that point. On most calls, the final delta isn't a complete object either, it's just the closing brace. So your tool_input stays None and the agent has no arguments to pass to the tool.

The lesson is straightforward and worth saying once: the partial_json string is not JSON. It's a fragment of a JSON-shaped string that becomes JSON when concatenated with all the other fragments for the same block. You don't parse fragments. You buffer them.

The 40-line buffer pattern: accumulate, parse at stop

The pattern is one buffer per content block, keyed by the index field on the event. You append every input_json_delta.partial_json to the buffer. You parse exactly once, at content_block_stop. Nothing in between.

import json
from collections import defaultdict
from anthropic import Anthropic

client = Anthropic()

# index -> accumulating partial_json string
tool_buffers: dict[int, str] = defaultdict(str)
# index -> {"id": ..., "name": ...} captured at content_block_start
tool_meta: dict[int, dict] = {}
# completed tool calls, in arrival order
completed_tools: list[dict] = []

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=2048,
    tools=tools,
    messages=messages,
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            block = event.content_block
            if block.type == "tool_use":
                tool_meta[event.index] = {
                    "id": block.id,
                    "name": block.name,
                }

        elif event.type == "content_block_delta":
            if event.delta.type == "input_json_delta":
                # accumulate, never parse here
                tool_buffers[event.index] += event.delta.partial_json

        elif event.type == "content_block_stop":
            if event.index in tool_meta:
                raw = tool_buffers[event.index]
                # empty input is "" not "{}", handle both
                parsed = json.loads(raw) if raw.strip() else {}
                completed_tools.append({
                    **tool_meta[event.index],
                    "input": parsed,
                })
                del tool_buffers[event.index]
                del tool_meta[event.index]
Enter fullscreen mode Exit fullscreen mode

That's the whole thing. Forty lines including the imports and comments.

Two things to notice. The buffer is keyed by event.index, not by tool id, because the id only shows up on content_block_start and the deltas don't carry it. index is on every event, so it's the join key. The metadata dict captures id and name at the start so you can pair them with the parsed input at the stop.

The second thing: json.loads("") raises. A tool that takes no arguments will produce zero input_json_delta events, leaving an empty buffer. The if raw.strip() guard returns {} instead of crashing. Skip this and your no-argument tool calls die at stop time.

Routing streamed text vs tool_use vs thinking blocks

Real responses interleave content blocks. You'll get a text block while the model "thinks aloud", then a tool_use block, then more text after the tool returns. With extended thinking turned on you also get thinking blocks. They all share the same delta channel, and only the delta.type distinguishes them.

A single dispatcher handles the lot. The shape is: switch on the block type at content_block_start, route deltas by delta.type, finalize at content_block_stop.

from typing import Callable

text_buffers: dict[int, str] = defaultdict(str)
thinking_buffers: dict[int, str] = defaultdict(str)
block_kind: dict[int, str] = {}  # "text" | "tool_use" | "thinking"

def on_text_chunk(text: str) -> None:
    # stream to the UI as it arrives: text IS safe to use mid-stream
    print(text, end="", flush=True)

def on_tool_call(tool: dict) -> None:
    # invoke the tool, append result to next request
    ...

for event in stream:
    if event.type == "content_block_start":
        block_kind[event.index] = event.content_block.type
        if event.content_block.type == "tool_use":
            tool_meta[event.index] = {
                "id": event.content_block.id,
                "name": event.content_block.name,
            }

    elif event.type == "content_block_delta":
        kind = block_kind.get(event.index)
        d = event.delta
        if kind == "text" and d.type == "text_delta":
            text_buffers[event.index] += d.text
            on_text_chunk(d.text)
        elif kind == "thinking" and d.type == "thinking_delta":
            thinking_buffers[event.index] += d.thinking
        elif kind == "tool_use" and d.type == "input_json_delta":
            tool_buffers[event.index] += d.partial_json

    elif event.type == "content_block_stop":
        kind = block_kind.pop(event.index, None)
        if kind == "tool_use":
            raw = tool_buffers.pop(event.index, "")
            parsed = json.loads(raw) if raw.strip() else {}
            on_tool_call({**tool_meta.pop(event.index), "input": parsed})
        # text and thinking blocks stream live; nothing to finalize
Enter fullscreen mode Exit fullscreen mode

Text is the one block type you can safely emit live. The delta is already a valid UTF-8 fragment, so concat to the running buffer and send to the UI. Thinking blocks you usually log but don't show users. Tool blocks you buffer and dispatch at stop.

The block_kind map exists because the delta.type alone isn't enough to decide what to do. A text_delta arriving on an index that started life as a tool_use would be a protocol violation, but defensive routing means you don't get a confusing crash if the SDK changes shape between versions.

Backpressure: when the model emits faster than your tool can run

If on_tool_call runs a database query or hits another API, it can take seconds. Meanwhile the model is still streaming the next block. If you call your tool synchronously inside the event loop, the SSE connection blocks and the server can hit its read timeout.

Two reasonable shapes. Either drop tool dispatch into a queue and consume it after the stream finishes (simplest, works for most cases), or run the tool concurrently and reconcile results into the next request (faster, but you need cancellation handling for when the user aborts):

import asyncio

pending_calls: list[dict] = []

async def consume_stream():
    async with client.messages.stream(...) as stream:
        async for event in stream:
            # same dispatcher logic; collect, don't execute
            ...
    return pending_calls

async def execute_tools(calls):
    return await asyncio.gather(*[run_tool(c) for c in calls])

calls = await consume_stream()
results = await execute_tools(calls)
Enter fullscreen mode Exit fullscreen mode

The queue shape is the default. The concurrent shape is for agent loops where time-to-first-tool-output matters more than implementation simplicity.

The gotcha: empty deltas are normal, don't reset on them

This one eats hours. The Anthropic stream occasionally sends an input_json_delta with partial_json: "". It's not a signal. It's not an error. It's just an empty chunk, possibly a heartbeat-shaped artifact of the underlying SSE framing, possibly a model-side tokenization edge case.

The defensive instinct is to treat an empty delta as a reset, or to log a warning, or to skip the buffer append. All three are wrong. Append the empty string (it's a no-op), keep going, parse at stop. The buffer state is correct.

The same applies to whitespace-only deltas. partial_json: " " is legal; it might be the space between two key-value pairs. Append, don't filter.

What to take with you

  • Never call json.loads on a partial_json fragment. Buffer until content_block_stop.
  • Key your buffer by event.index, capture id/name at content_block_start.
  • A single dispatcher handles text, tool_use, and thinking blocks by switching on the block kind captured at start.
  • Text is safe to stream live. Tool input is not.
  • Empty input_json_delta events are normal. Don't filter them, don't reset.
  • Run tools off the stream loop, either queued or concurrent. Never synchronously inside the event handler.

Forty lines, three gotchas, one dispatcher. That's the whole pattern.

What's the weirdest partial-JSON delta you've had to debug, and how did you catch it?


If this was useful

The streaming protocol is one small piece of the agent loop, and this kind of pattern lives in AI Agents Pocket Guide. The book walks through tool-use plumbing, loop budgets, error recovery, and the failure modes that show up only after you ship: the same ground covered here, but for the full agent lifecycle rather than a single event channel.

AI Agents Pocket Guide: Patterns for Building Autonomous Systems with LLMs

Top comments (0)