DEV Community

binky
binky

Posted on

From Zero to Production: Claude API Integration Patterns That Scale

Three weeks after shipping our Claude-powered summarization feature, our p99 latency hit 45 seconds and we were dropping 12% of requests. The code worked perfectly in staging. Here is everything I learned rebuilding it the right way.

The Naive Implementation (and Why It Breaks)

Most tutorials show you something like this:

import anthropic

client = anthropic.Anthropic(api_key="sk-...")

def summarize(text: str) -> str:
    message = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": text}]
    )
    return message.content[0].text
Enter fullscreen mode Exit fullscreen mode

This works fine for a demo. In production, it collapses under three real pressures: no retry logic, no concurrency control, and a new HTTP connection on every call. When you hit Claude's rate limits — and you will — every queued request just fails.

Pattern 1: The Production Client Wrapper

Build a thin wrapper that handles the things you will always need:

import anthropic
import time
import logging
from typing import Optional
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

logger = logging.getLogger(__name__)

class ClaudeClient:
    def __init__(
        self,
        api_key: str,
        model: str = "claude-opus-4-5",
        max_tokens: int = 1024,
        timeout: float = 30.0,
    ):
        self.model = model
        self.max_tokens = max_tokens
        # Reuse the underlying HTTP connection pool
        self._client = anthropic.Anthropic(
            api_key=api_key,
            timeout=anthropic.Timeout(
                connect=5.0,
                read=timeout,
                write=10.0,
                pool=5.0,
            ),
        )

    @retry(
        retry=retry_if_exception_type(
            (anthropic.RateLimitError, anthropic.APIStatusError)
        ),
        wait=wait_exponential(multiplier=1, min=2, max=60),
        stop=stop_after_attempt(4),
        reraise=True,
    )
    def complete(
        self,
        prompt: str,
        system: Optional[str] = None,
        max_tokens: Optional[int] = None,
    ) -> str:
        start = time.monotonic()
        messages = [{"role": "user", "content": prompt}]
        kwargs = {
            "model": self.model,
            "max_tokens": max_tokens or self.max_tokens,
            "messages": messages,
        }
        if system:
            kwargs["system"] = system

        try:
            response = self._client.messages.create(**kwargs)
            elapsed = time.monotonic() - start
            logger.info(
                "claude_request",
                extra={
                    "elapsed_ms": round(elapsed * 1000),
                    "input_tokens": response.usage.input_tokens,
                    "output_tokens": response.usage.output_tokens,
                    "model": self.model,
                },
            )
            return response.content[0].text
        except anthropic.APIStatusError as e:
            if e.status_code == 529:  # overloaded
                logger.warning("Claude API overloaded, retrying...")
                raise
            raise
Enter fullscreen mode Exit fullscreen mode

The key decisions here: tenacity handles retries with exponential backoff, the Timeout object lets you tune each phase of the connection separately (connect timeout vs read timeout are very different problems), and the structured log gives you the token usage you need to understand your bill.

Pattern 2: Async with Concurrency Control

If you are processing batches — documents, user requests, anything in a loop — you need async with a semaphore. Without the semaphore, you fire every request simultaneously and saturate the rate limit immediately.

import asyncio
import anthropic
from typing import Sequence

class AsyncClaudeClient:
    def __init__(
        self,
        api_key: str,
        model: str = "claude-opus-4-5",
        max_concurrent: int = 10,  # tune per your tier
    ):
        self.model = model
        self._client = anthropic.AsyncAnthropic(api_key=api_key)
        self._sem = asyncio.Semaphore(max_concurrent)

    async def complete(self, prompt: str, system: str = "") -> str:
        async with self._sem:
            response = await self._client.messages.create(
                model=self.model,
                max_tokens=1024,
                system=system,
                messages=[{"role": "user", "content": prompt}],
            )
            return response.content[0].text

    async def batch_complete(
        self,
        prompts: Sequence[str],
        system: str = "",
    ) -> list[str]:
        tasks = [self.complete(p, system) for p in prompts]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Usage
async def process_documents(docs: list[str]) -> list[str]:
    client = AsyncClaudeClient(api_key="sk-...", max_concurrent=8)
    system = "Summarize the following document in 3 bullet points."
    results = await client.batch_complete(docs, system=system)

    # gather returns exceptions as values, handle them
    return [
        r if isinstance(r, str) else f"ERROR: {r}"
        for r in results
    ]
Enter fullscreen mode Exit fullscreen mode

max_concurrent=8 is not arbitrary. Start with your rate limit in requests-per-minute divided by 60, then multiply by your average response time in seconds. For a 60 RPM limit with 3-second average responses, that is about 3 concurrent requests. Buffer up from there once you have real metrics.

The Debugging Story: When Retries Made Things Worse

After deploying the retry wrapper, our error rate dropped but our average latency nearly doubled. The logs showed requests succeeding on the third or fourth attempt constantly, which looked like a win — but the wall-clock time for users was now 20+ seconds on bad luck runs.

