DEV Community

Cover image for FastAPI Patterns You Should Be Using in Production
Shayan Holakouee
Shayan Holakouee

Posted on

FastAPI Patterns You Should Be Using in Production

FastAPI is already fast to build with — but are you using it to its full potential?


Introduction

Most FastAPI tutorials stop at CRUD routes, Pydantic models, and uvicorn main:app --reload. That's fine for getting started, but production applications demand more: dependency injection at scale, background tasks, custom middleware, event-driven design, and performance tuning.

This article dives deep into the patterns that separate a toy project from a production-grade FastAPI service.

Prerequisites: Comfortable with FastAPI basics, async Python, and Pydantic v2.


1. Dependency Injection — The Right Way

FastAPI's Depends() system is incredibly powerful, but most people only scratch the surface with database sessions. Let's go deeper.

Layered Dependencies

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
    return credentials.credentials

async def get_current_user(token: str = Depends(get_token)) -> User:
    user = await decode_jwt(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

async def require_admin(user: User = Depends(get_current_user)) -> User:
    if user.role != "admin":
        raise HTTPException(status_code=403, detail="Admins only")
    return user
Enter fullscreen mode Exit fullscreen mode

You now have a clean chain: token → user → admin. Each layer is independently testable and composable.

Class-Based Dependencies

For stateful dependencies (rate limiters, feature flags, DB repos), classes are superior:

class PaginationParams:
    def __init__(self, page: int = 1, size: int = 20, max_size: int = 100):
        if size > max_size:
            raise HTTPException(400, f"Page size cannot exceed {max_size}")
        self.page = page
        self.size = size
        self.offset = (page - 1) * size

@router.get("/items")
async def list_items(pagination: PaginationParams = Depends()):
    return await Item.find_all(offset=pagination.offset, limit=pagination.size)
Enter fullscreen mode Exit fullscreen mode

Dependency Overrides in Tests

This is where FastAPI truly shines for testing:

# In your test file
app.dependency_overrides[get_current_user] = lambda: User(id=1, role="admin")

async def test_admin_endpoint(client: AsyncClient):
    response = await client.get("/admin/dashboard")
    assert response.status_code == 200
Enter fullscreen mode Exit fullscreen mode

No mocking. No patching. Just clean overrides.


2. Lifespan Events — Replacing @app.on_event

The old @app.on_event("startup") is deprecated. Use the lifespan context manager instead — it's cleaner and handles errors properly:

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: initialize resources
    app.state.db_pool = await create_db_pool()
    app.state.redis = await create_redis_client()
    app.state.http_client = httpx.AsyncClient()
    print("✅ Application started")

    yield  # Application runs here

    # Shutdown: release resources
    await app.state.db_pool.close()
    await app.state.redis.close()
    await app.state.http_client.aclose()
    print("🛑 Application shut down cleanly")

app = FastAPI(lifespan=lifespan)
Enter fullscreen mode Exit fullscreen mode

Access these anywhere via request.app.state.redis.


3. Custom Middleware — Beyond CORSMiddleware

Request ID Middleware

Every request in production should have a traceable ID:

import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        request.state.request_id = request_id

        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        return response

app.add_middleware(RequestIDMiddleware)
Enter fullscreen mode Exit fullscreen mode

Timing Middleware

import time

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration_ms = (time.perf_counter() - start) * 1000
        response.headers["X-Process-Time-Ms"] = f"{duration_ms:.2f}"
        return response
Enter fullscreen mode Exit fullscreen mode

⚠️ Gotcha: BaseHTTPMiddleware adds overhead. For ultra-high throughput, use pure ASGI middleware or starlette.routing directly.


4. Background Tasks vs. Celery — When to Use What

FastAPI's Built-in Background Tasks

Perfect for fire-and-forget operations that don't need retries or monitoring:

from fastapi import BackgroundTasks

async def send_welcome_email(email: str, name: str):
    await email_client.send(
        to=email,
        subject=f"Welcome, {name}!",
        body="Thanks for signing up."
    )

@router.post("/users", status_code=201)
async def create_user(data: UserCreate, background_tasks: BackgroundTasks):
    user = await User.create(**data.model_dump())
    background_tasks.add_task(send_welcome_email, user.email, user.name)
    return user  # Returns immediately; email sends in background
Enter fullscreen mode Exit fullscreen mode

When to Use Celery (or ARQ/Dramatiq)

Use a proper task queue when you need:

  • Retry logic on failure
  • Scheduled tasks (cron-like)
  • Progress tracking
  • Rate limiting per worker
  • Distributed processing across multiple machines
# With ARQ (async-native, pairs beautifully with FastAPI)
async def process_video(ctx, video_id: int):
    video = await Video.get(video_id)
    await transcode(video.path)
    await video.update(status="processed")

# Enqueue from your route
@router.post("/videos/{video_id}/process")
async def trigger_processing(video_id: int, redis: Redis = Depends(get_redis)):
    await redis.enqueue_job("process_video", video_id)
    return {"status": "queued"}
Enter fullscreen mode Exit fullscreen mode

5. Advanced Pydantic v2 Patterns

Computed Fields

from pydantic import BaseModel, computed_field

class Order(BaseModel):
    items: list[OrderItem]
    discount_pct: float = 0.0

    @computed_field
    @property
    def subtotal(self) -> float:
        return sum(item.price * item.qty for item in self.items)

    @computed_field
    @property
    def total(self) -> float:
        return self.subtotal * (1 - self.discount_pct / 100)
Enter fullscreen mode Exit fullscreen mode

Model Validators

from pydantic import model_validator

class DateRange(BaseModel):
    start_date: date
    end_date: date

    @model_validator(mode="after")
    def check_dates(self) -> "DateRange":
        if self.end_date <= self.start_date:
            raise ValueError("end_date must be after start_date")
        return self

    @computed_field
    @property
    def duration_days(self) -> int:
        return (self.end_date - self.start_date).days
Enter fullscreen mode Exit fullscreen mode

Custom Serialization

from pydantic import field_serializer
from datetime import datetime

class Event(BaseModel):
    name: str
    created_at: datetime

    @field_serializer("created_at")
    def serialize_dt(self, dt: datetime) -> str:
        return dt.strftime("%Y-%m-%d %H:%M UTC")
Enter fullscreen mode Exit fullscreen mode

6. Structured Error Handling

Don't let exceptions leak into your responses. Build a global error system:

from fastapi import Request
from fastapi.responses import JSONResponse

class AppError(Exception):
    def __init__(self, message: str, code: str, status: int = 400):
        self.message = message
        self.code = code
        self.status = status

class NotFoundError(AppError):
    def __init__(self, resource: str, id: int):
        super().__init__(
            message=f"{resource} with id {id} not found",
            code="RESOURCE_NOT_FOUND",
            status=404
        )

@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
    return JSONResponse(
        status_code=exc.status,
        content={
            "error": {
                "code": exc.code,
                "message": exc.message,
                "request_id": getattr(request.state, "request_id", None)
            }
        }
    )

# Usage in routes
@router.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await User.get(user_id)
    if not user:
        raise NotFoundError("User", user_id)
    return user
Enter fullscreen mode Exit fullscreen mode

7. Streaming Responses

Perfect for LLMs, large file downloads, or server-sent events:

from fastapi.responses import StreamingResponse
import asyncio

async def event_stream(topic: str):
    async for event in subscribe_to_events(topic):
        yield f"data: {event.model_dump_json()}\n\n"

@router.get("/events/{topic}")
async def stream_events(topic: str):
    return StreamingResponse(
        event_stream(topic),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )
Enter fullscreen mode Exit fullscreen mode

For file streaming:

@router.get("/exports/{report_id}")
async def download_report(report_id: int):
    async def generate():
        async for chunk in generate_csv_chunks(report_id):
            yield chunk

    return StreamingResponse(
        generate(),
        media_type="text/csv",
        headers={"Content-Disposition": f"attachment; filename=report_{report_id}.csv"}
    )
Enter fullscreen mode Exit fullscreen mode

8. Router Organization at Scale

As your app grows, structure matters:

app/
├── main.py
├── core/
│   ├── config.py       # Settings via pydantic-settings
│   ├── security.py     # Auth dependencies
│   └── database.py     # DB pool setup
├── api/
│   ├── __init__.py
│   ├── v1/
│   │   ├── router.py   # Aggregates all v1 routes
│   │   ├── users.py
│   │   ├── orders.py
│   │   └── products.py
│   └── v2/
│       └── router.py
├── models/
├── schemas/
└── services/
Enter fullscreen mode Exit fullscreen mode
# api/v1/router.py
from fastapi import APIRouter
from .users import router as users_router
from .orders import router as orders_router

v1_router = APIRouter(prefix="/v1")
v1_router.include_router(users_router, prefix="/users", tags=["Users"])
v1_router.include_router(orders_router, prefix="/orders", tags=["Orders"])

# main.py
app.include_router(v1_router, prefix="/api")
Enter fullscreen mode Exit fullscreen mode

9. Performance: Async Done Right

The #1 Async Mistake

Running blocking I/O in async routes kills your throughput. Use run_in_executor or asyncio.to_thread:

import asyncio
from PIL import Image  # Blocking library

@router.post("/images/resize")
async def resize_image(file: UploadFile):
    contents = await file.read()

    # ✅ Offload blocking CPU work to thread pool
    result = await asyncio.to_thread(process_image_sync, contents)
    return {"size": len(result)}

def process_image_sync(data: bytes) -> bytes:
    img = Image.open(BytesIO(data))
    img = img.resize((800, 600))
    buf = BytesIO()
    img.save(buf, format="JPEG")
    return buf.getvalue()
Enter fullscreen mode Exit fullscreen mode

Response Caching with Redis

import json
from functools import wraps

def cache(key_prefix: str, ttl: int = 60):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, redis=None, **kwargs):
            cache_key = f"{key_prefix}:{':'.join(str(v) for v in kwargs.values())}"
            cached = await redis.get(cache_key)
            if cached:
                return json.loads(cached)
            result = await func(*args, **kwargs)
            await redis.setex(cache_key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator

@router.get("/products/{product_id}")
@cache("product", ttl=300)
async def get_product(product_id: int, redis=Depends(get_redis)):
    return await Product.get(product_id)
Enter fullscreen mode Exit fullscreen mode

10. Testing Async FastAPI Properly

import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
from app.core.database import get_db

@pytest.fixture
async def client():
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test"
    ) as ac:
        yield ac

@pytest.fixture(autouse=True)
async def override_db(test_db_session):
    app.dependency_overrides[get_db] = lambda: test_db_session
    yield
    app.dependency_overrides.clear()

async def test_create_user(client: AsyncClient):
    response = await client.post("/api/v1/users", json={
        "email": "test@example.com",
        "name": "Test User"
    })
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert "id" in data
Enter fullscreen mode Exit fullscreen mode

Wrapping Up

Here's what we covered:

  • Dependency injection at scale with layered and class-based dependencies
  • Lifespan events for proper resource management
  • Custom middleware for observability
  • Background tasks and when to reach for a proper queue
  • Pydantic v2 advanced features: computed fields, validators, serializers
  • Structured error handling with custom exception hierarchies
  • Streaming responses for real-time and large data
  • Router organization for growing codebases
  • Performance patterns: async threading and response caching
  • Async testing with httpx.AsyncClient

FastAPI gives you incredible primitives. The difference between a fragile prototype and a resilient service is how deliberately you compose them.


What advanced FastAPI patterns have you found indispensable? Drop them in the comments!

Top comments (0)