DEV Community

Cover image for FastAPI for Data Engineers: Building, Testing, and Debugging APIs That Don't Lie to You
De' Clerke
De' Clerke

Posted on

FastAPI for Data Engineers: Building, Testing, and Debugging APIs That Don't Lie to You

The JobSense project needed a FastAPI backend that served 604 job embeddings via semantic search, a Pydantic validation layer that stopped bad data before it reached pgvector, and a test suite that could be run without a live Ollama instance. Getting all three right took more time than the pipeline itself.

This article is the guide I wish I had then. It covers FastAPI setup for data engineering use cases, the Pydantic patterns that actually prevent bad data at the boundary, consuming external APIs without silent failures, testing patterns that catch real bugs, debugging the most common FastAPI errors, and the production patterns that most tutorials skip.


What FastAPI Is and Is Not in a Data Stack

Before building anything: FastAPI is a system boundary tool. It is not a scheduler, not a data processor, and not a database.

Use FastAPI for Use something else for
Ingestion endpoint (receive events, files, JSON) Orchestration: use Airflow, Dagster, Prefect
Serving processed data to dashboards Heavy transformation: use pandas, DuckDB, Spark
Triggering pipeline runs via HTTP Real-time streaming: use Kafka, Flink
Health and metadata endpoints Batch processing: use a DAG task, not an endpoint
Feature serving (ML embeddings, predictions) Message queuing: use SQS, RabbitMQ

The most common mistake I see in portfolio projects is using FastAPI where a dbt model and a BI tool would do the job in a third of the code. FastAPI belongs at the edges of your system where external clients need to push data in or pull data out.


App Setup

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(
    title="JobSense API",
    description="Kenyan jobs semantic search",
    version="1.0.0",
    debug=True,           # detailed error messages in dev — disable in prod
)

# CORS — required when a Streamlit or React frontend calls FastAPI
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8501", "http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Lifecycle events — connect DB and warm caches at startup
@app.on_event("startup")
async def on_startup():
    await database.connect()

@app.on_event("shutdown")
async def on_shutdown():
    await database.disconnect()
Enter fullscreen mode Exit fullscreen mode

API versioning from day one

If your API will have external consumers, prefix routes with a version. It costs nothing now and avoids breaking changes later.

from fastapi import APIRouter

v1 = APIRouter(prefix="/api/v1", tags=["v1"])
v2 = APIRouter(prefix="/api/v2", tags=["v2"])  # Future — add when needed

@v1.get("/jobs")
def list_jobs_v1():
    return []

app.include_router(v1)
Enter fullscreen mode Exit fullscreen mode

The alternative — changing /api/jobs to a different response shape after clients depend on it — is a breaking change that requires coordination. Versioning upfront avoids this.

Development server

uvicorn main:app --reload               # dev: auto-reload on save
uvicorn main:app --host 0.0.0.0 --port 8000  # expose to network
uvicorn main:app --workers 4            # production: multiple workers
Enter fullscreen mode Exit fullscreen mode

Interactive docs auto-generated at http://localhost:8000/docs (Swagger) and /redoc (ReDoc). Disable them in production:

app = FastAPI(docs_url=None, redoc_url=None)
Enter fullscreen mode Exit fullscreen mode

Pydantic: The Data Contract at the Boundary

Pydantic models are the most important part of a FastAPI data engineering setup. They are the point where your pipeline says "this is the shape data must have to enter my system." Everything downstream assumes this contract was enforced.

from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional
from datetime import datetime
from enum import Enum

class JobSource(str, Enum):
    brightermonday = "brightermonday"
    linkedin       = "linkedin"
    jobwebkenya    = "jobwebkenya"

