DEV Community

Peyton Green
Peyton Green

Posted on

Async Python for AI Applications: Patterns That Don't Break Under Load

The first async AI application most Python developers write looks like this:

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def summarize(text: str) -> str:
    response = await client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{"role": "user", "content": f"Summarize: {text}"}]
    )
    return response.content[0].text

async def main():
    results = await asyncio.gather(
        summarize(doc1),
        summarize(doc2),
        summarize(doc3),
    )
Enter fullscreen mode Exit fullscreen mode

It works. Then you run it against 500 documents and get a mix of rate limit errors, connection timeouts, and partial results. Some tasks complete, some silently fail, and you have no idea which.

This post covers the async patterns that actually hold under real load: bounded concurrency, retry with backoff, result collection with error isolation, and cancellation.


The problem with unbounded gather

asyncio.gather(*[summarize(doc) for doc in docs]) fires every task simultaneously. With 500 documents, that's 500 concurrent API connections. Three things happen:

  1. Rate limit errors. Most AI APIs have per-minute token limits. 500 simultaneous requests will hit them instantly.
  2. Connection pool exhaustion. The default httpx connection pool that backs the Anthropic SDK has a default limit of 100 connections. Beyond that, requests queue or fail.
  3. Error propagation. asyncio.gather by default raises the first exception and cancels remaining tasks. One bad document kills the batch.

The fix is a semaphore.


Bounded concurrency with asyncio.Semaphore

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()
semaphore = asyncio.Semaphore(10)  # max 10 concurrent API calls

async def summarize_bounded(text: str) -> str:
    async with semaphore:
        response = await client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            messages=[{"role": "user", "content": f"Summarize: {text}"}]
        )
        return response.content[0].text

async def process_batch(docs: list[str]) -> list[str]:
    tasks = [summarize_bounded(doc) for doc in docs]
    return await asyncio.gather(*tasks)
Enter fullscreen mode Exit fullscreen mode

The semaphore acts as a queue — 10 tasks run concurrently, the rest wait. Rate limit errors drop significantly. Connection pool stays healthy.

Tuning the semaphore value: Start at 5 for development. For production, tune to floor(rate_limit_per_minute / avg_seconds_per_call / 60). If your API allows 60,000 tokens/minute and each call uses ~1,000 tokens and takes ~1 second, 60 concurrent calls is theoretically safe — use 40-50 to leave headroom.


Retry with exponential backoff

Rate limit errors still happen even with bounded concurrency — you share quota with other processes, quotas vary by time of day, and the API occasionally returns 529s. A retry decorator:

import asyncio
import random
import logging
from functools import wraps
from anthropic import RateLimitError, APIStatusError

logger = logging.getLogger(__name__)

def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except RateLimitError:
                    if attempt == max_retries:
                        raise
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                    logger.warning(f"Rate limit hit, retry {attempt + 1}/{max_retries} in {delay:.1f}s")
                    await asyncio.sleep(delay)
                except APIStatusError as e:
                    if e.status_code in (500, 502, 503, 529):
                        if attempt == max_retries:
                            raise
                        delay = base_delay * (2 ** attempt)
                        await asyncio.sleep(delay)
                    else:
                        raise  # 400s, auth errors — don't retry
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3, base_delay=2.0)
async def summarize_with_retry(text: str) -> str:
    async with semaphore:
        response = await client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            messages=[{"role": "user", "content": f"Summarize: {text}"}]
        )
        return response.content[0].text
Enter fullscreen mode Exit fullscreen mode

Key details:

  • Exponential backoff (base_delay * 2^attempt) with jitter (random.uniform(0, 1)) prevents thundering herd — all retrying tasks don't fire at the same moment
  • Only retry on rate limits (429) and transient server errors (500, 502, 503, 529)
  • Don't retry 400-range errors — bad requests won't succeed on retry and you'll waste quota

Error isolation in batch processing

