DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Streaming LLM Responses in Python Agents: What Goes Wrong and How to Fix It

Streaming looks simple in the docs. You swap .create() for .stream(), iterate over chunks, and print each piece as it arrives.

Then you add tool use. Or you try to forward streams to external messaging systems. Or the response runs long and the context window fills up mid-stream.

Each of those is a different failure. Let's go through them.


The Core Problem

Streaming and tool use do not compose cleanly. A non-streaming LLM response is a complete JSON object. You parse it, see stop_reason: "tool_use", extract the tool call, run the tool, send the result back.

With streaming, that tool call arrives in pieces. The input field of a tool_use block comes as a string of JSON characters, one chunk at a time. You cannot parse partial JSON. You have to buffer the entire input field before you can execute the tool.

Most streaming tutorials skip this. They show text streaming and stop there. If you add tool calls to a streaming agent without handling this, you'll get either a JSON parse error or a tool call with an empty input dict.


Pattern 1: Buffer Tool Calls Until Complete

Here is what correct streaming with tool use looks like using the Anthropic Python SDK:

import anthropic
import json

client = anthropic.Anthropic()

def run_agent(messages, tools):
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=4096,
        tools=tools,
        messages=messages,
    ) as stream:
        # Accumulate the full response
        response = stream.get_final_message()

    # Check stop reason AFTER stream completes
    if response.stop_reason == "tool_use":
        tool_results = []

        for block in response.content:
            if block.type == "tool_use":
                # Tool input is now fully buffered -- safe to execute
                result = execute_tool(block.name, block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result,
                })

        # Continue the agent loop
        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})
        return run_agent(messages, tools)

    # stop_reason == "end_turn": extract text
    for block in response.content:
        if hasattr(block, "text"):
            return block.text

    return ""
Enter fullscreen mode Exit fullscreen mode

The key is stream.get_final_message(). This lets you stream the text to the user while it arrives, but blocks until the full response is assembled before you try to process tool calls.


Pattern 2: Stream Text, Buffer Tool Calls

What if you want to actually stream text to the user in real time, not just use streaming as a performance trick on your side?

def run_agent_with_live_text(messages, tools, on_text_chunk):
    current_text = []
    tool_calls_buffer = {}

    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=4096,
        tools=tools,
        messages=messages,
    ) as stream:
        for event in stream:
            if event.type == "content_block_start":
                if event.content_block.type == "tool_use":
                    # Start buffering a tool call
                    tool_calls_buffer[event.index] = {
                        "id": event.content_block.id,
                        "name": event.content_block.name,
                        "input_str": "",
                    }

            elif event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    # Stream text immediately to caller
                    chunk = event.delta.text
                    current_text.append(chunk)
                    on_text_chunk(chunk)

                elif event.delta.type == "input_json_delta":
                    # Buffer tool input JSON -- do NOT try to parse yet
                    if event.index in tool_calls_buffer:
                        tool_calls_buffer[event.index]["input_str"] += event.delta.partial_json

        response = stream.get_final_message()

    # Now parse buffered tool inputs
    if response.stop_reason == "tool_use":
        tool_results = []
        for idx, tool_data in tool_calls_buffer.items():
            try:
                parsed_input = json.loads(tool_data["input_str"])
            except json.JSONDecodeError:
                parsed_input = {}  # tool sent malformed input

            result = execute_tool(tool_data["name"], parsed_input)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tool_data["id"],
                "content": result,
            })

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})
        return run_agent_with_live_text(messages, tools, on_text_chunk)

    return "".join(current_text)
Enter fullscreen mode Exit fullscreen mode

Two separate paths: text chunks stream out immediately via on_text_chunk, while tool input JSON accumulates in tool_calls_buffer. Only after the stream completes do you parse the JSON and execute the tools.


Pattern 3: Never Forward Streaming Replies to External Surfaces

This one is not a code pattern. It is a hard rule.

If your agent posts to Slack, Telegram, WhatsApp, or any external messaging system, do not stream partial replies to those surfaces. Only send the final, complete reply.

There are two reasons for this. First, users see incomplete sentences mid-flight. It reads as a bug. Second, many messaging APIs rate-limit message edits. If you're streaming 100 chunks and updating the Slack message on each chunk, you'll hit rate limits and the stream will stall.

