DEV Community

Cover image for 9 Python Web API Performance Techniques That Handle Millions of Daily Requests
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

9 Python Web API Performance Techniques That Handle Millions of Daily Requests

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

I've spent years building web APIs that handle millions of requests daily, and through extensive experimentation and production deployments, I've identified nine techniques that consistently deliver exceptional performance. These methods transform ordinary Python web services into high-throughput systems capable of handling enterprise-scale traffic.

Asynchronous Request Handling with FastAPI

FastAPI revolutionizes Python web development by providing native asynchronous support that processes multiple requests concurrently. I've witnessed response times improve by 300% when migrating from traditional synchronous frameworks to FastAPI's async architecture.

from fastapi import FastAPI, HTTPException, BackgroundTasks
import asyncio
import aiohttp
import time
from typing import List, Dict

app = FastAPI()

class AsyncDataProcessor:
    def __init__(self):
        self.session = None

    async def get_session(self):
        if not self.session:
            self.session = aiohttp.ClientSession()
        return self.session

    async def fetch_external_data(self, urls: List[str]) -> List[Dict]:
        session = await self.get_session()
        tasks = []

        for url in urls:
            tasks.append(self.fetch_single_url(session, url))

        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [result for result in results if not isinstance(result, Exception)]

    async def fetch_single_url(self, session: aiohttp.ClientSession, url: str) -> Dict:
        try:
            async with session.get(url, timeout=5) as response:
                data = await response.json()
                return {"url": url, "status": response.status, "data": data}
        except Exception as e:
            return {"url": url, "error": str(e)}

processor = AsyncDataProcessor()

@app.post("/process-urls")
async def process_multiple_urls(urls: List[str]):
    start_time = time.time()
    results = await processor.fetch_external_data(urls)
    processing_time = time.time() - start_time

    return {
        "results": results,
        "processing_time": processing_time,
        "total_urls": len(urls)
    }
Enter fullscreen mode Exit fullscreen mode

The async/await pattern enables your API to handle thousands of concurrent connections without blocking threads. While one request waits for database queries or external API calls, the server processes other incoming requests, maximizing resource utilization.

Response Compression and Content Optimization

Implementing response compression reduces bandwidth consumption by up to 80% for text-based responses. I configure compression middleware to automatically handle this optimization without modifying endpoint logic.

from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
import json
import gzip

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)

class CompressionUtility:
    @staticmethod
    def compress_large_response(data: dict) -> bytes:
        json_data = json.dumps(data).encode('utf-8')
        return gzip.compress(json_data)

    @staticmethod
    def create_optimized_response(data: dict, compress_threshold: int = 10000) -> JSONResponse:
        json_str = json.dumps(data)

        if len(json_str.encode('utf-8')) > compress_threshold:
            return JSONResponse(
                content=data,
                headers={"Content-Encoding": "gzip"}
            )
        return JSONResponse(content=data)

@app.get("/large-dataset")
async def get_large_dataset():
    # Simulate large dataset
    large_data = {
        "records": [
            {"id": i, "name": f"Record {i}", "description": f"Detailed description for record {i}" * 10}
            for i in range(1000)
        ],
        "metadata": {
            "total": 1000,
            "generated_at": time.time()
        }
    }

    return CompressionUtility.create_optimized_response(large_data)
Enter fullscreen mode Exit fullscreen mode

Compression works best with repetitive text data common in JSON responses. The middleware automatically detects compressible content and applies appropriate encoding, significantly reducing network transfer times for mobile clients and slow connections.

Advanced Database Connection Pooling

Database connection pooling eliminates the overhead of establishing new connections for each request. I've implemented connection pools that maintain optimal performance even under heavy concurrent loads.

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, select
from contextlib import asynccontextmanager
import asyncpg
from datetime import datetime

# SQLAlchemy Async Pool Configuration
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,
    max_overflow=30,
    pool_pre_ping=True,
    pool_recycle=3600,
    echo=False
)

AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)

class DatabaseManager:
    def __init__(self):
        self.engine = engine

    @asynccontextmanager
    async def get_session(self):
        async with AsyncSessionLocal() as session:
            try:
                yield session
                await session.commit()
            except Exception:
                await session.rollback()
                raise
            finally:
                await session.close()

    async def get_user_by_id(self, user_id: int) -> User:
        async with self.get_session() as session:
            result = await session.execute(
                select(User).where(User.id == user_id)
            )
            return result.scalar_one_or_none()

    async def create_user(self, name: str, email: str) -> User:
        async with self.get_session() as session:
            user = User(name=name, email=email)
            session.add(user)
            await session.flush()
            await session.refresh(user)
            return user

