DEV Community

Cover image for How I Built a Real-Time DDoS Detection Engine from Scratch
Hezekiah Umoh
Hezekiah Umoh

Posted on

How I Built a Real-Time DDoS Detection Engine from Scratch

How I Built a Real-Time DDoS Detection Engine from Scratch (No Fail2Ban, No Libraries)

A beginner-friendly walkthrough of how I built a system that watches live web traffic, learns what "normal" looks like, and automatically blocks attackers — all from scratch using Python.


Why This Project Exists

Imagine you run a cloud storage platform. Thousands of users upload and download files every day. Then one morning, a single IP address starts sending 500 requests per second to your server — way more than any normal user would ever send.

Your server starts slowing down. Real users can't log in. Files won't upload. Your platform is under attack.

This is called a DDoS attack — Distributed Denial of Service. The goal is simple: flood your server with so much traffic that it can't serve real users anymore.

My job in this project was to build a tool that:

  1. Watches all incoming traffic in real time
  2. Learns what normal traffic looks like
  3. Detects when something is wrong
  4. Automatically blocks the attacker
  5. Sends a Slack alert so the team knows what happened

And I had to do it without using Fail2Ban or any rate-limiting library. Everything had to be built from scratch.

Let's walk through how it works — step by step.


The Big Picture

Before diving into code, here's what the system looks like at a high level:

Internet Traffic
      ↓
   Nginx (reverse proxy)
      ↓ writes JSON logs
   /var/log/nginx/hng-access.log
      ↓ tailed continuously
   monitor.py (sliding windows)
      ↓ feeds counts
   baseline.py (learns normal)
      ↓ compares
   detector.py (flags anomalies)
      ↓ if anomaly found
   blocker.py → iptables DROP rule
   notifier.py → Slack alert
   audit.py → audit log
      ↓ always running
   dashboard.py → live web UI
Enter fullscreen mode Exit fullscreen mode

Every component runs as a daemon — a background process that never stops. It's not a cron job that runs once a minute. It's always watching, always learning.


Part 1: Watching the Logs (monitor.py)

What is Nginx doing?

Nginx is a web server that sits in front of our Nextcloud application. Every time someone makes a request — loading a page, uploading a file, logging in — Nginx writes a line to an access log.

I configured Nginx to write logs in JSON format so they're easy to parse:

{
  "source_ip": "45.33.10.5",
  "timestamp": "2024-01-15T12:34:56+00:00",
  "method": "GET",
  "path": "/index.php",
  "status": 200,
  "response_size": 4521
}
Enter fullscreen mode Exit fullscreen mode

One line per request. Millions of lines per day on a busy server.

How do we read the log in real time?

You know how tail -f in Linux shows you new lines as they appear in a file? That's exactly what monitor.py does — but in Python.

def tail_log(log_path):
    with open(log_path, "r") as fh:
        fh.seek(0, 2)   # jump to end of file — skip old history

        while True:
            line = fh.readline()

            if line:
                parsed = parse_line(line)
                if parsed:
                    yield parsed   # send to main loop
            else:
                time.sleep(0.05)  # no new data, wait a moment
Enter fullscreen mode Exit fullscreen mode

The key line is fh.seek(0, 2) — this moves our reading position to the end of the file when we start. We don't want to process yesterday's logs, just new traffic from this moment forward.

Then we loop forever: read a line, parse it, yield the result. The yield makes this a generator — it produces one request at a time for the main detection loop to process.

The Sliding Window — tracking who's doing what

Now here's where it gets interesting. For every request that comes in, we need to answer: "How many requests has this IP sent in the last 60 seconds?"

The naive approach would be to count all requests and reset every minute. But that has a problem — what if someone sends 100 requests at 11:59 and 100 more at 12:00? A per-minute counter would show 100 for each minute, missing the burst.

The right approach is a sliding window using a deque (double-ended queue).

