DEV Community

anitaalicloud
anitaalicloud

Posted on

How I Built an Anomaly Detection Engine for DDoS Protection

Introduction
Imagine you run a busy website. On a normal day, about 50 people visit per second. Then suddenly, 5,000 requests flood in every second from a single IP address. Your server crashes. Your real users can't access anything. This is a DDoS (Distributed Denial of Service) attack.
In this post, I'll explain how I built a tool that watches incoming traffic in real time, learns what "normal" looks like, and automatically blocks attackers before they can cause damage.

What Does the Tool Do?
My anomaly detection engine does 6 things automatically:

Watches every HTTP request coming into the server
Learns what normal traffic looks like over time
Detects when traffic becomes abnormal
Blocks the attacker using the Linux firewall
Alerts me on Slack within 10 seconds
Unbans the IP automatically after a timeout

The Architecture
Internet Traffic

Nginx (logs every request as JSON)

Nextcloud (the actual app)

Detector Daemon reads Nginx logs

Sliding Window → tracks request rates
Rolling Baseline → learns normal traffic
Z-score Detection → spots anomalies
iptables → blocks attackers
Slack → sends alerts
Dashboard → shows live metrics

Part 1 — How the Sliding Window Works
Think of the sliding window like a 60 second camera 🎥
Every request that comes in gets a timestamp. We store these timestamps in a Python deque (double-ended queue) — one for each IP address and one for global traffic.

from collections import deque
import time

One deque per IP

ip_windows = {}

def record_request(ip):
now = time.time()

if ip not in ip_windows:
    ip_windows[ip] = deque()

# Add this request
ip_windows[ip].append(now)

# Remove requests older than 60 seconds from the LEFT
cutoff = now - 60
while ip_windows[ip] and ip_windows[ip][0] < cutoff:
    ip_windows[ip].popleft()

# Current rate = how many requests in last 60 seconds
current_rate = len(ip_windows[ip]) / 60
return current_rate
Enter fullscreen mode Exit fullscreen mode

from collections import deque
import time

One deque per IP

ip_windows = {}

def record_request(ip):
now = time.time()

if ip not in ip_windows:
    ip_windows[ip] = deque()

# Add this request
ip_windows[ip].append(now)

# Remove requests older than 60 seconds from the LEFT
cutoff = now - 60
while ip_windows[ip] and ip_windows[ip][0] < cutoff:
    ip_windows[ip].popleft()

# Current rate = how many requests in last 60 seconds
current_rate = len(ip_windows[ip]) / 60
return current_rate
Enter fullscreen mode Exit fullscreen mode

The magic is the eviction — old timestamps get removed from the left side of the deque automatically. So the deque always contains only the last 60 seconds of requests. The current rate is simply the length of the deque divided by 60.

Part 2 — How the Baseline Learns from Traffic
The baseline answers one question: "What is normal?"
We can't hardcode this because every website is different. A news site might normally get 1000 req/s. A small blog might get 2 req/s. So we let the system learn.
Every second we record how many requests came in. Every 60 seconds we look at the last 30 minutes of data and calculate:

import math

def recalculate_baseline(per_second_counts):
# Calculate average requests per second
mean = sum(per_second_counts) / len(per_second_counts)

# Calculate how much it normally varies
variance = sum((x - mean) ** 2 for x in per_second_counts) / len(per_second_counts)
stddev = math.sqrt(variance)

# Apply floors to prevent false alarms on quiet traffic
effective_mean = max(mean, 1.0)
effective_stddev = max(stddev, 1.0)

return effective_mean, effective_stddev
Enter fullscreen mode Exit fullscreen mode

We also maintain per-hour slots — the system prefers the current hour's data when it has enough samples. This means the baseline adapts to time-of-day patterns. Rush hour traffic looks different from 3 AM traffic!

Part 3 — How the Detection Logic Makes a Decision
Once we have the baseline we use a Z-score to decide if current traffic is anomalous.
The Z-score answers: "How many standard deviations away from normal is this?"

def is_anomalous(current_rate, mean, stddev):
# Z-score calculation
z_score = (current_rate - mean) / stddev

# Rate multiplier
rate_multiplier = current_rate / mean

# Flag as anomalous if EITHER condition fires
if z_score > 3.0:
    return True, "z-score exceeded 3.0"

if rate_multiplier > 5.0:
    return True, "rate exceeded 5x baseline"

return False, None
Enter fullscreen mode Exit fullscreen mode

Example:

Normal traffic: 50 req/s (mean=50, stddev=10)
Attack traffic: 5000 req/s from one IP
Z-score = (5000 - 50) / 10 = 495
495 > 3.0 → ANOMALY DETECTED! 🚨

We also detect error surges — if an IP is getting lots of 404/500 errors it might be scanning for vulnerabilities. In that case we tighten the thresholds automatically.

Part 4 — How iptables Blocks an IP
iptables is Linux's built-in firewall. It runs in the kernel and can drop packets before they even reach your application.
When we detect an attack:

import subprocess

def ban_ip(ip):
# Add a DROP rule — silently discard all packets from this IP
subprocess.run([
"iptables", "-A", "INPUT",
"-s", ip,
"-j", "DROP"
])
print(f"Banned {ip}")

def unban_ip(ip):
# Remove the DROP rule
subprocess.run([
"iptables", "-D", "INPUT",
"-s", ip,
"-j", "DROP"

])
print(f"Unbanned {ip}")
The -j DROP means "jump to DROP action" — the packet is silently discarded. The attacker doesn't even get an error message back. From their perspective the server just stopped responding.
Bans lift automatically on a backoff schedule:

1st offence → 10 minutes
2nd offence → 30 minutes
3rd offence → 2 hours
4th+ → permanent

Part 5 — The Live Dashboard
The dashboard is a simple web page that refreshes every 3 seconds showing:

Global requests per second
Currently banned IPs
Top 10 source IPs
CPU and memory usage
Current baseline mean and stddev
System uptime

It's built using Python's built-in http.server — no web framework needed!

Part 6 — Slack Alerts
When an IP gets banned, a Slack message arrives within 10 seconds:
🚨 IP BANNED
IP: 192.168.1.100
Condition: Anomalous request rate
Current Rate: 450.00 req/s
Baseline: 12.00 req/s
Ban Duration: 10 minutes
Timestamp: 2026-04-28 03:22:36 UTC
And when the ban expires:
✅ IP UNBANNED
IP: 192.168.1.100
Reason: ban-expired
Timestamp: 2026-04-28 03:32:36 UTC

What I Learned
Building this project taught me:

Z-scores are powerful — a simple maths formula can detect attacks that would be impossible to catch with hardcoded thresholds
Baselines must be dynamic — hardcoding "block if > 100 req/s" is wrong because normal traffic varies by time of day
iptables is incredibly fast — kernel-level packet dropping happens before the request even reaches Python
Threading needs care — shared data structures need locks to prevent race conditions
Deques are perfect for sliding windows — O(1) append and popleft make them ideal for real-time rate tracking

Try It Yourself
The full source code is available at:
https://github.com/AnitaAliCloud/hng-stage3-devops
The live dashboard is running at:
http://anitacloud.duckdns.org:8080

Built as part of the HNG14 DevOps internship programme

Top comments (0)