DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Multi-Provider Streaming Chunk Reassembly: Collect an LLM Stream Into a Complete Response

You switched from a non-streaming to a streaming API call for better perceived latency. Now your agent loop needs to handle chunks. Anthropic sends content_block_delta events. OpenAI sends choices[0].delta.content. Google sends candidates[0].content.parts. They all mean the same thing but in different formats.

You write a chunk collector for Anthropic. You switch to OpenAI for a test. You write another collector. You add Google. Three collectors for the same concept.

llm-stream-collector reassembles streaming chunks from any provider into a complete response object.


The Shape of the Fix

from llm_stream_collector import StreamCollector

collector = StreamCollector()

# Anthropic streaming
with anthropic_client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=messages,
) as stream:
    for chunk in stream:
        collector.feed_anthropic(chunk)

result = collector.result()
print(result.text)           # Complete assembled text
print(result.input_tokens)   # Total input tokens
print(result.output_tokens)  # Total output tokens
print(result.stop_reason)    # "end_turn", "tool_use", etc.
print(result.tool_calls)     # List of assembled tool calls
Enter fullscreen mode Exit fullscreen mode

Feed chunks as they arrive. Call result() when the stream ends. The result is provider-normalized: the same fields regardless of which provider you used.


What It Does NOT Do

llm-stream-collector does not stream the output to the user. It collects the full response before you can call result(). For streaming output to the user in real time (printing chunks as they arrive), pipe chunks directly to your output layer and also feed them to the collector. The collector is for the part that needs the complete response — routing decisions, tool call execution, logging.

It does not handle concurrent streams. One StreamCollector instance, one stream. Create a new instance per request.

It does not retry on connection drops. If the stream connection drops mid-stream, the collector has a partial response. You detect this from result().complete == False and retry the full request. Partial reassembly is not retried automatically.


Inside the Library

The collector accumulates text and tool call blocks separately:

class StreamCollector:
    def __init__(self):
        self._text_chunks: list[str] = []
        self._tool_calls: dict[str, dict] = {}  # id -> accumulated call
        self._usage: dict[str, int] = {}
        self._stop_reason: str | None = None
        self._complete = False

    def feed_anthropic(self, event) -> None:
        event_type = getattr(event, "type", None)

        if event_type == "content_block_delta":
            delta = event.delta
            if delta.type == "text_delta":
                self._text_chunks.append(delta.text)
            elif delta.type == "input_json_delta":
                tool_id = event.index  # block index
                if tool_id not in self._tool_calls:
                    self._tool_calls[tool_id] = {"json_chunks": []}
                self._tool_calls[tool_id]["json_chunks"].append(delta.partial_json)

        elif event_type == "content_block_start":
            if event.content_block.type == "tool_use":
                self._tool_calls[event.index] = {
                    "id": event.content_block.id,
                    "name": event.content_block.name,
                    "json_chunks": [],
                }

        elif event_type == "message_delta":
            if hasattr(event.delta, "stop_reason"):
                self._stop_reason = event.delta.stop_reason
            if hasattr(event, "usage"):
                self._usage["output_tokens"] = event.usage.output_tokens

        elif event_type == "message_start":
            if hasattr(event.message, "usage"):
                self._usage["input_tokens"] = event.message.usage.input_tokens

        elif event_type == "message_stop":
            self._complete = True

    def feed_openai(self, chunk) -> None:
        for choice in chunk.choices:
            delta = choice.delta
            if delta.content:
                self._text_chunks.append(delta.content)
            if delta.tool_calls:
                for tc in delta.tool_calls:
                    idx = tc.index
                    if idx not in self._tool_calls:
                        self._tool_calls[idx] = {"id": tc.id or "", "name": "", "json_chunks": []}
                    if tc.function.name:
                        self._tool_calls[idx]["name"] = tc.function.name
                    if tc.function.arguments:
                        self._tool_calls[idx]["json_chunks"].append(tc.function.arguments)
            if choice.finish_reason:
                self._stop_reason = choice.finish_reason
                self._complete = True

    def result(self) -> StreamResult:
        tool_calls = []
        for _, tc in sorted(self._tool_calls.items()):
            tool_calls.append({
                "id": tc.get("id", ""),
                "name": tc.get("name", ""),
                "input": json.loads("".join(tc["json_chunks"])) if tc["json_chunks"] else {},
            })

        return StreamResult(
            text="".join(self._text_chunks),
            tool_calls=tool_calls,
            input_tokens=self._usage.get("input_tokens", 0),
            output_tokens=self._usage.get("output_tokens", 0),
            stop_reason=self._stop_reason,
            complete=self._complete,
        )
Enter fullscreen mode Exit fullscreen mode

When to Use It

Use it when you switch from non-streaming to streaming and your agent loop needs both: stream output to the user for perceived latency, and collect the complete response for routing decisions (tool calls, stop reason detection, cost logging).

Use it when you are building a multi-provider agent that streams from different providers. The normalized StreamResult has the same fields regardless of provider so your agent loop does not need provider-specific branches.

Use it for streaming log capture. Feed chunks to the collector and also write them to your step log. When the stream ends, the collector gives you the assembled response and token counts for the full log record.

Skip it for non-streaming calls. If you are using the standard non-streaming API, the response is already assembled. The collector adds no value.


Install

pip install git+https://github.com/MukundaKatta/llm-stream-collector

# Or from PyPI
pip install llm-stream-collector
Enter fullscreen mode Exit fullscreen mode
from llm_stream_collector import StreamCollector

async def stream_to_user_and_collect(messages: list[dict]) -> StreamResult:
    collector = StreamCollector()

    async with anthropic_client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=messages,
        tools=tool_schemas,
    ) as stream:
        async for text in stream.text_stream:
            # Send to user in real time
            await websocket.send_text(text)

        # Feed all events to collector
        async for event in stream:
            collector.feed_anthropic(event)

    result = collector.result()

    # Use assembled result for agent loop routing
    if result.stop_reason == "tool_use":
        for tool_call in result.tool_calls:
            tool_result = await execute_tool(tool_call["name"], tool_call["input"])
            # Continue loop...

    return result
Enter fullscreen mode Exit fullscreen mode

Sibling Libraries

Library What it solves
agenttap Wire-level capture including raw streaming events
agent-step-log Log assembled results from the collector
llm-pretty-error Normalize streaming errors across providers
llm-content-blocks Build content blocks for the next turn from stream results
llm-fallback-chain Fall back to another provider when a stream fails

The streaming stack: llm-stream-collector for reassembly, agenttap for raw event capture, llm-fallback-chain for provider failover when streams fail.


What's Next

Async native: AsyncStreamCollector with async for event in async_generator: await collector.async_feed(event). The current sync implementation works for sync streams; async-native would be cleaner for async agent loops.

Token streaming callback: collector.on_token(callback) that fires for each text chunk before the stream ends. Useful for real-time token counting and budget enforcement while the stream is still in progress.

Partial result access: collector.partial_result() that returns the assembled-so-far state before the stream completes. Useful for implementing streaming timeouts: if the partial result looks wrong, abort the stream.


Built as part of the agent-stack family: composable Python primitives for production LLM agents.

Top comments (0)