DEV Community

zhongqiyue
zhongqiyue

Posted on

Taming AI API Rate Limits with Asyncio Queues

Last month, I spent three days banging my head against the wall because of API rate limits.

I was building a small web app that needed to process hundreds of text inputs through an AI API — think sentiment analysis, summarization, the kind of thing that makes a prototype look magical. The problem? Every time I scaled up the test data, my app would start throwing 429 errors like confetti at a parade.

Sound familiar?

I'm going to walk you through exactly what I tried, what failed, and the approach that finally worked. No fluff, no "just use this one weird trick" — just honest code and trade-offs.


The Real Problem

I had a simple pipeline:

  1. Read a list of texts from a CSV
  2. For each text, call an AI API endpoint
  3. Store the result in a database

The API was fast — ~500ms per request. But it had a rate limit: 10 requests per second. I naively assumed I could just fire off requests in parallel with asyncio.gather() and stay under the limit. Spoiler: I couldn't.

When I ran the script, I got a burst of 429s after the first 10 requests. My retry logic was primitive — just time.sleep(1) on failure — which made the whole thing crawl. Not to mention the backoff was jittery and unpredictable.


What I Tried (and Why It Failed)

Attempt 1: Fixed Delay Between Requests

# Bad idea: blocking sleep in an async loop
async def process(texts):
    for text in texts:
        await call_api(text)
        await asyncio.sleep(0.1)  # hope 100ms is enough?
Enter fullscreen mode Exit fullscreen mode

This works until the API latency spikes or you accidentally run two instances. Plus, it's wasteful — you're idle even when the API could handle more traffic.

Attempt 2: asyncio.Semaphore

# Better, but still fragile
sem = asyncio.Semaphore(5)

async def safe_call(text):
    async with sem:
        return await call_api(text)
Enter fullscreen mode Exit fullscreen mode

This limits concurrency but doesn't respect a per-second rate limit. If each request takes 200ms, you could easily burst 25 requests in a second with 5 concurrent workers. Still hitting rate limits.

Attempt 3: Simple Exponential Backoff with Retries

async def call_with_retry(text, max_retries=5):
    for attempt in range(max_retries):
        try:
            return await call_api(text)
        except HTTPError as e:
            if e.status == 429:
                wait = 2 ** attempt
                await asyncio.sleep(wait)
            else:
                raise
    raise Exception("Max retries exceeded")
Enter fullscreen mode Exit fullscreen mode

This works for occasional 429s, but when you're hitting the limit on every request, you spend most of your time sleeping. Not good.


What Finally Worked: Async Queue with Token Bucket

I needed two things:

  1. A rate limiter that enforces a maximum number of requests per second.
  2. A retry queue that re-queues failed requests with exponential backoff, without blocking the main pipeline.

The solution: a custom RateLimiter using a token bucket algorithm, combined with asyncio.Queue for retries.

Here's the core code (simplified for clarity):

import asyncio
import time
from collections import deque


class TokenBucket:
    """Simple token bucket rate limiter"""
    def __init__(self, rate_per_sec: float):
        self.rate = rate_per_sec
        self.tokens = 0.0
        self.last_refill = time.monotonic()
        self.lock = asyncio.Lock()

    async def acquire(self):
        async with self.lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.rate, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens >= 1.0:
                self.tokens -= 1.0
                return True
            else:
                # Need to wait until next token
                wait_time = (1.0 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                # After sleep, we can take the token
                self.tokens = 0.0
                self.last_refill = time.monotonic()
                return True


class AIPipeline:
    def __init__(self, api_endpoint: str, rate_limit: float = 10.0):
        self.bucket = TokenBucket(rate_limit)
        self.retry_queue = asyncio.Queue()
        self.api_endpoint = api_endpoint

    async def worker(self, text: str):
        """Process a single text with rate limiting and retry"""
        await self.bucket.acquire()
        try:
            # Your actual API call here
            result = await call_api(self.api_endpoint, text)
            return result
        except HTTPError as e:
            if e.status == 429:
                # Re-queue with exponential backoff delay
                delay = 2 ** (self.retry_queue.qsize() % 5)
                await self.retry_queue.put((text, delay))
                return None
            else:
                raise

    async def retry_worker(self):
        """Process retries from the queue"""
        while True:
            text, delay = await self.retry_queue.get()
            await asyncio.sleep(delay)
            result = await self.worker(text)
            if result is not None:
                # Store result somewhere
                pass
            self.retry_queue.task_done()

    async def run(self, texts: list[str]):
        # Start one retry worker
        asyncio.create_task(self.retry_worker())
        # Process all texts with a semaphore to avoid too many concurrent tasks
        sem = asyncio.Semaphore(20)
        async def bound_worker(t):
            async with sem:
                return await self.worker(t)
        tasks = [bound_worker(t) for t in texts]
        results = await asyncio.gather(*tasks)
        return results
Enter fullscreen mode Exit fullscreen mode

Key ideas:

  • Token bucket ensures we never exceed the rate limit, even with many concurrent workers. The acquire() method blocks until a token is available.
  • Retry queue separates retry logic from the main flow. Failed requests are put back into a queue with a computed delay (exponential backoff).
  • Semaphore caps concurrency so we don't spawn thousands of tasks.

Lessons Learned and Trade-offs

What I loved:

  • The token bucket is simple and predictable. No more mysterious 429s.
  • The retry queue doesn't block the main flow — it runs as a background task.
  • The code is easy to test: you can mock the token bucket and verify retry behavior.

What I'd change next time:

  • The token bucket implementation uses a lock — for very high concurrency, consider a lock-free approach with asyncio.Event.
  • The retry worker runs forever. In practice, you'd want to add a maximum retry count and a dead letter queue.
  • No jitter in the exponential backoff — adding random jitter prevents the "thundering herd" problem when multiple requests fail at the same time.

When NOT to use this approach:

  • If you only make occasional API calls, the complexity isn't worth it. A simple time.sleep after a 429 is fine.
  • If the API provides a Retry-After header, use that instead of calculating your own delay.
  • If you're using a high-level client library (like openai), it already handles retries — but you still need rate limiting for concurrent usage.

The Takeaway

Rate limiting is a fact of life when working with third-party APIs, especially AI services that charge per token. Instead of fighting the limit, embrace it with a proper queue and backoff strategy. The code I shared isn't production-ready (no logging, no error handling beyond 429s), but it's a solid starting point.

What's your favorite strategy for handling API rate limits? Have you used token buckets or leaky buckets in your projects? I'd love to hear what's worked for you.

Top comments (0)