- Book: AI Agents Pocket Guide: Patterns for Building Autonomous Systems with LLMs
- Also by me: Thinking in Go (2-book series) — Complete Guide to Go Programming + Hexagonal Architecture in Go
- My project: Hermes IDE | GitHub — an IDE for developers who ship with Claude Code and other AI coding tools
- Me: xgabriel.com | GitHub
A user types a question into your support agent. The model decides to call search_orders with a customer email and a date range. The total response takes 4.2 seconds to finish. Your UI sat blank for every one of those seconds, then dropped one big tool result on the screen. The model knew the email after token 30. It knew the date range after token 80. You waited for token 410 because the SDK call was non-streaming.
Streaming tool calls close that gap. The Anthropic Messages API streams every block of the response over Server-Sent Events. Tool inputs arrive as partial_json chunks you can concatenate and inspect while the model is still typing. React to the email field the moment it closes, before the date range even arrives, and you can shave a noticeable slice off first-action latency on a search-heavy agent.
The event shape first.
What the wire actually looks like
Anthropic's streaming format follows the Server-Sent Events spec. You get a sequence of named events. The ones you care about for tool calls:
message_start
content_block_start (index=0, type=tool_use, name=search_orders)
content_block_delta (index=0, partial_json='{"em')
content_block_delta (index=0, partial_json='ail":')
content_block_delta (index=0, partial_json=' "ada@')
content_block_delta (index=0, partial_json='ex.io"')
content_block_delta (index=0, partial_json=', "from')
content_block_delta (index=0, partial_json='":"2026-')
...
content_block_stop (index=0)
message_delta (stop_reason=tool_use)
message_stop
Each content_block_delta event carries a nested delta object. For tool inputs the delta type is input_json_delta and the field is partial_json. Concatenate them in arrival order to get the JSON the model is sending. The chunks do not respect JSON boundaries. A delta can end mid-key or mid-string, so buffer first and parse only on content_block_stop.
There is a smaller text path too. Plain assistant text uses text_delta with a text field. Same outer envelope, different payload.
The parser, end to end
Here is the smallest correct parser using client.messages.stream. It accumulates per-block buffers, parses each tool input the moment its block closes, and yields events your code can act on.
import json
from dataclasses import dataclass, field
from typing import Iterator
import anthropic
client = anthropic.Anthropic()
@dataclass
class ToolCall:
block_index: int
name: str
tool_use_id: str
raw_json: str = ""
parsed: dict | None = None
@dataclass
class StreamState:
text_buf: dict[int, str] = field(default_factory=dict)
tool_buf: dict[int, ToolCall] = field(
default_factory=dict
)
stop_reason: str | None = None
The state is two dicts keyed by block index. Anthropic emits one block at a time today, but the index is part of the protocol so handle it as if blocks could interleave.
def stream_turn(messages, tools) -> Iterator[dict]:
state = StreamState()
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=tools,
messages=messages,
) as stream:
for event in stream:
for out in _handle(event, state):
yield out
yield {
"type": "done",
"stop_reason": state.stop_reason,
}
messages.stream is a context manager. The for-loop pulls typed event objects. The handler routes each event by event.type and decides whether to yield anything.
def _handle(event, state) -> Iterator[dict]:
t = event.type
if t == "content_block_start":
block = event.content_block
if block.type == "tool_use":
state.tool_buf[event.index] = ToolCall(
block_index=event.index,
name=block.name,
tool_use_id=block.id,
)
elif block.type == "text":
state.text_buf[event.index] = ""
elif t == "content_block_delta":
d = event.delta
if d.type == "input_json_delta":
tc = state.tool_buf[event.index]
tc.raw_json += d.partial_json
yield {
"type": "tool_partial",
"name": tc.name,
"raw_so_far": tc.raw_json,
}
elif d.type == "text_delta":
state.text_buf[event.index] += d.text
yield {"type": "text", "chunk": d.text}
elif t == "content_block_stop":
if event.index in state.tool_buf:
tc = state.tool_buf[event.index]
tc.parsed = json.loads(tc.raw_json or "{}")
yield {
"type": "tool_ready",
"name": tc.name,
"tool_use_id": tc.tool_use_id,
"input": tc.parsed,
}
elif t == "message_delta":
if event.delta.stop_reason:
state.stop_reason = event.delta.stop_reason
Three event types do the work: content_block_start opens a buffer; deltas append and yield previews; content_block_stop parses the finished JSON and yields a tool_ready event your dispatch loop runs.
Empty input is a real case. A tool with zero arguments still emits content_block_start and content_block_stop with no deltas in between. The or "{}" fallback handles it.
Reacting before the block closes
The tool_partial events are where streaming earns its keep. You get the raw JSON string as it grows. If you have to wait for one specific field, you can lift it out the moment it closes without parsing the whole object.
A pragmatic shortcut for stable, short string fields:
import re
def first_string_field(raw: str, key: str) -> str | None:
pattern = rf'"{re.escape(key)}"\s*:\s*"([^"\\]*)"'
m = re.search(pattern, raw)
return m.group(1) if m else None
for ev in stream_turn(messages, tools):
if ev["type"] == "tool_partial":
email = first_string_field(ev["raw_so_far"], "email")
if email and not warmed:
prefetch_customer(email)
warmed = True
The regex matches a closed JSON string for one key. It will not match while the value is still being typed because the closing quote has not arrived yet. That is the property you want. You fire only on closed fields; a half-typed value never matches. Skip this trick for nested objects or escape-heavy inputs, where a partial parser like ijson earns the dependency.
A 200ms head start on a database lookup is real latency you can ship. The agent's first action runs while the model is still drafting argument three.
Ending the turn
When the stream finishes, stop_reason lives on message_delta.delta.stop_reason. The values you care about are end_turn (the model said its piece, no tool call needed) and tool_use (one or more tool blocks need execution). After your dispatch runs the tools, append a user message containing the tool_result blocks and call the API again. Same shape as the non-streaming flow.
The SDK's stream.get_final_message() will hand you the structured assistant content for free if you want to skip the bookkeeping. The manual version, for the cases where you want full control:
tool_ready_events = []
final = None
for ev in stream_turn(messages, tools):
if ev["type"] == "tool_ready":
tool_ready_events.append(ev)
elif ev["type"] == "done":
final = ev
if final["stop_reason"] == "tool_use":
results = []
for ev in tool_ready_events:
out = dispatch[ev["name"]](**ev["input"])
results.append({
"type": "tool_result",
"tool_use_id": ev["tool_use_id"],
"content": str(out),
})
messages.append(
{"role": "assistant", "content": assistant_blocks}
)
messages.append(
{"role": "user", "content": results}
)
assistant_blocks is the structured content you accumulated from the stream: text and tool_use blocks. Build it as you go from content_block_start and content_block_stop events, or call stream.get_final_message().content once the stream closes.
When streaming is worth it
Streaming earns its keep when:
- The user is watching. Token-by-token text or a "Searching orders..." indicator the moment the tool name lands beats four seconds of dead UI every time.
- A tool argument unblocks something expensive. Prefetching a customer record, opening a database connection, running an auth check on a known-stable field. Each of these can run in parallel with the rest of the model's argument list, so the wall-clock for the whole turn drops.
- You want to validate before the model finishes. Reject a forbidden domain or oversized payload during the stream and abort the call instead of paying for the rest of the tokens.
- Your turn has multiple tool blocks and they are independent. Kick off the first dispatch while the second is still streaming.
Skip streaming when the call is short, the user is not waiting, or the tool input is small enough that the SSE plumbing costs more code than it saves time. For background batch jobs, scheduled summarizers, and eval runs, non-streaming is one fewer thing to maintain.
Under 60 lines once you strip the type hints. Cheaper than the four-second blank UI you are shipping today, and the parser is small enough to drop into any agent loop you already have.
If this was useful
The AI Agents Pocket Guide covers the streaming protocol, the dispatch loop, and the trade-offs around partial parsing and early validation in more depth, alongside the agent patterns that pair with them.

Top comments (0)