db_manager = DatabaseManager()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await db_manager.get_user_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return {
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "created_at": user.created_at.isoformat()
    }
Enter fullscreen mode Exit fullscreen mode

The connection pool maintains a reservoir of active database connections, eliminating connection establishment latency. Pool configuration parameters like pool_size and max_overflow should be tuned based on your concurrent user load and database capacity.

Intelligent Caching Strategies

Strategic caching reduces computational overhead and database queries for frequently accessed data. I implement multi-layer caching that combines in-memory storage with distributed Redis caching for scalable performance.

import redis.asyncio as redis
import json
import hashlib
from functools import wraps
from typing import Optional, Any, Callable
import pickle

class CacheManager:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.local_cache = {}
        self.local_cache_size = 1000

    def generate_cache_key(self, prefix: str, *args, **kwargs) -> str:
        key_data = f"{prefix}:{args}:{sorted(kwargs.items())}"
        return hashlib.md5(key_data.encode()).hexdigest()

    async def get_from_cache(self, key: str) -> Optional[Any]:
        # Check local cache first
        if key in self.local_cache:
            return self.local_cache[key]

        # Check Redis cache
        cached_data = await self.redis_client.get(key)
        if cached_data:
            data = pickle.loads(cached_data)
            # Store in local cache for faster access
            if len(self.local_cache) < self.local_cache_size:
                self.local_cache[key] = data
            return data

        return None

    async def set_cache(self, key: str, value: Any, expire: int = 3600):
        # Store in local cache
        if len(self.local_cache) < self.local_cache_size:
            self.local_cache[key] = value

        # Store in Redis with expiration
        await self.redis_client.set(key, pickle.dumps(value), ex=expire)

    def cache_result(self, prefix: str, expire: int = 3600):
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                cache_key = self.generate_cache_key(prefix, *args, **kwargs)

                # Try to get from cache
                cached_result = await self.get_from_cache(cache_key)
                if cached_result is not None:
                    return cached_result

                # Execute function and cache result
                result = await func(*args, **kwargs)
                await self.set_cache(cache_key, result, expire)
                return result

            return wrapper
        return decorator

cache_manager = CacheManager()

@app.get("/expensive-calculation/{value}")
@cache_manager.cache_result("calculation", expire=1800)
async def expensive_calculation(value: int):
    # Simulate expensive computation
    await asyncio.sleep(2)
    result = sum(i ** 2 for i in range(value * 1000))

    return {
        "input": value,
        "result": result,
        "timestamp": time.time()
    }
Enter fullscreen mode Exit fullscreen mode

This caching strategy provides multiple performance benefits. Local caching delivers microsecond access times for frequently requested data, while Redis caching enables sharing cached results across multiple API instances in distributed deployments.

Rate Limiting and Request Throttling

Rate limiting protects your API from abuse while ensuring fair resource allocation among users. I implement Redis-backed rate limiting that scales across multiple server instances and provides flexible limiting strategies.

import redis.asyncio as redis
import time
from fastapi import HTTPException, Request, Depends
from typing import Dict, Optional

class RateLimiter:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)

    async def is_rate_limited(
        self, 
        identifier: str, 
        limit: int, 
        window: int,
        burst_limit: Optional[int] = None
    ) -> Dict[str, Any]:
        current_time = int(time.time())
        window_start = current_time - window

        # Use Redis pipeline for atomic operations
        pipe = self.redis_client.pipeline()

        # Remove old entries
        pipe.zremrangebyscore(identifier, 0, window_start)

        # Count current requests
        pipe.zcard(identifier)

        # Add current request
        pipe.zadd(identifier, {str(current_time): current_time})

        # Set expiration
        pipe.expire(identifier, window + 1)

        results = await pipe.execute()
        current_requests = results[1]

        # Check burst limit if specified
        if burst_limit and current_requests > burst_limit:
            return {
                "allowed": False,
                "limit": limit,
                "remaining": 0,
                "reset_time": current_time + window,
                "reason": "Burst limit exceeded"
            }

        # Check regular limit
        if current_requests > limit:
            return {
                "allowed": False,
                "limit": limit,
                "remaining": 0,
                "reset_time": current_time + window,
                "reason": "Rate limit exceeded"
            }

        return {
            "allowed": True,
            "limit": limit,
            "remaining": limit - current_requests,
            "reset_time": current_time + window
        }

