AI agents fail silently when they hit rate limits. One moment your agent is calling GPT-4 at 100 RPM; the next, it's getting 429s and retrying infinitely. Here's how to track, predict, and prevent rate limit failures before they kill your production workflow.
Why Rate Limits Fail Silently
Most rate limit handling looks like this:
# Common but broken pattern
try:
response = openai.chat.completions.create(...)
except openai.RateLimitError:
time.sleep(60) # Guess and hope
retry()
The problem: you don't know how close you are to the limit until you're already over it. And sleeping for 60 seconds might not be enough — or it might be 10x too long.
Real-Time Rate Limit Tracking
import requests
class AIRateLimitTracker:
def __init__(self, api_key: str):
self.base = "https://api.lazy-mac.com/ai-rate-limit-tracker"
self.headers = {"Authorization": f"Bearer {api_key}"}
def check_headroom(self, provider: str, model: str) -> dict:
"""Check how much capacity remains before hitting limits."""
resp = requests.get(f"{self.base}/headroom",
params={"provider": provider, "model": model},
headers=self.headers
)
return resp.json()
def log_request(self, provider: str, model: str, tokens: int):
"""Track a completed request for pattern analysis."""
requests.post(f"{self.base}/log",
json={"provider": provider, "model": model, "tokens": tokens},
headers=self.headers
)
def predict_safe_rps(self, provider: str, model: str) -> float:
"""Get predicted safe requests-per-second for the next minute."""
resp = requests.get(f"{self.base}/predict",
params={"provider": provider, "model": model, "window": "60s"},
headers=self.headers
)
return resp.json()["safe_rps"]
tracker = AIRateLimitTracker("your-key")
# Before any batch job
headroom = tracker.check_headroom("openai", "gpt-4")
print(f"RPM remaining: {headroom['rpm_remaining']}/{headroom['rpm_limit']}")
print(f"TPM remaining: {headroom['tpm_remaining']}/{headroom['tpm_limit']}")
Predictive Throttling
Instead of reactive retry, use predictive throttling:
import asyncio
async def safe_batch_process(prompts: list, tracker: AIRateLimitTracker):
for prompt in prompts:
# Predict how long to wait before next request
safe_rps = tracker.predict_safe_rps("openai", "gpt-4-turbo")
wait_ms = (1.0 / safe_rps) * 1000
if wait_ms > 0:
await asyncio.sleep(wait_ms / 1000)
# Make the request
response = openai.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}]
)
# Log for future predictions
tracker.log_request("openai", "gpt-4-turbo",
response.usage.total_tokens)
Multi-Provider Failover
When one provider is near its limit, automatically route to another:
def get_best_provider(tracker, models=["openai/gpt-4", "anthropic/claude-3", "google/gemini-pro"]):
headrooms = []
for model_str in models:
provider, model = model_str.split("/")
h = tracker.check_headroom(provider, model)
headrooms.append((h["headroom_pct"], provider, model))
# Use provider with most headroom
_, best_provider, best_model = max(headrooms)
return best_provider, best_model
Top comments (0)