class JobSchema(BaseModel):
    title:      str            = Field(..., min_length=2, max_length=200)
    company:    str            = Field(..., min_length=1)
    salary_min: Optional[float] = Field(None, ge=0)
    salary_max: Optional[float] = Field(None, ge=0)
    source:     JobSource
    posted_at:  Optional[datetime] = None

    @field_validator("title")
    @classmethod
    def strip_title(cls, v: str) -> str:
        return v.strip()

    @model_validator(mode="after")
    def salary_order(self):
        if self.salary_min and self.salary_max:
            if self.salary_min > self.salary_max:
                raise ValueError("salary_min must be <= salary_max")
        return self
Enter fullscreen mode Exit fullscreen mode

When validation fails, FastAPI returns a 422 with the exact field and reason. That is more useful than the silent data corruption you get when you skip validation.

Idempotency keys for ingestion endpoints

Production ingestion APIs add an idempotency key requirement. If the client retries a failed POST, you need to recognize the duplicate and return the same result rather than inserting twice.

import hashlib
from fastapi import Header, HTTPException
from typing import Optional

@app.post("/api/v1/events", status_code=201)
def ingest_event(
    event: EventSchema,
    x_idempotency_key: Optional[str] = Header(None),
):
    if x_idempotency_key:
        # Check if we already processed this key
        existing = event_repo.find_by_idempotency_key(x_idempotency_key)
        if existing:
            return existing  # Return previous result, no re-insert

    result = event_repo.create(event, idempotency_key=x_idempotency_key)
    return result
Enter fullscreen mode Exit fullscreen mode

Without this pattern, a client that retries after a network timeout (which received no response but the insert succeeded) creates a duplicate. This is how pipelines end up with double-counted revenue.

Common HTTP status codes

200 OK            — successful GET/PUT
201 Created       — successful POST
204 No Content    — successful DELETE
400 Bad Request   — client sent invalid data (use this for your own validation logic)
401 Unauthorized  — missing or invalid credentials
403 Forbidden     — authenticated but not permitted to access this resource
404 Not Found     — resource does not exist
422 Unprocessable — Pydantic validation failed (FastAPI default for bad body)
429 Too Many      — rate limited (from you or from upstream)
500 Server Error  — unhandled exception in your code
503 Unavailable   — your service is up but a dependency (DB) is down
Enter fullscreen mode Exit fullscreen mode

Dependency Injection

Dependency injection lets you share resources (database sessions, auth checks, config) across route handlers without passing them around manually.

from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def get_job_or_404(job_id: int, db: Session = Depends(get_db)) -> JobModel:
    obj = db.get(JobModel, job_id)
    if not obj:
        raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
    return obj

@app.get("/api/v1/jobs/{job_id}", response_model=JobResponse)
def get_job(job: JobModel = Depends(get_job_or_404)):
    return job
Enter fullscreen mode Exit fullscreen mode

API key authentication

from fastapi.security import APIKeyHeader
import os

api_key_header = APIKeyHeader(name="X-API-Key")

def verify_api_key(key: str = Depends(api_key_header)):
    if key != os.getenv("API_KEY"):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return key

@app.get("/admin/stats", dependencies=[Depends(verify_api_key)])
def admin_stats():
    return {"total_jobs": 604}
Enter fullscreen mode Exit fullscreen mode

JWT authentication for multi-user APIs

API keys work for service-to-service auth. For user-facing APIs with multiple roles, use JWT.

from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = os.getenv("JWT_SECRET_KEY")
ALGORITHM  = "HS256"

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
    payload = data.copy()
    payload["exp"] = datetime.utcnow() + expires_delta
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token")
        return user_id
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid or expired token")

@app.get("/api/v1/profile")
def get_profile(user_id: str = Depends(get_current_user)):
    return {"user_id": user_id}
Enter fullscreen mode Exit fullscreen mode

Install python-jose[cryptography] for the JWT library.


Consuming External APIs Without Silent Failures

Every external API call is a failure point. The pattern that works in all my projects: a session with a retry adapter, explicit timeout, structured error handling, and logging that tells you exactly what failed and why.

requests: the complete setup

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
import os

