Three weeks after shipping our Claude-powered summarization feature, our p99 latency hit 45 seconds and we were dropping 12% of requests. The code worked perfectly in staging. Here is everything I learned rebuilding it the right way.
The Naive Implementation (and Why It Breaks)
Most tutorials show you something like this:
import anthropic
client = anthropic.Anthropic(api_key="sk-...")
def summarize(text: str) -> str:
message = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": text}]
)
return message.content[0].text
This works fine for a demo. In production, it collapses under three real pressures: no retry logic, no concurrency control, and a new HTTP connection on every call. When you hit Claude's rate limits — and you will — every queued request just fails.
Pattern 1: The Production Client Wrapper
Build a thin wrapper that handles the things you will always need:
import anthropic
import time
import logging
from typing import Optional
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
logger = logging.getLogger(__name__)
class ClaudeClient:
def __init__(
self,
api_key: str,
model: str = "claude-opus-4-5",
max_tokens: int = 1024,
timeout: float = 30.0,
):
self.model = model
self.max_tokens = max_tokens
# Reuse the underlying HTTP connection pool
self._client = anthropic.Anthropic(
api_key=api_key,
timeout=anthropic.Timeout(
connect=5.0,
read=timeout,
write=10.0,
pool=5.0,
),
)
@retry(
retry=retry_if_exception_type(
(anthropic.RateLimitError, anthropic.APIStatusError)
),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(4),
reraise=True,
)
def complete(
self,
prompt: str,
system: Optional[str] = None,
max_tokens: Optional[int] = None,
) -> str:
start = time.monotonic()
messages = [{"role": "user", "content": prompt}]
kwargs = {
"model": self.model,
"max_tokens": max_tokens or self.max_tokens,
"messages": messages,
}
if system:
kwargs["system"] = system
try:
response = self._client.messages.create(**kwargs)
elapsed = time.monotonic() - start
logger.info(
"claude_request",
extra={
"elapsed_ms": round(elapsed * 1000),
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"model": self.model,
},
)
return response.content[0].text
except anthropic.APIStatusError as e:
if e.status_code == 529: # overloaded
logger.warning("Claude API overloaded, retrying...")
raise
raise
The key decisions here: tenacity handles retries with exponential backoff, the Timeout object lets you tune each phase of the connection separately (connect timeout vs read timeout are very different problems), and the structured log gives you the token usage you need to understand your bill.
Pattern 2: Async with Concurrency Control
If you are processing batches — documents, user requests, anything in a loop — you need async with a semaphore. Without the semaphore, you fire every request simultaneously and saturate the rate limit immediately.
import asyncio
import anthropic
from typing import Sequence
class AsyncClaudeClient:
def __init__(
self,
api_key: str,
model: str = "claude-opus-4-5",
max_concurrent: int = 10, # tune per your tier
):
self.model = model
self._client = anthropic.AsyncAnthropic(api_key=api_key)
self._sem = asyncio.Semaphore(max_concurrent)
async def complete(self, prompt: str, system: str = "") -> str:
async with self._sem:
response = await self._client.messages.create(
model=self.model,
max_tokens=1024,
system=system,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
async def batch_complete(
self,
prompts: Sequence[str],
system: str = "",
) -> list[str]:
tasks = [self.complete(p, system) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
async def process_documents(docs: list[str]) -> list[str]:
client = AsyncClaudeClient(api_key="sk-...", max_concurrent=8)
system = "Summarize the following document in 3 bullet points."
results = await client.batch_complete(docs, system=system)
# gather returns exceptions as values, handle them
return [
r if isinstance(r, str) else f"ERROR: {r}"
for r in results
]
max_concurrent=8 is not arbitrary. Start with your rate limit in requests-per-minute divided by 60, then multiply by your average response time in seconds. For a 60 RPM limit with 3-second average responses, that is about 3 concurrent requests. Buffer up from there once you have real metrics.
The Debugging Story: When Retries Made Things Worse
After deploying the retry wrapper, our error rate dropped but our average latency nearly doubled. The logs showed requests succeeding on the third or fourth attempt constantly, which looked like a win — but the wall-clock time for users was now 20+ seconds on bad luck runs.
I assumed the retries were working correctly and started looking at the wrong things: network topology, DNS resolution, even our load balancer config. Two days of wrong assumptions.
The actual problem: our wait_exponential(min=2, max=60) was fine, but we had forgotten that anthropic.APIStatusError covers all 4xx and 5xx errors. We were retrying 400 Bad Request errors — malformed prompts — and waiting up to 60 seconds on requests that would never succeed.
# Pulled this from our structured logs to diagnose
$ grep '"status_code": 400' app.log | wc -l
847
$ grep '"attempt": 4' app.log | wc -l
203
203 requests had burned through all 4 retry attempts. Almost all of them were 400s from a prompt template bug, not transient errors at all.
The fix was straightforward — be specific about which errors warrant a retry:
def _is_retryable(exception: BaseException) -> bool:
if isinstance(exception, anthropic.RateLimitError):
return True
if isinstance(exception, anthropic.APIStatusError):
# Only retry server errors and overload, not client errors
return exception.status_code in {429, 500, 502, 503, 529}
if isinstance(exception, anthropic.APIConnectionError):
return True
return False
# In your @retry decorator:
retry=retry_if_exception(is_retryable),
Latency dropped back to normal within an hour of the deploy.
Pattern 3: Streaming for Long Responses
For any output over a few sentences, streaming is the difference between a good UX and users assuming the page is broken. The token-level streaming from Claude maps cleanly to server-sent events.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic(api_key="sk-...")
@app.post("/stream")
async def stream_response(prompt: str):
def generate():
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
# SSE format
yield f"data: {json.dumps({'text': text})}\n\n"
# Send final usage stats for client-side logging
final = stream.get_final_message()
yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # critical for nginx
},
)
The X-Accel-Buffering: no header is the one that actually trips people up. Without it, nginx buffers the entire response before sending it downstream, and your streaming UI shows nothing until the request completes. This bit us in staging (where we had no nginx) but not in local dev.
Prompt Versioning and the Config Layer
Hard-coding prompts in your application code is fine until it isn't. The first time a prompt change requires a full deploy cycle, you will want a config layer.
import os
from functools import lru_cache
from pathlib import Path
import yaml
PROMPT_DIR = Path(__file__).parent / "prompts"
@lru_cache(maxsize=None)
def load_prompt(name: str, version: str = "latest") -> dict:
"""Load a versioned prompt template from disk or a config store."""
prompt_path = PROMPT_DIR / f"{name}.yaml"
with open(prompt_path) as f:
config = yaml.safe_load(f)
versions = config["versions"]
if version == "latest":
version = max(versions.keys())
return versions[version]
# prompts/summarize.yaml
# versions:
# v1:
# system: "You are a concise summarizer."
# user_template: "Summarize this: {text}"
# v2:
# system: "You are a precise technical writer."
# user_template: "Provide a 3-bullet summary of: {text}"
def summarize_document(text: str, prompt_version: str = "latest") -> str:
prompt_config = load_prompt("summarize", prompt_version)
client = ClaudeClient(api_key=os.environ["ANTHROPIC_API_KEY"])
return client.complete(
prompt=prompt_config["user_template"].format(text=text),
system=prompt_config["system"],
)
This gives you A/B testing capability and instant rollback without a code deploy. lru_cache keeps it from hammering disk on every request.
Running It All
# Install dependencies
pip install anthropic tenacity fastapi uvicorn pyyaml
# Set your key
export ANTHROPIC_API_KEY=sk-ant-...
# Run the streaming endpoint
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
# Quick smoke test
curl -X POST "http://localhost:8000/stream" \
-H "Content-Type: application/json" \
-d '{"prompt": "Explain async/await in Python in 3 sentences"}' \
--no-buffer
# Expected output (streaming):
# data: {"text": "Async"}
# data: {"text": "/await"}
# data: {"text": " in Python"}
# ...
# data: {"done": true, "usage": {"input": 18, "output": 47}}
Key Takeaways
- Retry only retryable errors — 400s burn your budget and your latency if you retry them blindly
- Use
asyncio.Semaphorefor batch jobs; without it you will saturate rate limits immediately on any non-trivial workload - Set
X-Accel-Buffering: noon streaming endpoints behind nginx or you will debug ghost latency for hours - Log token counts on every request from day one — your cost model will thank you when traffic spikes
- Version your prompts outside application code; the first time you need an emergency prompt rollback you will understand why
Follow for more practical AI and productivity content.
Top comments (0)