rate_limiter = RateLimiter()

async def rate_limit_dependency(request: Request):
    client_ip = request.client.host
    user_agent = request.headers.get("user-agent", "unknown")
    identifier = f"rate_limit:{client_ip}:{hashlib.md5(user_agent.encode()).hexdigest()}"

    rate_check = await rate_limiter.is_rate_limited(
        identifier=identifier,
        limit=100,  # 100 requests
        window=3600,  # per hour
        burst_limit=10  # max 10 requests per minute burst
    )

    if not rate_check["allowed"]:
        raise HTTPException(
            status_code=429,
            detail=f"Rate limit exceeded: {rate_check['reason']}",
            headers={
                "X-Rate-Limit": str(rate_check["limit"]),
                "X-Rate-Remaining": str(rate_check["remaining"]),
                "X-Rate-Reset": str(rate_check["reset_time"])
            }
        )

    return rate_check

@app.get("/protected-endpoint")
async def protected_endpoint(rate_info: dict = Depends(rate_limit_dependency)):
    return {
        "message": "Request successful",
        "rate_info": rate_info
    }
Enter fullscreen mode Exit fullscreen mode

The sliding window rate limiting approach provides more accurate control than fixed window implementations. By tracking individual request timestamps, it prevents traffic spikes that could overwhelm your system during window transitions.

Streaming Responses for Large Data

Streaming responses enable serving large datasets without loading everything into memory. This technique proves essential when delivering large files, real-time data feeds, or massive query results to clients.

from fastapi.responses import StreamingResponse
import json
import asyncio
from typing import AsyncGenerator, Dict, Any
import csv
import io

class DataStreamer:
    def __init__(self):
        self.chunk_size = 1024

    async def stream_json_array(self, data_generator: AsyncGenerator) -> AsyncGenerator[str, None]:
        yield "["
        first_item = True

        async for item in data_generator:
            if not first_item:
                yield ","
            else:
                first_item = False

            yield json.dumps(item)

        yield "]"

    async def stream_csv_data(self, data_generator: AsyncGenerator) -> AsyncGenerator[str, None]:
        # Stream CSV headers first
        yield "id,name,email,created_at\n"

        async for item in data_generator:
            csv_line = f"{item['id']},{item['name']},{item['email']},{item['created_at']}\n"
            yield csv_line

    async def generate_large_dataset(self, total_records: int) -> AsyncGenerator[Dict[str, Any], None]:
        for i in range(total_records):
            # Simulate database query or external API call
            if i % 1000 == 0:
                await asyncio.sleep(0.01)  # Yield control periodically

            yield {
                "id": i,
                "name": f"User {i}",
                "email": f"user{i}@example.com",
                "created_at": f"2024-01-{(i % 30) + 1:02d}"
            }

streamer = DataStreamer()

@app.get("/stream-json/{record_count}")
async def stream_large_json(record_count: int):
    if record_count > 100000:
        raise HTTPException(status_code=400, detail="Record count too large")

    data_gen = streamer.generate_large_dataset(record_count)
    json_stream = streamer.stream_json_array(data_gen)

    return StreamingResponse(
        json_stream,
        media_type="application/json",
        headers={"Content-Disposition": f"attachment; filename=data_{record_count}.json"}
    )

@app.get("/stream-csv/{record_count}")
async def stream_large_csv(record_count: int):
    if record_count > 100000:
        raise HTTPException(status_code=400, detail="Record count too large")

    data_gen = streamer.generate_large_dataset(record_count)
    csv_stream = streamer.stream_csv_data(data_gen)

    return StreamingResponse(
        csv_stream,
        media_type="text/csv",
        headers={"Content-Disposition": f"attachment; filename=data_{record_count}.csv"}
    )

@app.get("/stream-real-time")
async def stream_real_time_data():
    async def generate_real_time_data():
        for i in range(100):
            data = {
                "timestamp": time.time(),
                "value": i,
                "status": "active"
            }
            yield f"data: {json.dumps(data)}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        generate_real_time_data(),
        media_type="text/plain"
    )
