The JobSense project needed a FastAPI backend that served 604 job embeddings via semantic search, a Pydantic validation layer that stopped bad data before it reached pgvector, and a test suite that could be run without a live Ollama instance. Getting all three right took more time than the pipeline itself.
This article is the guide I wish I had then. It covers FastAPI setup for data engineering use cases, the Pydantic patterns that actually prevent bad data at the boundary, consuming external APIs without silent failures, testing patterns that catch real bugs, debugging the most common FastAPI errors, and the production patterns that most tutorials skip.
What FastAPI Is and Is Not in a Data Stack
Before building anything: FastAPI is a system boundary tool. It is not a scheduler, not a data processor, and not a database.
| Use FastAPI for | Use something else for |
|---|---|
| Ingestion endpoint (receive events, files, JSON) | Orchestration: use Airflow, Dagster, Prefect |
| Serving processed data to dashboards | Heavy transformation: use pandas, DuckDB, Spark |
| Triggering pipeline runs via HTTP | Real-time streaming: use Kafka, Flink |
| Health and metadata endpoints | Batch processing: use a DAG task, not an endpoint |
| Feature serving (ML embeddings, predictions) | Message queuing: use SQS, RabbitMQ |
The most common mistake I see in portfolio projects is using FastAPI where a dbt model and a BI tool would do the job in a third of the code. FastAPI belongs at the edges of your system where external clients need to push data in or pull data out.
App Setup
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="JobSense API",
description="Kenyan jobs semantic search",
version="1.0.0",
debug=True, # detailed error messages in dev — disable in prod
)
# CORS — required when a Streamlit or React frontend calls FastAPI
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:8501", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Lifecycle events — connect DB and warm caches at startup
@app.on_event("startup")
async def on_startup():
await database.connect()
@app.on_event("shutdown")
async def on_shutdown():
await database.disconnect()
API versioning from day one
If your API will have external consumers, prefix routes with a version. It costs nothing now and avoids breaking changes later.
from fastapi import APIRouter
v1 = APIRouter(prefix="/api/v1", tags=["v1"])
v2 = APIRouter(prefix="/api/v2", tags=["v2"]) # Future — add when needed
@v1.get("/jobs")
def list_jobs_v1():
return []
app.include_router(v1)
The alternative — changing /api/jobs to a different response shape after clients depend on it — is a breaking change that requires coordination. Versioning upfront avoids this.
Development server
uvicorn main:app --reload # dev: auto-reload on save
uvicorn main:app --host 0.0.0.0 --port 8000 # expose to network
uvicorn main:app --workers 4 # production: multiple workers
Interactive docs auto-generated at http://localhost:8000/docs (Swagger) and /redoc (ReDoc). Disable them in production:
app = FastAPI(docs_url=None, redoc_url=None)
Pydantic: The Data Contract at the Boundary
Pydantic models are the most important part of a FastAPI data engineering setup. They are the point where your pipeline says "this is the shape data must have to enter my system." Everything downstream assumes this contract was enforced.
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional
from datetime import datetime
from enum import Enum
class JobSource(str, Enum):
brightermonday = "brightermonday"
linkedin = "linkedin"
jobwebkenya = "jobwebkenya"
class JobSchema(BaseModel):
title: str = Field(..., min_length=2, max_length=200)
company: str = Field(..., min_length=1)
salary_min: Optional[float] = Field(None, ge=0)
salary_max: Optional[float] = Field(None, ge=0)
source: JobSource
posted_at: Optional[datetime] = None
@field_validator("title")
@classmethod
def strip_title(cls, v: str) -> str:
return v.strip()
@model_validator(mode="after")
def salary_order(self):
if self.salary_min and self.salary_max:
if self.salary_min > self.salary_max:
raise ValueError("salary_min must be <= salary_max")
return self
When validation fails, FastAPI returns a 422 with the exact field and reason. That is more useful than the silent data corruption you get when you skip validation.
Idempotency keys for ingestion endpoints
Production ingestion APIs add an idempotency key requirement. If the client retries a failed POST, you need to recognize the duplicate and return the same result rather than inserting twice.
import hashlib
from fastapi import Header, HTTPException
from typing import Optional
@app.post("/api/v1/events", status_code=201)
def ingest_event(
event: EventSchema,
x_idempotency_key: Optional[str] = Header(None),
):
if x_idempotency_key:
# Check if we already processed this key
existing = event_repo.find_by_idempotency_key(x_idempotency_key)
if existing:
return existing # Return previous result, no re-insert
result = event_repo.create(event, idempotency_key=x_idempotency_key)
return result
Without this pattern, a client that retries after a network timeout (which received no response but the insert succeeded) creates a duplicate. This is how pipelines end up with double-counted revenue.
Common HTTP status codes
200 OK — successful GET/PUT
201 Created — successful POST
204 No Content — successful DELETE
400 Bad Request — client sent invalid data (use this for your own validation logic)
401 Unauthorized — missing or invalid credentials
403 Forbidden — authenticated but not permitted to access this resource
404 Not Found — resource does not exist
422 Unprocessable — Pydantic validation failed (FastAPI default for bad body)
429 Too Many — rate limited (from you or from upstream)
500 Server Error — unhandled exception in your code
503 Unavailable — your service is up but a dependency (DB) is down
Dependency Injection
Dependency injection lets you share resources (database sessions, auth checks, config) across route handlers without passing them around manually.
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_job_or_404(job_id: int, db: Session = Depends(get_db)) -> JobModel:
obj = db.get(JobModel, job_id)
if not obj:
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
return obj
@app.get("/api/v1/jobs/{job_id}", response_model=JobResponse)
def get_job(job: JobModel = Depends(get_job_or_404)):
return job
API key authentication
from fastapi.security import APIKeyHeader
import os
api_key_header = APIKeyHeader(name="X-API-Key")
def verify_api_key(key: str = Depends(api_key_header)):
if key != os.getenv("API_KEY"):
raise HTTPException(status_code=401, detail="Invalid API key")
return key
@app.get("/admin/stats", dependencies=[Depends(verify_api_key)])
def admin_stats():
return {"total_jobs": 604}
JWT authentication for multi-user APIs
API keys work for service-to-service auth. For user-facing APIs with multiple roles, use JWT.
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
SECRET_KEY = os.getenv("JWT_SECRET_KEY")
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
payload = data.copy()
payload["exp"] = datetime.utcnow() + expires_delta
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
return user_id
except JWTError:
raise HTTPException(status_code=401, detail="Invalid or expired token")
@app.get("/api/v1/profile")
def get_profile(user_id: str = Depends(get_current_user)):
return {"user_id": user_id}
Install python-jose[cryptography] for the JWT library.
Consuming External APIs Without Silent Failures
Every external API call is a failure point. The pattern that works in all my projects: a session with a retry adapter, explicit timeout, structured error handling, and logging that tells you exactly what failed and why.
requests: the complete setup
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
import os
log = logging.getLogger(__name__)
def build_session() -> requests.Session:
session = requests.Session()
session.headers.update({
"User-Agent": "DataPipeline/1.0",
"Accept": "application/json",
})
retry = Retry(
total=3,
backoff_factor=2, # wait 2s, 4s, 8s between retries
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
)
session.mount("https://", HTTPAdapter(max_retries=retry))
session.mount("http://", HTTPAdapter(max_retries=retry))
return session
SESSION = build_session()
Error handling that tells you what actually happened
from requests.exceptions import HTTPError, ConnectionError, Timeout, JSONDecodeError
def fetch_eia_prices(fuel_type: str) -> list[dict]:
url = "https://api.eia.gov/v2/petroleum/pri/gnd/data/"
params = {
"api_key": os.getenv("EIA_API_KEY"),
"frequency": "monthly",
"data[0]": "value",
"facets[product][]": fuel_type,
}
try:
r = SESSION.get(url, params=params, timeout=15)
r.raise_for_status()
return r.json()["response"]["data"]
except HTTPError as e:
log.error(f"EIA API HTTP {e.response.status_code}: {e.response.text[:200]}")
raise
except ConnectionError:
log.error("EIA API unreachable — check network or service status")
raise
except Timeout:
log.error("EIA API timeout after 15s")
raise
except (JSONDecodeError, KeyError) as e:
log.error(f"EIA API response parse error: {e}")
raise
Never call r.json() without catching JSONDecodeError. When an API returns a 200 with an HTML error page (maintenance mode, Cloudflare challenge), .json() raises an exception with a confusing message. Catch it explicitly.
Pagination patterns
Offset/limit (most REST APIs):
def fetch_all_pages(base_url: str, params: dict, page_size: int = 100) -> list[dict]:
all_results = []
offset = 0
while True:
params.update({"limit": page_size, "offset": offset})
r = SESSION.get(base_url, params=params, timeout=15)
r.raise_for_status()
data = r.json()
# APIs use different response shapes — handle both
items = data.get("results") or data.get("data") or (data if isinstance(data, list) else [])
if not items:
break
all_results.extend(items)
offset += len(items)
if len(items) < page_size:
break # reached last page
time.sleep(0.5) # polite delay
log.info(f"Fetched {len(all_results)} total records from {base_url}")
return all_results
Cursor-based pagination:
def fetch_cursor_pages(base_url: str) -> list[dict]:
all_results = []
cursor = None
while True:
params = {"cursor": cursor} if cursor else {}
data = SESSION.get(base_url, params=params, timeout=15).json()
all_results.extend(data["items"])
cursor = data.get("next_cursor")
if not cursor:
break
return all_results
Handling 429: rate limit responses
import time
def request_with_backoff(url: str, max_retries: int = 5) -> requests.Response:
delay = 1
for attempt in range(max_retries):
r = SESSION.get(url, timeout=15)
if r.status_code == 429:
# Respect the Retry-After header if the API sends one
wait = int(r.headers.get("Retry-After", delay))
log.warning(f"Rate limited (attempt {attempt + 1}/{max_retries}). Waiting {wait}s")
time.sleep(wait)
delay = min(delay * 2, 60)
continue
r.raise_for_status()
return r
raise RuntimeError(f"Exceeded {max_retries} retries for {url}")
httpx for async pipelines
Use httpx.AsyncClient inside FastAPI async routes or asyncio-based pipelines. For the Ollama embedding calls in JobSense:
import httpx
import asyncio
async def fetch_embedding(text: str) -> list[float]:
async with httpx.AsyncClient(timeout=30) as client:
r = await client.post(
"http://localhost:11434/api/embeddings",
json={"model": "nomic-embed-text", "prompt": text},
)
r.raise_for_status()
return r.json()["embedding"]
# Fetch many embeddings concurrently
async def fetch_all_embeddings(texts: list[str]) -> list[list[float]]:
async with httpx.AsyncClient(timeout=30) as client:
tasks = [
client.post(
"http://localhost:11434/api/embeddings",
json={"model": "nomic-embed-text", "prompt": t},
)
for t in texts
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for r in responses:
if isinstance(r, Exception):
log.error(f"Embedding fetch failed: {r}")
results.append([])
else:
results.append(r.json()["embedding"])
return results
Quick Manual Testing with curl
Before writing a test, reach for curl to verify the endpoint works at all.
# Basic GET
curl http://localhost:8000/api/v1/jobs
curl -s http://localhost:8000/api/v1/jobs | python3 -m json.tool # pretty print
# GET with query params and auth
curl -H "X-API-Key: abc123" \
"http://localhost:8000/api/v1/jobs?keyword=data+engineer&limit=10"
# POST with JSON body
curl -X POST http://localhost:8000/api/v1/jobs \
-H "Content-Type: application/json" \
-d '{"title": "Data Engineer", "company": "Safaricom", "source": "linkedin"}'
# POST from a file
curl -X POST http://localhost:8000/api/v1/jobs \
-H "Content-Type: application/json" \
-d @payload.json
# Verbose: show request and response headers
curl -v http://localhost:8000/api/v1/jobs
# Status code only
curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/api/v1/jobs
# Test all services at once
for port in 8000 8080 8501; do
echo -n ":$port → "
curl -s --max-time 3 http://localhost:$port/health || echo "DOWN"
done
Testing FastAPI Endpoints
The conftest.py pattern
# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import get_db, Base
TEST_DB_URL = "postgresql+psycopg2://user:pass@localhost:5432/test_jobsense"
@pytest.fixture(scope="session")
def test_engine():
engine = create_engine(TEST_DB_URL)
Base.metadata.create_all(bind=engine)
yield engine
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def db_session(test_engine):
Session = sessionmaker(bind=test_engine)
session = Session()
yield session
session.rollback() # undo every test's changes
session.close()
@pytest.fixture
def client(db_session):
def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as c:
yield c
app.dependency_overrides.clear()
The session.rollback() in the fixture is critical. Without it, data written by one test leaks into the next, causing flaky tests that pass in isolation but fail in sequence.
Tests that actually catch bugs
# tests/test_jobs.py
class TestJobsEndpoint:
def test_list_jobs_200(self, client):
r = client.get("/api/v1/jobs")
assert r.status_code == 200
assert isinstance(r.json(), list)
def test_create_job_201(self, client):
payload = {"title": "Data Engineer", "company": "Safaricom", "source": "linkedin"}
r = client.post("/api/v1/jobs", json=payload)
assert r.status_code == 201
assert r.json()["title"] == "Data Engineer"
def test_missing_required_field_422(self, client):
# Test that Pydantic validation catches missing company
r = client.post("/api/v1/jobs", json={"title": "No company"})
assert r.status_code == 422
errors = r.json()["detail"]
assert any("company" in str(e) for e in errors)
def test_invalid_enum_422(self, client):
payload = {"title": "DE", "company": "X", "source": "FAKE_SOURCE"}
r = client.post("/api/v1/jobs", json=payload)
assert r.status_code == 422
def test_salary_validation(self, client):
# salary_min > salary_max should fail
payload = {
"title": "DE", "company": "X", "source": "linkedin",
"salary_min": 200_000, "salary_max": 100_000
}
r = client.post("/api/v1/jobs", json=payload)
assert r.status_code == 422
def test_job_not_found_404(self, client):
r = client.get("/api/v1/jobs/99999")
assert r.status_code == 404
def test_delete_204(self, client):
r = client.post("/api/v1/jobs", json={"title": "Temp", "company": "X", "source": "linkedin"})
job_id = r.json()["id"]
assert client.delete(f"/api/v1/jobs/{job_id}").status_code == 204
assert client.get(f"/api/v1/jobs/{job_id}").status_code == 404
def test_auth_required_401(self, client):
r = client.get("/admin/stats")
assert r.status_code in (401, 403)
def test_auth_with_key(self, client, monkeypatch):
monkeypatch.setenv("API_KEY", "test-key-123")
r = client.get("/admin/stats", headers={"X-API-Key": "test-key-123"})
assert r.status_code == 200
Mocking external API calls
Never call a live external API in tests. They are slow, unreliable, may have rate limits, and make your CI dependent on a third-party service being up.
# Using pytest-mock
def test_eia_endpoint(client, mocker):
mock_response = MagicMock()
mock_response.json.return_value = {
"response": {"data": [{"period": "2024-01", "value": "3.45"}]}
}
mock_response.status_code = 200
mock_response.raise_for_status = lambda: None
mocker.patch("app.services.eia.SESSION.get", return_value=mock_response)
r = client.get("/api/v1/energy/prices?fuel=gasoline")
assert r.status_code == 200
# Using the 'responses' library (cleaner for URL-level mocking)
import responses as mock_http
@mock_http.activate
def test_cbk_forex_fetch():
mock_http.add(
mock_http.GET,
"https://www.centralbank.go.ke/api/forex",
json={"rates": [{"pair": "USD/KES", "rate": 129.5}]},
status=200,
)
from app.services.forex import fetch_rates
data = fetch_rates()
assert data[0]["rate"] == 129.5
Async endpoint tests
import pytest
import httpx
from app.main import app
@pytest.mark.asyncio
async def test_semantic_search():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
r = await client.post(
"/api/v1/search",
json={"text": "python data engineer nairobi", "top_k": 5}
)
assert r.status_code == 200
assert len(r.json()) <= 5
Add to pytest.ini or pyproject.toml:
[pytest]
asyncio_mode = auto
Useful pytest flags
pytest -v # verbose output
pytest -x # stop on first failure
pytest -s # show print/logging output
pytest -k "keyword" # run matching tests only
pytest -k "not slow" # skip slow tests
pytest --cov=app --cov-report=term-missing # coverage
pytest -m integration # run marked tests
Debugging Common Errors
422 Unprocessable Entity
This is FastAPI's most common error. Pydantic validation failed. The response body tells you exactly what and where:
curl -X POST http://localhost:8000/api/v1/jobs \
-H "Content-Type: application/json" \
-d '{"title": "DE"}' | python3 -m json.tool
{
"detail": [
{
"loc": ["body", "company"],
"msg": "Field required",
"type": "missing"
}
]
}
Common causes:
- Required field missing in the request body
- Wrong type (sending a string where a number is expected)
- Enum value not in the allowed list
-
min_lengthormax_lengthconstraint violated
500 Internal Server Error
Check the uvicorn terminal. The full Python traceback is printed there. For dev, add a global exception handler that returns the trace in the response:
from fastapi import Request
from fastapi.responses import JSONResponse
import traceback
@app.exception_handler(Exception)
async def generic_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={"detail": str(exc), "trace": traceback.format_exc()},
)
Disable this in production. Exposing tracebacks to external clients leaks implementation details.
Custom exception handlers (better than generic 500)
class DataQualityError(Exception):
def __init__(self, message: str, field: str = None):
self.message = message
self.field = field
@app.exception_handler(DataQualityError)
async def data_quality_handler(request: Request, exc: DataQualityError):
return JSONResponse(
status_code=400,
content={"error": "data_quality_error", "message": exc.message, "field": exc.field},
)
# In your route:
@app.post("/api/v1/events")
def ingest_event(event: EventSchema):
if event.timestamp > datetime.utcnow():
raise DataQualityError("Event timestamp is in the future", field="timestamp")
This pattern gives clients a structured error they can handle programmatically rather than a generic 500.
CORS errors in the browser
CORS errors appear in the browser console, not in the FastAPI terminal. The fix is nearly always the same:
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:8501"], # exact origin, no trailing slash
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Common CORS mistakes:
- Including a trailing slash:
"http://localhost:8501/"does not match"http://localhost:8501" - Using
allow_origins=["*"]withallow_credentials=True(blocked by browsers for credentialed requests) - Forgetting to add the middleware before route definitions
Debugging what is running on a port
# Linux / WSL
lsof -i :8000
fuser -k 8000/tcp # kill what is on port 8000
# PowerShell
netstat -ano | findstr ":8000"
$pid = (Get-NetTCPConnection -LocalPort 8000).OwningProcess
Stop-Process -Id $pid -Force
Production Patterns
Health endpoint (add to every API)
from sqlalchemy import text
@app.get("/health")
def health(db: Session = Depends(get_db)):
try:
db.execute(text("SELECT 1"))
return {"status": "ok", "database": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=str(e))
Airflow's HttpSensor can poll this endpoint before triggering downstream tasks. Docker Compose health checks use it. It is one line of code that saves real debugging time.
Rate limiting your own API
Protecting your API from abuse or accidental hammering requires a rate limiter. slowapi is the standard library for FastAPI:
pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/api/v1/search")
@limiter.limit("10/minute")
def semantic_search(request: Request, q: str):
# The `request` parameter is required by slowapi
return search_jobs(q)
Without rate limiting, a single script hitting /search in a tight loop can exhaust your database connections or your embedding model's memory.
Streaming responses for large data exports
When an endpoint returns a large dataset (thousands of rows), do not load the entire result into memory before sending the response. Use StreamingResponse:
import csv
import io
from fastapi.responses import StreamingResponse
@app.get("/api/v1/export/jobs.csv")
def export_jobs_csv(db: Session = Depends(get_db)):
def generate():
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["id", "title", "company", "source", "posted_at"])
yield output.getvalue()
output.seek(0)
output.truncate(0)
for job in db.query(JobModel).yield_per(1000):
writer.writerow([job.id, job.title, job.company, job.source, job.posted_at])
yield output.getvalue()
output.seek(0)
output.truncate(0)
return StreamingResponse(
generate(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=jobs.csv"},
)
yield_per(1000) on the SQLAlchemy query means only 1,000 rows are held in memory at a time regardless of how large the table is.
Response caching for expensive queries
For endpoints that run the same expensive query repeatedly (dashboard metrics, aggregate counts), cache the result in memory:
from functools import lru_cache
from datetime import datetime, timedelta
_cache: dict = {}
def get_cached(key: str, ttl_seconds: int = 300):
entry = _cache.get(key)
if entry and datetime.utcnow() - entry["ts"] < timedelta(seconds=ttl_seconds):
return entry["value"]
return None
def set_cached(key: str, value):
_cache[key] = {"value": value, "ts": datetime.utcnow()}
@app.get("/api/v1/stats")
def get_stats(db: Session = Depends(get_db)):
cached = get_cached("stats", ttl_seconds=300)
if cached:
return cached
result = {
"total_jobs": db.query(JobModel).count(),
"total_sources": db.query(JobModel.source).distinct().count(),
}
set_cached("stats", result)
return result
For production with multiple workers, replace the in-memory dict with Redis so all workers share the cache.
Database connection pool configuration
The default SQLAlchemy pool is fine for development. For production with multiple Uvicorn workers:
from sqlalchemy import create_engine
engine = create_engine(
os.getenv("DATABASE_URL"),
pool_size=5, # connections kept open per worker
max_overflow=10, # extra connections above pool_size allowed in burst
pool_pre_ping=True, # test connection before use (handles DB restarts)
pool_recycle=3600, # recycle connections older than 1 hour (avoids stale TCP)
)
pool_pre_ping=True is the one you need most. Without it, workers that have been idle may hold dead connections and throw OperationalError on the first request after a database restart.
Structured request logging
import time
import logging
log = logging.getLogger("api")
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.time()
response = await call_next(request)
elapsed_ms = round((time.time() - start) * 1000, 2)
log.info(
f"{request.method} {request.url.path} "
f"status={response.status_code} "
f"duration={elapsed_ms}ms "
f"ip={request.client.host}"
)
return response
This middleware gives you one log line per request with the information you need to debug production issues: method, path, status code, and how long it took.
The Profiling Section Nobody Reads Until They Need It
When an endpoint is slower than expected, do not guess. Measure.
import time
from functools import wraps
def timed(func):
@wraps(func)
def wrapper(*args, **kwargs):
t = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - t) * 1000
print(f"{func.__name__} took {elapsed:.2f}ms")
return result
return wrapper
@timed
def slow_query(db):
return db.query(JobModel).filter(...).all()
# Full profiling with cProfile
import cProfile, pstats, io
pr = cProfile.Profile()
pr.enable()
slow_function()
pr.disable()
stream = io.StringIO()
pstats.Stats(pr, stream=stream).sort_stats("cumulative").print_stats(20)
print(stream.getvalue())
# Log all SQL queries from SQLAlchemy (enable during debugging, disable in prod)
import logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
The SQLAlchemy logging usually reveals the problem immediately: an N+1 query pattern where a route is running one query per row rather than a single JOIN.
JobSense uses FastAPI for semantic job search with pgvector and Ollama. The Kenya Forex API uses FastAPI with DuckDB for sub-10ms query latency. Both are on GitHub.
Follow me on dev.to for more on data engineering, APIs, and pipelines.
Top comments (0)