DEV Community

Cover image for FastAPI Security 100 LVL: Production-Grade DDoS Protection
Adilet Akmatov
Adilet Akmatov

Posted on

FastAPI Security 100 LVL: Production-Grade DDoS Protection

Your FastAPI is going down under load — and you're reading the slowapi docs thinking that'll be enough?

Spoiler: it won't. About as effective as a bike lock against a professional thief.

Let's break it all down — layer by layer — the way senior security engineers do it in production. No fluff, real code, some humor, and a clear understanding of why each decision is made the way it is — not just "I saw someone do it this way once."

Why FastAPI Is Especially Vulnerable

Python applications aren't killed by traffic volume — they're killed by application-layer (L7) attacks — ones that look like perfectly normal HTTP requests. A single request to a heavy endpoint can eat 5 seconds of CPU. 200 of those simultaneously = your server is dead, and you're getting a 3 AM phone call.

Attack types you need to know:

Type Layer What it kills Where to handle
UDP/ICMP flood L3 Network channel CDN / provider
SYN flood L4 TCP stack iptables / Nginx
HTTP flood L7 Workers FastAPI + Nginx
Slowloris L7 Connections Nginx + Uvicorn
Heavy endpoint abuse L7 CPU / memory Rate Limiting

Defense in Depth

Internet → CDN/DDoS Provider → Nginx → Rate Limit Middleware → FastAPI → Business Logic
Enter fullscreen mode Exit fullscreen mode

Rule: each layer handles its own threat type and reduces load on the next one. Like an onion — except instead of tears, the attacker gets a 403.

⚠️ The #1 beginner mistake — protecting only at the FastAPI level without Nginx in front. Thousands of simultaneous connections will spin up Python processes and exhaust memory before any middleware gets a chance to run. It's like posting a security guard inside the building while a crowd is already breaking through the unlocked front door.


1. Rate Limiting — First Line of Defense

Install

pip install slowapi redis limits
Enter fullscreen mode Exit fullscreen mode

Yes, three packages. But at least it won't hurt later.

Basic Configuration

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware

# key_func defines what we're limiting by
limiter = Limiter(
    key_func=get_remote_address,
    default_limits=["100/minute"],
    # Redis for distributed counter — required with multiple workers
    storage_uri="redis://localhost:6379"
)

app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)


@app.get("/public")
@limiter.limit("100/minute")
async def public_endpoint(request: Request):
    return {"status": "ok"}


# Heavy endpoint — strict limit
@app.get("/heavy-search")
@limiter.limit("10/minute;3/second")  # multiple limits separated by ;
async def heavy_search(request: Request, query: str):
    return {"results": []}


# Login — brute-force protection (5 attempts, then think)
@app.post("/auth/login")
@limiter.limit("5/minute")
async def login(request: Request):
    return {"token": "..."}
Enter fullscreen mode Exit fullscreen mode

Limit by User, Not by IP

If you only limit by IP — all users behind the same corporate NAT share a single quota. Picture this: 200 office employees, one external IP, everyone hitting "Refresh" at the same time. Half of them get 429. Your CTO calls.

def get_user_or_ip(request: Request) -> str:
    """
    Authenticated → limit by user_id.
    Anonymous → limit by real IP.
    """
    token = request.headers.get("Authorization", "")
    if token.startswith("Bearer "):
        try:
            payload = decode_jwt(token[7:])
            return f"user:{payload['sub']}"
        except:
            pass

    # Respect proxy headers
    return (
        request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
        or request.headers.get("X-Real-IP", "")
        or request.client.host
    )


user_limiter = Limiter(
    key_func=get_user_or_ip,
    storage_uri="redis://localhost:6379"
)
Enter fullscreen mode Exit fullscreen mode

Token Bucket — Smooth Rate Limiting Without Burst Spikes

Standard rate limiting gives you an ugly burst: 100 requests in the first second, then nothing until the minute resets. The user stares at a 429 thinking the site is broken. Token Bucket fixes this — requests flow smoothly, like water, not like rush-hour traffic:

import time
import redis.asyncio as redis


