Imagine you run a website. Everything is working fine, users are logging in, uploading files, and going about their day. Then suddenly, one bad actor decides to flood your server with thousands of requests per second. Your server slows down, legitimate users can't get in, and your business grinds to a halt. This is called a DDoS attack (Distributed Denial of Service), and it happens to companies of all sizes every single day.
In this post, I'll walk you through how I built a tool that watches all incoming traffic in real time, learns what "normal" looks like, and automatically blocks attackers all without using any rate-limiting libraries, just pure Python logic.
By the end of this post, you'll understand:
- How traffic is monitored in real time
- What a sliding window is and why it matters
- How the system learns a "normal" baseline
- How anomaly detection makes a decision
- How iptables blocks an attacker at the network level
Let's dive in!
What I Built
Here's the big picture of the system:
Internet Traffic
↓
Nginx (reverse proxy)
↓
writes JSON logs
↓
Our Detector Daemon ←── runs 24/7
↓ ↓ ↓
Block IP Slack Alert Dashboard
(iptables) (webhook) (website)
The stack has four moving parts:
Nginx sits in front of the application and writes every single HTTP request to a log file in JSON format — things like the source IP, the URL, the status code, and the timestamp.
Nextcloud is the actual web application users interact with. I don't touch it.
Our Detector is a Python daemon — a program that runs forever in the background. It reads the Nginx log file line by line, tracks request rates, computes a baseline, and fires an alert when something looks wrong.
The Dashboard is a simple web page that shows live stats, banned IPs, request rates, top talkers, and CPU usage, refreshing every 3 seconds.
Part 1: Reading the Log File in Real Time
The first challenge was: how do you read a file that is constantly being written to?
Think of it like reading a book while someone else is adding new pages at the end. You need to keep your finger on the last page and check for new content every fraction of a second.
This technique is called tailing a file — like the Unix tail -f command. Here's how I did it in Python:
def tail_log(filepath):
with open(filepath, "r") as f:
# Jump to the end of the file first
# so I only read NEW lines going forward
f.seek(0, 2)
while True:
line = f.readline()
if line:
yield line.strip() # give the line to whoever called us
else:
time.sleep(0.1) # no new line yet, wait a moment
The key trick here is f.seek(0, 2) — this jumps the reading cursor to the very end of the file. That way I don't re-process old traffic from before the daemon started.
Every new line is a JSON object that looks like this:
json{
"source_ip": "0.0.0.0",
"timestamp": "YYYY-MM-DDT09:53:21+00:00",
"method": "GET",
"path": "/",
"status": 200,
"response_size": 1234
}
I parse this with Python's built-in json.loads() to extract the IP address, status code, and other fields.
Part 2: The Sliding Window: Tracking Request Rates
Now that I'm reading requests in real time, I need to answer the question: how many requests has this IP sent in the last 60 seconds?
A naive approach would be to count requests per minute. But that has a problem — if an attacker sends 1000 requests at 00:59 and another 1000 at 01:01, a per-minute counter would only ever see 1000 at a time, never the full burst.
A sliding window solves this. Instead of fixed time buckets, I track a moving 60-second window that always ends at "right now."
Here's how it works using Python's deque (a double-ended queue — you can efficiently add to one end and remove from the other):
python
from collections import deque
import time
# One deque per IP — stores timestamps of each request
ip_windows = {}
def record_request(ip):
now = time.time()
cutoff = now - 60 # 60 seconds ago
if ip not in ip_windows:
ip_windows[ip] = deque()
# Add this request's timestamp
ip_windows[ip].append(now)
# Remove timestamps older than 60 seconds from the LEFT side
while ip_windows[ip] and ip_windows[ip][0] < cutoff:
ip_windows[ip].popleft()
# How many requests in the last 60 seconds?
count = len(ip_windows[ip])
rate = count / 60 # requests per second
return rate
Why deque? Because removing old entries from the left end of a deque is O(1) — instant — whereas doing the same with a regular Python list would be O(n) — slow for large lists.
Eviction logic: Every time a new request comes in, I check the leftmost (oldest) timestamp. If it's older than 60 seconds, I pop it off. I keep doing this until all remaining timestamps are within the window. This means the deque always contains only the last 60 seconds of activity.
I maintain two of these windows in parallel:
One per IP — to detect a single aggressive attacker
One global — to detect a flood from many different IPs at once
Part 3: The Rolling Baseline
Learning What Normal Looks Like: Knowing the current rate isn't enough. I need to know whether that rate is unusual. A server that normally handles 100 requests per second shouldn't be alarmed by 80, but a server that normally handles 2 requests per second absolutely should be alarmed by 80.
This is why I built a rolling baseline, a statistical picture of what normal traffic looks like over the past 30 minutes.
Here's the approach:
python
import math
from collections import deque
# Rolling window stores (timestamp, per_second_count) tuples
window = deque()
current_second = int(time.time())
current_count = 0
def record_request():
global current_second, current_count
now = int(time.time())
if now != current_second:
# Save the completed second into the rolling window
window.append((current_second, current_count))
# Evict entries older than 30 minutes
cutoff = now - (30 * 60)
while window and window[0][0] < cutoff:
window.popleft()
current_second = now
current_count = 0
current_count += 1
def recalculate_baseline():
# Extract just the counts
data = [count for _, count in window]
if len(data) < 2:
return # not enough data yet
mean = sum(data) / len(data)
variance = sum((x - mean) ** 2 for x in data) / len(data)
stddev = math.sqrt(variance)
return mean, stddev
I recalculate this every 60 seconds. The result is two numbers:
mean — the average requests per second over the last 30 minutes
stddev — how much the traffic normally varies
I also apply floor values — minimum values of 0.5 for mean and 0.3 for stddev. This prevents false positives when traffic is near zero (a single request would look like a massive spike if mean is 0.001).
I also maintain per-hour slots — separate baselines for each hour of the day. This way, if your server is always busier at 9am than at 3am, the system knows that and doesn't panic during the morning rush.
Part 4: Anomaly Detection: Making the Decision
Now I have two things:
The current rate for an IP (from the sliding window)
The baseline mean and stddev (from the rolling window)
I use these to calculate a z-score — a standard statistical measure of how far something is from normal, measured in standard deviations:
z-score = (current_rate - baseline_mean) / baseline_stddev
For example, if baseline mean is 2 req/s, stddev is 0.5, and an IP is sending 10 req/s:
z-score = (10 - 2) / 0.5 = 16
A z-score of 16 is extremely abnormal. I flag anything above 2.0 as suspicious.
I also have a second check on a rate multiplier. If the current rate is more than 3× the baseline mean, it's flagged regardless of z-score. This catches attacks that happen to have a low z-score because of high variance in the baseline.
Whichever check fires first wins:
python
def check_anomaly(ip_rate, baseline_mean, baseline_stddev):
zscore = (ip_rate - baseline_mean) / baseline_stddev
if zscore > 2.0:
return True, f"z-score={zscore:.2f} rate={ip_rate:.2f}/s"
if ip_rate > baseline_mean * 3.0:
return True, f"rate={ip_rate:.2f}/s is 3x baseline"
return False, ""
I also have an error surge feature. If an IP's 4xx/5xx error rate is 3× the normal error rate, I tighten its detection thresholds automatically — because legitimate users don't usually generate lots of errors.
Part 5: Blocking with iptables
Once I decide an IP is malicious, I need to block it at the network level, before it even reaches our application. I use iptables for this.
Iptables is Linux's built-in firewall. It inspects every network packet and decides what to do with it based on rules you define.
To block an IP:
bashiptables -I INPUT -s 192.168.1.100 -j DROP
This tells the kernel: "For any incoming packet from 192.168.1.100, DROP it — don't even acknowledge it."
I call this from Python using subprocess:
python
import subprocess
def ban_ip(ip):
subprocess.run(
["iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"],
check=True
)
print(f"Banned {ip}")
To unban:
pythondef unban_ip(ip):
subprocess.run(
["iptables", "-D", "INPUT", "-s", ip, "-j", "DROP"],
check=True
)
print(f"Unbanned {ip}")
I use -I (insert) to add at the top of the chain so it takes effect immediately, and -D (delete) to remove it when the ban expires.
Part 6: Auto-Unban with Backoff Schedule
I don't ban IPs forever on the first offence. I use a backoff schedule:
OffenceBan Duration1st10 minutes2nd30 minutes3rd2 hours4th+Permanent
A background thread checks every 30 seconds whether any ban has expired and releases it automatically, sending a Slack notification each time.
Part 7: Slack Alerts
Every ban and unban sends a message to a Slack channel via webhook. A webhook is just a URL you POST a JSON message to:
python
import requests, json
def send_alert(webhook_url, message):
requests.post(
webhook_url,
data=json.dumps({"text": message}),
headers={"Content-Type": "application/json"}
)
A ban alert looks like this in Slack:
🚨 IP BANNED
IP: 0.0.0.0
Condition: z-score=16.20 rate=8.33/s
Baseline mean: 0.50 req/s
Ban duration: 10 min
Time: 2026-04-29 09:53:21
Part 8: The Live Dashboard
The dashboard is a simple Flask web app that renders an HTML page with live stats and refreshes every 3 seconds using an HTML meta refresh tag. It shows banned IPs, global request rate, top 10 source IPs, CPU and memory usage, baseline mean and stddev, and uptime.
Lessons Learned
Hardcoded thresholds don't work. Early versions had fixed numbers like "block anything above 100 req/s." This caused false positives during normal traffic spikes and missed attacks during high-traffic periods. The rolling baseline fixed this.
The sliding window must use timestamps, not counters. A simple per-minute counter misses burst attacks that span minute boundaries. Storing actual timestamps in a deque and evicting by time gives accurate real-time rates.
iptables needs root. The detector container must run as root and have the NET_ADMIN capability to execute iptables commands. This is a necessary tradeoff for network-level blocking.
Floor values matter. Without minimum baseline values, any traffic on a quiet server looks like an attack. Setting a floor of 0.5 req/s mean, and 0.3 stddev prevents false alarms during low-traffic periods.
Conclusion
Building this taught me that security tooling doesn't have to be magic. At its core, anomaly detection is just statistics — mean, standard deviation, and z-scores applied to real-time data streams. The sliding window gives you accurate rate measurements. The rolling baseline gives you context. And iptables gives you the power to act.
The full source code is available at:
The live dashboard is running at:
Top comments (0)