asyncio.gather(*tasks) propagates the first exception. For batch processing where partial success is acceptable, use return_exceptions=True:

from dataclasses import dataclass
from typing import Any

@dataclass
class BatchResult:
    index: int
    success: bool
    value: str | None
    error: Exception | None

async def process_batch_isolated(docs: list[str]) -> list[BatchResult]:
    tasks = [summarize_with_retry(doc) for doc in docs]
    raw_results = await asyncio.gather(*tasks, return_exceptions=True)

    results = []
    for i, result in enumerate(raw_results):
        if isinstance(result, Exception):
            logger.error(f"Doc {i} failed: {type(result).__name__}: {result}")
            results.append(BatchResult(index=i, success=False, value=None, error=result))
        else:
            results.append(BatchResult(index=i, success=True, value=result, error=None))

    failed = sum(1 for r in results if not r.success)
    logger.info(f"Batch complete: {len(results) - failed}/{len(results)} succeeded")
    return results
Enter fullscreen mode Exit fullscreen mode

With return_exceptions=True, exceptions are returned as values in the results list rather than raised. You decide what to do with failures: log and continue, re-queue for retry, write to a dead-letter queue, or raise.


Progress tracking for long batches

For batches that take minutes, you want progress updates without blocking:

import asyncio
from tqdm.asyncio import tqdm

async def process_with_progress(docs: list[str]) -> list[BatchResult]:
    semaphore = asyncio.Semaphore(10)

    async def process_one(i: int, doc: str) -> BatchResult:
        async with semaphore:
            try:
                result = await summarize_with_retry(doc)
                return BatchResult(index=i, success=True, value=result, error=None)
            except Exception as e:
                return BatchResult(index=i, success=False, value=None, error=e)

    tasks = [process_one(i, doc) for i, doc in enumerate(docs)]

    results = []
    async for result in tqdm.as_completed(tasks, desc="Processing docs", total=len(docs)):
        results.append(await result)

    # Restore original order
    results.sort(key=lambda r: r.index)
    return results
Enter fullscreen mode Exit fullscreen mode

tqdm.asyncio.tqdm.as_completed wraps asyncio.as_completed with a progress bar. Results arrive in completion order, so sort by index at the end if you need original ordering.


Timeouts and cancellation

AI API calls can hang. The AsyncAnthropic client has a default timeout, but you might want stricter control:

async def summarize_with_timeout(text: str, timeout: float = 30.0) -> str:
    try:
        async with asyncio.timeout(timeout):
            async with semaphore:
                response = await client.messages.create(
                    model="claude-sonnet-4-6",
                    max_tokens=512,
                    messages=[{"role": "user", "content": f"Summarize: {text}"}]
                )
                return response.content[0].text
    except asyncio.TimeoutError:
        logger.warning(f"Timeout after {timeout}s for doc (len={len(text)})")
        raise
Enter fullscreen mode Exit fullscreen mode

asyncio.timeout (Python 3.11+) is cleaner than asyncio.wait_for — it raises TimeoutError and properly cancels the underlying task. Set timeout based on your max_tokens and expected model latency, not a round number.

For Python < 3.11, use asyncio.wait_for:

result = await asyncio.wait_for(
    client.messages.create(...),
    timeout=30.0
)
Enter fullscreen mode Exit fullscreen mode

Putting it together: a production batch processor

import asyncio
import logging
from dataclasses import dataclass
from typing import Callable, Awaitable
from anthropic import AsyncAnthropic, RateLimitError, APIStatusError

logger = logging.getLogger(__name__)

@dataclass
class TaskResult[T]:
    index: int
    success: bool
    value: T | None
    error: Exception | None

