DEV Community

Anna lilith
Anna lilith

Posted on

Python API Design Patterns That Scale to 10 Million Users

Python API Design Patterns That Scale to 10 Million Users

Building an API that handles 100 requests is easy. Building one that handles 10 million requires proper design patterns. Here are the patterns used by production systems at scale.

1. Rate Limiting with Token Buckets

Don't let one bad actor take down your API:

import time
from collections import defaultdict

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()

    def consume(self, tokens: int = 1) -> bool:
        self._refill()
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now
Enter fullscreen mode Exit fullscreen mode

Usage: Allow 100 requests/minute per API key, refill 1.67 tokens/second.

2. Response Caching with TTL

Cache expensive queries but invalidate when data changes:

import time
from functools import wraps

_cache = {}

def cached(ttl_seconds: int = 300):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = f"{func.__name__}:{hash((args, tuple(kwargs.items())))}"
            now = time.time()

            if key in _cache:
                result, cached_at = _cache[key]
                if now - cached_at < ttl_seconds:
                    return result

            result = func(*args, **kwargs)
            _cache[key] = (result, now)
            return result
        return wrapper
    return decorator

@cached(ttl_seconds=60)
def get_product(product_id: int) -> dict:
    # Expensive database query
    return db.query(Product).get(product_id)
Enter fullscreen mode Exit fullscreen mode

3. Cursor-Based Pagination

Don't use offset pagination at scale — it's O(n) for large datasets:

from typing import List, Optional
import base64
import json

def encode_cursor(product_id: int) -> str:
    return base64.b64encode(json.dumps({"id": product_id}).encode()).decode()

def decode_cursor(cursor: str) -> dict:
    return json.loads(base64.b64decode(cursor.encode()).decode())

async def list_products(
    cursor: Optional[str] = None,
    limit: int = 20
) -> dict:
    query = select(Product).order_by(Product.id)

    if cursor:
        decoded = decode_cursor(cursor)
        query = query.where(Product.id > decoded["id"])

    products = (await db.execute(query.limit(limit + 1))).scalars().all()

    has_next = len(products) > limit
    items = products[:limit]

    return {
        "items": items,
        "next_cursor": encode_cursor(items[-1].id) if has_next and items else None,
        "has_next": has_next,
    }
Enter fullscreen mode Exit fullscreen mode

4. Circuit Breaker Pattern

Stop hammering a failing service:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, skip calls
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = 0

    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            raise
Enter fullscreen mode Exit fullscreen mode

5. Request ID Tracking

Trace requests across microservices:

import uuid
from contextvars import ContextVar

request_id_var: ContextVar[str] = ContextVar('request_id', default='')

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    req_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
    request_id_var.set(req_id)
    response = await call_next(request)
    response.headers["X-Request-ID"] = req_id
    return response

def get_request_id() -> str:
    return request_id_var.get()
Enter fullscreen mode Exit fullscreen mode

6. Batch Endpoint Pattern

Reduce network overhead for bulk operations:

from pydantic import BaseModel
from typing import List

class BatchRequest(BaseModel):
    operations: List[dict]  # [{"method": "GET", "path": "/users/1"}, ...]

class BatchResponse(BaseModel):
    results: List[dict]  # [{"status": 200, "body": {...}}, ...]

@app.post("/api/batch")
async def batch_endpoint(batch: BatchRequest):
    results = []
    for op in batch.operations:
        # Process each operation
        result = await process_operation(op)
        results.append(result)
    return BatchResponse(results=results)
Enter fullscreen mode Exit fullscreen mode

7. Idempotency Keys

Prevent duplicate processing of the same request:

import hashlib
from redis import Redis

redis = Redis()

async def create_payment(request: Request, idempotency_key: str):
    # Check if we already processed this request
    cached = redis.get(f"idempotency:{idempotency_key}")
    if cached:
        return json.loads(cached)

    # Process the payment
    result = await process_payment(request)

    # Cache the result for 24 hours
    redis.setex(
        f"idempotency:{idempotency_key}",
        86400,
        json.dumps(result)
    )
    return result
Enter fullscreen mode Exit fullscreen mode

8. Structured Error Responses

Consistent error format across all endpoints:

from fastapi.responses import JSONResponse

class APIError(Exception):
    def __init__(self, code: str, message: str, status: int = 400, details: dict = None):
        self.code = code
        self.message = message
        self.status = status
        self.details = details or {}

@app.exception_handler(APIError)
async def api_error_handler(request, exc):
    return JSONResponse(
        status_code=exc.status,
        content={
            "error": {
                "code": exc.code,
                "message": exc.message,
                "details": exc.details,
                "request_id": get_request_id(),
            }
        }
    )
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Rate limit everything — protect your API from abuse
  2. Cache aggressively — but invalidate correctly
  3. Use cursor pagination — offset doesn't scale
  4. Implement circuit breakers — prevent cascade failures
  5. Track requests — you can't fix what you can't trace
  6. Make it idempotent — networks are unreliable
  7. Standardize errors — consistent UX for API consumers

These patterns are the difference between an API that works in a demo and one that works in production.

Top comments (0)