DEV Community

Sangmin Lee
Sangmin Lee

Posted on • Originally published at claudeguide.io

Streaming vs Batch in Claude Agent SDK: When to Use Which

Originally published at claudeguide.io/claude-streaming-batch-agent

Streaming vs Batch in Claude Agent SDK: When to Use Which

Streaming delivers tokens as they're generated — good for chat UX and long responses. Batch processes multiple requests at once — good for throughput and 50% cost reduction on offline workloads. Most production agents need both: streaming for user-facing interactions, batch for background processing. This guide covers the implementation patterns for each and when to use which.


The Core Trade-off

Streaming Batch
First token latency Immediate Delayed (queued)
UX perception Fast Slow
Throughput 1 request at a time Many requests parallel
Cost Standard pricing 50% discount (async batch)
Best for Chat, interactive agents Bulk processing, offline tasks

Streaming: Real-Time Token Delivery

Basic streaming implementation

import anthropic

client = anthropic.Anthropic()


def stream_response(prompt: str):
    """Stream Claude's response and print tokens as they arrive."""
    with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
        print()  # New line after completion

        # Get final message with usage stats
        final_message = stream.get_final_message()
        return final_message


stream_response("Explain the concept of closures in JavaScript")
Enter fullscreen mode Exit fullscreen mode

Streaming with Server-Sent Events (for web UIs)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json

app = FastAPI()
client = anthropic.Anthropic()


@app.post("/chat/stream")
async def chat_stream(request: dict):
    user_message = request.get("message", "")

    async def generate():
        with client.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            messages=[{"role": "user", "content": user_message}]
        ) as stream:
            for text in stream.text_stream:
                # SSE format: data: {...}\n\n
                yield f"data: {json.dumps({'text': text})}\n\n"

            # Signal completion
            final = stream.get_final_message()
            yield f"data: {json.dumps({'done': True, 'total_tokens': final.usage.input_tokens + final.usage.output_tokens})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )
Enter fullscreen mode Exit fullscreen mode

TypeScript streaming (Next.js API route)

// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

export async function POST(req: Request) {
  const { message } = await req.json();

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      const response = await client.messages.create({
        model: "claude-sonnet-4-5",
        max_tokens: 2048,
        messages: [{ role: "user", content: message }],
        stream: true,
      });

      for await (const event of response) {
        if (
          event.type === "content_block_delta" &&
          event.delta.type === "text_delta"
        ) {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify({ text: event.delta.text })}\n\n`)
          );
        }
        if (event.type === "message_stop") {
          controller.enqueue(encoder.encode("data: [DONE]\n\n"));
          controller.close();
        }
      }
    },
  });

  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream" },
  });
}
Enter fullscreen mode Exit fullscreen mode

Batch Processing: High Throughput at Lower Cost

When batch is the right choice

  • Processing 10+ documents
  • Background summarization jobs
  • Nightly content generation
  • Bulk analysis pipelines
  • Any workload where results aren't needed immediately

Parallel batch with asyncio


python
import asyncio
import anthropic
from dataclasses import dataclass
from typing import Callable


@dataclass
class BatchJob:
    id: str
    prompt: str
    metadata: dict = None


@dataclass
class BatchResult:
    job_id: str
    output: str
    input_tokens: int
    output_tokens: int
    error: str = None


async def process_single(
    client: anthropic.AsyncAnthropic,
    job: BatchJob,
    semaphore: asyncio.Semaphore
) -

[→ Get the Agent SDK Cookbook — $49](https://shoutfirst.gumroad.com/l/ogxhmy?utm_source=claudeguide&utm_medium=article&utm_campaign=claude-streaming-batch-agent)

*30-day money-back guarantee. Instant download.*
Enter fullscreen mode Exit fullscreen mode

Top comments (0)