Your FastAPI is going down under load — and you're reading the slowapi docs thinking that'll be enough?
Spoiler: it won't. About as effective as a bike lock against a professional thief.
Let's break it all down — layer by layer — the way senior security engineers do it in production. No fluff, real code, some humor, and a clear understanding of why each decision is made the way it is — not just "I saw someone do it this way once."
Why FastAPI Is Especially Vulnerable
Python applications aren't killed by traffic volume — they're killed by application-layer (L7) attacks — ones that look like perfectly normal HTTP requests. A single request to a heavy endpoint can eat 5 seconds of CPU. 200 of those simultaneously = your server is dead, and you're getting a 3 AM phone call.
Attack types you need to know:
| Type | Layer | What it kills | Where to handle |
|---|---|---|---|
| UDP/ICMP flood | L3 | Network channel | CDN / provider |
| SYN flood | L4 | TCP stack | iptables / Nginx |
| HTTP flood | L7 | Workers | FastAPI + Nginx |
| Slowloris | L7 | Connections | Nginx + Uvicorn |
| Heavy endpoint abuse | L7 | CPU / memory | Rate Limiting |
Defense in Depth
Internet → CDN/DDoS Provider → Nginx → Rate Limit Middleware → FastAPI → Business Logic
Rule: each layer handles its own threat type and reduces load on the next one. Like an onion — except instead of tears, the attacker gets a 403.
⚠️ The #1 beginner mistake — protecting only at the FastAPI level without Nginx in front. Thousands of simultaneous connections will spin up Python processes and exhaust memory before any middleware gets a chance to run. It's like posting a security guard inside the building while a crowd is already breaking through the unlocked front door.
1. Rate Limiting — First Line of Defense
Install
pip install slowapi redis limits
Yes, three packages. But at least it won't hurt later.
Basic Configuration
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
# key_func defines what we're limiting by
limiter = Limiter(
key_func=get_remote_address,
default_limits=["100/minute"],
# Redis for distributed counter — required with multiple workers
storage_uri="redis://localhost:6379"
)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
@app.get("/public")
@limiter.limit("100/minute")
async def public_endpoint(request: Request):
return {"status": "ok"}
# Heavy endpoint — strict limit
@app.get("/heavy-search")
@limiter.limit("10/minute;3/second") # multiple limits separated by ;
async def heavy_search(request: Request, query: str):
return {"results": []}
# Login — brute-force protection (5 attempts, then think)
@app.post("/auth/login")
@limiter.limit("5/minute")
async def login(request: Request):
return {"token": "..."}
Limit by User, Not by IP
If you only limit by IP — all users behind the same corporate NAT share a single quota. Picture this: 200 office employees, one external IP, everyone hitting "Refresh" at the same time. Half of them get 429. Your CTO calls.
def get_user_or_ip(request: Request) -> str:
"""
Authenticated → limit by user_id.
Anonymous → limit by real IP.
"""
token = request.headers.get("Authorization", "")
if token.startswith("Bearer "):
try:
payload = decode_jwt(token[7:])
return f"user:{payload['sub']}"
except:
pass
# Respect proxy headers
return (
request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
or request.headers.get("X-Real-IP", "")
or request.client.host
)
user_limiter = Limiter(
key_func=get_user_or_ip,
storage_uri="redis://localhost:6379"
)
Token Bucket — Smooth Rate Limiting Without Burst Spikes
Standard rate limiting gives you an ugly burst: 100 requests in the first second, then nothing until the minute resets. The user stares at a 429 thinking the site is broken. Token Bucket fixes this — requests flow smoothly, like water, not like rush-hour traffic:
import time
import redis.asyncio as redis
class TokenBucketRateLimiter:
def __init__(self, redis_client, capacity: int = 10, refill_rate: float = 1.0):
self.redis = redis_client
self.capacity = capacity # max tokens in the bucket
self.refill_rate = refill_rate # tokens per second
async def is_allowed(self, key: str) -> tuple[bool, dict]:
now = time.time()
# Lua script for atomic operation — no race conditions
script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * refill_rate)
local allowed = 0
if tokens >= 1 then
tokens = tokens - 1
allowed = 1
end
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600)
return {allowed, math.floor(tokens)}
"""
result = await self.redis.eval(script, 1, key, self.capacity, self.refill_rate, now)
allowed, remaining = result[0], result[1]
return bool(allowed), {
"X-RateLimit-Limit": str(self.capacity),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(now + 60))
}
2. Security Middleware
Trusted Hosts, CORS, Headers
CORS with allow_origins=["*"] is like putting a lock on your door but taping the code next to it. We don't do that.
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from starlette.middleware.cors import CORSMiddleware
# Only requests to our domain — guards against Host header injection
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["myapp.com", "*.myapp.com"]
)
# CORS — NEVER use allow_origins=["*"] in production
app.add_middleware(
CORSMiddleware,
allow_origins=["https://myapp.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
max_age=3600
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.middleware("http")
async def security_headers_middleware(request: Request, call_next):
response = await call_next(request)
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
response.headers["Server"] = "Server" # hide server type
return response
Timeouts: The Right Approach
Wrapping asyncio.wait_for around the entire ASGI application is a dangerous anti-pattern. When a TimeoutError is raised, a CancelledError is injected into the task. If the endpoint is mid-transaction in SQLAlchemy or inside an httpx call and doesn't explicitly catch CancelledError — the connection never returns to the pool. After a few of these incidents, the pool is exhausted and the entire API goes down. That's how Friday-evening incidents are born.
Rule: set timeouts where the actual waiting happens — in clients, not around the application.
import httpx
from sqlalchemy.ext.asyncio import create_async_engine
# Timeout on external HTTP requests
async def call_external_api(url: str):
timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=2.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url)
return response.json()
# Timeout on DB queries — in the engine config
engine = create_async_engine(
DATABASE_URL,
pool_timeout=10, # wait time for a free pool connection
pool_recycle=1800, # reopen connections every 30 min
connect_args={"command_timeout": 15}, # SQL query timeout (PostgreSQL)
)
The global timeout at the TCP level lives in Nginx — that's where it makes sense, because Nginx doesn't hold transactions:
proxy_read_timeout 30s; # wait no longer than 30s for FastAPI response
proxy_connect_timeout 5s; # connection establishment timeout
proxy_send_timeout 10s; # request send timeout
3. Slowloris Protection
☠️ Slowloris opens thousands of connections and sends HTTP headers one byte at a time every few seconds. The server waits for the request to complete, connections pile up. A single laptop can take down your server. Named after the slow-moving primate — and yes, it's just as unstoppable without the right protection.
Uvicorn Settings
# run.py
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=4,
timeout_keep_alive=5, # Slowloris dies in 5 seconds
limit_max_requests=1000,
h11_max_incomplete_event_size=16384, # 16KB header limit
backlog=100,
)
Gunicorn + Uvicorn Workers (Production)
# gunicorn.conf.py
import multiprocessing
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
timeout = 30
keepalive = 2
graceful_timeout = 30
max_requests = 1000 # restart worker every N requests — cures memory leaks
max_requests_jitter = 100 # randomize for smoothness (more on jitter below)
bind = "0.0.0.0:8000"
backlog = 100
gunicorn -c gunicorn.conf.py main:app
4. Redis Distributed Rate Limiter
With multiple FastAPI instances behind a load balancer — each process has no idea what limits others have counted. Like three security guards at the entrance, each counting visitors from zero independently. Together they let in three times too many.
Redis fixes this.
Sliding Window Counter — The Right Algorithm
The internet is full of implementations using Redis ZSET (Sorted Set), where every request is stored as a separate element. That's Sliding Window Log — accurate, but catastrophic under attack: 20,000 req/s from one IP → 20,000 entries in Redis memory for a single key. ZREMRANGEBYSCORE becomes O(log N + M) and creates CPU spikes in single-threaded Redis.
The correct approach — Sliding Window Counter: split time into discrete intervals and store only counters. O(1) memory per IP instead of O(N). The difference is like storing every step a person takes vs just a step counter.
import redis.asyncio as aioredis
import time
from fastapi import Request, HTTPException, Depends
class SlidingWindowCounter:
"""
Sliding Window Counter via Redis HASH.
Window is split into 1-second intervals.
We store only counters — not timestamps.
Memory is O(window_size) per IP, not O(requests_count).
"""
def __init__(self, redis: aioredis.Redis):
self.redis = redis
async def is_allowed(self, key: str, limit: int, window: int) -> tuple[bool, int]:
now = int(time.time())
pipe = self.redis.pipeline()
# Sum across all second-buckets in the window
buckets = [f"{key}:{now - i}" for i in range(window)]
for bucket in buckets:
pipe.hget("rate_counters", bucket)
results = await pipe.execute()
total = sum(int(r or 0) for r in results)
if total >= limit:
return False, 0
# Atomically increment the current bucket
current_bucket = f"{key}:{now}"
await self.redis.hincrby("rate_counters", current_bucket, 1)
# TTL on the whole hash — auto-cleanup
await self.redis.expire("rate_counters", jittered_ttl(window + 5)) # <- with jitter!
return True, limit - total - 1
async def rate_limit(request: Request, limit: int = 100, window: int = 60):
key = f"{request.client.host}:{request.url.path}"
allowed, remaining = await sliding_counter.is_allowed(key, limit, window)
if not allowed:
raise HTTPException(
status_code=429,
detail={"error": "Rate limit exceeded", "retry_after": window},
headers={"Retry-After": str(window), "X-RateLimit-Remaining": "0"}
)
return remaining
@app.get("/api/expensive")
async def expensive_endpoint(
request: Request,
_: int = Depends(lambda r: rate_limit(r, limit=10, window=60))
):
return {"data": "..."}
IP Blacklist via Redis
from ipaddress import ip_address, ip_network
WHITELIST_NETWORKS = [
ip_network("10.0.0.0/8"), # internal network
ip_network("172.16.0.0/12"), # Docker networks
ip_network("127.0.0.0/8"), # localhost
]
class IPFilterMiddleware:
def __init__(self, app, redis_client):
self.app = app
self.redis = redis_client
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
client_ip = scope["client"][0]
ip = ip_address(client_ip)
# Whitelist — pass through without checks
for network in WHITELIST_NETWORKS:
if ip in network:
await self.app(scope, receive, send)
return
# Blacklist check
is_blocked = await self.redis.get(f"blacklist:{client_ip}")
if is_blocked:
response = JSONResponse(status_code=403, content={"detail": "Access denied"})
await response(scope, receive, send)
return
await self.app(scope, receive, send)
@app.post("/admin/blacklist/{ip}")
async def block_ip(ip: str, duration: int = 3600):
await redis_client.setex(f"blacklist:{ip}", duration, 1)
return {"blocked": ip, "duration": duration}
5. Thundering Herd — TTL Jitter Against the Stampede Effect
We have 9 Gunicorn workers, Redis, a Circuit Breaker with recovery_timeout = 30. Ask yourself: what happens when that timeout expires on all workers simultaneously?
A real-life scenario: the database goes down at 14:32:00. Circuit Breaker enters OPEN state. Exactly at 14:32:30 all 9 workers synchronously transition to HALF_OPEN and fire a probe request to the DB. If the DB is still unstable — it gets an instant connection spike and goes down again. Circuit Breaker goes back to OPEN. At 14:33:00, the cycle repeats. This is the Thundering Herd — the stampede effect where your protection mechanism is what breaks the thing it's protecting.
Solution: Jitter (random TTL spread). Never use static expiration times for states that expire across multiple workers simultaneously. Desynchronize them — let each worker wake up at its own moment.
import random
def jittered_ttl(base_seconds: int, variance: float = 0.15) -> int:
"""
Adds ±15% random spread to TTL.
base=30s → actual TTL between 25.5s and 34.5s.
Workers wake up at different times — no spike.
"""
delta = int(base_seconds * variance)
return base_seconds + random.randint(-delta, delta)
Apply it in RedisCircuitBreaker:
class RedisCircuitBreaker:
# ... (previous code)
async def _on_failure(self, now: float):
# Before: open_ttl = self.recovery_timeout * 2 — all workers in sync
# After: each worker gets a slightly different TTL — beautiful
ttl = jittered_ttl(self.recovery_timeout * 2)
await self.redis.eval(
self._RECORD_FAILURE_SCRIPT, 1,
self.key, self.failure_threshold, ttl, now
)
And in SlidingWindowCounter — window expirations across workers shouldn't be synchronized:
# Before: await self.redis.expire("rate_counters", window + 5)
await self.redis.expire("rate_counters", jittered_ttl(window + 5))
General rule: any
EXPIREin Redis that could fire across multiple consumers simultaneously must have jitter. This is especially critical for Circuit Breakerrecovery_timeoutand TTL on hot cache keys. A small spread — a huge relief for your database.
6. Circuit Breaker — Distributed, via Redis
If the database is down — Circuit Breaker quickly rejects requests instead of waiting for timeouts. But there's a critical catch: state stored in process memory doesn't work with multiple workers.
With gunicorn running cpu_count() * 2 + 1 workers, you have ~9 isolated Python processes. Worker A opens the circuit, workers B–I keep hammering the dead DB, thinking it's still closed. It's like one security guard knowing the building is on fire while the other eight keep letting people in. State must live in Redis.
from enum import Enum
import redis.asyncio as aioredis
import time
class CircuitState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # all requests blocked
HALF_OPEN = "half_open" # probe request to check recovery
class RedisCircuitBreaker:
"""
Circuit Breaker with state stored in Redis.
All workers see the same state.
Lua script guarantees atomic state transitions.
"""
_RECORD_FAILURE_SCRIPT = """
local key = KEYS[1]
local threshold = tonumber(ARGV[1])
local open_ttl = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local failures = redis.call('HINCRBY', key, 'failures', 1)
redis.call('HSET', key, 'last_failure', now)
if failures >= threshold then
redis.call('HSET', key, 'state', 'open', 'opened_at', now)
redis.call('EXPIRE', key, open_ttl * 2)
end
return failures
"""
def __init__(
self,
redis: aioredis.Redis,
name: str,
failure_threshold: int = 5,
recovery_timeout: int = 30,
success_threshold: int = 2,
):
self.redis = redis
self.key = f"circuit:{name}"
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
async def _get_state(self) -> dict:
data = await self.redis.hgetall(self.key)
return {k.decode(): v.decode() for k, v in data.items()} if data else {}
async def call(self, func, *args, **kwargs):
state = await self._get_state()
circuit_state = state.get("state", "closed")
now = time.time()
if circuit_state == "open":
opened_at = float(state.get("opened_at", 0))
if now - opened_at >= self.recovery_timeout:
await self.redis.hset(self.key, mapping={"state": "half_open", "successes": 0})
else:
raise HTTPException(503, "Service temporarily unavailable")
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception:
await self._on_failure(now)
raise
async def _on_success(self):
state = await self._get_state()
if state.get("state") == "half_open":
successes = await self.redis.hincrby(self.key, "successes", 1)
if successes >= self.success_threshold:
await self.redis.delete(self.key)
else:
await self.redis.hset(self.key, "failures", 0)
async def _on_failure(self, now: float):
ttl = jittered_ttl(self.recovery_timeout * 2) # <- jitter here
await self.redis.eval(
self._RECORD_FAILURE_SCRIPT, 1,
self.key, self.failure_threshold, ttl, now
)
db_circuit = RedisCircuitBreaker(redis_client, name="postgres", failure_threshold=5)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return await db_circuit.call(fetch_user_from_db, user_id)
7. Nginx — Correct Configuration
http {
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=heavy:10m rate=1r/s;
limit_conn_zone $binary_remote_addr zone=conn:10m;
limit_conn_status 429;
limit_req_status 429;
server {
listen 443 ssl http2;
server_name myapp.com;
# Slowloris timeouts — wait no more than 10 seconds for headers
client_header_timeout 10s;
client_body_timeout 10s;
send_timeout 10s;
keepalive_timeout 5s;
client_max_body_size 10m;
client_header_buffer_size 1k;
large_client_header_buffers 4 8k;
# Max 10 connections per IP — prevents Slowloris from sticking
limit_conn conn 10;
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://127.0.0.1:8000;
proxy_read_timeout 30s;
proxy_connect_timeout 5s;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location ~* ^/api/(search|export|report) {
limit_req zone=heavy burst=5;
proxy_pass http://127.0.0.1:8000;
proxy_read_timeout 120s;
}
# Block popular scanners — looking at you, nikto
if ($http_user_agent ~* (nikto|sqlmap|masscan|nmap)) {
return 403;
}
# Empty User-Agent — close connection silently
if ($http_user_agent = "") {
return 444;
}
}
}
💡 Status code
444in Nginx closes the TCP connection with no HTTP response. Cheaper than sending a 403 — we don't waste bandwidth on aggressive bots. Zero bytes in response = zero resources spent talking to someone who doesn't deserve a reply.
Geo-blocking for Targeted Attacks
# Requires ngx_http_geoip2_module
geoip2 /etc/nginx/GeoLite2-Country.mmdb {
$geoip2_data_country_code country iso_code;
}
map $geoip2_data_country_code $allowed_country {
default 0;
RU 1;
KZ 1;
US 1;
}
server {
if ($allowed_country = 0) {
return 403;
}
}
8. Cloudflare — CDN Layer
| Feature | Free | Pro ($20/mo) | Business |
|---|---|---|---|
| L3/L4 DDoS Protection | ✅ | ✅ | ✅ |
| WAF | ❌ | ✅ | ✅ |
| Advanced Rate Limiting | ❌ | ✅ | ✅ |
| Bot Fight Mode | ✅ | ✅ | ✅ |
| Custom Rules | 5 | 20 | 100+ |
Cloudflare Worker — Rate Limiting at the Edge
Requests are blocked before they even reach your server. It's like a bouncer at the club — they turn away troublemakers on the street, not at the bar.
const RATE_LIMIT = 100;
const WINDOW = 60; // seconds
export default {
async fetch(request, env) {
const ip = request.headers.get('CF-Connecting-IP');
const key = `rate:${ip}:${Math.floor(Date.now() / 1000 / WINDOW)}`;
const current = parseInt(await env.RATE_KV.get(key) || '0');
if (current >= RATE_LIMIT) {
return new Response('Too Many Requests', {
status: 429,
headers: {
'Retry-After': String(WINDOW),
'X-RateLimit-Remaining': '0'
}
});
}
await env.RATE_KV.put(key, String(current + 1), { expirationTtl: WINDOW });
return fetch(request);
}
};
Blocking Direct Server Access — This Is Mandatory
If ports 80/443 are open directly — an attacker can bypass Cloudflare by hitting your server's IP. All your CDN protection becomes meaningless. It's like having an armored door next to a hole in the wall.
# Download current Cloudflare IP ranges
wget -O /tmp/cf-ips.txt https://www.cloudflare.com/ips-v4
iptables -F INPUT
iptables -A INPUT -p tcp --dport 22 -j ACCEPT # Keep SSH — otherwise you're locked out
while IFS= read -r ip; do
iptables -A INPUT -p tcp --dport 80 -s "$ip" -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -s "$ip" -j ACCEPT
done < /tmp/cf-ips.txt
iptables -A INPUT -j DROP # everything else — no
9. OS Kernel: eBPF/XDP vs iptables
In the previous section we locked down direct access using iptables. That's the standard — but it has an architectural ceiling that becomes visible under truly serious volumetric attacks.
The iptables problem under volumetric attack. Even if a rule drops a packet, the Linux kernel still allocates an sk_buff buffer for every incoming packet before iptables processes it. At 10–20 Mpps (millions of packets per second) this exhausts kernel memory before the rules even fire. You're dropping packets, but you already paid the memory cost — a paradox of protection.
Solution: XDP (eXpress Data Path) + eBPF. XDP lets you attach an eBPF program directly to the NIC driver. The packet is dropped before the kernel even sees it — before sk_buff allocation, before the IP stack, before iptables. Like a doorman who turns away unwanted guests in the parking garage — not at reception.
Without XDP: NIC → sk_buff alloc → TCP stack → iptables → DROP (memory wasted)
With XDP: NIC → eBPF prog → XDP_DROP (memory untouched)
Minimal XDP program in C (compiled via clang, loaded via ip link):
// xdp_drop_syn.c
#include <linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/tcp.h>
SEC("xdp")
int block_syn_flood(struct xdp_md *ctx) {
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
struct ethhdr *eth = data;
if ((void *)(eth + 1) > data_end) return XDP_PASS;
if (eth->h_proto != __constant_htons(ETH_P_IP)) return XDP_PASS;
struct iphdr *ip = (void *)(eth + 1);
if ((void *)(ip + 1) > data_end) return XDP_PASS;
if (ip->protocol != IPPROTO_TCP) return XDP_PASS;
struct tcphdr *tcp = (void *)(ip + 1);
if ((void *)(tcp + 1) > data_end) return XDP_PASS;
// Drop bare SYN without ACK — classic SYN flood
if (tcp->syn && !tcp->ack) {
// In production: check against eBPF map of blocked IPs
return XDP_DROP;
}
return XDP_PASS;
}
# Load program onto the interface (native mode — maximum speed)
ip link set dev eth0 xdp obj xdp_drop_syn.o sec xdp
Layer allocation rule:
iptables— for stateful rules and port-based filtering. eBPF/XDP — for high-speed dropping of volumetric L3/L4 attacks at the NIC level. For bare-metal servers under serious attack the difference in effectiveness is an order of magnitude. This isn't a replacement — it's an additional layer in front of everything else.
10. Monitoring and Automatic Auto-ban
The previous version of the anomaly detector stored request history in defaultdict(list) in process memory. That's two bugs in one:
- Memory leak: 100,000 unique IPs make one request each and disappear — their keys stay in memory until Uvicorn restarts. A slow death by OOM.
- Event loop blocking: list comprehension over 5,000 timestamps on every request — a synchronous operation in async context, delaying all other requests. One bad IP slows down all good users.
All state goes to Redis. TTL and LTRIM handle the cleanup automatically:
import redis.asyncio as aioredis
import logging
import time
logger = logging.getLogger(__name__)
class AnomalyDetector:
"""
All state in Redis — no memory leaks, no event loop blocking.
LPUSH/LTRIM keep only the last N timestamps.
TTL automatically cleans up inactive IPs.
"""
def __init__(self, redis: aioredis.Redis):
self.redis = redis
async def analyze_request(self, request: Request) -> dict:
ip = request.client.host
signals = {}
# 1. High frequency — counter with TTL instead of in-memory list
freq_key = f"freq:{ip}"
count = await self.redis.incr(freq_key)
await self.redis.expire(freq_key, 10)
signals["high_frequency"] = count > 50
# 2. Suspicious User-Agent (basic check — extended by JA3 below)
ua = request.headers.get("user-agent", "")
signals["suspicious_ua"] = any(
bot in ua.lower()
for bot in ["python-requests", "curl", "wget", "scrapy", "go-http"]
)
# 3. JA3 TLS fingerprint — real client identification (see section 11)
KNOWN_BAD_JA3 = {
"6734f37431670b3ab4292b8f60f29984", # python-requests
"b32309a26951912be7dba376398abc3b", # curl
"4d7a28d6f2263ed61de88ca66eb011e3", # Scrapy
}
ja3 = request.headers.get("X-JA3-Fingerprint", "")
signals["bad_tls_fingerprint"] = ja3 in KNOWN_BAD_JA3
# 4. High 4xx error rate
error_count = await self.redis.get(f"errors:{ip}")
signals["high_errors"] = int(error_count or 0) > 20
# 5. Scanning known vulnerable paths
scan_paths = ["/admin", "/.env", "/wp-admin", "/phpMyAdmin", "/.git"]
signals["scanning"] = any(request.url.path.startswith(p) for p in scan_paths)
threat_score = sum(1 for v in signals.values() if v)
if threat_score >= 2:
await self._auto_block(ip, threat_score, signals)
return signals
async def record_error(self, ip: str):
"""Call this from middleware on 4xx responses."""
key = f"errors:{ip}"
await self.redis.incr(key)
await self.redis.expire(key, 300)
async def _auto_block(self, ip: str, score: int, signals: dict):
duration = 86400 if score >= 3 else 3600
# SET NX — don't overwrite if already blocked
blocked = await self.redis.set(f"blacklist:{ip}", score, ex=duration, nx=True)
if blocked:
logger.warning(f"AUTO-BLOCKED {ip} for {duration}s | score={score} | {signals}")
await self._send_telegram_alert(ip, score, signals)
async def _send_telegram_alert(self, ip, score, signals):
import httpx
message = f"🚨 DDoS Alert!\nIP: {ip}\nScore: {score}/5\nSignals: {signals}"
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
await client.post(
f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
json={"chat_id": CHAT_ID, "text": message}
)
@app.middleware("http")
async def track_errors_middleware(request: Request, call_next):
response = await call_next(request)
if 400 <= response.status_code < 500:
await anomaly_detector.record_error(request.client.host)
return response
Prometheus Metrics
from prometheus_client import Counter, Histogram
from prometheus_fastapi_instrumentator import Instrumentator
RATE_LIMIT_HITS = Counter(
"rate_limit_hits_total",
"Total rate limit violations",
["endpoint", "ip"]
)
REQUEST_DURATION = Histogram(
"request_duration_seconds",
"Request processing time",
["method", "endpoint", "status"],
buckets=[.005, .01, .025, .05, .1, .5, 1, 5]
)
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
11. JA3/JA4 TLS Fingerprinting — Detect Bots by Their Handshake
In AnomalyDetector we check the User-Agent. That's necessary, but naive: any script changes that header in one line. It's like trying to identify a professional burglar by asking their name.
The problem: HTTP headers are controlled by the attacker. The TLS handshake is not.
When establishing an HTTPS connection, the client sends a ClientHello — a list of supported cipher suites, extensions, elliptic curve groups. Chrome sends a specific set, Python requests sends a different one, Go net/http — yet another. This set is hashed into a JA3 fingerprint (MD5, 32 characters). The fingerprint changes when the library is updated, but within the same version — it's stable. Faking it without patching the SSL library is not possible.
Chrome 120: ja3 = "cd08e31494f9531f560d64c695473da9" ← legitimate user
Python requests: ja3 = "6734f37431670b3ab4292b8f60f29984" ← instant red flag
curl 8.x: ja3 = "b32309a26951912be7dba376398abc3b" ← a tool, not a browser
Getting JA3 in Nginx
# Requires: openresty or nginx + lua-nginx-module + lua-resty-ja3
http {
lua_shared_dict ja3_block 10m;
server {
access_by_lua_block {
local ja3 = require("resty.ja3")
local fingerprint = ja3.fingerprint()
-- List of known bad fingerprints (botnets, scanners)
local blocklist = {
["6734f37431670b3ab4292b8f60f29984"] = true, -- python-requests
["b32309a26951912be7dba376398abc3b"] = true, -- curl
}
if blocklist[fingerprint] then
ngx.header["X-JA3-Blocked"] = fingerprint
ngx.exit(ngx.HTTP_FORBIDDEN)
end
-- Forward fingerprint to FastAPI for logging and analysis
ngx.req.set_header("X-JA3-Fingerprint", fingerprint)
}
}
}
FastAPI receives the fingerprint via the X-JA3-Fingerprint header and AnomalyDetector handles it (see the bad_tls_fingerprint signal above).
The Cloudflare path (no Nginx config needed): on Pro plan and above, JA3/JA4 are available as
cf.bot_management.ja3_hashin Custom Rules — without a single line of server-side code. One click in the UI — and all Python bots are gone.Important caveat: JA3 blocking is not a silver bullet. Sophisticated bots use browser engines (Playwright, Puppeteer) with a real Chrome TLS stack and a real Chrome fingerprint. But it effectively cuts out 80–90% of automated traffic from scripts and custom clients — the ones that didn't bother. And that's the majority.
12. Chaos Engineering — Proving Your Protection Actually Works
We wrote a Circuit Breaker, AnomalyDetector, Sliding Window. Beautiful code. Passed code review. Everyone said "looks good."
But how do you know it actually works in production and not just in unit tests with a mocked Redis? The honest answer: you can't — unless you deliberately broke things in staging first.
The enterprise answer: Chaos Engineering — intentionally injecting failures into staging to verify graceful degradation. Netflix calls this the "Design for failure" principle. We call it "let's break this before it breaks itself at 3 AM."
Toxiproxy — Injecting Failures at the TCP Layer
# Run Toxiproxy next to your staging DB
docker run -d -p 8474:8474 -p 5433:5433 ghcr.io/shopify/toxiproxy
# chaos_tests/test_circuit_breaker.py
import pytest
import httpx
import asyncio
from toxiproxy import Toxiproxy
@pytest.fixture
def proxy():
client = Toxiproxy()
# Proxy PostgreSQL: localhost:5433 → postgres:5432
proxy = client.create(name="postgres", listen="0.0.0.0:5433", upstream="postgres:5432")
yield proxy
proxy.destroy()
async def test_circuit_opens_under_latency(proxy):
"""
Inject 3-second latency into PostgreSQL.
Circuit Breaker should open after 5 timeouts
and start returning 503 — fast, not hanging.
"""
proxy.add_toxic("latency", type="latency", attributes={"latency": 3000})
failed = 0
circuit_opened = False
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
for i in range(20):
r = await client.get("/users/1")
if r.status_code == 503:
circuit_opened = True
break
if r.status_code == 504:
failed += 1
assert circuit_opened, f"Circuit didn't open after {failed} timeouts — something's wrong"
async def test_circuit_recovers_after_heal(proxy):
"""
After the circuit opens, remove the toxic.
After recovery_timeout the system should self-heal.
No restart. No 3 AM calls.
"""
proxy.add_toxic("down", type="limit_data", attributes={"bytes": 0})
await asyncio.sleep(2)
proxy.remove_toxic("down")
await asyncio.sleep(35) # wait recovery_timeout + buffer
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
r = await client.get("/users/1")
assert r.status_code == 200, "System didn't recover — Circuit stuck in OPEN"
async def test_rate_limiter_under_flood(proxy):
"""
200 requests from one IP at once.
Should get 429, not 500.
500 = your code crashed. 429 = your protection is working.
"""
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
tasks = [client.get("/api/expensive") for _ in range(200)]
responses = await asyncio.gather(*tasks, return_exceptions=True)
status_codes = [r.status_code for r in responses if isinstance(r, httpx.Response)]
assert 429 in status_codes, "Rate limiter didn't fire under flood"
assert 500 not in status_codes, "Server returned 500 — this is a crash, not controlled degradation"
# Run in CI/CD pipeline (STAGING ONLY — never do this in production)
pytest chaos_tests/ -v --timeout=120
What to Test in Chaos Tests
| Scenario | Tool | Expected behavior |
|---|---|---|
| 3s latency on PostgreSQL | Toxiproxy latency
|
Circuit opens, 503 instead of 504 |
| Full DB outage | Toxiproxy limit_data
|
Circuit open, recovery in N seconds |
| Redis unavailable | docker pause redis |
Graceful degradation, no crash |
| 1000 req/s flood |
wrk / locust
|
429 at limit, zero 500s |
| Worker OOM | kill -9 <pid> |
Gunicorn spawns new one, no traffic loss |
The core Chaos Engineering insight: we don't care whether the system falls under load (it will — that's normal). We care whether it degrades gracefully. Correct degradation: 429 instead of 500, 503 instead of hanging, predictable recovery. Incorrect: OOM crash, connection leak, infinite retry storm. You only find out which one you have by breaking things intentionally — or accidentally in production.
13. Production Docker Compose
version: '3.9'
services:
nginx:
image: nginx:alpine
ports: ["443:443", "80:80"]
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./certs:/etc/nginx/certs
depends_on: [fastapi]
fastapi:
build: .
command: gunicorn -c gunicorn.conf.py main:app
environment:
REDIS_URL: redis://redis:6379
depends_on: [redis]
deploy:
replicas: 3
resources:
limits:
memory: 512M
cpus: '1.0'
redis:
image: redis:7-alpine
# allkeys-lru: on low memory evict old keys, don't crash
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
volumes: [redis_data:/data]
prometheus:
image: prom/prometheus
volumes: [./prometheus.yml:/etc/prometheus/prometheus.yml]
grafana:
image: grafana/grafana
environment:
GF_SECURITY_ADMIN_PASSWORD: strongpassword # please change this. seriously.
ports: ["3000:3000"]
volumes:
redis_data:
Pre-Deploy Checklist
Print it out. Pin it above your monitor. Save it in Notion. Tattoo it on your wrist — your choice.
- [ ] Nginx sits in front of FastAPI with
limit_req_zoneandlimit_conn - [ ]
client_max_body_size 10min Nginx — not Python middleware (bypassed viaTransfer-Encoding: chunked) - [ ] Redis connected for distributed rate limiting
- [ ] Rate limiting uses Sliding Window Counter (
HINCRBY), not ZSET Log - [ ] TTL Jitter applied to Circuit Breaker
recovery_timeoutand all caches - [ ] Timeouts set in
httpx.AsyncClientand DB engine — not viaasyncio.wait_foraround the app - [ ]
proxy_read_timeoutin Nginx as global backstop - [ ] CORS configured strictly — only allowed origins, not
* - [ ] Security headers set: HSTS, CSP, X-Frame-Options
- [ ] Direct server access blocked via iptables — only Cloudflare IPs allowed
- [ ] Cloudflare with Bot Fight Mode enabled
- [ ] JA3 fingerprint checked at Nginx or Cloudflare Custom Rules level
- [ ] Prometheus + Grafana monitoring configured
- [ ] Telegram/PagerDuty alerts on anomalies
- [ ] Circuit Breaker with Redis state — not in process memory
- [ ] AnomalyDetector fully on Redis — no
defaultdictin worker memory - [ ] Login and critical endpoints have their own strict limits
- [ ] Chaos tests (Toxiproxy) run in staging and passed
- [ ] XDP/eBPF considered for bare-metal under volumetric L4 attacks
Summary: 8 Layers of Defense
eBPF/XDP (NIC level — before the kernel)
└─ CDN (L3/L4 — Cloudflare + JA3 fingerprinting)
└─ Firewall (IP filtering, iptables)
└─ Nginx (connection + rate limits, Slowloris timeouts)
└─ FastAPI Middleware (headers, CORS, security)
└─ Rate Limiter (Redis Sliding Window + Jitter)
└─ Business Logic (Circuit Breaker → Redis)
└─ Monitoring (Anomaly Detection + Auto-ban + Chaos Tests)
Each layer is cheaper than the one above it in CPU cost. An attack has to pass through all eight — almost nothing reaches the business logic. And if something does — monitoring already sent an alert and auto-ban is running.
If this was useful — leave a ❤️ and bookmark it. The next part will cover JWT security and credential stuffing protection. There'll be production bugs there too — there always are.
💼 Python Backend & Automation Engineer | Specialized in Scalable Data Systems & API Architecture. Relocating to Saskatoon, SK via SINP Tech Pathway — Open to joining high-growth Canadian engineering teams.
Top comments (0)