Think of a deque like a conveyor belt. New requests go on the right. Old requests fall off the left. The length of the belt is always exactly 60 seconds.

from collections import deque, defaultdict

WINDOW = 60  # seconds

global_window = deque()              # all requests
ip_windows = defaultdict(deque)      # per-IP requests

def add_request(ip, status):
    now = time.time()

    # Add this request to the right of both deques
    global_window.append(now)
    ip_windows[ip].append(now)

    # Evict entries older than 60 seconds from the left
    cutoff = now - WINDOW

    while global_window and global_window[0] < cutoff:
        global_window.popleft()

    for dq in ip_windows.values():
        while dq and dq[0] < cutoff:
            dq.popleft()
Enter fullscreen mode Exit fullscreen mode

Every entry in the deque is just a timestamp. So to get the current rate:

ip_rate = len(ip_windows["45.33.10.5"])   # requests from this IP in last 60s
global_rate = len(global_window)           # all requests in last 60s
Enter fullscreen mode Exit fullscreen mode

No division needed. No rounding errors. Just count how many timestamps are still in the window.


Part 2: Learning What "Normal" Looks Like (baseline.py)

Here's a critical insight: you can't hardcode what "too many requests" means.

At 3am, getting 5 requests per second might be unusual. At noon, getting 50 requests per second might be perfectly normal. If you hardcode a threshold of "more than 20 req/s = attack", you'll get false alarms all morning and miss attacks at night.

The solution is a rolling baseline — the system learns what normal looks like by watching recent traffic.

How the baseline is calculated

Every second, we record how many requests came in that second:

history = deque()   # stores (timestamp, count, error_count)

def record_request(is_error=False):
    # Increment current-second counter
    _current_count += 1
    if is_error:
        _current_errors += 1
Enter fullscreen mode Exit fullscreen mode

Every second, we flush the current count into our history:

def _flush():
    now = int(time.time())
    history.append((now, current_count, current_errors))

    # Remove data older than 30 minutes
    cutoff = now - 1800
    while history and history[0][0] < cutoff:
        history.popleft()
Enter fullscreen mode Exit fullscreen mode

Every 60 seconds, we recalculate the baseline:

def _compute():
    data = [entry[1] for entry in history]

    mean = sum(data) / len(data)
    variance = sum((x - mean)**2 for x in data) / len(data)
    std = sqrt(variance)

    baseline["mean"] = max(mean, 1.0)   # never go below floor value
    baseline["std"]  = max(std, 0.5)    # never go below floor value
Enter fullscreen mode Exit fullscreen mode

The per-hour slot trick

Traffic patterns change throughout the day. Morning rush hour is different from midnight. So instead of one global rolling average, we keep per-hour slots:

hourly = defaultdict(list)   # { hour_of_day -> [counts] }

# When adding a sample:
hour = time.localtime().tm_hour
hourly[hour].append(count)
Enter fullscreen mode Exit fullscreen mode

When computing the baseline, we prefer the current hour's data if it has enough samples:

current_hour = time.localtime().tm_hour
hour_data = hourly.get(current_hour, [])

if len(hour_data) >= 10:
    data = hour_data        # use today's 2pm data to judge 2pm traffic
else:
    data = full_30min_window   # not enough hour data yet, use rolling window
Enter fullscreen mode Exit fullscreen mode

This means at 2pm, the baseline reflects what 2pm traffic normally looks like — not 3am traffic from 6 hours ago.


Part 3: Detecting Attacks (detector.py)

Now we have two numbers:

  • current_rate — how many requests this IP sent in the last 60 seconds
  • baseline_mean and baseline_std — what normal looks like

The question is: how different does the current rate need to be before we call it an attack?

Z-score: the statistical approach

A z-score tells you how many standard deviations away from the mean a value is. The formula is:

z = (current_value - mean) / standard_deviation
Enter fullscreen mode Exit fullscreen mode

