Introduction
Imagine you run a cloud storage platform. Thousands of users upload files, share documents, and collaborate every day. Then one morning, a single IP address sends 500 requests in 60 seconds. Your server slows to a crawl. Users can't log in. Files won't upload. You're under attack.
This is a DDoS attack — Distributed Denial of Service. The goal is simple: flood your server with so many requests that it can't serve real users anymore.
In this post, I'll walk you through how I built an anomaly detection engine that watches all incoming HTTP traffic in real time, learns what normal looks like, and automatically blocks attackers — all without any third-party security libraries.
Here's what the system does:
- Reads Nginx access logs line by line as they are written
- Tracks request rates using sliding time windows
- Learns normal traffic patterns using a rolling statistical baseline
- Detects anomalies using z-score math
- Blocks attacking IPs using Linux firewall rules (iptables)
- Sends Slack alerts within 10 seconds
- Shows a live web dashboard
Let's break down each piece.
The Architecture
Before diving into code, here's how all the pieces connect:
Internet → Nginx (reverse proxy) → Nextcloud
↓ writes JSON logs
/var/log/nginx/hng-access.log
↓ reads logs
Detector Daemon (Python)
├── monitor.py — tails the log file
├── baseline.py — learns what normal looks like
├── detector.py — spots anomalies
├── blocker.py — blocks IPs with iptables
├── unbanner.py — auto-releases bans
├── notifier.py — sends Slack alerts
└── dashboard.py — live web UI
Nginx sits in front of Nextcloud and writes every HTTP request as a JSON line to a log file. The detector daemon tails that file continuously, processes each line, and acts when something looks wrong.
Everything runs in Docker. Nginx, Nextcloud, and the detector are three separate containers sharing a named volume so the log file is accessible to all of them.
Step 1 — Reading the Log File in Real Time
The first challenge is reading a log file as it grows. This is called "tailing" — like running tail -f in your terminal.
Here's the core idea:
f = open("/var/log/nginx/hng-access.log", "r")
f.seek(0, 2) # jump to the end of the file
while True:
line = f.readline()
if line:
process(line)
else:
time.sleep(0.05) # nothing new yet, wait 50ms and try again
f.seek(0, 2) moves the file cursor to the very end. This means we only see new lines written after the daemon starts — not the entire history.
readline() returns an empty string "" when there's nothing new. That's our signal to sleep briefly and retry.
Handling log rotation: Nginx periodically rotates its log file (renames the old one, creates a new one). If we don't handle this, we'd keep reading the old file forever. The fix is to compare the file's inode (a unique ID the OS assigns to every file) on each empty read:
current_inode = os.stat(log_path).st_ino
if current_inode != saved_inode:
f.close()
f = open(log_path, "r") # reopen the new file from the beginning
saved_inode = current_inode
Each parsed JSON line gives us: source_ip, timestamp, method, path, status, response_size.
Step 2 — The Sliding Window
Now that we're reading log lines, we need to answer one question per request:
How many requests has this IP sent in the last 60 seconds?
A naive approach would be to store a counter per IP and reset it every minute. But that's wrong — it doesn't give you a true 60-second window. If an attacker sends 499 requests at 11:59:59 and 1 request at 12:00:01, a per-minute counter resets and misses the spike.
The correct approach is a sliding window using a deque (double-ended queue).
Here's the idea:
from collections import deque
import time
ip_windows = {} # one deque per IP
def record_request(ip):
now = time.time()
if ip not in ip_windows:
ip_windows[ip] = deque()
# add this request's timestamp to the right
ip_windows[ip].append(now)
# evict timestamps older than 60 seconds from the left
cutoff = now - 60
while ip_windows[ip] and ip_windows[ip][0] < cutoff:
ip_windows[ip].popleft()
# the length is now the exact request count in the last 60 seconds
return len(ip_windows[ip])
Visually, the deque looks like this:
ip_windows["1.2.3.4"] = deque([
1714000001.1, ← oldest (left side)
1714000001.9,
1714000002.3,
1714000059.8 ← newest (right side)
])
As time passes, old timestamps fall off the left. New ones are added to the right. len(deque) always gives you the exact count for the last 60 seconds.
Why deque and not a regular list? Because deque is O(1) on both ends — appending to the right and removing from the left are both instant, regardless of how many items are in it. A regular list's pop(0) is O(n) — it shifts every element left, which gets slow under heavy traffic.
We maintain three sets of windows:
- One deque per IP (per-IP request rate)
- One global deque (total traffic rate across all IPs)
- One error deque per IP (only 4xx/5xx responses)
Step 3 — The Rolling Baseline
The sliding window tells us the current rate. But is that rate normal or abnormal? To answer that, we need to know what normal looks like — and that changes over time. Traffic at 3am is different from traffic at 3pm.
This is the rolling baseline: a statistical model of recent traffic that updates automatically.
How it works:
Every second, we count how many requests arrived in that second and store it as a bucket:
second_bucket = int(time.time())
counts[second_bucket] += 1
Every 60 seconds, we look at the last 30 minutes of these per-second counts (up to 1800 buckets) and compute:
mean = sum(counts) / len(counts)
stddev = sqrt(sum((c - mean)**2 for c in counts) / len(counts))
mean is the average requests per second over the last 30 minutes. stddev (standard deviation) measures how much the traffic varies — a low stddev means traffic is steady, a high stddev means it's spiky.
Per-hour slots: We also store counts grouped by UTC hour. If the current hour has 60 or more data points, we prefer it over the mixed 30-minute window. This makes the baseline more accurate — 2am traffic shouldn't influence the 2pm baseline.
Floor values: On a quiet server, mean and stddev could be very close to zero. If stddev is 0, dividing by it in the z-score formula would crash the program. So we apply a floor:
mean = max(computed_mean, 1.0)
stddev = max(computed_stddev, 0.5)
This ensures the math always works, even on a server with almost no traffic.
Step 4 — Detecting Anomalies
Now we have:
-
current_rate— requests from this IP in the last 60 seconds -
mean— average requests per second over the last 30 minutes -
stddev— how much traffic normally varies
We use z-score to decide if the current rate is abnormal:
z = (current_rate - mean) / stddev
The z-score tells you how many standard deviations above normal the current rate is. A z-score of 1.0 means slightly above average. A z-score of 3.0 means extremely unusual — statistically, only 0.3% of normal traffic would ever reach this level.
We flag an IP as anomalous if either condition fires:
z = (ip_rate - mean) / stddev
if z > 3.0 or ip_rate > 5 * mean:
# anomaly detected
The 5x mean rule is a safety net — if traffic is so extreme that even a high stddev wouldn't catch it, the multiplier rule fires first.
Error surge tightening: If an IP is also generating a lot of 4xx/5xx errors (failed login attempts, scanning for vulnerabilities), we tighten the detection threshold from 3.0 down to 2.0. This means we act faster on IPs that are both high-volume and generating errors.
The same logic applies globally — if total traffic across all IPs spikes, we send a Slack alert (but don't block a single IP since the attack may be distributed).
Step 5 — Blocking with iptables
When an IP is flagged as anomalous, we block it at the Linux kernel level using iptables.
iptables is Linux's built-in firewall. It processes every network packet before it reaches your application. A DROP rule tells the kernel to silently discard all packets from a specific IP — they never reach Nginx, never reach Nextcloud, never consume any application resources.
The command to block an IP:
iptables -I INPUT -s 1.2.3.4 -j DROP
Breaking this down:
-
-I INPUT— insert a rule into the INPUT chain (incoming traffic) -
-s 1.2.3.4— match packets from this source IP -
-j DROP— silently drop them (no response sent to the attacker)
In Python, we run this as a subprocess:
import subprocess
subprocess.run(
["iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"],
check=True,
capture_output=True,
)
We use a list (not a shell string) to avoid shell injection vulnerabilities. check=True raises an exception if the command fails so we can log it.
To unban:
iptables -D INPUT -s 1.2.3.4 -j DROP
-D deletes the rule instead of inserting it.
Step 6 — Auto-Unban with Backoff
We don't ban IPs forever (unless they keep attacking). The unban schedule uses exponential backoff:
| Offense | Ban Duration |
|---|---|
| 1st | 10 minutes |
| 2nd | 30 minutes |
| 3rd | 2 hours |
| 4th+ | Permanent |
A background thread checks every 30 seconds whether any ban has expired:
for ip, record in banned_ips.items():
elapsed_minutes = (now - record["banned_at"]) / 60
if elapsed_minutes >= record["duration_minutes"]:
unban(ip)
send_slack_unban_alert(ip)
The offense count persists across ban/unban cycles. So if an IP gets banned, serves its 10-minute ban, gets released, and attacks again — the next ban is 30 minutes.
Step 7 — Slack Alerts
Every ban, unban, and global anomaly sends a Slack message via webhook. The webhook is just an HTTPS POST to a URL Slack gives you:
import requests
requests.post(
webhook_url,
json={"text": ":rotating_light: *IP BANNED* ..."},
timeout=8,
)
timeout=8 ensures we complete within the 10-second requirement even if Slack is slow. The POST runs in a background thread so it never blocks the detector loop.
A ban alert looks like this in Slack:
🚨 IP BANNED
• IP: 203.0.113.42
• Condition: z=4.21
• Current rate: 312 req/60s
• Baseline mean: 8.3421 | stddev: 2.1500
• Ban duration: 10 min
• Timestamp: 2025-04-20 14:32:01 UTC
Step 8 — The Live Dashboard
The dashboard is a FastAPI web app with two endpoints:
-
GET /metrics— returns live JSON data -
GET /— returns an HTML page that polls/metricsevery 3 seconds
The HTML page uses plain JavaScript to fetch /metrics and update the DOM without reloading the page:
async function refresh() {
const r = await fetch('/metrics');
const d = await r.json();
document.getElementById('global-rate').textContent = d.global_req_per_60s;
// ... update other elements
}
setInterval(refresh, 3000);
The dashboard shows:
- Banned IPs with ban time, duration, and offense count
- Global requests per 60 seconds
- Top 10 source IPs
- CPU and memory usage
- Effective baseline mean and stddev
- Daemon uptime
Putting It All Together
All the modules run as threads inside a single Python process:
main.py
├── Thread: monitor → tails log, pushes events to queue
├── Thread: detector → consumes queue, runs detection
├── Thread: baseline → recalculates every 60 seconds
├── Thread: unbanner → checks ban expiry every 30 seconds
└── Thread: dashboard → uvicorn serving FastAPI on port 8080
The main thread just keeps the process alive with while True: sleep(1) and handles KeyboardInterrupt for clean shutdown.
All shared state (banned IPs, sliding windows, baseline values) is protected by threading.Lock() to prevent race conditions between threads.
Key Takeaways
- Sliding windows with deques give you exact per-second rate tracking with O(1) performance
- Rolling baselines let the system adapt to real traffic patterns instead of relying on hardcoded thresholds
- Z-score detection is statistically sound — it flags things that are genuinely unusual relative to recent history
- iptables blocks at the kernel level — the most efficient place to stop an attack
- Backoff unbanning balances security with fairness — short bans for first offenses, longer for repeat attackers
The full source code is available at: https://github.com/nielvid/anomaly-detector
Built as part of the HNG DevOps Track — Stage 3
Top comments (0)