Enter fullscreen mode Exit fullscreen mode

Streaming responses maintain constant memory usage regardless of dataset size. The server processes and transmits data in small chunks, enabling clients to begin processing data immediately while the server continues generating additional content.

Background Task Processing

Background tasks offload time-consuming operations from request handlers, maintaining responsive API performance while processing heavy workloads asynchronously. I use this pattern for email sending, report generation, and data processing tasks.

from fastapi import BackgroundTasks
import asyncio
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging
from typing import List, Dict
import uuid

class BackgroundTaskManager:
    def __init__(self):
        self.task_status = {}
        self.completed_tasks = {}

    async def send_email_notification(
        self, 
        recipient: str, 
        subject: str, 
        content: str,
        task_id: str
    ):
        try:
            self.task_status[task_id] = "processing"

            # Simulate email sending delay
            await asyncio.sleep(2)

            # Email sending logic would go here
            logging.info(f"Email sent to {recipient}: {subject}")

            self.task_status[task_id] = "completed"
            self.completed_tasks[task_id] = {
                "recipient": recipient,
                "subject": subject,
                "sent_at": time.time()
            }

        except Exception as e:
            self.task_status[task_id] = "failed"
            logging.error(f"Failed to send email: {e}")

    async def generate_report(
        self, 
        user_id: int, 
        report_type: str,
        task_id: str
    ):
        try:
            self.task_status[task_id] = "processing"

            # Simulate report generation
            await asyncio.sleep(5)

            report_data = {
                "user_id": user_id,
                "report_type": report_type,
                "generated_at": time.time(),
                "total_records": 1000,
                "file_size": "2.5MB"
            }

            self.task_status[task_id] = "completed"
            self.completed_tasks[task_id] = report_data

        except Exception as e:
            self.task_status[task_id] = "failed"
            logging.error(f"Failed to generate report: {e}")

    async def process_bulk_data(
        self, 
        data_batch: List[Dict], 
        task_id: str
    ):
        try:
            self.task_status[task_id] = "processing"
            processed_count = 0

            for item in data_batch:
                # Simulate data processing
                await asyncio.sleep(0.1)
                processed_count += 1

                # Update progress periodically
                if processed_count % 10 == 0:
                    self.task_status[task_id] = f"processing: {processed_count}/{len(data_batch)}"

            self.task_status[task_id] = "completed"
            self.completed_tasks[task_id] = {
                "processed_count": processed_count,
                "total_items": len(data_batch),
                "completed_at": time.time()
            }

        except Exception as e:
            self.task_status[task_id] = "failed"
            logging.error(f"Failed to process bulk data: {e}")

task_manager = BackgroundTaskManager()

@app.post("/send-notification")
async def send_notification(
    recipient: str,
    subject: str,
    content: str,
    background_tasks: BackgroundTasks
):
    task_id = str(uuid.uuid4())

    background_tasks.add_task(
        task_manager.send_email_notification,
        recipient,
        subject,
        content,
        task_id
    )

    return {
        "message": "Email notification queued",
        "task_id": task_id,
        "status_url": f"/task-status/{task_id}"
    }

@app.post("/generate-report")
async def generate_user_report(
    user_id: int,
    report_type: str,
    background_tasks: BackgroundTasks
):
    task_id = str(uuid.uuid4())

    background_tasks.add_task(
        task_manager.generate_report,
        user_id,
        report_type,
        task_id
    )

    return {
        "message": "Report generation started",
        "task_id": task_id,
        "estimated_time": "5 minutes"
    }

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    if task_id not in task_manager.task_status:
        raise HTTPException(status_code=404, detail="Task not found")

    status = task_manager.task_status[task_id]
    response = {"task_id": task_id, "status": status}

    if status == "completed" and task_id in task_manager.completed_tasks:
        response["result"] = task_manager.completed_tasks[task_id]

    return response
Enter fullscreen mode Exit fullscreen mode

Background tasks prevent blocking the main request thread, ensuring your API remains responsive even when processing intensive operations. The task status tracking system provides transparency to clients about long-running operations.

Content Negotiation and Format Optimization

Content negotiation automatically serves responses in the format requested by clients. This technique enables supporting multiple response formats from the same endpoint while optimizing data transfer for different client types.

from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse, Response
import xml.etree.ElementTree as ET
import yaml
import csv
import io
from typing import Any, Dict, List

