FastAPI is already fast to build with — but are you using it to its full potential?
Introduction
Most FastAPI tutorials stop at CRUD routes, Pydantic models, and uvicorn main:app --reload. That's fine for getting started, but production applications demand more: dependency injection at scale, background tasks, custom middleware, event-driven design, and performance tuning.
This article dives deep into the patterns that separate a toy project from a production-grade FastAPI service.
Prerequisites: Comfortable with FastAPI basics, async Python, and Pydantic v2.
1. Dependency Injection — The Right Way
FastAPI's Depends() system is incredibly powerful, but most people only scratch the surface with database sessions. Let's go deeper.
Layered Dependencies
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
return credentials.credentials
async def get_current_user(token: str = Depends(get_token)) -> User:
user = await decode_jwt(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
async def require_admin(user: User = Depends(get_current_user)) -> User:
if user.role != "admin":
raise HTTPException(status_code=403, detail="Admins only")
return user
You now have a clean chain: token → user → admin. Each layer is independently testable and composable.
Class-Based Dependencies
For stateful dependencies (rate limiters, feature flags, DB repos), classes are superior:
class PaginationParams:
def __init__(self, page: int = 1, size: int = 20, max_size: int = 100):
if size > max_size:
raise HTTPException(400, f"Page size cannot exceed {max_size}")
self.page = page
self.size = size
self.offset = (page - 1) * size
@router.get("/items")
async def list_items(pagination: PaginationParams = Depends()):
return await Item.find_all(offset=pagination.offset, limit=pagination.size)
Dependency Overrides in Tests
This is where FastAPI truly shines for testing:
# In your test file
app.dependency_overrides[get_current_user] = lambda: User(id=1, role="admin")
async def test_admin_endpoint(client: AsyncClient):
response = await client.get("/admin/dashboard")
assert response.status_code == 200
No mocking. No patching. Just clean overrides.
2. Lifespan Events — Replacing @app.on_event
The old @app.on_event("startup") is deprecated. Use the lifespan context manager instead — it's cleaner and handles errors properly:
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: initialize resources
app.state.db_pool = await create_db_pool()
app.state.redis = await create_redis_client()
app.state.http_client = httpx.AsyncClient()
print("✅ Application started")
yield # Application runs here
# Shutdown: release resources
await app.state.db_pool.close()
await app.state.redis.close()
await app.state.http_client.aclose()
print("🛑 Application shut down cleanly")
app = FastAPI(lifespan=lifespan)
Access these anywhere via request.app.state.redis.
3. Custom Middleware — Beyond CORSMiddleware
Request ID Middleware
Every request in production should have a traceable ID:
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
app.add_middleware(RequestIDMiddleware)
Timing Middleware
import time
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
response.headers["X-Process-Time-Ms"] = f"{duration_ms:.2f}"
return response
⚠️ Gotcha:
BaseHTTPMiddlewareadds overhead. For ultra-high throughput, use pure ASGI middleware orstarlette.routingdirectly.
4. Background Tasks vs. Celery — When to Use What
FastAPI's Built-in Background Tasks
Perfect for fire-and-forget operations that don't need retries or monitoring:
from fastapi import BackgroundTasks
async def send_welcome_email(email: str, name: str):
await email_client.send(
to=email,
subject=f"Welcome, {name}!",
body="Thanks for signing up."
)
@router.post("/users", status_code=201)
async def create_user(data: UserCreate, background_tasks: BackgroundTasks):
user = await User.create(**data.model_dump())
background_tasks.add_task(send_welcome_email, user.email, user.name)
return user # Returns immediately; email sends in background
When to Use Celery (or ARQ/Dramatiq)
Use a proper task queue when you need:
- Retry logic on failure
- Scheduled tasks (cron-like)
- Progress tracking
- Rate limiting per worker
- Distributed processing across multiple machines
# With ARQ (async-native, pairs beautifully with FastAPI)
async def process_video(ctx, video_id: int):
video = await Video.get(video_id)
await transcode(video.path)
await video.update(status="processed")
# Enqueue from your route
@router.post("/videos/{video_id}/process")
async def trigger_processing(video_id: int, redis: Redis = Depends(get_redis)):
await redis.enqueue_job("process_video", video_id)
return {"status": "queued"}
5. Advanced Pydantic v2 Patterns
Computed Fields
from pydantic import BaseModel, computed_field
class Order(BaseModel):
items: list[OrderItem]
discount_pct: float = 0.0
@computed_field
@property
def subtotal(self) -> float:
return sum(item.price * item.qty for item in self.items)
@computed_field
@property
def total(self) -> float:
return self.subtotal * (1 - self.discount_pct / 100)
Model Validators
from pydantic import model_validator
class DateRange(BaseModel):
start_date: date
end_date: date
@model_validator(mode="after")
def check_dates(self) -> "DateRange":
if self.end_date <= self.start_date:
raise ValueError("end_date must be after start_date")
return self
@computed_field
@property
def duration_days(self) -> int:
return (self.end_date - self.start_date).days
Custom Serialization
from pydantic import field_serializer
from datetime import datetime
class Event(BaseModel):
name: str
created_at: datetime
@field_serializer("created_at")
def serialize_dt(self, dt: datetime) -> str:
return dt.strftime("%Y-%m-%d %H:%M UTC")
6. Structured Error Handling
Don't let exceptions leak into your responses. Build a global error system:
from fastapi import Request
from fastapi.responses import JSONResponse
class AppError(Exception):
def __init__(self, message: str, code: str, status: int = 400):
self.message = message
self.code = code
self.status = status
class NotFoundError(AppError):
def __init__(self, resource: str, id: int):
super().__init__(
message=f"{resource} with id {id} not found",
code="RESOURCE_NOT_FOUND",
status=404
)
@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
return JSONResponse(
status_code=exc.status,
content={
"error": {
"code": exc.code,
"message": exc.message,
"request_id": getattr(request.state, "request_id", None)
}
}
)
# Usage in routes
@router.get("/users/{user_id}")
async def get_user(user_id: int):
user = await User.get(user_id)
if not user:
raise NotFoundError("User", user_id)
return user
7. Streaming Responses
Perfect for LLMs, large file downloads, or server-sent events:
from fastapi.responses import StreamingResponse
import asyncio
async def event_stream(topic: str):
async for event in subscribe_to_events(topic):
yield f"data: {event.model_dump_json()}\n\n"
@router.get("/events/{topic}")
async def stream_events(topic: str):
return StreamingResponse(
event_stream(topic),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
For file streaming:
@router.get("/exports/{report_id}")
async def download_report(report_id: int):
async def generate():
async for chunk in generate_csv_chunks(report_id):
yield chunk
return StreamingResponse(
generate(),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=report_{report_id}.csv"}
)
8. Router Organization at Scale
As your app grows, structure matters:
app/
├── main.py
├── core/
│ ├── config.py # Settings via pydantic-settings
│ ├── security.py # Auth dependencies
│ └── database.py # DB pool setup
├── api/
│ ├── __init__.py
│ ├── v1/
│ │ ├── router.py # Aggregates all v1 routes
│ │ ├── users.py
│ │ ├── orders.py
│ │ └── products.py
│ └── v2/
│ └── router.py
├── models/
├── schemas/
└── services/
# api/v1/router.py
from fastapi import APIRouter
from .users import router as users_router
from .orders import router as orders_router
v1_router = APIRouter(prefix="/v1")
v1_router.include_router(users_router, prefix="/users", tags=["Users"])
v1_router.include_router(orders_router, prefix="/orders", tags=["Orders"])
# main.py
app.include_router(v1_router, prefix="/api")
9. Performance: Async Done Right
The #1 Async Mistake
Running blocking I/O in async routes kills your throughput. Use run_in_executor or asyncio.to_thread:
import asyncio
from PIL import Image # Blocking library
@router.post("/images/resize")
async def resize_image(file: UploadFile):
contents = await file.read()
# ✅ Offload blocking CPU work to thread pool
result = await asyncio.to_thread(process_image_sync, contents)
return {"size": len(result)}
def process_image_sync(data: bytes) -> bytes:
img = Image.open(BytesIO(data))
img = img.resize((800, 600))
buf = BytesIO()
img.save(buf, format="JPEG")
return buf.getvalue()
Response Caching with Redis
import json
from functools import wraps
def cache(key_prefix: str, ttl: int = 60):
def decorator(func):
@wraps(func)
async def wrapper(*args, redis=None, **kwargs):
cache_key = f"{key_prefix}:{':'.join(str(v) for v in kwargs.values())}"
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
result = await func(*args, **kwargs)
await redis.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@router.get("/products/{product_id}")
@cache("product", ttl=300)
async def get_product(product_id: int, redis=Depends(get_redis)):
return await Product.get(product_id)
10. Testing Async FastAPI Properly
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
from app.core.database import get_db
@pytest.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as ac:
yield ac
@pytest.fixture(autouse=True)
async def override_db(test_db_session):
app.dependency_overrides[get_db] = lambda: test_db_session
yield
app.dependency_overrides.clear()
async def test_create_user(client: AsyncClient):
response = await client.post("/api/v1/users", json={
"email": "test@example.com",
"name": "Test User"
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "id" in data
Wrapping Up
Here's what we covered:
- Dependency injection at scale with layered and class-based dependencies
- Lifespan events for proper resource management
- Custom middleware for observability
- Background tasks and when to reach for a proper queue
- Pydantic v2 advanced features: computed fields, validators, serializers
- Structured error handling with custom exception hierarchies
- Streaming responses for real-time and large data
- Router organization for growing codebases
- Performance patterns: async threading and response caching
-
Async testing with
httpx.AsyncClient
FastAPI gives you incredible primitives. The difference between a fragile prototype and a resilient service is how deliberately you compose them.
What advanced FastAPI patterns have you found indispensable? Drop them in the comments!
Top comments (0)