class TokenBucketRateLimiter:
    def __init__(self, redis_client, capacity: int = 10, refill_rate: float = 1.0):
        self.redis = redis_client
        self.capacity = capacity        # max tokens in the bucket
        self.refill_rate = refill_rate  # tokens per second

    async def is_allowed(self, key: str) -> tuple[bool, dict]:
        now = time.time()

        # Lua script for atomic operation — no race conditions
        script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or now

        local elapsed = now - last_refill
        tokens = math.min(capacity, tokens + elapsed * refill_rate)

        local allowed = 0
        if tokens >= 1 then
            tokens = tokens - 1
            allowed = 1
        end

        redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
        redis.call('EXPIRE', key, 3600)
        return {allowed, math.floor(tokens)}
        """

        result = await self.redis.eval(script, 1, key, self.capacity, self.refill_rate, now)
        allowed, remaining = result[0], result[1]

        return bool(allowed), {
            "X-RateLimit-Limit": str(self.capacity),
            "X-RateLimit-Remaining": str(remaining),
            "X-RateLimit-Reset": str(int(now + 60))
        }
Enter fullscreen mode Exit fullscreen mode

2. Security Middleware

Trusted Hosts, CORS, Headers

CORS with allow_origins=["*"] is like putting a lock on your door but taping the code next to it. We don't do that.

from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from starlette.middleware.cors import CORSMiddleware

# Only requests to our domain — guards against Host header injection
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["myapp.com", "*.myapp.com"]
)

# CORS — NEVER use allow_origins=["*"] in production
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://myapp.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
    max_age=3600
)

app.add_middleware(GZipMiddleware, minimum_size=1000)


@app.middleware("http")
async def security_headers_middleware(request: Request, call_next):
    response = await call_next(request)
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    response.headers["Server"] = "Server"  # hide server type
    return response
Enter fullscreen mode Exit fullscreen mode

Timeouts: The Right Approach

Wrapping asyncio.wait_for around the entire ASGI application is a dangerous anti-pattern. When a TimeoutError is raised, a CancelledError is injected into the task. If the endpoint is mid-transaction in SQLAlchemy or inside an httpx call and doesn't explicitly catch CancelledError — the connection never returns to the pool. After a few of these incidents, the pool is exhausted and the entire API goes down. That's how Friday-evening incidents are born.

Rule: set timeouts where the actual waiting happens — in clients, not around the application.

import httpx
from sqlalchemy.ext.asyncio import create_async_engine

# Timeout on external HTTP requests
async def call_external_api(url: str):
    timeout = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=2.0)
    async with httpx.AsyncClient(timeout=timeout) as client:
        response = await client.get(url)
        return response.json()

# Timeout on DB queries — in the engine config
engine = create_async_engine(
    DATABASE_URL,
    pool_timeout=10,        # wait time for a free pool connection
    pool_recycle=1800,      # reopen connections every 30 min
    connect_args={"command_timeout": 15},  # SQL query timeout (PostgreSQL)
)
Enter fullscreen mode Exit fullscreen mode

The global timeout at the TCP level lives in Nginx — that's where it makes sense, because Nginx doesn't hold transactions:

proxy_read_timeout    30s;   # wait no longer than 30s for FastAPI response
proxy_connect_timeout  5s;   # connection establishment timeout
proxy_send_timeout    10s;   # request send timeout
Enter fullscreen mode Exit fullscreen mode

3. Slowloris Protection

☠️ Slowloris opens thousands of connections and sends HTTP headers one byte at a time every few seconds. The server waits for the request to complete, connections pile up. A single laptop can take down your server. Named after the slow-moving primate — and yes, it's just as unstoppable without the right protection.

Uvicorn Settings

# run.py
import uvicorn

uvicorn.run(
    "main:app",
    host="0.0.0.0",
    port=8000,
    workers=4,
    timeout_keep_alive=5,           # Slowloris dies in 5 seconds
    limit_max_requests=1000,
    h11_max_incomplete_event_size=16384,  # 16KB header limit
    backlog=100,
)
Enter fullscreen mode Exit fullscreen mode

Gunicorn + Uvicorn Workers (Production)

# gunicorn.conf.py
import multiprocessing

workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000

timeout = 30
keepalive = 2
graceful_timeout = 30

max_requests = 1000        # restart worker every N requests — cures memory leaks
max_requests_jitter = 100  # randomize for smoothness (more on jitter below)

bind = "0.0.0.0:8000"
backlog = 100
Enter fullscreen mode Exit fullscreen mode
gunicorn -c gunicorn.conf.py main:app
Enter fullscreen mode Exit fullscreen mode

4. Redis Distributed Rate Limiter

With multiple FastAPI instances behind a load balancer — each process has no idea what limits others have counted. Like three security guards at the entrance, each counting visitors from zero independently. Together they let in three times too many.

Redis fixes this.

Sliding Window Counter — The Right Algorithm

The internet is full of implementations using Redis ZSET (Sorted Set), where every request is stored as a separate element. That's Sliding Window Log — accurate, but catastrophic under attack: 20,000 req/s from one IP → 20,000 entries in Redis memory for a single key. ZREMRANGEBYSCORE becomes O(log N + M) and creates CPU spikes in single-threaded Redis.

The correct approach — Sliding Window Counter: split time into discrete intervals and store only counters. O(1) memory per IP instead of O(N). The difference is like storing every step a person takes vs just a step counter.

import redis.asyncio as aioredis
import time
from fastapi import Request, HTTPException, Depends


class SlidingWindowCounter:
    """
    Sliding Window Counter via Redis HASH.
    Window is split into 1-second intervals.
    We store only counters — not timestamps.
    Memory is O(window_size) per IP, not O(requests_count).
    """

    def __init__(self, redis: aioredis.Redis):
        self.redis = redis

    async def is_allowed(self, key: str, limit: int, window: int) -> tuple[bool, int]:
        now = int(time.time())
        pipe = self.redis.pipeline()

        # Sum across all second-buckets in the window
        buckets = [f"{key}:{now - i}" for i in range(window)]
        for bucket in buckets:
            pipe.hget("rate_counters", bucket)

        results = await pipe.execute()
        total = sum(int(r or 0) for r in results)

        if total >= limit:
            return False, 0

        # Atomically increment the current bucket
        current_bucket = f"{key}:{now}"
        await self.redis.hincrby("rate_counters", current_bucket, 1)
        # TTL on the whole hash — auto-cleanup
        await self.redis.expire("rate_counters", jittered_ttl(window + 5))  # <- with jitter!

        return True, limit - total - 1


async def rate_limit(request: Request, limit: int = 100, window: int = 60):
    key = f"{request.client.host}:{request.url.path}"
    allowed, remaining = await sliding_counter.is_allowed(key, limit, window)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail={"error": "Rate limit exceeded", "retry_after": window},
            headers={"Retry-After": str(window), "X-RateLimit-Remaining": "0"}
        )
    return remaining


@app.get("/api/expensive")
async def expensive_endpoint(
    request: Request,
    _: int = Depends(lambda r: rate_limit(r, limit=10, window=60))
):
    return {"data": "..."}
Enter fullscreen mode Exit fullscreen mode

IP Blacklist via Redis

from ipaddress import ip_address, ip_network

WHITELIST_NETWORKS = [
    ip_network("10.0.0.0/8"),    # internal network
    ip_network("172.16.0.0/12"), # Docker networks
    ip_network("127.0.0.0/8"),   # localhost
]


class IPFilterMiddleware:
    def __init__(self, app, redis_client):
        self.app = app
        self.redis = redis_client

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        client_ip = scope["client"][0]
        ip = ip_address(client_ip)

        # Whitelist — pass through without checks
        for network in WHITELIST_NETWORKS:
            if ip in network:
                await self.app(scope, receive, send)
                return

        # Blacklist check
        is_blocked = await self.redis.get(f"blacklist:{client_ip}")
        if is_blocked:
            response = JSONResponse(status_code=403, content={"detail": "Access denied"})
            await response(scope, receive, send)
            return

        await self.app(scope, receive, send)


@app.post("/admin/blacklist/{ip}")
async def block_ip(ip: str, duration: int = 3600):
    await redis_client.setex(f"blacklist:{ip}", duration, 1)
    return {"blocked": ip, "duration": duration}
Enter fullscreen mode Exit fullscreen mode

5. Thundering Herd — TTL Jitter Against the Stampede Effect

We have 9 Gunicorn workers, Redis, a Circuit Breaker with recovery_timeout = 30. Ask yourself: what happens when that timeout expires on all workers simultaneously?

A real-life scenario: the database goes down at 14:32:00. Circuit Breaker enters OPEN state. Exactly at 14:32:30 all 9 workers synchronously transition to HALF_OPEN and fire a probe request to the DB. If the DB is still unstable — it gets an instant connection spike and goes down again. Circuit Breaker goes back to OPEN. At 14:33:00, the cycle repeats. This is the Thundering Herd — the stampede effect where your protection mechanism is what breaks the thing it's protecting.

Solution: Jitter (random TTL spread). Never use static expiration times for states that expire across multiple workers simultaneously. Desynchronize them — let each worker wake up at its own moment.

import random


def jittered_ttl(base_seconds: int, variance: float = 0.15) -> int:
    """
    Adds ±15% random spread to TTL.
    base=30s → actual TTL between 25.5s and 34.5s.
    Workers wake up at different times — no spike.
    """
    delta = int(base_seconds * variance)
    return base_seconds + random.randint(-delta, delta)
Enter fullscreen mode Exit fullscreen mode

Apply it in RedisCircuitBreaker:

class RedisCircuitBreaker:
    # ... (previous code)

    async def _on_failure(self, now: float):
        # Before: open_ttl = self.recovery_timeout * 2  — all workers in sync
        # After: each worker gets a slightly different TTL — beautiful
        ttl = jittered_ttl(self.recovery_timeout * 2)
        await self.redis.eval(
            self._RECORD_FAILURE_SCRIPT, 1,
            self.key, self.failure_threshold, ttl, now
        )
Enter fullscreen mode Exit fullscreen mode

And in SlidingWindowCounter — window expirations across workers shouldn't be synchronized:

# Before: await self.redis.expire("rate_counters", window + 5)
await self.redis.expire("rate_counters", jittered_ttl(window + 5))
Enter fullscreen mode Exit fullscreen mode

General rule: any EXPIRE in Redis that could fire across multiple consumers simultaneously must have jitter. This is especially critical for Circuit Breaker recovery_timeout and TTL on hot cache keys. A small spread — a huge relief for your database.


6. Circuit Breaker — Distributed, via Redis

If the database is down — Circuit Breaker quickly rejects requests instead of waiting for timeouts. But there's a critical catch: state stored in process memory doesn't work with multiple workers.

With gunicorn running cpu_count() * 2 + 1 workers, you have ~9 isolated Python processes. Worker A opens the circuit, workers B–I keep hammering the dead DB, thinking it's still closed. It's like one security guard knowing the building is on fire while the other eight keep letting people in. State must live in Redis.

from enum import Enum
import redis.asyncio as aioredis
import time


class CircuitState(Enum):
    CLOSED    = "closed"     # normal operation
    OPEN      = "open"       # all requests blocked
    HALF_OPEN = "half_open"  # probe request to check recovery


class RedisCircuitBreaker:
    """
    Circuit Breaker with state stored in Redis.
    All workers see the same state.
    Lua script guarantees atomic state transitions.
    """

    _RECORD_FAILURE_SCRIPT = """
    local key = KEYS[1]
    local threshold = tonumber(ARGV[1])
    local open_ttl  = tonumber(ARGV[2])
    local now       = tonumber(ARGV[3])

    local failures = redis.call('HINCRBY', key, 'failures', 1)
    redis.call('HSET', key, 'last_failure', now)

    if failures >= threshold then
        redis.call('HSET', key, 'state', 'open', 'opened_at', now)
        redis.call('EXPIRE', key, open_ttl * 2)
    end
    return failures
    """

    def __init__(
        self,
        redis: aioredis.Redis,
        name: str,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2,
    ):
        self.redis = redis
        self.key = f"circuit:{name}"
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

    async def _get_state(self) -> dict:
        data = await self.redis.hgetall(self.key)
        return {k.decode(): v.decode() for k, v in data.items()} if data else {}

    async def call(self, func, *args, **kwargs):
        state = await self._get_state()
        circuit_state = state.get("state", "closed")
        now = time.time()

        if circuit_state == "open":
            opened_at = float(state.get("opened_at", 0))
            if now - opened_at >= self.recovery_timeout:
                await self.redis.hset(self.key, mapping={"state": "half_open", "successes": 0})
            else:
                raise HTTPException(503, "Service temporarily unavailable")

        try:
            result = await func(*args, **kwargs)
            await self._on_success()
            return result
        except Exception:
            await self._on_failure(now)
            raise

    async def _on_success(self):
        state = await self._get_state()
        if state.get("state") == "half_open":
            successes = await self.redis.hincrby(self.key, "successes", 1)
            if successes >= self.success_threshold:
                await self.redis.delete(self.key)
        else:
            await self.redis.hset(self.key, "failures", 0)

    async def _on_failure(self, now: float):
        ttl = jittered_ttl(self.recovery_timeout * 2)  # <- jitter here
        await self.redis.eval(
            self._RECORD_FAILURE_SCRIPT, 1,
            self.key, self.failure_threshold, ttl, now
        )


db_circuit = RedisCircuitBreaker(redis_client, name="postgres", failure_threshold=5)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    return await db_circuit.call(fetch_user_from_db, user_id)
Enter fullscreen mode Exit fullscreen mode

7. Nginx — Correct Configuration

http {
    limit_req_zone  $binary_remote_addr  zone=api:10m   rate=10r/s;
    limit_req_zone  $binary_remote_addr  zone=heavy:10m rate=1r/s;
    limit_conn_zone $binary_remote_addr  zone=conn:10m;
    limit_conn_status 429;
    limit_req_status  429;

    server {
        listen 443 ssl http2;
        server_name myapp.com;

        # Slowloris timeouts — wait no more than 10 seconds for headers
        client_header_timeout 10s;
        client_body_timeout   10s;
        send_timeout          10s;
        keepalive_timeout     5s;

        client_max_body_size        10m;
        client_header_buffer_size   1k;
        large_client_header_buffers 4 8k;

        # Max 10 connections per IP — prevents Slowloris from sticking
        limit_conn conn 10;

        location /api/ {
            limit_req zone=api burst=20 nodelay;

            proxy_pass            http://127.0.0.1:8000;
            proxy_read_timeout    30s;
            proxy_connect_timeout 5s;
            proxy_set_header      X-Real-IP        $remote_addr;
            proxy_set_header      X-Forwarded-For  $proxy_add_x_forwarded_for;
        }

        location ~* ^/api/(search|export|report) {
            limit_req zone=heavy burst=5;
            proxy_pass         http://127.0.0.1:8000;
            proxy_read_timeout 120s;
        }

        # Block popular scanners — looking at you, nikto
        if ($http_user_agent ~* (nikto|sqlmap|masscan|nmap)) {
            return 403;
        }

        # Empty User-Agent — close connection silently
        if ($http_user_agent = "") {
            return 444;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

💡 Status code 444 in Nginx closes the TCP connection with no HTTP response. Cheaper than sending a 403 — we don't waste bandwidth on aggressive bots. Zero bytes in response = zero resources spent talking to someone who doesn't deserve a reply.

Geo-blocking for Targeted Attacks

# Requires ngx_http_geoip2_module
geoip2 /etc/nginx/GeoLite2-Country.mmdb {
    $geoip2_data_country_code country iso_code;
}

map $geoip2_data_country_code $allowed_country {
    default 0;
    RU 1;
    KZ 1;
    US 1;
}

server {
    if ($allowed_country = 0) {
        return 403;
    }
}
Enter fullscreen mode Exit fullscreen mode

8. Cloudflare — CDN Layer

Feature Free Pro ($20/mo) Business
L3/L4 DDoS Protection
WAF
Advanced Rate Limiting
Bot Fight Mode
Custom Rules 5 20 100+

Cloudflare Worker — Rate Limiting at the Edge

Requests are blocked before they even reach your server. It's like a bouncer at the club — they turn away troublemakers on the street, not at the bar.

const RATE_LIMIT = 100;
const WINDOW = 60; // seconds

export default {
    async fetch(request, env) {
        const ip = request.headers.get('CF-Connecting-IP');
        const key = `rate:${ip}:${Math.floor(Date.now() / 1000 / WINDOW)}`;

        const current = parseInt(await env.RATE_KV.get(key) || '0');

        if (current >= RATE_LIMIT) {
            return new Response('Too Many Requests', {
                status: 429,
                headers: {
                    'Retry-After': String(WINDOW),
                    'X-RateLimit-Remaining': '0'
                }
            });
        }

        await env.RATE_KV.put(key, String(current + 1), { expirationTtl: WINDOW });
        return fetch(request);
    }
};
Enter fullscreen mode Exit fullscreen mode

Blocking Direct Server Access — This Is Mandatory

If ports 80/443 are open directly — an attacker can bypass Cloudflare by hitting your server's IP. All your CDN protection becomes meaningless. It's like having an armored door next to a hole in the wall.

# Download current Cloudflare IP ranges
wget -O /tmp/cf-ips.txt https://www.cloudflare.com/ips-v4

iptables -F INPUT
iptables -A INPUT -p tcp --dport 22 -j ACCEPT  # Keep SSH — otherwise you're locked out

while IFS= read -r ip; do
    iptables -A INPUT -p tcp --dport 80  -s "$ip" -j ACCEPT
    iptables -A INPUT -p tcp --dport 443 -s "$ip" -j ACCEPT
done < /tmp/cf-ips.txt

iptables -A INPUT -j DROP  # everything else — no
Enter fullscreen mode Exit fullscreen mode

9. OS Kernel: eBPF/XDP vs iptables

In the previous section we locked down direct access using iptables. That's the standard — but it has an architectural ceiling that becomes visible under truly serious volumetric attacks.

The iptables problem under volumetric attack. Even if a rule drops a packet, the Linux kernel still allocates an sk_buff buffer for every incoming packet before iptables processes it. At 10–20 Mpps (millions of packets per second) this exhausts kernel memory before the rules even fire. You're dropping packets, but you already paid the memory cost — a paradox of protection.

Solution: XDP (eXpress Data Path) + eBPF. XDP lets you attach an eBPF program directly to the NIC driver. The packet is dropped before the kernel even sees it — before sk_buff allocation, before the IP stack, before iptables. Like a doorman who turns away unwanted guests in the parking garage — not at reception.

Without XDP: NIC → sk_buff alloc → TCP stack → iptables → DROP  (memory wasted)
With XDP:    NIC → eBPF prog → XDP_DROP                          (memory untouched)
Enter fullscreen mode Exit fullscreen mode

Minimal XDP program in C (compiled via clang, loaded via ip link):

// xdp_drop_syn.c
#include <linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/tcp.h>

SEC("xdp")
int block_syn_flood(struct xdp_md *ctx) {
    void *data     = (void *)(long)ctx->data;
    void *data_end = (void *)(long)ctx->data_end;

    struct ethhdr *eth = data;
    if ((void *)(eth + 1) > data_end) return XDP_PASS;
    if (eth->h_proto != __constant_htons(ETH_P_IP)) return XDP_PASS;

    struct iphdr *ip = (void *)(eth + 1);
    if ((void *)(ip + 1) > data_end) return XDP_PASS;
    if (ip->protocol != IPPROTO_TCP) return XDP_PASS;

    struct tcphdr *tcp = (void *)(ip + 1);
    if ((void *)(tcp + 1) > data_end) return XDP_PASS;

    // Drop bare SYN without ACK — classic SYN flood
    if (tcp->syn && !tcp->ack) {
        // In production: check against eBPF map of blocked IPs
        return XDP_DROP;
    }

    return XDP_PASS;
}
Enter fullscreen mode Exit fullscreen mode
# Load program onto the interface (native mode — maximum speed)
ip link set dev eth0 xdp obj xdp_drop_syn.o sec xdp
Enter fullscreen mode Exit fullscreen mode

Layer allocation rule: iptables — for stateful rules and port-based filtering. eBPF/XDP — for high-speed dropping of volumetric L3/L4 attacks at the NIC level. For bare-metal servers under serious attack the difference in effectiveness is an order of magnitude. This isn't a replacement — it's an additional layer in front of everything else.


10. Monitoring and Automatic Auto-ban

The previous version of the anomaly detector stored request history in defaultdict(list) in process memory. That's two bugs in one:

  1. Memory leak: 100,000 unique IPs make one request each and disappear — their keys stay in memory until Uvicorn restarts. A slow death by OOM.
  2. Event loop blocking: list comprehension over 5,000 timestamps on every request — a synchronous operation in async context, delaying all other requests. One bad IP slows down all good users.

All state goes to Redis. TTL and LTRIM handle the cleanup automatically:

import redis.asyncio as aioredis
import logging
import time

logger = logging.getLogger(__name__)


class AnomalyDetector:
    """
    All state in Redis — no memory leaks, no event loop blocking.
    LPUSH/LTRIM keep only the last N timestamps.
    TTL automatically cleans up inactive IPs.
    """

    def __init__(self, redis: aioredis.Redis):
        self.redis = redis

    async def analyze_request(self, request: Request) -> dict:
        ip = request.client.host
        signals = {}

        # 1. High frequency — counter with TTL instead of in-memory list
        freq_key = f"freq:{ip}"
        count = await self.redis.incr(freq_key)
        await self.redis.expire(freq_key, 10)
        signals["high_frequency"] = count > 50

        # 2. Suspicious User-Agent (basic check — extended by JA3 below)
        ua = request.headers.get("user-agent", "")
        signals["suspicious_ua"] = any(
            bot in ua.lower()
            for bot in ["python-requests", "curl", "wget", "scrapy", "go-http"]
        )

        # 3. JA3 TLS fingerprint — real client identification (see section 11)
        KNOWN_BAD_JA3 = {
            "6734f37431670b3ab4292b8f60f29984",  # python-requests
            "b32309a26951912be7dba376398abc3b",  # curl
            "4d7a28d6f2263ed61de88ca66eb011e3",  # Scrapy
        }
        ja3 = request.headers.get("X-JA3-Fingerprint", "")
        signals["bad_tls_fingerprint"] = ja3 in KNOWN_BAD_JA3

        # 4. High 4xx error rate
        error_count = await self.redis.get(f"errors:{ip}")
        signals["high_errors"] = int(error_count or 0) > 20

        # 5. Scanning known vulnerable paths
        scan_paths = ["/admin", "/.env", "/wp-admin", "/phpMyAdmin", "/.git"]
        signals["scanning"] = any(request.url.path.startswith(p) for p in scan_paths)

        threat_score = sum(1 for v in signals.values() if v)
        if threat_score >= 2:
            await self._auto_block(ip, threat_score, signals)

        return signals

    async def record_error(self, ip: str):
        """Call this from middleware on 4xx responses."""
        key = f"errors:{ip}"
        await self.redis.incr(key)
        await self.redis.expire(key, 300)

    async def _auto_block(self, ip: str, score: int, signals: dict):
        duration = 86400 if score >= 3 else 3600
        # SET NX — don't overwrite if already blocked
        blocked = await self.redis.set(f"blacklist:{ip}", score, ex=duration, nx=True)
        if blocked:
            logger.warning(f"AUTO-BLOCKED {ip} for {duration}s | score={score} | {signals}")
            await self._send_telegram_alert(ip, score, signals)

    async def _send_telegram_alert(self, ip, score, signals):
        import httpx
        message = f"🚨 DDoS Alert!\nIP: {ip}\nScore: {score}/5\nSignals: {signals}"
        async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
            await client.post(
                f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
                json={"chat_id": CHAT_ID, "text": message}
            )


@app.middleware("http")
async def track_errors_middleware(request: Request, call_next):
    response = await call_next(request)
    if 400 <= response.status_code < 500:
        await anomaly_detector.record_error(request.client.host)
    return response
Enter fullscreen mode Exit fullscreen mode

Prometheus Metrics

from prometheus_client import Counter, Histogram
from prometheus_fastapi_instrumentator import Instrumentator

RATE_LIMIT_HITS = Counter(
    "rate_limit_hits_total",
    "Total rate limit violations",
    ["endpoint", "ip"]
)

REQUEST_DURATION = Histogram(
    "request_duration_seconds",
    "Request processing time",
    ["method", "endpoint", "status"],
    buckets=[.005, .01, .025, .05, .1, .5, 1, 5]
)

Instrumentator().instrument(app).expose(app, endpoint="/metrics")
Enter fullscreen mode Exit fullscreen mode

11. JA3/JA4 TLS Fingerprinting — Detect Bots by Their Handshake

In AnomalyDetector we check the User-Agent. That's necessary, but naive: any script changes that header in one line. It's like trying to identify a professional burglar by asking their name.

The problem: HTTP headers are controlled by the attacker. The TLS handshake is not.

When establishing an HTTPS connection, the client sends a ClientHello — a list of supported cipher suites, extensions, elliptic curve groups. Chrome sends a specific set, Python requests sends a different one, Go net/http — yet another. This set is hashed into a JA3 fingerprint (MD5, 32 characters). The fingerprint changes when the library is updated, but within the same version — it's stable. Faking it without patching the SSL library is not possible.

Chrome 120:       ja3 = "cd08e31494f9531f560d64c695473da9"  ← legitimate user
Python requests:  ja3 = "6734f37431670b3ab4292b8f60f29984"  ← instant red flag
curl 8.x:         ja3 = "b32309a26951912be7dba376398abc3b"  ← a tool, not a browser
Enter fullscreen mode Exit fullscreen mode

Getting JA3 in Nginx

# Requires: openresty or nginx + lua-nginx-module + lua-resty-ja3
http {
    lua_shared_dict ja3_block 10m;

    server {
        access_by_lua_block {
            local ja3 = require("resty.ja3")
            local fingerprint = ja3.fingerprint()

            -- List of known bad fingerprints (botnets, scanners)
            local blocklist = {
                ["6734f37431670b3ab4292b8f60f29984"] = true,  -- python-requests
                ["b32309a26951912be7dba376398abc3b"] = true,  -- curl
            }

            if blocklist[fingerprint] then
                ngx.header["X-JA3-Blocked"] = fingerprint
                ngx.exit(ngx.HTTP_FORBIDDEN)
            end

            -- Forward fingerprint to FastAPI for logging and analysis
            ngx.req.set_header("X-JA3-Fingerprint", fingerprint)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

FastAPI receives the fingerprint via the X-JA3-Fingerprint header and AnomalyDetector handles it (see the bad_tls_fingerprint signal above).

The Cloudflare path (no Nginx config needed): on Pro plan and above, JA3/JA4 are available as cf.bot_management.ja3_hash in Custom Rules — without a single line of server-side code. One click in the UI — and all Python bots are gone.

Important caveat: JA3 blocking is not a silver bullet. Sophisticated bots use browser engines (Playwright, Puppeteer) with a real Chrome TLS stack and a real Chrome fingerprint. But it effectively cuts out 80–90% of automated traffic from scripts and custom clients — the ones that didn't bother. And that's the majority.


12. Chaos Engineering — Proving Your Protection Actually Works

We wrote a Circuit Breaker, AnomalyDetector, Sliding Window. Beautiful code. Passed code review. Everyone said "looks good."

But how do you know it actually works in production and not just in unit tests with a mocked Redis? The honest answer: you can't — unless you deliberately broke things in staging first.

The enterprise answer: Chaos Engineering — intentionally injecting failures into staging to verify graceful degradation. Netflix calls this the "Design for failure" principle. We call it "let's break this before it breaks itself at 3 AM."

Toxiproxy — Injecting Failures at the TCP Layer

# Run Toxiproxy next to your staging DB
docker run -d -p 8474:8474 -p 5433:5433 ghcr.io/shopify/toxiproxy
Enter fullscreen mode Exit fullscreen mode
# chaos_tests/test_circuit_breaker.py
import pytest
import httpx
import asyncio
from toxiproxy import Toxiproxy


@pytest.fixture
def proxy():
    client = Toxiproxy()
    # Proxy PostgreSQL: localhost:5433 → postgres:5432
    proxy = client.create(name="postgres", listen="0.0.0.0:5433", upstream="postgres:5432")
    yield proxy
    proxy.destroy()


async def test_circuit_opens_under_latency(proxy):
    """
    Inject 3-second latency into PostgreSQL.
    Circuit Breaker should open after 5 timeouts
    and start returning 503 — fast, not hanging.
    """
    proxy.add_toxic("latency", type="latency", attributes={"latency": 3000})

    failed = 0
    circuit_opened = False

    async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
        for i in range(20):
            r = await client.get("/users/1")
            if r.status_code == 503:
                circuit_opened = True
                break
            if r.status_code == 504:
                failed += 1

    assert circuit_opened, f"Circuit didn't open after {failed} timeouts — something's wrong"


async def test_circuit_recovers_after_heal(proxy):
    """
    After the circuit opens, remove the toxic.
    After recovery_timeout the system should self-heal.
    No restart. No 3 AM calls.
    """
    proxy.add_toxic("down", type="limit_data", attributes={"bytes": 0})
    await asyncio.sleep(2)
    proxy.remove_toxic("down")

    await asyncio.sleep(35)  # wait recovery_timeout + buffer

    async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
        r = await client.get("/users/1")
        assert r.status_code == 200, "System didn't recover — Circuit stuck in OPEN"


async def test_rate_limiter_under_flood(proxy):
    """
    200 requests from one IP at once.
    Should get 429, not 500.
    500 = your code crashed. 429 = your protection is working.
    """
    async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
        tasks = [client.get("/api/expensive") for _ in range(200)]
        responses = await asyncio.gather(*tasks, return_exceptions=True)

    status_codes = [r.status_code for r in responses if isinstance(r, httpx.Response)]
    assert 429 in status_codes, "Rate limiter didn't fire under flood"
    assert 500 not in status_codes, "Server returned 500 — this is a crash, not controlled degradation"
Enter fullscreen mode Exit fullscreen mode
# Run in CI/CD pipeline (STAGING ONLY — never do this in production)
pytest chaos_tests/ -v --timeout=120
Enter fullscreen mode Exit fullscreen mode

What to Test in Chaos Tests

Scenario Tool Expected behavior
3s latency on PostgreSQL Toxiproxy latency Circuit opens, 503 instead of 504
Full DB outage Toxiproxy limit_data Circuit open, recovery in N seconds
Redis unavailable docker pause redis Graceful degradation, no crash
1000 req/s flood wrk / locust 429 at limit, zero 500s
Worker OOM kill -9 <pid> Gunicorn spawns new one, no traffic loss

The core Chaos Engineering insight: we don't care whether the system falls under load (it will — that's normal). We care whether it degrades gracefully. Correct degradation: 429 instead of 500, 503 instead of hanging, predictable recovery. Incorrect: OOM crash, connection leak, infinite retry storm. You only find out which one you have by breaking things intentionally — or accidentally in production.


13. Production Docker Compose

version: '3.9'

services:
  nginx:
    image: nginx:alpine
    ports: ["443:443", "80:80"]
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./certs:/etc/nginx/certs
    depends_on: [fastapi]

  fastapi:
    build: .
    command: gunicorn -c gunicorn.conf.py main:app
    environment:
      REDIS_URL: redis://redis:6379
    depends_on: [redis]
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 512M
          cpus: '1.0'

  redis:
    image: redis:7-alpine
    # allkeys-lru: on low memory evict old keys, don't crash
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
    volumes: [redis_data:/data]

  prometheus:
    image: prom/prometheus
    volumes: [./prometheus.yml:/etc/prometheus/prometheus.yml]

  grafana:
    image: grafana/grafana
    environment:
      GF_SECURITY_ADMIN_PASSWORD: strongpassword  # please change this. seriously.
    ports: ["3000:3000"]

volumes:
  redis_data:
Enter fullscreen mode Exit fullscreen mode

Pre-Deploy Checklist

Print it out. Pin it above your monitor. Save it in Notion. Tattoo it on your wrist — your choice.

  • [ ] Nginx sits in front of FastAPI with limit_req_zone and limit_conn
  • [ ] client_max_body_size 10m in Nginx — not Python middleware (bypassed via Transfer-Encoding: chunked)
  • [ ] Redis connected for distributed rate limiting
  • [ ] Rate limiting uses Sliding Window Counter (HINCRBY), not ZSET Log
  • [ ] TTL Jitter applied to Circuit Breaker recovery_timeout and all caches
  • [ ] Timeouts set in httpx.AsyncClient and DB engine — not via asyncio.wait_for around the app
  • [ ] proxy_read_timeout in Nginx as global backstop
  • [ ] CORS configured strictly — only allowed origins, not *
  • [ ] Security headers set: HSTS, CSP, X-Frame-Options
  • [ ] Direct server access blocked via iptables — only Cloudflare IPs allowed
  • [ ] Cloudflare with Bot Fight Mode enabled
  • [ ] JA3 fingerprint checked at Nginx or Cloudflare Custom Rules level
  • [ ] Prometheus + Grafana monitoring configured
  • [ ] Telegram/PagerDuty alerts on anomalies
  • [ ] Circuit Breaker with Redis state — not in process memory
  • [ ] AnomalyDetector fully on Redis — no defaultdict in worker memory
  • [ ] Login and critical endpoints have their own strict limits
  • [ ] Chaos tests (Toxiproxy) run in staging and passed
  • [ ] XDP/eBPF considered for bare-metal under volumetric L4 attacks

Summary: 8 Layers of Defense

eBPF/XDP (NIC level — before the kernel)
  └─ CDN (L3/L4 — Cloudflare + JA3 fingerprinting)
       └─ Firewall (IP filtering, iptables)
            └─ Nginx (connection + rate limits, Slowloris timeouts)
                 └─ FastAPI Middleware (headers, CORS, security)
                      └─ Rate Limiter (Redis Sliding Window + Jitter)
                           └─ Business Logic (Circuit Breaker → Redis)
                                └─ Monitoring (Anomaly Detection + Auto-ban + Chaos Tests)
Enter fullscreen mode Exit fullscreen mode

Each layer is cheaper than the one above it in CPU cost. An attack has to pass through all eight — almost nothing reaches the business logic. And if something does — monitoring already sent an alert and auto-ban is running.


If this was useful — leave a ❤️ and bookmark it. The next part will cover JWT security and credential stuffing protection. There'll be production bugs there too — there always are.

💼 Python Backend & Automation Engineer | Specialized in Scalable Data Systems & API Architecture. Relocating to Saskatoon, SK via SINP Tech Pathway — Open to joining high-growth Canadian engineering teams.

Top comments (0)