I filed 33 issues in one week across four production services. Not because the code was terrible — production codebases accumulate subtle bugs that only surface under specific conditions. This tutorial shows the 5-layer audit methodology I use, with code you can drop into your own FastAPI/Python services.
The 5-Layer Audit Approach
Audit in layers. Each layer has a different failure mode. Skipping one leaves bugs in production.
| Layer | Question | Failure Mode |
|---|---|---|
| 1. API Surface | Does every endpoint validate input? | OOM, injection, bad data |
| 2. Auth & Ownership | Can user A access user B's resources? | Data leaks, privilege escalation |
| 3. Data Integrity | Can the same input create duplicates? | Duplicate records, race conditions |
| 4. Error Handling | What happens when dependencies fail? | Cascading failures, unbounded loads |
| 5. Security | SSRF, path traversal, CORS? | External attacks, metadata exposure |
Let's go through each layer with concrete checks and fixes.
Layer 1: API Surface Validation
The bug: We had limit=-1 accepted. Result: unbounded query returning the entire dataset. OOM risk.
Pydantic validators for query params
from pydantic import BaseModel, field_validator
class ListTasksParams(BaseModel):
limit: int = 20
offset: int = 0
@field_validator("limit")
@classmethod
def limit_bounds(cls, v: int) -> int:
if v < 1 or v > 100:
raise ValueError("limit must be between 1 and 100")
return v
@field_validator("offset")
@classmethod
def offset_non_negative(cls, v: int) -> int:
if v < 0:
raise ValueError("offset must be >= 0")
return v
FastAPI dependency for consistent validation
from fastapi import Depends, Query
def paginated(limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0)):
return {"limit": limit, "offset": offset}
@app.get("/tasks")
async def list_tasks(pagination: dict = Depends(paginated)):
return await db.get_tasks(
limit=pagination["limit"],
offset=pagination["offset"]
)
Audit script: find unvalidated limits
# Grep for limit/offset without validation
rg "limit.*=.*Query|limit.*int" --type py -A 1
# If you see limit: int without ge/le, you have a bug
Layer 2: Auth & Ownership
The bug: Any authenticated user could read/update/cancel any task. We checked "is logged in" but not "owns this resource."
Ownership check pattern
async def get_task_or_403(task_id: str, user: User) -> Task:
task = await db.get(Task, id=task_id)
if not task:
raise HTTPException(404, "Task not found")
if task.user_id != user.id:
raise HTTPException(403, "Forbidden")
return task
@app.get("/tasks/{task_id}")
async def get_task(task_id: str, user: User = Depends(require_auth)):
task = await get_task_or_403(task_id, user)
return task
Audit script: find missing ownership checks
# For each endpoint that takes an ID, verify:
# 1. Fetches the resource
# 2. Compares resource.owner_id (or similar) to request.user.id
# 3. Returns 403 if mismatch
# Red flags: endpoints with {id} in path but no ownership comparison
Test: can user A access user B's task?
def test_ownership_enforced():
user_a = create_user("a")
user_b = create_user("b")
task_b = create_task(user_id=user_b.id)
# User A tries to access User B's task
response = client.get(
f"/tasks/{task_b.id}",
headers=auth_headers(user_a)
)
assert response.status_code == 403
Layer 3: Data Integrity
The bug: Same PR URL could create unlimited concurrent tasks. No uniqueness constraint, no deduplication.
Add unique constraints
-- Prevent duplicate tasks for same PR
CREATE UNIQUE INDEX idx_tasks_pr_unique
ON tasks(pr_url, evaluation_id)
WHERE status IN ('pending', 'running');
-- Or use a partial unique index for "one active task per PR"
Idempotent creation with upsert
async def create_or_get_task(pr_url: str, eval_id: str, user_id: str):
task_id = hashlib.sha256(f"{pr_url}:{eval_id}".encode()).hexdigest()[:32]
result = await db.execute(text("""
INSERT INTO tasks (id, pr_url, evaluation_id, user_id, status)
VALUES (:id, :pr_url, :eval_id, :user_id, 'pending')
ON CONFLICT (id) DO NOTHING
RETURNING id
"""), {"id": task_id, "pr_url": pr_url, "eval_id": eval_id, "user_id": user_id})
row = result.fetchone()
if row:
return row[0] # New task
# Duplicate - return existing
return task_id
Audit: find endpoints that create without idempotency
# Look for INSERT without ON CONFLICT in handlers that accept user input
rg "INSERT INTO" --type py -B 5
Layer 4: Error Handling
The bug: Analytics service loaded unbounded result sets when the database returned an error. Fallback path had no limit.
Bounded fallbacks
# BAD: unbounded fallback
try:
data = await analytics.query(...)
except AnalyticsError:
data = await db.query("SELECT * FROM events") # OOM!
# GOOD: bounded fallback
except AnalyticsError:
data = await db.query(
"SELECT * FROM events ORDER BY created_at DESC LIMIT 1000"
)
Timeout on all external calls
import httpx
async def call_external_api(url: str) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(url)
return response.json()
Audit: grep for bare except or missing limits
rg "except:" --type py # Bare except swallows everything
rg "SELECT \*" --type py # Unbounded queries
Layer 5: Security
The bug: file_path parameter allowed ../../etc/passwd style path traversal. Webhook secrets not enforced in dev.
Path traversal prevention
import os
def safe_path(base_dir: str, user_path: str) -> Path:
resolved = (Path(base_dir) / user_path).resolve()
if not str(resolved).startswith(str(Path(base_dir).resolve())):
raise ValueError("Path traversal attempt")
return resolved
SSRF prevention (block internal IPs)
BLOCKED_IPS = {"127.0.0.1", "169.254.169.254", "10.0.0.0"}
def validate_url(url: str) -> bool:
parsed = urlparse(url)
ip = socket.gethostbyname(parsed.hostname)
if ip in BLOCKED_IPS or ip.startswith("10."):
return False
return True
Webhook secret enforcement (even in dev)
def verify_webhook(request: Request, body: bytes) -> bool:
secret = os.getenv("WEBHOOK_SECRET")
if not secret:
raise SystemExit("WEBHOOK_SECRET required in all environments")
signature = request.headers.get("X-Webhook-Signature", "")
expected = hmac.new(secret.encode(), body, "sha256").hexdigest()
return hmac.compare_digest(signature, expected)
Audit: deprecated datetime.utcnow()
# BAD: timezone bugs
from datetime import datetime
now = datetime.utcnow() # Deprecated, no timezone info
# GOOD
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
rg "datetime.utcnow" --type py # Every match is a potential bug
Putting It Together: Audit Checklist
Run this checklist on every service before production:
## Layer 1 — API Surface
- [ ] All list endpoints have limit/offset with bounds (1-100)
- [ ] All IDs in path validated (UUID format, exists)
- [ ] Request body size limits set
## Layer 2 — Auth & Ownership
- [ ] Every {id} endpoint checks resource.owner_id == user.id
- [ ] Test: user A cannot access user B's resources
## Layer 3 — Data Integrity
- [ ] Creation endpoints idempotent (deterministic ID + upsert)
- [ ] Unique constraints on natural keys (pr_url, etc.)
- [ ] No duplicate creation under concurrent requests
## Layer 4 — Error Handling
- [ ] All external calls have timeouts
- [ ] Fallback paths have bounds (LIMIT, max retries)
- [ ] No bare except:
## Layer 5 — Security
- [ ] Path parameters validated for traversal
- [ ] URLs validated for SSRF (block metadata IPs)
- [ ] Webhook/docs secrets enforced in dev
- [ ] datetime.utcnow() replaced with timezone-aware
Quick Wins We Implemented
Of the 33 issues we found, 11 were closed within 3 days. The fastest fixes:
-
Pydantic validators — Add
ge=1, le=100to limit params. 5 minutes. -
Ownership decorator —
@require_ownership("task_id")on 12 endpoints. 2 hours. - Path safety — Wrapper for all file operations. 1 hour.
- Webhook secret — Fail startup if unset. 15 minutes.
The methodology forces you to ask the right questions. Production bugs hide in the gaps between "it works" and "it's correct."
Read the full article on my blog. I write about production patterns and security — find me at humzakt.github.io.
Top comments (0)