def send_to_slack(channel, messages, tools):
    # Wrong: streaming partial replies to Slack
    # for chunk in stream:
    #     slack_client.chat_update(channel=channel, text=partial_reply)

    # Correct: collect full reply, send once
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=4096,
        tools=tools,
        messages=messages,
    ) as stream:
        response = stream.get_final_message()

    final_text = next(
        (b.text for b in response.content if hasattr(b, "text")), ""
    )
    slack_client.chat_postMessage(channel=channel, text=final_text)
Enter fullscreen mode Exit fullscreen mode

Use streaming internally for your own UI or for logging. For external messaging surfaces, stream to a buffer, wait for completion, send once.


Pattern 4: Context Window Overrun During Long Streams

If your agent runs a long agentic loop, the message history grows. Eventually a single request's messages list will push the total token count near the model's context limit. The stream starts, and then cuts off mid-reply with a max_tokens or context error.

The fix is to measure before sending:

from prompt_token_counter import estimate_tokens
from tool_output_truncate import truncate

def prepare_messages(messages, tools, model="claude-sonnet-4-6", limit=160000):
    # Rough estimate of token cost
    token_count = estimate_tokens(messages)

    # Truncate large tool results first
    if token_count > limit * 0.8:
        messages = [
            {**msg, "content": truncate(msg["content"], max_chars=2000)}
            if msg.get("role") == "tool"
            else msg
            for msg in messages
        ]

    # If still too large, drop oldest non-system messages
    while estimate_tokens(messages) > limit * 0.9 and len(messages) > 2:
        # Keep system message (index 0) and drop oldest user/assistant pair
        messages = [messages[0]] + messages[3:]

    return messages
Enter fullscreen mode Exit fullscreen mode

prompt-token-counter gives you a fast approximation without calling the API. tool-output-truncate shrinks large tool results in-place. You apply both before building the request.


What This Does NOT Do

None of these patterns handle multi-modal streaming. If you're streaming images or audio, the buffering logic is different.

The estimate_tokens call is approximate. It will be off by a few percent. Do not use it as a hard guarantee that a request will succeed. Treat it as a safety margin check.

These patterns assume you're working with the Anthropic API. OpenAI and other providers have different streaming event formats. The buffering concept is the same but the event types and field names differ.


Design Notes

The reason streaming is tricky with tool use is that the Anthropic message format requires tool_use and tool_result blocks to be paired correctly. If you try to parse a tool call before its input field is complete, you get garbage. If you send a tool_result with the wrong tool_use_id, the API returns an error.

Buffering by event.index (the position of the content block in the response) is the reliable way to track which partial JSON belongs to which tool call. Multiple tool calls can interleave in the stream.

The get_final_message() call is your safety net. Use it even when you're processing events in the streaming loop. It gives you the fully assembled response object to work from.


When This Applies

Use the buffering patterns when:

  • Your agent uses tool calls and you want live text streaming
  • You're building a chat UI that shows replies as they arrive
  • You need to handle errors mid-stream gracefully

The external-messaging rule applies whenever:

  • Your agent posts to Slack, Telegram, WhatsApp, Discord, or any platform with message rate limits

Quick Start

pip install anthropic tool-output-truncate-py prompt-token-counter
Enter fullscreen mode Exit fullscreen mode

The Anthropic SDK handles the streaming protocol. tool-output-truncate-py and prompt-token-counter are helpers for context management.


Related Libraries

Library What It Does Language
claude-stream-rs Anthropic SSE stream parser Rust
agent-message-window Sliding window with tool_use/tool_result pairing Python
tool-output-truncate-py Truncate large tool results before sending to LLM Python
agentsnap Capture tool call args and results per run Python
llm-stop-conditions Composable conditions to end the agent loop Python
prompt-token-counter Fast approximate token count without API call Python

What's Next

If you want to see the full streaming event sequence for debugging, agentsnap captures every tool call and response in a structured log. You can replay a session and see exactly which events arrived in which order.

For long-running agents that accumulate large tool results, tool-output-truncate-py and agent-message-window together give you context budget management without manual history pruning.

The streaming patterns here work with claude-sonnet-4-6. If you're using extended thinking, the streaming event structure changes. Thinking blocks arrive as their own content block type and should be buffered separately from text.

Top comments (0)