class BatchProcessor[T]:
    """Bounded-concurrency batch processor with retry and error isolation."""

    def __init__(
        self,
        concurrency: int = 10,
        max_retries: int = 3,
        timeout: float = 60.0,
    ):
        self.semaphore = asyncio.Semaphore(concurrency)
        self.max_retries = max_retries
        self.timeout = timeout

    async def _run_with_retry(
        self, fn: Callable[..., Awaitable[T]], *args, **kwargs
    ) -> T:
        for attempt in range(self.max_retries + 1):
            try:
                async with asyncio.timeout(self.timeout):
                    async with self.semaphore:
                        return await fn(*args, **kwargs)
            except RateLimitError:
                if attempt == self.max_retries:
                    raise
                delay = 2.0 * (2 ** attempt) + random.uniform(0, 1)
                await asyncio.sleep(delay)
            except APIStatusError as e:
                if e.status_code >= 500 and attempt < self.max_retries:
                    await asyncio.sleep(2.0 * (2 ** attempt))
                else:
                    raise

    async def process(
        self,
        fn: Callable[..., Awaitable[T]],
        items: list,
    ) -> list[TaskResult[T]]:
        async def run_one(i: int, item) -> TaskResult[T]:
            try:
                value = await self._run_with_retry(fn, item)
                return TaskResult(index=i, success=True, value=value, error=None)
            except Exception as e:
                logger.error(f"Item {i} failed permanently: {e}")
                return TaskResult(index=i, success=False, value=None, error=e)

        tasks = [run_one(i, item) for i, item in enumerate(items)]
        results = await asyncio.gather(*tasks, return_exceptions=False)
        results.sort(key=lambda r: r.index)

        success_count = sum(1 for r in results if r.success)
        logger.info(f"Batch: {success_count}/{len(results)} succeeded")
        return results


# Usage
async def main():
    import random
    client = AsyncAnthropic()
    processor = BatchProcessor(concurrency=10, max_retries=3)

    async def summarize(doc: str) -> str:
        response = await client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            messages=[{"role": "user", "content": f"Summarize: {doc}"}]
        )
        return response.content[0].text

    docs = ["doc text here"] * 100
    results = await processor.process(summarize, docs)

    successes = [r.value for r in results if r.success]
    failures = [r for r in results if not r.success]
    print(f"Processed {len(successes)} docs, {len(failures)} failed")
Enter fullscreen mode Exit fullscreen mode

When to use threads instead of async

Async works when the bottleneck is I/O wait — network calls, file reads, database queries. It doesn't help when the bottleneck is CPU.

If you're doing heavy post-processing on AI outputs (parsing, classification, regex over large texts), use asyncio.to_thread to run that work in a thread pool without blocking the event loop:

import asyncio
import re

def extract_entities(text: str) -> list[str]:
    # CPU-bound text processing — runs in thread pool
    pattern = re.compile(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b")
    return pattern.findall(text)

async def analyze_with_extraction(doc: str) -> tuple[str, list[str]]:
    # AI call — async I/O
    summary = await summarize_with_retry(doc)

    # CPU work — offload to thread pool
    entities = await asyncio.to_thread(extract_entities, summary)

    return summary, entities
Enter fullscreen mode Exit fullscreen mode

asyncio.to_thread runs the sync function in the default ThreadPoolExecutor and awaits the result. The event loop remains unblocked. For truly heavy CPU work (model inference, large numpy operations), use ProcessPoolExecutor instead.


A checklist for async AI applications

  • asyncio.Semaphore on every batch — never unbounded gather against external APIs
  • Exponential backoff with jitter on rate limit and 5xx errors
  • return_exceptions=True on gather for batch processing — let failures be values, not raised exceptions
  • Timeout on every API call — never trust external latency to be bounded
  • asyncio.to_thread for CPU-bound post-processing — keep the event loop clear

The async patterns in the AI Dev Toolkit include prompt templates for generating asyncio batch processors, retry decorators, and progress-tracked pipelines from function signatures — so you're not writing boilerplate from scratch each time you build a new AI workflow.


Further reading

Top comments (0)