Whether you're building a trading platform, a data pipeline, or any API consumer, proper rate limiting is essential. Here are the patterns that work.
Token Bucket Algorithm
The most common server-side pattern:
import time
import threading
class TokenBucket:
def __init__(self, rate, capacity):
self.rate = rate # tokens per second
self.capacity = capacity # max tokens
self.tokens = capacity
self.last_refill = time.time()
self.lock = threading.Lock()
def consume(self, tokens=1):
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
# 10 requests per second, burst of 20
limiter = TokenBucket(rate=10, capacity=20)
Client-Side: Adaptive Rate Limiting
When consuming external APIs, adapt to their limits:
class AdaptiveRateLimiter:
def __init__(self, initial_rate=1.0):
self.rate = initial_rate
self.min_delay = 0.1
self.max_delay = 60
self.last_request = 0
self.consecutive_429s = 0
def wait(self):
delay = max(self.min_delay, 1.0 / self.rate)
elapsed = time.time() - self.last_request
if elapsed < delay:
time.sleep(delay - elapsed)
self.last_request = time.time()
def on_success(self):
self.consecutive_429s = 0
# Gradually increase rate
self.rate = min(self.rate * 1.1, 100)
def on_rate_limit(self, retry_after=None):
self.consecutive_429s += 1
if retry_after:
time.sleep(float(retry_after))
else:
# Exponential backoff
backoff = min(self.max_delay, 2 ** self.consecutive_429s)
time.sleep(backoff)
# Reduce rate
self.rate = max(0.1, self.rate * 0.5)
Sliding Window Counter
Better for distributed systems:
import math
class SlidingWindowCounter:
def __init__(self, limit, window_seconds):
self.limit = limit
self.window = window_seconds
self.current_count = 0
self.previous_count = 0
self.current_start = time.time()
def allow_request(self):
now = time.time()
elapsed = now - self.current_start
if elapsed >= self.window:
self.previous_count = self.current_count
self.current_count = 0
self.current_start = now
elapsed = 0
# Weighted count
weight = 1 - (elapsed / self.window)
estimated = self.previous_count * weight + self.current_count
if estimated < self.limit:
self.current_count += 1
return True
return False
Retry with Backoff Decorator
import functools
import random
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except RateLimitError as e:
if attempt == max_retries:
raise
delay = min(max_delay, base_delay * (2 ** attempt))
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
raise Exception("Max retries exceeded")
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def fetch_data(url):
response = requests.get(url)
if response.status_code == 429:
raise RateLimitError(response.headers.get('Retry-After'))
return response.json()
Queue-Based Rate Limiting
For high-throughput applications:
import asyncio
from collections import deque
class RequestQueue:
def __init__(self, rate_limit, period=1.0):
self.rate_limit = rate_limit
self.period = period
self.timestamps = deque()
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
# Remove old timestamps
while self.timestamps and self.timestamps[0] < now - self.period:
self.timestamps.popleft()
if len(self.timestamps) >= self.rate_limit:
wait_time = self.timestamps[0] + self.period - now
await asyncio.sleep(wait_time)
self.timestamps.append(time.time())
Monitoring Your Rate Usage
class RateMonitor:
def __init__(self):
self.requests = deque()
self.errors = deque()
def record(self, status_code):
now = time.time()
self.requests.append(now)
if status_code == 429:
self.errors.append(now)
# Clean old entries
cutoff = now - 3600
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
while self.errors and self.errors[0] < cutoff:
self.errors.popleft()
def stats(self):
return {
'requests_per_hour': len(self.requests),
'rate_limits_per_hour': len(self.errors),
'error_rate': len(self.errors) / max(1, len(self.requests))
}
These patterns apply everywhere β from trading APIs to social media integrations. I've used many of them when building data aggregation tools for propfirmkey.com, where reliable API communication is critical.
What rate limiting strategy do you use in your projects?
Top comments (0)