For example:

  • Mean = 10 req/s, Std = 2 req/s
  • Current rate = 16 req/s
  • Z-score = (16 - 10) / 2 = 3.0

A z-score of 3.0 means the value is 3 standard deviations above normal. In statistics, this happens by chance less than 0.3% of the time. That's suspicious.

def detect_ip(ip_rate, mean, std, ip_error_rate=0, baseline_error=0):
    z = (ip_rate - mean) / std

    # Check z-score first
    if z > 3.0:
        return True, f"z-score={z:.2f}>3.0"

    # Also check raw multiplier (catches slow z-score rises)
    if ip_rate > mean * 5.0:
        return True, f"{ip_rate:.1f}req/s > 5x baseline"

    return False, None
Enter fullscreen mode Exit fullscreen mode

We use two conditions because they catch different attack patterns:

  • Z-score catches gradual increases relative to variance
  • 5x multiplier catches sudden spikes even when variance is low

Error surge tightening

Here's a clever trick: if an IP is generating lots of 404 errors or failed login attempts (4xx/5xx responses), it's probably a scanner or brute-force attack. We tighten the thresholds automatically:

# If IP's error rate > 3x the baseline error rate...
error_surge = ip_error_rate > 3 * baseline_error_rate

if error_surge:
    z_limit = 2.0    # tighter threshold (was 3.0)
    mult    = 3.0    # tighter multiplier (was 5.0)
Enter fullscreen mode Exit fullscreen mode

This means suspicious IPs get caught faster, even if their total request rate isn't extreme yet.


Part 4: Blocking the Attacker (blocker.py)

Once we detect an anomaly, we need to block the IP within 10 seconds. We use iptables — Linux's built-in firewall.

import subprocess

def block_ip(ip, condition, rate, baseline_mean):
    # Add a DROP rule at the top of the INPUT chain
    subprocess.run([
        "iptables", "-I", "INPUT", "1",
        "-s", ip,        # source IP
        "-j", "DROP"     # drop all packets from this IP
    ])
Enter fullscreen mode Exit fullscreen mode

The -I INPUT 1 means "insert at position 1" — the very top of the firewall rules. This ensures the block takes effect immediately for all subsequent packets.

The backoff schedule

We don't permanently ban IPs on the first offense — they might be a misconfigured bot, not a malicious attacker. Instead, we use a backoff schedule:

Offense Ban Duration
1st 10 minutes
2nd 30 minutes
3rd 2 hours
4th+ Permanent
BAN_SCHEDULE = [600, 1800, 7200, -1]   # seconds (-1 = permanent)

def get_duration(ip):
    offense_count = ban_count.get(ip, 0)
    idx = min(offense_count, len(BAN_SCHEDULE) - 1)
    duration = BAN_SCHEDULE[idx]
    ban_count[ip] = offense_count + 1
    return duration
Enter fullscreen mode Exit fullscreen mode

When a ban expires, unblock_expired() removes the iptables rule automatically.


Part 5: Slack Alerts (notifier.py)

The team needs to know when something happens. We send structured Slack messages for every ban, unban, and global anomaly.

import requests

def send_ban(ip, condition, rate, baseline_mean, duration):
    msg = (
        f":rotating_light: *IP BANNED*\n"
        f"*IP:* `{ip}`\n"
        f"*Condition:* {condition}\n"
        f"*Rate:* {rate:.2f} req/s\n"
        f"*Baseline:* {baseline_mean:.2f} req/s\n"
        f"*Duration:* {duration}\n"
        f"*Time:* {datetime.utcnow()}"
    )
    requests.post(WEBHOOK_URL, json={"text": msg})
Enter fullscreen mode Exit fullscreen mode

The webhook URL is stored as an environment variable — never hardcoded in source code. This is important for security: if you accidentally push your code to GitHub, your webhook won't be exposed.


Part 6: The Live Dashboard (dashboard.py)

The dashboard is a Flask web app that shows live metrics and refreshes every 3 seconds:

