DEV Community

2x lazymac
2x lazymac

Posted on

AI Agent Rate Limits: Track, Predict, and Never Hit the Wall Again

AI agents fail silently when they hit rate limits. One moment your agent is calling GPT-4 at 100 RPM; the next, it's getting 429s and retrying infinitely. Here's how to track, predict, and prevent rate limit failures before they kill your production workflow.

Why Rate Limits Fail Silently

Most rate limit handling looks like this:

# Common but broken pattern
try:
    response = openai.chat.completions.create(...)
except openai.RateLimitError:
    time.sleep(60)  # Guess and hope
    retry()
Enter fullscreen mode Exit fullscreen mode

The problem: you don't know how close you are to the limit until you're already over it. And sleeping for 60 seconds might not be enough — or it might be 10x too long.

Real-Time Rate Limit Tracking

import requests

class AIRateLimitTracker:
    def __init__(self, api_key: str):
        self.base = "https://api.lazy-mac.com/ai-rate-limit-tracker"
        self.headers = {"Authorization": f"Bearer {api_key}"}

    def check_headroom(self, provider: str, model: str) -> dict:
        """Check how much capacity remains before hitting limits."""
        resp = requests.get(f"{self.base}/headroom",
            params={"provider": provider, "model": model},
            headers=self.headers
        )
        return resp.json()

    def log_request(self, provider: str, model: str, tokens: int):
        """Track a completed request for pattern analysis."""
        requests.post(f"{self.base}/log",
            json={"provider": provider, "model": model, "tokens": tokens},
            headers=self.headers
        )

    def predict_safe_rps(self, provider: str, model: str) -> float:
        """Get predicted safe requests-per-second for the next minute."""
        resp = requests.get(f"{self.base}/predict",
            params={"provider": provider, "model": model, "window": "60s"},
            headers=self.headers
        )
        return resp.json()["safe_rps"]

tracker = AIRateLimitTracker("your-key")

# Before any batch job
headroom = tracker.check_headroom("openai", "gpt-4")
print(f"RPM remaining: {headroom['rpm_remaining']}/{headroom['rpm_limit']}")
print(f"TPM remaining: {headroom['tpm_remaining']}/{headroom['tpm_limit']}")
Enter fullscreen mode Exit fullscreen mode

Predictive Throttling

Instead of reactive retry, use predictive throttling:

import asyncio

async def safe_batch_process(prompts: list, tracker: AIRateLimitTracker):
    for prompt in prompts:
        # Predict how long to wait before next request
        safe_rps = tracker.predict_safe_rps("openai", "gpt-4-turbo")
        wait_ms = (1.0 / safe_rps) * 1000

        if wait_ms > 0:
            await asyncio.sleep(wait_ms / 1000)

        # Make the request
        response = openai.chat.completions.create(
            model="gpt-4-turbo",
            messages=[{"role": "user", "content": prompt}]
        )

        # Log for future predictions
        tracker.log_request("openai", "gpt-4-turbo",
                           response.usage.total_tokens)
Enter fullscreen mode Exit fullscreen mode

Multi-Provider Failover

When one provider is near its limit, automatically route to another:

def get_best_provider(tracker, models=["openai/gpt-4", "anthropic/claude-3", "google/gemini-pro"]):
    headrooms = []
    for model_str in models:
        provider, model = model_str.split("/")
        h = tracker.check_headroom(provider, model)
        headrooms.append((h["headroom_pct"], provider, model))

    # Use provider with most headroom
    _, best_provider, best_model = max(headrooms)
    return best_provider, best_model
Enter fullscreen mode Exit fullscreen mode

AI Rate Limit Tracker API | Documentation

Top comments (0)