Python API Design Patterns That Scale to 10 Million Users
Building an API that handles 100 requests is easy. Building one that handles 10 million requires proper design patterns. Here are the patterns used by production systems at scale.
1. Rate Limiting with Token Buckets
Don't let one bad actor take down your API:
import time
from collections import defaultdict
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
def consume(self, tokens: int = 1) -> bool:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
Usage: Allow 100 requests/minute per API key, refill 1.67 tokens/second.
2. Response Caching with TTL
Cache expensive queries but invalidate when data changes:
import time
from functools import wraps
_cache = {}
def cached(ttl_seconds: int = 300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = f"{func.__name__}:{hash((args, tuple(kwargs.items())))}"
now = time.time()
if key in _cache:
result, cached_at = _cache[key]
if now - cached_at < ttl_seconds:
return result
result = func(*args, **kwargs)
_cache[key] = (result, now)
return result
return wrapper
return decorator
@cached(ttl_seconds=60)
def get_product(product_id: int) -> dict:
# Expensive database query
return db.query(Product).get(product_id)
3. Cursor-Based Pagination
Don't use offset pagination at scale — it's O(n) for large datasets:
from typing import List, Optional
import base64
import json
def encode_cursor(product_id: int) -> str:
return base64.b64encode(json.dumps({"id": product_id}).encode()).decode()
def decode_cursor(cursor: str) -> dict:
return json.loads(base64.b64decode(cursor.encode()).decode())
async def list_products(
cursor: Optional[str] = None,
limit: int = 20
) -> dict:
query = select(Product).order_by(Product.id)
if cursor:
decoded = decode_cursor(cursor)
query = query.where(Product.id > decoded["id"])
products = (await db.execute(query.limit(limit + 1))).scalars().all()
has_next = len(products) > limit
items = products[:limit]
return {
"items": items,
"next_cursor": encode_cursor(items[-1].id) if has_next and items else None,
"has_next": has_next,
}
4. Circuit Breaker Pattern
Stop hammering a failing service:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, skip calls
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = 0
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
5. Request ID Tracking
Trace requests across microservices:
import uuid
from contextvars import ContextVar
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
@app.middleware("http")
async def add_request_id(request: Request, call_next):
req_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
request_id_var.set(req_id)
response = await call_next(request)
response.headers["X-Request-ID"] = req_id
return response
def get_request_id() -> str:
return request_id_var.get()
6. Batch Endpoint Pattern
Reduce network overhead for bulk operations:
from pydantic import BaseModel
from typing import List
class BatchRequest(BaseModel):
operations: List[dict] # [{"method": "GET", "path": "/users/1"}, ...]
class BatchResponse(BaseModel):
results: List[dict] # [{"status": 200, "body": {...}}, ...]
@app.post("/api/batch")
async def batch_endpoint(batch: BatchRequest):
results = []
for op in batch.operations:
# Process each operation
result = await process_operation(op)
results.append(result)
return BatchResponse(results=results)
7. Idempotency Keys
Prevent duplicate processing of the same request:
import hashlib
from redis import Redis
redis = Redis()
async def create_payment(request: Request, idempotency_key: str):
# Check if we already processed this request
cached = redis.get(f"idempotency:{idempotency_key}")
if cached:
return json.loads(cached)
# Process the payment
result = await process_payment(request)
# Cache the result for 24 hours
redis.setex(
f"idempotency:{idempotency_key}",
86400,
json.dumps(result)
)
return result
8. Structured Error Responses
Consistent error format across all endpoints:
from fastapi.responses import JSONResponse
class APIError(Exception):
def __init__(self, code: str, message: str, status: int = 400, details: dict = None):
self.code = code
self.message = message
self.status = status
self.details = details or {}
@app.exception_handler(APIError)
async def api_error_handler(request, exc):
return JSONResponse(
status_code=exc.status,
content={
"error": {
"code": exc.code,
"message": exc.message,
"details": exc.details,
"request_id": get_request_id(),
}
}
)
Key Takeaways
- Rate limit everything — protect your API from abuse
- Cache aggressively — but invalidate correctly
- Use cursor pagination — offset doesn't scale
- Implement circuit breakers — prevent cascade failures
- Track requests — you can't fix what you can't trace
- Make it idempotent — networks are unreliable
- Standardize errors — consistent UX for API consumers
These patterns are the difference between an API that works in a demo and one that works in production.
Top comments (0)