log = logging.getLogger(__name__)

def build_session() -> requests.Session:
    session = requests.Session()
    session.headers.update({
        "User-Agent": "DataPipeline/1.0",
        "Accept": "application/json",
    })
    retry = Retry(
        total=3,
        backoff_factor=2,             # wait 2s, 4s, 8s between retries
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "POST"],
    )
    session.mount("https://", HTTPAdapter(max_retries=retry))
    session.mount("http://",  HTTPAdapter(max_retries=retry))
    return session

SESSION = build_session()
Enter fullscreen mode Exit fullscreen mode

Error handling that tells you what actually happened

from requests.exceptions import HTTPError, ConnectionError, Timeout, JSONDecodeError

def fetch_eia_prices(fuel_type: str) -> list[dict]:
    url = "https://api.eia.gov/v2/petroleum/pri/gnd/data/"
    params = {
        "api_key":    os.getenv("EIA_API_KEY"),
        "frequency":  "monthly",
        "data[0]":    "value",
        "facets[product][]": fuel_type,
    }
    try:
        r = SESSION.get(url, params=params, timeout=15)
        r.raise_for_status()
        return r.json()["response"]["data"]
    except HTTPError as e:
        log.error(f"EIA API HTTP {e.response.status_code}: {e.response.text[:200]}")
        raise
    except ConnectionError:
        log.error("EIA API unreachable — check network or service status")
        raise
    except Timeout:
        log.error("EIA API timeout after 15s")
        raise
    except (JSONDecodeError, KeyError) as e:
        log.error(f"EIA API response parse error: {e}")
        raise
Enter fullscreen mode Exit fullscreen mode

Never call r.json() without catching JSONDecodeError. When an API returns a 200 with an HTML error page (maintenance mode, Cloudflare challenge), .json() raises an exception with a confusing message. Catch it explicitly.

Pagination patterns

Offset/limit (most REST APIs):

def fetch_all_pages(base_url: str, params: dict, page_size: int = 100) -> list[dict]:
    all_results = []
    offset = 0
    while True:
        params.update({"limit": page_size, "offset": offset})
        r = SESSION.get(base_url, params=params, timeout=15)
        r.raise_for_status()
        data = r.json()

        # APIs use different response shapes — handle both
        items = data.get("results") or data.get("data") or (data if isinstance(data, list) else [])
        if not items:
            break
        all_results.extend(items)
        offset += len(items)
        if len(items) < page_size:
            break        # reached last page
        time.sleep(0.5)  # polite delay
    log.info(f"Fetched {len(all_results)} total records from {base_url}")
    return all_results
Enter fullscreen mode Exit fullscreen mode

Cursor-based pagination:

def fetch_cursor_pages(base_url: str) -> list[dict]:
    all_results = []
    cursor = None
    while True:
        params = {"cursor": cursor} if cursor else {}
        data = SESSION.get(base_url, params=params, timeout=15).json()
        all_results.extend(data["items"])
        cursor = data.get("next_cursor")
        if not cursor:
            break
    return all_results
Enter fullscreen mode Exit fullscreen mode

Handling 429: rate limit responses

import time

def request_with_backoff(url: str, max_retries: int = 5) -> requests.Response:
    delay = 1
    for attempt in range(max_retries):
        r = SESSION.get(url, timeout=15)
        if r.status_code == 429:
            # Respect the Retry-After header if the API sends one
            wait = int(r.headers.get("Retry-After", delay))
            log.warning(f"Rate limited (attempt {attempt + 1}/{max_retries}). Waiting {wait}s")
            time.sleep(wait)
            delay = min(delay * 2, 60)
            continue
        r.raise_for_status()
        return r
    raise RuntimeError(f"Exceeded {max_retries} retries for {url}")
Enter fullscreen mode Exit fullscreen mode

httpx for async pipelines

Use httpx.AsyncClient inside FastAPI async routes or asyncio-based pipelines. For the Ollama embedding calls in JobSense:

