DEV Community

Propfirmkey
Propfirmkey

Posted on

API Rate Limiting: Patterns Every Developer Should Know

Whether you're building a trading platform, a data pipeline, or any API consumer, proper rate limiting is essential. Here are the patterns that work.

Token Bucket Algorithm

The most common server-side pattern:

import time
import threading

class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate          # tokens per second
        self.capacity = capacity  # max tokens
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def consume(self, tokens=1):
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

# 10 requests per second, burst of 20
limiter = TokenBucket(rate=10, capacity=20)
Enter fullscreen mode Exit fullscreen mode

Client-Side: Adaptive Rate Limiting

When consuming external APIs, adapt to their limits:

class AdaptiveRateLimiter:
    def __init__(self, initial_rate=1.0):
        self.rate = initial_rate
        self.min_delay = 0.1
        self.max_delay = 60
        self.last_request = 0
        self.consecutive_429s = 0

    def wait(self):
        delay = max(self.min_delay, 1.0 / self.rate)
        elapsed = time.time() - self.last_request
        if elapsed < delay:
            time.sleep(delay - elapsed)
        self.last_request = time.time()

    def on_success(self):
        self.consecutive_429s = 0
        # Gradually increase rate
        self.rate = min(self.rate * 1.1, 100)

    def on_rate_limit(self, retry_after=None):
        self.consecutive_429s += 1
        if retry_after:
            time.sleep(float(retry_after))
        else:
            # Exponential backoff
            backoff = min(self.max_delay, 2 ** self.consecutive_429s)
            time.sleep(backoff)
        # Reduce rate
        self.rate = max(0.1, self.rate * 0.5)
Enter fullscreen mode Exit fullscreen mode

Sliding Window Counter

Better for distributed systems:

import math

class SlidingWindowCounter:
    def __init__(self, limit, window_seconds):
        self.limit = limit
        self.window = window_seconds
        self.current_count = 0
        self.previous_count = 0
        self.current_start = time.time()

    def allow_request(self):
        now = time.time()
        elapsed = now - self.current_start

        if elapsed >= self.window:
            self.previous_count = self.current_count
            self.current_count = 0
            self.current_start = now
            elapsed = 0

        # Weighted count
        weight = 1 - (elapsed / self.window)
        estimated = self.previous_count * weight + self.current_count

        if estimated < self.limit:
            self.current_count += 1
            return True
        return False
Enter fullscreen mode Exit fullscreen mode

Retry with Backoff Decorator

import functools
import random

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except RateLimitError as e:
                    if attempt == max_retries:
                        raise
                    delay = min(max_delay, base_delay * (2 ** attempt))
                    jitter = random.uniform(0, delay * 0.1)
                    time.sleep(delay + jitter)
            raise Exception("Max retries exceeded")
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def fetch_data(url):
    response = requests.get(url)
    if response.status_code == 429:
        raise RateLimitError(response.headers.get('Retry-After'))
    return response.json()
Enter fullscreen mode Exit fullscreen mode

Queue-Based Rate Limiting

For high-throughput applications:

import asyncio
from collections import deque

class RequestQueue:
    def __init__(self, rate_limit, period=1.0):
        self.rate_limit = rate_limit
        self.period = period
        self.timestamps = deque()
        self.lock = asyncio.Lock()

    async def acquire(self):
        async with self.lock:
            now = time.time()
            # Remove old timestamps
            while self.timestamps and self.timestamps[0] < now - self.period:
                self.timestamps.popleft()

            if len(self.timestamps) >= self.rate_limit:
                wait_time = self.timestamps[0] + self.period - now
                await asyncio.sleep(wait_time)

            self.timestamps.append(time.time())
Enter fullscreen mode Exit fullscreen mode

Monitoring Your Rate Usage

class RateMonitor:
    def __init__(self):
        self.requests = deque()
        self.errors = deque()

    def record(self, status_code):
        now = time.time()
        self.requests.append(now)
        if status_code == 429:
            self.errors.append(now)

        # Clean old entries
        cutoff = now - 3600
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
        while self.errors and self.errors[0] < cutoff:
            self.errors.popleft()

    def stats(self):
        return {
            'requests_per_hour': len(self.requests),
            'rate_limits_per_hour': len(self.errors),
            'error_rate': len(self.errors) / max(1, len(self.requests))
        }
Enter fullscreen mode Exit fullscreen mode

These patterns apply everywhere β€” from trading APIs to social media integrations. I've used many of them when building data aggregation tools for propfirmkey.com, where reliable API communication is critical.


What rate limiting strategy do you use in your projects?

Top comments (0)