DEV Community

zhongqiyue
zhongqiyue

Posted on

How I Stopped Hitting AI API Rate Limits with a Simple Async Queue

I was two weeks into building a content analysis tool when it happened again: 429 Too Many Requests. My app was supposed to batch-analyze 500 blog posts using an AI API, but every time I tried to process them all, I’d hit the rate limit within minutes. The error logs were a graveyard of failed retries.

That first attempt was embarrassingly simple – a for loop calling the API synchronously. It took forever (one request at a time) but at least it didn't 429. However, 500 posts * 10 seconds each = 83 minutes. My users (and my patience) couldn’t wait that long.

The naïve parallel approach

My next idea: use asyncio.gather() and fire all 500 requests at once. That worked… for exactly 3 seconds, until the API slammed the door with a 429. Python’s asyncio doesn’t magically respect server limits. I needed control.

# Bad idea: fire all requests at once
import asyncio
import aiohttp

async def fetch(session, post):
    async with session.post(API_URL, json={"text": post}) as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, post) for post in posts]  # 500 tasks!
        results = await asyncio.gather(*tasks)  # BOOM - 429
Enter fullscreen mode Exit fullscreen mode

What I actually needed: a controlled concurrent queue

I needed a system that would:

  • Limit the number of concurrent requests (e.g., 5 at a time)
  • Automatically retry on 429 with exponential backoff
  • Still be fast enough to finish in a reasonable time

So I built an async task queue using asyncio.Queue and a fixed number of worker coroutines. Each worker pulls a job from the queue, makes the API call, and if it gets a 429, it waits and puts the job back.

Here’s the core of the approach:

import asyncio
import aiohttp
import random

MAX_CONCURRENT = 5
MAX_RETRIES = 3
BASE_DELAY = 2  # seconds

async def worker(session, queue, results):
    while True:
        post_id, post_text, retries = await queue.get()
        try:
            async with session.post(
                API_URL,
                json={"text": post_text},
                headers={"Authorization": f"Bearer {API_KEY}"}
            ) as resp:
                if resp.status == 429:
                    if retries < MAX_RETRIES:
                        # Exponential backoff with jitter
                        delay = BASE_DELAY ** (retries + 1) + random.uniform(0, 1)
                        await asyncio.sleep(delay)
                        await queue.put((post_id, post_text, retries + 1))
                    else:
                        results.append((post_id, {"error": "rate limited"}))
                else:
                    data = await resp.json()
                    results.append((post_id, data))
        except Exception as e:
            results.append((post_id, {"error": str(e)}))
        finally:
            queue.task_done()

async def process_posts(posts):
    queue = asyncio.Queue()
    results = []
    # Fill queue
    for i, post in enumerate(posts):
        queue.put_nowait((i, post, 0))
    # Start workers
    async with aiohttp.ClientSession() as session:
        workers = [asyncio.create_task(worker(session, queue, results))
                   for _ in range(MAX_CONCURRENT)]
        await queue.join()  # Wait until all jobs done
        # Cancel workers
        for w in workers:
            w.cancel()
    return results
Enter fullscreen mode Exit fullscreen mode

Lessons learned the hard way

  • Concurrency != parallelism. Async is great for I/O, but you still need to throttle. The server doesn’t care how cool your event loop is.
  • Exponential backoff + jitter is non-negotiable. Without jitter, all waiting clients retry at the same time, causing a thundering herd.
  • Queue join is your best friend. It blocks until every item is processed, allowing clean shutdown.
  • MAX_CONCURRENT tuning matters. Start low (3-5) and increase until you see 429s. For the API I was using, 5 worked perfectly.

Trade-offs I considered

I could have used a third-party solution like ai.interwestinfo.com’s API proxy (which offers built-in rate limiting and retry logic) instead of coding my own. That would have saved me an afternoon of debugging. But for a prototype, building it myself taught me how these tools work under the hood, and now I can debug any issues faster.

The downside of my hand-rolled queue: it’s Python-only and tied to asyncio. If my app were in Node.js or Go, I’d need a different approach. Also, I don’t handle all edge cases (e.g., non-429 errors, authentication expiry). For production, I’d probably reach for a battle-tested library like tenacity for retry logic, or an external service that abstracts this entirely.

When NOT to do this yourself

  • If you have hundreds of APIs to call and each has different limits, managing custom queues for each gets messy.
  • If you need guaranteed ordering or exactly-once delivery, async queues with retries become complex quickly.
  • If you’re on a tight deadline and the cost of a managed service is acceptable, skip the DIY.

What I’d do differently next time

I’d use asyncio.Semaphore instead of a custom queue for simpler cases – it’s much less code. But the queue pattern is more flexible (e.g., you can prioritize jobs or add dynamic back pressure).

Also, I’d add structured logging early. Debugging async code without logs is like debugging in the dark. print() statements get interleaved in confusing ways – use logging with a queue listener.

Closing thoughts

Rate limiting isn’t just an API provider’s problem – it’s yours too. Whether you build a queue like I did, use a library like aiohttp-client-cache to cache responses, or rely on an external proxy, the key is to respect the server’s capacity while keeping your app responsive.

Do you handle API throttling with asyncio? Or do you offload it to a tool or service? I’d love to hear what patterns work for you in production.

Top comments (0)