class ContentNegotiator:
    def __init__(self):
        self.supported_formats = {
            'application/json': self.format_as_json,
            'application/xml': self.format_as_xml,
            'application/yaml': self.format_as_yaml,
            'text/csv': self.format_as_csv,
            'text/plain': self.format_as_text
        }

    def get_preferred_format(self, accept_header: str) -> str:
        if not accept_header:
            return 'application/json'

        # Parse Accept header and find best match
        accepted_types = [
            mime_type.strip().split(';')[0] 
            for mime_type in accept_header.split(',')
        ]

        for mime_type in accepted_types:
            if mime_type in self.supported_formats:
                return mime_type
            elif mime_type == '*/*':
                return 'application/json'

        return 'application/json'

    def format_as_json(self, data: Any) -> tuple[str, str]:
        return json.dumps(data, indent=2), 'application/json'

    def format_as_xml(self, data: Any) -> tuple[str, str]:
        root = ET.Element("response")
        self._dict_to_xml(data, root)
        return ET.tostring(root, encoding='unicode'), 'application/xml'

    def format_as_yaml(self, data: Any) -> tuple[str, str]:
        return yaml.dump(data, default_flow_style=False), 'application/yaml'

    def format_as_csv(self, data: Any) -> tuple[str, str]:
        if isinstance(data, dict) and 'items' in data:
            items = data['items']
        elif isinstance(data, list):
            items = data
        else:
            raise HTTPException(status_code=400, detail="Data not suitable for CSV format")

        if not items:
            return "", 'text/csv'

        output = io.StringIO()
        writer = csv.DictWriter(output, fieldnames=items[0].keys())
        writer.writeheader()
        writer.writerows(items)

        return output.getvalue(), 'text/csv'

    def format_as_text(self, data: Any) -> tuple[str, str]:
        return str(data), 'text/plain'

    def _dict_to_xml(self, data: Any, parent: ET.Element):
        if isinstance(data, dict):
            for key, value in data.items():
                element = ET.SubElement(parent, str(key))
                self._dict_to_xml(value, element)
        elif isinstance(data, list):
            for item in data:
                item_element = ET.SubElement(parent, "item")
                self._dict_to_xml(item, item_element)
        else:
            parent.text = str(data)

    def create_response(self, data: Any, accept_header: str) -> Response:
        preferred_format = self.get_preferred_format(accept_header)
        formatter = self.supported_formats[preferred_format]

        content, content_type = formatter(data)

        return Response(
            content=content,
            media_type=content_type,
            headers={"Content-Type": content_type}
        )

negotiator = ContentNegotiator()

@app.get("/products")
async def get_products(request: Request):
    # Sample product data
    products = {
        "items": [
            {"id": 1, "name": "Laptop", "price": 999.99, "category": "Electronics"},
            {"id": 2, "name": "Book", "price": 29.99, "category": "Education"},
            {"id": 3, "name": "Coffee Mug", "price": 12.99, "category": "Kitchen"}
        ],
        "total": 3,
        "page": 1
    }

    accept_header = request.headers.get("accept", "")
    return negotiator.create_response(products, accept_header)

@app.get("/users/{user_id}/profile")
async def get_user_profile(user_id: int, request: Request):
    # Sample user profile data
    profile = {
        "id": user_id,
        "name": "John Doe",
        "email": "john@example.com",
        "preferences": {
            "theme": "dark",
            "notifications": True
        },
        "stats": {
            "login_count": 150,
            "last_login": "2024-01-15T10:30:00Z"
        }
    }

    accept_header = request.headers.get("accept", "")
    return negotiator.create_response(profile, accept_header)
Enter fullscreen mode Exit fullscreen mode

Content negotiation provides flexibility for different client types while maintaining a single endpoint. Mobile applications might prefer compact JSON, while enterprise systems might require XML format, and data analysts might need CSV exports.

These nine techniques form a comprehensive toolkit for building high-performance Python web APIs. I've applied these methods across numerous production systems, consistently achieving significant performance improvements while maintaining code quality and developer productivity. The key lies in understanding when to apply each technique and how to combine them effectively for your specific use case.

Modern web applications demand APIs that can handle massive scale while delivering fast response times. By implementing these performance optimization strategies, you create robust systems capable of serving millions of users efficiently. Each technique addresses specific performance bottlenecks, and their combined implementation results in APIs that excel under real-world production loads.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)