The first async AI application most Python developers write looks like this:
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def summarize(text: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize: {text}"}]
)
return response.content[0].text
async def main():
results = await asyncio.gather(
summarize(doc1),
summarize(doc2),
summarize(doc3),
)
It works. Then you run it against 500 documents and get a mix of rate limit errors, connection timeouts, and partial results. Some tasks complete, some silently fail, and you have no idea which.
This post covers the async patterns that actually hold under real load: bounded concurrency, retry with backoff, result collection with error isolation, and cancellation.
The problem with unbounded gather
asyncio.gather(*[summarize(doc) for doc in docs]) fires every task simultaneously. With 500 documents, that's 500 concurrent API connections. Three things happen:
- Rate limit errors. Most AI APIs have per-minute token limits. 500 simultaneous requests will hit them instantly.
-
Connection pool exhaustion. The default
httpxconnection pool that backs the Anthropic SDK has a default limit of 100 connections. Beyond that, requests queue or fail. -
Error propagation.
asyncio.gatherby default raises the first exception and cancels remaining tasks. One bad document kills the batch.
The fix is a semaphore.
Bounded concurrency with asyncio.Semaphore
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
semaphore = asyncio.Semaphore(10) # max 10 concurrent API calls
async def summarize_bounded(text: str) -> str:
async with semaphore:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize: {text}"}]
)
return response.content[0].text
async def process_batch(docs: list[str]) -> list[str]:
tasks = [summarize_bounded(doc) for doc in docs]
return await asyncio.gather(*tasks)
The semaphore acts as a queue — 10 tasks run concurrently, the rest wait. Rate limit errors drop significantly. Connection pool stays healthy.
Tuning the semaphore value: Start at 5 for development. For production, tune to floor(rate_limit_per_minute / avg_seconds_per_call / 60). If your API allows 60,000 tokens/minute and each call uses ~1,000 tokens and takes ~1 second, 60 concurrent calls is theoretically safe — use 40-50 to leave headroom.
Retry with exponential backoff
Rate limit errors still happen even with bounded concurrency — you share quota with other processes, quotas vary by time of day, and the API occasionally returns 529s. A retry decorator:
import asyncio
import random
import logging
from functools import wraps
from anthropic import RateLimitError, APIStatusError
logger = logging.getLogger(__name__)
def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except RateLimitError:
if attempt == max_retries:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Rate limit hit, retry {attempt + 1}/{max_retries} in {delay:.1f}s")
await asyncio.sleep(delay)
except APIStatusError as e:
if e.status_code in (500, 502, 503, 529):
if attempt == max_retries:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
else:
raise # 400s, auth errors — don't retry
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=2.0)
async def summarize_with_retry(text: str) -> str:
async with semaphore:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize: {text}"}]
)
return response.content[0].text
Key details:
- Exponential backoff (
base_delay * 2^attempt) with jitter (random.uniform(0, 1)) prevents thundering herd — all retrying tasks don't fire at the same moment - Only retry on rate limits (429) and transient server errors (500, 502, 503, 529)
- Don't retry 400-range errors — bad requests won't succeed on retry and you'll waste quota
Error isolation in batch processing
asyncio.gather(*tasks) propagates the first exception. For batch processing where partial success is acceptable, use return_exceptions=True:
from dataclasses import dataclass
from typing import Any
@dataclass
class BatchResult:
index: int
success: bool
value: str | None
error: Exception | None
async def process_batch_isolated(docs: list[str]) -> list[BatchResult]:
tasks = [summarize_with_retry(doc) for doc in docs]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, result in enumerate(raw_results):
if isinstance(result, Exception):
logger.error(f"Doc {i} failed: {type(result).__name__}: {result}")
results.append(BatchResult(index=i, success=False, value=None, error=result))
else:
results.append(BatchResult(index=i, success=True, value=result, error=None))
failed = sum(1 for r in results if not r.success)
logger.info(f"Batch complete: {len(results) - failed}/{len(results)} succeeded")
return results
With return_exceptions=True, exceptions are returned as values in the results list rather than raised. You decide what to do with failures: log and continue, re-queue for retry, write to a dead-letter queue, or raise.
Progress tracking for long batches
For batches that take minutes, you want progress updates without blocking:
import asyncio
from tqdm.asyncio import tqdm
async def process_with_progress(docs: list[str]) -> list[BatchResult]:
semaphore = asyncio.Semaphore(10)
async def process_one(i: int, doc: str) -> BatchResult:
async with semaphore:
try:
result = await summarize_with_retry(doc)
return BatchResult(index=i, success=True, value=result, error=None)
except Exception as e:
return BatchResult(index=i, success=False, value=None, error=e)
tasks = [process_one(i, doc) for i, doc in enumerate(docs)]
results = []
async for result in tqdm.as_completed(tasks, desc="Processing docs", total=len(docs)):
results.append(await result)
# Restore original order
results.sort(key=lambda r: r.index)
return results
tqdm.asyncio.tqdm.as_completed wraps asyncio.as_completed with a progress bar. Results arrive in completion order, so sort by index at the end if you need original ordering.
Timeouts and cancellation
AI API calls can hang. The AsyncAnthropic client has a default timeout, but you might want stricter control:
async def summarize_with_timeout(text: str, timeout: float = 30.0) -> str:
try:
async with asyncio.timeout(timeout):
async with semaphore:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize: {text}"}]
)
return response.content[0].text
except asyncio.TimeoutError:
logger.warning(f"Timeout after {timeout}s for doc (len={len(text)})")
raise
asyncio.timeout (Python 3.11+) is cleaner than asyncio.wait_for — it raises TimeoutError and properly cancels the underlying task. Set timeout based on your max_tokens and expected model latency, not a round number.
For Python < 3.11, use asyncio.wait_for:
result = await asyncio.wait_for(
client.messages.create(...),
timeout=30.0
)
Putting it together: a production batch processor
import asyncio
import logging
from dataclasses import dataclass
from typing import Callable, Awaitable
from anthropic import AsyncAnthropic, RateLimitError, APIStatusError
logger = logging.getLogger(__name__)
@dataclass
class TaskResult[T]:
index: int
success: bool
value: T | None
error: Exception | None
class BatchProcessor[T]:
"""Bounded-concurrency batch processor with retry and error isolation."""
def __init__(
self,
concurrency: int = 10,
max_retries: int = 3,
timeout: float = 60.0,
):
self.semaphore = asyncio.Semaphore(concurrency)
self.max_retries = max_retries
self.timeout = timeout
async def _run_with_retry(
self, fn: Callable[..., Awaitable[T]], *args, **kwargs
) -> T:
for attempt in range(self.max_retries + 1):
try:
async with asyncio.timeout(self.timeout):
async with self.semaphore:
return await fn(*args, **kwargs)
except RateLimitError:
if attempt == self.max_retries:
raise
delay = 2.0 * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
except APIStatusError as e:
if e.status_code >= 500 and attempt < self.max_retries:
await asyncio.sleep(2.0 * (2 ** attempt))
else:
raise
async def process(
self,
fn: Callable[..., Awaitable[T]],
items: list,
) -> list[TaskResult[T]]:
async def run_one(i: int, item) -> TaskResult[T]:
try:
value = await self._run_with_retry(fn, item)
return TaskResult(index=i, success=True, value=value, error=None)
except Exception as e:
logger.error(f"Item {i} failed permanently: {e}")
return TaskResult(index=i, success=False, value=None, error=e)
tasks = [run_one(i, item) for i, item in enumerate(items)]
results = await asyncio.gather(*tasks, return_exceptions=False)
results.sort(key=lambda r: r.index)
success_count = sum(1 for r in results if r.success)
logger.info(f"Batch: {success_count}/{len(results)} succeeded")
return results
# Usage
async def main():
import random
client = AsyncAnthropic()
processor = BatchProcessor(concurrency=10, max_retries=3)
async def summarize(doc: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize: {doc}"}]
)
return response.content[0].text
docs = ["doc text here"] * 100
results = await processor.process(summarize, docs)
successes = [r.value for r in results if r.success]
failures = [r for r in results if not r.success]
print(f"Processed {len(successes)} docs, {len(failures)} failed")
When to use threads instead of async
Async works when the bottleneck is I/O wait — network calls, file reads, database queries. It doesn't help when the bottleneck is CPU.
If you're doing heavy post-processing on AI outputs (parsing, classification, regex over large texts), use asyncio.to_thread to run that work in a thread pool without blocking the event loop:
import asyncio
import re
def extract_entities(text: str) -> list[str]:
# CPU-bound text processing — runs in thread pool
pattern = re.compile(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b")
return pattern.findall(text)
async def analyze_with_extraction(doc: str) -> tuple[str, list[str]]:
# AI call — async I/O
summary = await summarize_with_retry(doc)
# CPU work — offload to thread pool
entities = await asyncio.to_thread(extract_entities, summary)
return summary, entities
asyncio.to_thread runs the sync function in the default ThreadPoolExecutor and awaits the result. The event loop remains unblocked. For truly heavy CPU work (model inference, large numpy operations), use ProcessPoolExecutor instead.
A checklist for async AI applications
-
asyncio.Semaphoreon every batch — never unbounded gather against external APIs - Exponential backoff with jitter on rate limit and 5xx errors
-
return_exceptions=Trueon gather for batch processing — let failures be values, not raised exceptions - Timeout on every API call — never trust external latency to be bounded
-
asyncio.to_threadfor CPU-bound post-processing — keep the event loop clear
The async patterns in the AI Dev Toolkit include prompt templates for generating asyncio batch processors, retry decorators, and progress-tracked pipelines from function signatures — so you're not writing boilerplate from scratch each time you build a new AI workflow.
Further reading
- asyncio docs — synchronization primitives
- Anthropic Python SDK — async usage
- Python type hints for AI workflows — earlier post in this series
- Structured LLM outputs with Pydantic v2 — previous post in this series
Top comments (0)