I assumed the retries were working correctly and started looking at the wrong things: network topology, DNS resolution, even our load balancer config. Two days of wrong assumptions.

The actual problem: our wait_exponential(min=2, max=60) was fine, but we had forgotten that anthropic.APIStatusError covers all 4xx and 5xx errors. We were retrying 400 Bad Request errors — malformed prompts — and waiting up to 60 seconds on requests that would never succeed.

# Pulled this from our structured logs to diagnose
$ grep '"status_code": 400' app.log | wc -l
847

$ grep '"attempt": 4' app.log | wc -l  
203
Enter fullscreen mode Exit fullscreen mode

203 requests had burned through all 4 retry attempts. Almost all of them were 400s from a prompt template bug, not transient errors at all.

The fix was straightforward — be specific about which errors warrant a retry:

def _is_retryable(exception: BaseException) -> bool:
    if isinstance(exception, anthropic.RateLimitError):
        return True
    if isinstance(exception, anthropic.APIStatusError):
        # Only retry server errors and overload, not client errors
        return exception.status_code in {429, 500, 502, 503, 529}
    if isinstance(exception, anthropic.APIConnectionError):
        return True
    return False

# In your @retry decorator:
retry=retry_if_exception(is_retryable),
Enter fullscreen mode Exit fullscreen mode

Latency dropped back to normal within an hour of the deploy.

Pattern 3: Streaming for Long Responses

For any output over a few sentences, streaming is the difference between a good UX and users assuming the page is broken. The token-level streaming from Claude maps cleanly to server-sent events.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json

app = FastAPI()
client = anthropic.Anthropic(api_key="sk-...")

@app.post("/stream")
async def stream_response(prompt: str):
    def generate():
        with client.messages.stream(
            model="claude-opus-4-5",
            max_tokens=2048,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            for text in stream.text_stream:
                # SSE format
                yield f"data: {json.dumps({'text': text})}\n\n"

            # Send final usage stats for client-side logging
            final = stream.get_final_message()
            yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # critical for nginx
        },
    )
Enter fullscreen mode Exit fullscreen mode

The X-Accel-Buffering: no header is the one that actually trips people up. Without it, nginx buffers the entire response before sending it downstream, and your streaming UI shows nothing until the request completes. This bit us in staging (where we had no nginx) but not in local dev.

Prompt Versioning and the Config Layer

Hard-coding prompts in your application code is fine until it isn't. The first time a prompt change requires a full deploy cycle, you will want a config layer.

import os
from functools import lru_cache
from pathlib import Path
import yaml

PROMPT_DIR = Path(__file__).parent / "prompts"

@lru_cache(maxsize=None)
def load_prompt(name: str, version: str = "latest") -> dict:
    """Load a versioned prompt template from disk or a config store."""
    prompt_path = PROMPT_DIR / f"{name}.yaml"
    with open(prompt_path) as f:
        config = yaml.safe_load(f)

    versions = config["versions"]
    if version == "latest":
        version = max(versions.keys())

    return versions[version]

# prompts/summarize.yaml
# versions:
#   v1:
#     system: "You are a concise summarizer."
#     user_template: "Summarize this: {text}"
#   v2:
#     system: "You are a precise technical writer."
#     user_template: "Provide a 3-bullet summary of: {text}"

def summarize_document(text: str, prompt_version: str = "latest") -> str:
    prompt_config = load_prompt("summarize", prompt_version)
    client = ClaudeClient(api_key=os.environ["ANTHROPIC_API_KEY"])
    return client.complete(
        prompt=prompt_config["user_template"].format(text=text),
        system=prompt_config["system"],
    )
Enter fullscreen mode Exit fullscreen mode

This gives you A/B testing capability and instant rollback without a code deploy. lru_cache keeps it from hammering disk on every request.

Running It All

# Install dependencies
pip install anthropic tenacity fastapi uvicorn pyyaml

# Set your key
export ANTHROPIC_API_KEY=sk-ant-...

# Run the streaming endpoint
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

# Quick smoke test
curl -X POST "http://localhost:8000/stream" \
  -H "Content-Type: application/json" \
  -d '{"prompt": "Explain async/await in Python in 3 sentences"}' \
  --no-buffer

# Expected output (streaming):
# data: {"text": "Async"}
# data: {"text": "/await"}
# data: {"text": " in Python"}
# ...
# data: {"done": true, "usage": {"input": 18, "output": 47}}
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  • Retry only retryable errors — 400s burn your budget and your latency if you retry them blindly
  • Use asyncio.Semaphore for batch jobs; without it you will saturate rate limits immediately on any non-trivial workload
  • Set X-Accel-Buffering: no on streaming endpoints behind nginx or you will debug ghost latency for hours
  • Log token counts on every request from day one — your cost model will thank you when traffic spikes
  • Version your prompts outside application code; the first time you need an emergency prompt rollback you will understand why

Follow for more practical AI and productivity content.

Top comments (0)