from flask import Flask
app = Flask(__name__)

@app.route("/")
def home():
    return f"""
    <html>
    <head><meta http-equiv="refresh" content="3"></head>
    <body>
        <h1>Global Req/s: {get_global_rate()}</h1>
        <h2>Baseline Mean: {baseline["mean"]:.2f}</h2>
        <h2>Banned IPs: {len(get_blocked_list())}</h2>
        <!-- ... more stats ... -->
    </body>
    </html>
    """
Enter fullscreen mode Exit fullscreen mode

The <meta http-equiv="refresh" content="3"> tag makes the browser automatically reload every 3 seconds — no JavaScript needed.


Part 7: Putting It All Together (main.py)

The main loop ties everything together. It's beautifully simple:

# Start background threads
threading.Thread(target=baseline.loop, daemon=True).start()
threading.Thread(target=dashboard.run, daemon=True).start()

# Main detection loop
for ip, status in tail_log(log_file):
    # 1. Add to sliding windows
    add_request(ip, status)
    baseline.record_request(is_error=(status >= 400))

    # 2. Unban expired IPs
    blocker.unblock_expired()

    # 3. Get current stats
    mean = baseline.baseline["mean"]
    std  = baseline.baseline["std"]
    ip_rate = get_ip_rate(ip)

    # 4. Check for IP anomaly
    anomaly, reason = detector.detect_ip(ip_rate, mean, std)
    if anomaly:
        blocker.block_ip(ip, reason, ip_rate, mean)

    # 5. Check for global anomaly
    global_rate = get_global_rate()
    g_anomaly, g_reason = detector.detect_global(global_rate, mean, std)
    if g_anomaly:
        notifier.send_global_alert(g_reason, global_rate, mean)
Enter fullscreen mode Exit fullscreen mode

That's it. For every single HTTP request that hits the server, this code runs in milliseconds — checking whether it's part of an attack.


Deploying with Docker

The entire stack runs in Docker containers:

services:
  nextcloud:    # the actual app (pre-built image, not modified)
  nginx:        # reverse proxy + JSON logging
  detector:     # our Python daemon
Enter fullscreen mode Exit fullscreen mode

The Nginx logs are shared via a named Docker volume called HNG-nginx-logs. Nginx writes to it, and our detector reads from it — even though they're in separate containers.

The detector runs with network_mode: host and privileged: true so that iptables commands affect the actual host machine's firewall, not just the container's network namespace.


Lessons Learned

1. Never hardcode thresholds. What's "too many requests" depends entirely on your traffic patterns. Build a system that learns.

2. Deques are perfect for sliding windows. Python's collections.deque with a maxlen or manual eviction is exactly the right data structure for time-based windows.

3. Two detection methods are better than one. Z-score catches gradual increases. Rate multiplier catches sudden spikes. Together they cover more attack patterns.

4. Store secrets in environment variables. Never commit API keys, webhook URLs, or passwords to git. Use .env files that are gitignored.

5. Daemons beat cron jobs. A continuously running daemon reacts in milliseconds. A cron job that runs every minute can miss a 30-second attack entirely.


The Result

After all this work, here's what the live dashboard looks like:

  • Global Req/s updating in real time
  • Baseline Mean and Std Dev learned from actual traffic
  • Active bans with conditions and durations
  • Top 10 source IPs
  • Audit log showing every ban, unban, and baseline recalculation

When an attack comes in, the sequence is:

  1. Request arrives → sliding window updated
  2. Z-score computed → exceeds 3.0
  3. iptables rule added within 10 seconds
  4. Slack alert sent to team
  5. Audit log entry written
  6. Dashboard updates to show new ban

All of this happens automatically, 24/7, without any human intervention.


Resources


Built for HNG Internship Stage 3 — DevOps Track
https://github.com/ntonous/hng14-stage3-ddos-detector.git

Top comments (0)