I was stitching together a Rust agent that needed to stream Anthropic responses. The official Anthropic Rust SDK was not out yet, so I reached for the HTTP client directly. That part worked fine. Then I had to parse the SSE payload.
My first attempt handled message_start, content_block_delta, and message_stop. Looked complete. Worked on text completions. The moment I added a tool call, the parser silently swallowed the input_json_delta events. The tool args just never showed up. No panic, no error, nothing. The stream drained, the tool was empty.
Fixing it meant re-reading the Anthropic streaming docs, finding the missing variant, patching the match arm, and then discovering I had also missed message_delta with usage fields. Another patch. I realized I was hand-rolling a complete SSE event type system from scratch, and that work belongs in a library, not buried in an agent project.
That library is claude-stream-rs.
The shape of the fix
The crate parses a stream of bytes into a typed Event enum. Every variant the Anthropic streaming API emits is covered.
Add it to your project:
[dependencies]
claude-stream-rs = "0.1"
Then parse events as bytes arrive:
use claude_stream_rs::{Parser, Event};
let mut parser = Parser::new();
// Feed raw SSE bytes from your HTTP client, file, or test fixture
for chunk in response_chunks {
for event in parser.feed(&chunk) {
match event {
Event::MessageStart { message } => {
println!("model: {}", message.model);
}
Event::ContentBlockDelta { index, delta } => {
if let Some(text) = delta.text {
print!("{}", text);
}
if let Some(partial_json) = delta.partial_json {
// Accumulate tool input args
tool_input.push_str(&partial_json);
}
}
Event::MessageDelta { delta, usage } => {
println!("\nstop_reason: {:?}", delta.stop_reason);
println!("output_tokens: {}", usage.output_tokens);
}
Event::MessageStop => {
println!("stream complete");
}
Event::Error { error } => {
eprintln!("stream error: {}", error.message);
}
_ => {}
}
}
}
The Event enum covers all variants:
pub enum Event {
MessageStart { message: MessageStartData },
ContentBlockStart { index: u32, content_block: ContentBlock },
ContentBlockDelta { index: u32, delta: ContentBlockDeltaData },
ContentBlockStop { index: u32 },
MessageDelta { delta: MessageDeltaData, usage: MessageDeltaUsage },
MessageStop,
Error { error: StreamError },
}
ContentBlockDeltaData has both text and partial_json fields to cover text completions and tool call input accumulation. If you only care about text streaming, ignore partial_json. If you are building a tool dispatcher, accumulate partial_json values until ContentBlockStop and then parse the full JSON.
What it does NOT do
- It does not make HTTP requests. Bring your own client: reqwest, hyper, ureq, whatever you already use.
- It does not require tokio or any async runtime. The
Parseris a plain synchronous struct. - It does not accumulate tool args for you. You get delta chunks. Concatenation is one line in your code and belongs to you.
- It does not validate the final assembled JSON for tool inputs. After
ContentBlockStop, parse withserde_jsonyourself.
Inside the lib: HTTP-agnostic bytes in, events out
The core design decision: the parser owns no I/O.
pub struct Parser {
buffer: String,
}
impl Parser {
pub fn new() -> Self { ... }
pub fn feed(&mut self, bytes: &[u8]) -> Vec<Event> { ... }
}
You call feed() with whatever bytes you have. The parser appends them to an internal buffer, scans for complete SSE frames (separated by double newlines), parses each frame, and returns the events it found. Partial frames stay in the buffer until the next feed() call completes them.
This design has two practical benefits.
First, it is trivially testable. You do not need a live Anthropic API key to test streaming logic. Write the expected SSE bytes as a string literal in your test, feed them in, assert on the events you get back. The test runs offline, runs fast, and never touches the network.
#[test]
fn parses_text_delta() {
let mut parser = Parser::new();
let sse = b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hello\"}}\n\n";
let events = parser.feed(sse);
assert_eq!(events.len(), 1);
if let Event::ContentBlockDelta { delta, .. } = &events[0] {
assert_eq!(delta.text.as_deref(), Some("Hello"));
}
}
Second, it composes with any async runtime. If you are on tokio, wrap feed() calls in a loop over your StreamExt chunks. If you are on async-std, same thing. If you are doing a blocking request with ureq, call feed() in a plain loop. The parser does not care.
// tokio + reqwest example
use futures_util::StreamExt;
let mut stream = response.bytes_stream();
let mut parser = Parser::new();
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
for event in parser.feed(&bytes) {
// handle event
}
}
The bytes-in/events-out interface is a deliberate inversion of the more common pattern where you hand the library your HTTP client and it does everything. That pattern hides how streaming works and makes testing harder. This pattern keeps the logic narrow and lets you see every byte.
When this is useful
You are calling the Anthropic API directly without the official SDK, in a Rust project that already has its own HTTP layer. You do not want to pull in a new async runtime just to parse SSE. You want to write tests for streaming behavior without a live API key.
You are building a tool-calling agent and you hit the exact bug I hit: input_json_delta events silently dropped because your hand-rolled match arm was missing the variant. Using a typed enum means the compiler catches missing variants at compile time when you add a new match arm.
You are recording streaming responses to a file for replay in tests, and you need a parser that works equally well on live HTTP chunks and on replayed file bytes.
When this is NOT what you want
If the official Anthropic Rust SDK ships with built-in streaming support, use that. The SDK will have auth, retry, and error handling baked in alongside the event types. This crate is for the gap period and for projects that want to stay SDK-free.
If you need to build streaming logic on top of OpenAI or another provider, the event shape is different. This crate is Anthropic-specific.
Install
[dependencies]
claude-stream-rs = "0.1"
Repo: https://github.com/MukundaKatta/claude-stream-rs
Sibling libraries
| Library | Boundary | Repo |
|---|---|---|
| agenttap | Python wire-level capture, see the bytes before they hit the stream parser | MukundaKatta/agenttap |
| agentsnap-rs | Snapshot what events the stream produced, for regression testing | MukundaKatta/agentsnap-rs |
| claude-cost | Calculate cost from the output_tokens field in MessageDelta usage |
MukundaKatta/claude-cost |
| llm-content-blocks-rs | Build content blocks before the request, parse them back out of the stream | MukundaKatta/llm-content-blocks-rs |
They form a pipeline. Build the request with llm-content-blocks-rs. Send it. Parse the response with claude-stream-rs. Feed the usage numbers into claude-cost. Snapshot the event sequence with agentsnap-rs for regression coverage.
What's next
The thing I want most is a streaming accumulator that builds a complete Message from a sequence of events. Right now you have to concatenate text deltas and tool input deltas yourself. That accumulation is always the same few lines, and they should live in the library.
I also want to add a replay utility that takes a saved sequence of SSE frames from a file and feeds them through the parser at controlled timing. That would let you write slow-network tests locally without a real API.
If you have ever lost time to a missing match arm on a streaming event type, you know why this started as a library instead of staying as inline code.
Top comments (0)