import httpx
import asyncio

async def fetch_embedding(text: str) -> list[float]:
    async with httpx.AsyncClient(timeout=30) as client:
        r = await client.post(
            "http://localhost:11434/api/embeddings",
            json={"model": "nomic-embed-text", "prompt": text},
        )
        r.raise_for_status()
        return r.json()["embedding"]

# Fetch many embeddings concurrently
async def fetch_all_embeddings(texts: list[str]) -> list[list[float]]:
    async with httpx.AsyncClient(timeout=30) as client:
        tasks = [
            client.post(
                "http://localhost:11434/api/embeddings",
                json={"model": "nomic-embed-text", "prompt": t},
            )
            for t in texts
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        results = []
        for r in responses:
            if isinstance(r, Exception):
                log.error(f"Embedding fetch failed: {r}")
                results.append([])
            else:
                results.append(r.json()["embedding"])
        return results
Enter fullscreen mode Exit fullscreen mode

Quick Manual Testing with curl

Before writing a test, reach for curl to verify the endpoint works at all.

# Basic GET
curl http://localhost:8000/api/v1/jobs
curl -s http://localhost:8000/api/v1/jobs | python3 -m json.tool  # pretty print

# GET with query params and auth
curl -H "X-API-Key: abc123" \
     "http://localhost:8000/api/v1/jobs?keyword=data+engineer&limit=10"

# POST with JSON body
curl -X POST http://localhost:8000/api/v1/jobs \
     -H "Content-Type: application/json" \
     -d '{"title": "Data Engineer", "company": "Safaricom", "source": "linkedin"}'

# POST from a file
curl -X POST http://localhost:8000/api/v1/jobs \
     -H "Content-Type: application/json" \
     -d @payload.json

# Verbose: show request and response headers
curl -v http://localhost:8000/api/v1/jobs

# Status code only
curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/api/v1/jobs

# Test all services at once
for port in 8000 8080 8501; do
  echo -n ":$port → "
  curl -s --max-time 3 http://localhost:$port/health || echo "DOWN"
done
Enter fullscreen mode Exit fullscreen mode

Testing FastAPI Endpoints

The conftest.py pattern

# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import get_db, Base

TEST_DB_URL = "postgresql+psycopg2://user:pass@localhost:5432/test_jobsense"

@pytest.fixture(scope="session")
def test_engine():
    engine = create_engine(TEST_DB_URL)
    Base.metadata.create_all(bind=engine)
    yield engine
    Base.metadata.drop_all(bind=engine)

@pytest.fixture
def db_session(test_engine):
    Session = sessionmaker(bind=test_engine)
    session = Session()
    yield session
    session.rollback()   # undo every test's changes
    session.close()

@pytest.fixture
def client(db_session):
    def override_get_db():
        yield db_session
    app.dependency_overrides[get_db] = override_get_db
    with TestClient(app) as c:
        yield c
    app.dependency_overrides.clear()
Enter fullscreen mode Exit fullscreen mode

The session.rollback() in the fixture is critical. Without it, data written by one test leaks into the next, causing flaky tests that pass in isolation but fail in sequence.

Tests that actually catch bugs

# tests/test_jobs.py
class TestJobsEndpoint:

    def test_list_jobs_200(self, client):
        r = client.get("/api/v1/jobs")
        assert r.status_code == 200
        assert isinstance(r.json(), list)

    def test_create_job_201(self, client):
        payload = {"title": "Data Engineer", "company": "Safaricom", "source": "linkedin"}
        r = client.post("/api/v1/jobs", json=payload)
        assert r.status_code == 201
        assert r.json()["title"] == "Data Engineer"

    def test_missing_required_field_422(self, client):
        # Test that Pydantic validation catches missing company
        r = client.post("/api/v1/jobs", json={"title": "No company"})
        assert r.status_code == 422
        errors = r.json()["detail"]
        assert any("company" in str(e) for e in errors)

    def test_invalid_enum_422(self, client):
        payload = {"title": "DE", "company": "X", "source": "FAKE_SOURCE"}
        r = client.post("/api/v1/jobs", json=payload)
        assert r.status_code == 422

    def test_salary_validation(self, client):
        # salary_min > salary_max should fail
        payload = {
            "title": "DE", "company": "X", "source": "linkedin",
            "salary_min": 200_000, "salary_max": 100_000
        }
        r = client.post("/api/v1/jobs", json=payload)
        assert r.status_code == 422

    def test_job_not_found_404(self, client):
        r = client.get("/api/v1/jobs/99999")
        assert r.status_code == 404

    def test_delete_204(self, client):
        r = client.post("/api/v1/jobs", json={"title": "Temp", "company": "X", "source": "linkedin"})
        job_id = r.json()["id"]
        assert client.delete(f"/api/v1/jobs/{job_id}").status_code == 204
        assert client.get(f"/api/v1/jobs/{job_id}").status_code == 404

    def test_auth_required_401(self, client):
        r = client.get("/admin/stats")
        assert r.status_code in (401, 403)

    def test_auth_with_key(self, client, monkeypatch):
        monkeypatch.setenv("API_KEY", "test-key-123")
        r = client.get("/admin/stats", headers={"X-API-Key": "test-key-123"})
        assert r.status_code == 200
Enter fullscreen mode Exit fullscreen mode

Mocking external API calls

Never call a live external API in tests. They are slow, unreliable, may have rate limits, and make your CI dependent on a third-party service being up.

# Using pytest-mock
def test_eia_endpoint(client, mocker):
    mock_response = MagicMock()
    mock_response.json.return_value = {
        "response": {"data": [{"period": "2024-01", "value": "3.45"}]}
    }
    mock_response.status_code = 200
    mock_response.raise_for_status = lambda: None
    mocker.patch("app.services.eia.SESSION.get", return_value=mock_response)

    r = client.get("/api/v1/energy/prices?fuel=gasoline")
    assert r.status_code == 200

# Using the 'responses' library (cleaner for URL-level mocking)
import responses as mock_http

@mock_http.activate
def test_cbk_forex_fetch():
    mock_http.add(
        mock_http.GET,
        "https://www.centralbank.go.ke/api/forex",
        json={"rates": [{"pair": "USD/KES", "rate": 129.5}]},
        status=200,
    )
    from app.services.forex import fetch_rates
    data = fetch_rates()
    assert data[0]["rate"] == 129.5
Enter fullscreen mode Exit fullscreen mode

Async endpoint tests

import pytest
import httpx
from app.main import app

@pytest.mark.asyncio
async def test_semantic_search():
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        r = await client.post(
            "/api/v1/search",
            json={"text": "python data engineer nairobi", "top_k": 5}
        )
        assert r.status_code == 200
        assert len(r.json()) <= 5
Enter fullscreen mode Exit fullscreen mode

Add to pytest.ini or pyproject.toml:

[pytest]
asyncio_mode = auto
Enter fullscreen mode Exit fullscreen mode

Useful pytest flags

pytest -v                              # verbose output
pytest -x                              # stop on first failure
pytest -s                              # show print/logging output
pytest -k "keyword"                    # run matching tests only
pytest -k "not slow"                   # skip slow tests
pytest --cov=app --cov-report=term-missing  # coverage
pytest -m integration                  # run marked tests
Enter fullscreen mode Exit fullscreen mode

Debugging Common Errors

422 Unprocessable Entity

This is FastAPI's most common error. Pydantic validation failed. The response body tells you exactly what and where:

curl -X POST http://localhost:8000/api/v1/jobs \
     -H "Content-Type: application/json" \
     -d '{"title": "DE"}' | python3 -m json.tool
Enter fullscreen mode Exit fullscreen mode
{
  "detail": [
    {
      "loc": ["body", "company"],
      "msg": "Field required",
      "type": "missing"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Common causes:

  • Required field missing in the request body
  • Wrong type (sending a string where a number is expected)
  • Enum value not in the allowed list
  • min_length or max_length constraint violated

500 Internal Server Error

Check the uvicorn terminal. The full Python traceback is printed there. For dev, add a global exception handler that returns the trace in the response:

from fastapi import Request
from fastapi.responses import JSONResponse
import traceback

@app.exception_handler(Exception)
async def generic_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"detail": str(exc), "trace": traceback.format_exc()},
    )
Enter fullscreen mode Exit fullscreen mode

Disable this in production. Exposing tracebacks to external clients leaks implementation details.

Custom exception handlers (better than generic 500)

class DataQualityError(Exception):
    def __init__(self, message: str, field: str = None):
        self.message = message
        self.field   = field

@app.exception_handler(DataQualityError)
async def data_quality_handler(request: Request, exc: DataQualityError):
    return JSONResponse(
        status_code=400,
        content={"error": "data_quality_error", "message": exc.message, "field": exc.field},
    )

# In your route:
@app.post("/api/v1/events")
def ingest_event(event: EventSchema):
    if event.timestamp > datetime.utcnow():
        raise DataQualityError("Event timestamp is in the future", field="timestamp")
Enter fullscreen mode Exit fullscreen mode

This pattern gives clients a structured error they can handle programmatically rather than a generic 500.

CORS errors in the browser

CORS errors appear in the browser console, not in the FastAPI terminal. The fix is nearly always the same:

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8501"],  # exact origin, no trailing slash
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
Enter fullscreen mode Exit fullscreen mode

Common CORS mistakes:

  • Including a trailing slash: "http://localhost:8501/" does not match "http://localhost:8501"
  • Using allow_origins=["*"] with allow_credentials=True (blocked by browsers for credentialed requests)
  • Forgetting to add the middleware before route definitions

Debugging what is running on a port

# Linux / WSL
lsof -i :8000
fuser -k 8000/tcp     # kill what is on port 8000

# PowerShell
netstat -ano | findstr ":8000"
$pid = (Get-NetTCPConnection -LocalPort 8000).OwningProcess
Stop-Process -Id $pid -Force
Enter fullscreen mode Exit fullscreen mode

Production Patterns

Health endpoint (add to every API)

from sqlalchemy import text

@app.get("/health")
def health(db: Session = Depends(get_db)):
    try:
        db.execute(text("SELECT 1"))
        return {"status": "ok", "database": "connected"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=str(e))
Enter fullscreen mode Exit fullscreen mode

Airflow's HttpSensor can poll this endpoint before triggering downstream tasks. Docker Compose health checks use it. It is one line of code that saves real debugging time.

Rate limiting your own API

Protecting your API from abuse or accidental hammering requires a rate limiter. slowapi is the standard library for FastAPI:

pip install slowapi
Enter fullscreen mode Exit fullscreen mode
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/v1/search")
@limiter.limit("10/minute")
def semantic_search(request: Request, q: str):
    # The `request` parameter is required by slowapi
    return search_jobs(q)
Enter fullscreen mode Exit fullscreen mode

Without rate limiting, a single script hitting /search in a tight loop can exhaust your database connections or your embedding model's memory.

Streaming responses for large data exports

When an endpoint returns a large dataset (thousands of rows), do not load the entire result into memory before sending the response. Use StreamingResponse:

import csv
import io
from fastapi.responses import StreamingResponse

@app.get("/api/v1/export/jobs.csv")
def export_jobs_csv(db: Session = Depends(get_db)):
    def generate():
        output = io.StringIO()
        writer = csv.writer(output)
        writer.writerow(["id", "title", "company", "source", "posted_at"])
        yield output.getvalue()
        output.seek(0)
        output.truncate(0)

        for job in db.query(JobModel).yield_per(1000):
            writer.writerow([job.id, job.title, job.company, job.source, job.posted_at])
            yield output.getvalue()
            output.seek(0)
            output.truncate(0)

    return StreamingResponse(
        generate(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=jobs.csv"},
    )
Enter fullscreen mode Exit fullscreen mode

yield_per(1000) on the SQLAlchemy query means only 1,000 rows are held in memory at a time regardless of how large the table is.

Response caching for expensive queries

For endpoints that run the same expensive query repeatedly (dashboard metrics, aggregate counts), cache the result in memory:

from functools import lru_cache
from datetime import datetime, timedelta

_cache: dict = {}

def get_cached(key: str, ttl_seconds: int = 300):
    entry = _cache.get(key)
    if entry and datetime.utcnow() - entry["ts"] < timedelta(seconds=ttl_seconds):
        return entry["value"]
    return None

def set_cached(key: str, value):
    _cache[key] = {"value": value, "ts": datetime.utcnow()}

@app.get("/api/v1/stats")
def get_stats(db: Session = Depends(get_db)):
    cached = get_cached("stats", ttl_seconds=300)
    if cached:
        return cached
    result = {
        "total_jobs":     db.query(JobModel).count(),
        "total_sources":  db.query(JobModel.source).distinct().count(),
    }
    set_cached("stats", result)
    return result
Enter fullscreen mode Exit fullscreen mode

For production with multiple workers, replace the in-memory dict with Redis so all workers share the cache.

Database connection pool configuration

The default SQLAlchemy pool is fine for development. For production with multiple Uvicorn workers:

from sqlalchemy import create_engine

engine = create_engine(
    os.getenv("DATABASE_URL"),
    pool_size=5,          # connections kept open per worker
    max_overflow=10,      # extra connections above pool_size allowed in burst
    pool_pre_ping=True,   # test connection before use (handles DB restarts)
    pool_recycle=3600,    # recycle connections older than 1 hour (avoids stale TCP)
)
Enter fullscreen mode Exit fullscreen mode

pool_pre_ping=True is the one you need most. Without it, workers that have been idle may hold dead connections and throw OperationalError on the first request after a database restart.

Structured request logging

import time
import logging

log = logging.getLogger("api")

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    elapsed_ms = round((time.time() - start) * 1000, 2)
    log.info(
        f"{request.method} {request.url.path} "
        f"status={response.status_code} "
        f"duration={elapsed_ms}ms "
        f"ip={request.client.host}"
    )
    return response
Enter fullscreen mode Exit fullscreen mode

This middleware gives you one log line per request with the information you need to debug production issues: method, path, status code, and how long it took.


The Profiling Section Nobody Reads Until They Need It

When an endpoint is slower than expected, do not guess. Measure.

import time
from functools import wraps

def timed(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        t = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = (time.perf_counter() - t) * 1000
        print(f"{func.__name__} took {elapsed:.2f}ms")
        return result
    return wrapper

@timed
def slow_query(db):
    return db.query(JobModel).filter(...).all()
Enter fullscreen mode Exit fullscreen mode
# Full profiling with cProfile
import cProfile, pstats, io

pr = cProfile.Profile()
pr.enable()
slow_function()
pr.disable()

stream = io.StringIO()
pstats.Stats(pr, stream=stream).sort_stats("cumulative").print_stats(20)
print(stream.getvalue())
Enter fullscreen mode Exit fullscreen mode
# Log all SQL queries from SQLAlchemy (enable during debugging, disable in prod)
import logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
Enter fullscreen mode Exit fullscreen mode

The SQLAlchemy logging usually reveals the problem immediately: an N+1 query pattern where a route is running one query per row rather than a single JOIN.


JobSense uses FastAPI for semantic job search with pgvector and Ollama. The Kenya Forex API uses FastAPI with DuckDB for sub-10ms query latency. Both are on GitHub.

Follow me on dev.to for more on data engineering, APIs, and pipelines.

Top comments (0)