DEV Community

Abraham Acha
Abraham Acha

Posted on

How I Built a Real-Time DDoS Detection Engine from Scratch (No Fail2Ban)

Assuming my boss walked in and said "we're getting hit with suspicious traffic and I need you to build something that detects and responds to it automatically," I had two choices: reach for Fail2Ban like everyone else, or build something I actually understood from the ground up.

I chose to build it from scratch. This post explains exactly how I did it — in plain English, with real code, and without assuming you've ever touched security tooling before.

By the end of this post you will understand:

  • What the system does and why it matters
  • How a sliding window works (and why it's not just "count requests per minute")
  • How the system learns what normal traffic looks like
  • How it decides something is an attack
  • How it uses iptables to cut off an attacker at the firewall level

Let's go.


What This Project Does and Why It Matters

We run a cloud storage platform built on Nextcloud. It's public-facing, which means anyone on the internet can send requests to it — including bots, scanners, and attackers.

Two types of attacks concern us most:

A single aggressive IP — one machine sending hundreds of requests per second, trying to brute-force logins, scrape data, or simply overwhelm the server. This is a classic DDoS from a single source.

A global traffic spike — thousands of different IPs each sending a small number of requests. No single IP looks suspicious, but the total volume is crushing the server. This is a distributed DDoS.

The tool I built watches every HTTP request in real time, learns what "normal" traffic looks like, and fires an automatic response the moment something deviates — whether it's a single aggressive IP or a global spike.

For a single aggressive IP: it adds an iptables firewall rule to drop every packet from that IP, sends a Slack alert, and automatically releases the ban on a backoff schedule (10 minutes, then 30, then 2 hours, then permanent for repeat offenders).

For a global spike: it sends a Slack alert (no single IP to block).

The whole thing runs as a daemon — a continuously running background process — alongside the Nextcloud stack in Docker. It never sleeps, never needs a cron job, and never relies on hardcoded thresholds.

Here's the architecture at a glance:

Internet → Nginx (reverse proxy) → Nextcloud
                ↓ writes JSON logs
         [HNG-nginx-logs volume]
                ↓ reads logs
         Python Daemon
           ├── monitor.py    (tail the log file)
           ├── baseline.py   (learn normal traffic)
           ├── detector.py   (spot anomalies)
           ├── blocker.py    (iptables + audit log)
           ├── unbanner.py   (auto-release bans)
           ├── notifier.py   (Slack alerts)
           └── dashboard.py  (live web UI)
Enter fullscreen mode Exit fullscreen mode

Everything is wired together by main.py and configured through a single config.yaml file — no hardcoded numbers anywhere in the code.


Part 1: Reading the Traffic — The Log Monitor

Before we can detect anything, we need to see the traffic. Nginx is our reverse proxy — every HTTP request passes through it. We configure Nginx to write each request as a JSON object to a log file.

Here's the Nginx log format we defined:

log_format hng_json escape=json
    '{'
        '"source_ip":"$remote_addr",'
        '"timestamp":"$time_iso8601",'
        '"method":"$request_method",'
        '"path":"$uri",'
        '"status":$status,'
        '"response_size":$body_bytes_sent,'
        '"request_time":$request_time,'
        '"http_user_agent":"$http_user_agent"'
    '}';

access_log /var/log/nginx/hng-access.log hng_json;
Enter fullscreen mode Exit fullscreen mode

Every request now produces a line like this in the log file:

{
  "source_ip": "1.2.3.4",
  "timestamp": "2026-04-26T14:08:21+00:00",
  "method": "GET",
  "path": "/index.php/login",
  "status": 401,
  "response_size": 1842,
  "request_time": 0.043,
  "http_user_agent": "Mozilla/5.0..."
}
Enter fullscreen mode Exit fullscreen mode

Now our Python daemon needs to read these lines as they appear — in real time, without missing any, and without crashing if the log file gets rotated. This is called "tailing" a file, like tail -f in Linux.

Here's how monitor.py does it:

import os
import time
import json
import queue

def parse_line(raw: str) -> dict | None:
    """Parse one JSON log line. Returns None if malformed."""
    raw = raw.strip()
    if not raw:
        return None
    try:
        entry = json.loads(raw)
    except json.JSONDecodeError:
        return None  # drop malformed lines silently

    # Make sure all required fields are present
    required = {"source_ip", "timestamp", "method", "path", "status", "response_size"}
    if required - entry.keys():
        return None

    # Coerce types and add a wall-clock timestamp for window math
    entry["status"] = int(entry["status"])
    entry["response_size"] = int(entry["response_size"])
    entry["_parsed_at"] = time.time()
    return entry


class LogMonitor:
    def run(self):
        fh = open("/var/log/nginx/hng-access.log", "r")
        fh.seek(0, 2)  # jump to END of file — don't replay old traffic

        current_inode = os.stat(fh.name).st_ino

        while True:
            line = fh.readline()
            if line:
                entry = parse_line(line)
                if entry:
                    self.out_queue.put(entry)  # send downstream
            else:
                # Check if the file was rotated (inode changed)
                new_inode = os.stat(fh.name).st_ino
                if new_inode != current_inode:
                    fh.close()
                    fh = open(fh.name, "r")
                    current_inode = new_inode
                else:
                    time.sleep(0.1)  # no new data, yield CPU briefly
Enter fullscreen mode Exit fullscreen mode

Three things worth understanding here:

fh.seek(0, 2) — When we start the daemon, the log file already has entries from before we launched. We don't want to process all of those as if they're live traffic. Seeking to position 2 (the end) means we only see new lines written after the daemon starts.

Inode check — Log files get rotated (renamed and replaced with a fresh empty file) periodically. If we don't detect this, we'd keep reading the old file and miss all new traffic. The inode (a unique file identifier in Linux) changes when the file is replaced, so we check it on every "no new data" cycle.

The queue — We put parsed entries into a queue.Queue. This decouples reading from processing. The monitor thread just reads and queues; the main thread drains the queue and feeds entries to the baseline and detector. If the detector is temporarily slow, entries buffer in the queue rather than being dropped.


Part 2: How the Sliding Window Works

This is the core data structure of the entire detection system. Get this right and everything else falls into place.

The wrong way: per-minute counters

The naive approach is to count requests per minute:

# WRONG — this is not a sliding window
requests_this_minute = 0
last_minute = int(time.time() / 60)

def on_request():
    current_minute = int(time.time() / 60)
    if current_minute != last_minute:
        requests_this_minute = 0  # reset
        last_minute = current_minute
    requests_this_minute += 1
Enter fullscreen mode Exit fullscreen mode

The problem: imagine an attacker sends 1000 requests at 14:00:59 and another 1000 at 14:01:01. The per-minute counter resets at 14:01:00, so each minute only shows 1000 requests. But in reality, 2000 requests arrived in a 2-second window — a massive spike that the counter completely misses.

The right way: a deque of timestamps

A sliding window stores the exact timestamp of every request in a collections.deque. When you want the current rate, you evict old entries and count what remains.

from collections import deque
import time

# One deque per IP, one global
ip_window = deque()    # stores timestamps of requests from this IP
WINDOW = 60            # seconds

def on_request(ip: str, now: float):
    # Step 1: Add this request's timestamp
    ip_window.append(now)

    # Step 2: Evict entries older than 60 seconds from the LEFT
    cutoff = now - WINDOW
    while ip_window and ip_window[0] < cutoff:
        ip_window.popleft()   # O(1) — this is why we use deque, not list

    # Step 3: Rate = requests still in window / window size
    rate = len(ip_window) / WINDOW
    return rate  # requests per second over the last 60 seconds
Enter fullscreen mode Exit fullscreen mode

Let's trace through a real example:

Time 14:00:00  → request arrives → deque: [14:00:00]             rate = 1/60 = 0.017 req/s
Time 14:00:01  → request arrives → deque: [14:00:00, 14:00:01]   rate = 2/60 = 0.033 req/s
...normal traffic...
Time 14:01:30  → attacker starts sending 100 req/s
Time 14:01:31  → deque has 100 new entries + a few old ones
               → evict everything before 14:00:31
               → 100 entries remain → rate = 100/60 = 1.67 req/s  ← anomaly detected
Enter fullscreen mode Exit fullscreen mode

The window "slides" forward with real time. Old entries fall off the left; new entries pile up on the right. The rate you get is always "how many requests from this IP in the last 60 seconds."

Why deque and not list?

list.pop(0) is O(n) — it has to shift every element one position to the left every time you evict. With thousands of entries under a DDoS, this becomes very slow.

deque.popleft() is O(1) — it just moves a pointer. No matter how many entries are in the deque, eviction is instant.

Here's the actual implementation from our detector.py:

from collections import deque, defaultdict
from dataclasses import dataclass, field

@dataclass
class IPState:
    """All per-IP sliding window state."""
    timestamps: deque = field(default_factory=deque)  # request timestamps
    error_ts: deque = field(default_factory=deque)     # 4xx/5xx timestamps

# One IPState per IP address, created on first request
ip_states = defaultdict(IPState)

# One global window for all traffic combined
global_ts: deque = deque()

def _evict(dq: deque, window: float, now: float):
    """Remove timestamps older than `window` seconds from the left."""
    cutoff = now - window
    while dq and dq[0] < cutoff:
        dq.popleft()

def _rate(dq: deque, window: float) -> float:
    """Requests per second over the window."""
    return len(dq) / window

def process(entry: dict):
    ip = entry["source_ip"]
    now = entry["_parsed_at"]

    state = ip_states[ip]

    # Update per-IP window
    state.timestamps.append(now)
    _evict(state.timestamps, 60, now)
    ip_rate = _rate(state.timestamps, 60)

    # Update global window
    global_ts.append(now)
    _evict(global_ts, 60, now)
    global_rate = _rate(global_ts, 60)
Enter fullscreen mode Exit fullscreen mode

We maintain two separate windows: one per IP (so we can detect individual attackers) and one global (so we can detect distributed attacks from many IPs). Both are updated on every single request.


Part 3: How the Baseline Learns from Traffic

Detecting anomalies requires knowing what normal looks like. If normal traffic is 50 req/s, then 100 req/s is suspicious. But if normal traffic is 5 req/s, then even 15 req/s might be an attack.

The key insight: normal changes over time. A file-sharing platform sees more traffic during business hours than at 3am. A global service sees more traffic from Asia during Asian business hours. You cannot hardcode a threshold — you have to learn it.

Per-second bucketing

Our baseline engine counts requests per second and stores those counts in a rolling 30-minute window:

from collections import deque
import math
import time
from datetime import datetime

# Stores one count per second, up to 30 minutes worth (1800 buckets)
second_buckets = deque(maxlen=1800)

current_second = int(time.time())
current_count = 0.0

def record(entry: dict):
    global current_second, current_count

    now = entry["_parsed_at"]
    bucket_sec = int(now)

    # When the clock second changes, flush the completed bucket
    if bucket_sec > current_second:
        # Fill any empty seconds (gaps where no traffic arrived)
        for s in range(current_second, bucket_sec):
            second_buckets.append(current_count)
            current_count = 0.0
        current_second = bucket_sec

    current_count += 1.0
Enter fullscreen mode Exit fullscreen mode

So if 47 requests arrived in the second ending at 14:05:00, the deque gets 47.0 appended. If the next second was quiet (zero requests), it gets 0.0. The deque holds up to 1800 of these values — 30 minutes of history.

Computing mean and standard deviation

Every 60 seconds, we recompute the baseline from whatever is in the bucket deque:

def _recalculate():
    samples = list(second_buckets)
    n = len(samples)

    if n < 30:
        return  # not enough data yet, keep the floor values

    # Mean: average requests per second over the last 30 minutes
    mean = sum(samples) / n

    # Standard deviation: how much does traffic vary?
    variance = sum((x - mean) ** 2 for x in samples) / (n - 1)
    stddev = math.sqrt(variance)

    # Apply floor values to prevent false positives during quiet periods
    mean = max(mean, 1.0)      # never let mean drop below 1 req/s
    stddev = max(stddev, 0.5)  # never let stddev drop below 0.5

    effective_mean = mean
    effective_stddev = stddev
Enter fullscreen mode Exit fullscreen mode

Why do we need floor values?

Imagine the server is quiet at 3am with 0.1 req/s average. Without floor values:

  • mean = 0.1, stddev = 0.05
  • A burst to just 0.25 req/s gives z-score = (0.25 - 0.1) / 0.05 = 3.0 → false alarm!

With floor_mean = 1.0:

  • The denominator stays reasonable
  • You need a genuine spike to trigger detection

Per-hour preference

Traffic patterns repeat by hour of day. We also maintain 24 separate per-hour slot deques:

hour_slots = {h: deque(maxlen=1800) for h in range(24)}

# When flushing a bucket, also file it into the current hour's slot
hour = datetime.fromtimestamp(current_second).hour
hour_slots[hour].append(current_count)
Enter fullscreen mode Exit fullscreen mode

At recalculation time, if the current hour's slot has at least 30 samples, we use it instead of the 30-minute window. This means 9am traffic is compared to yesterday's 9am baseline, not to the 8:30am baseline — much more accurate.


Part 4: How the Detection Logic Makes a Decision

We have a rate (from the sliding window) and a baseline (mean and stddev). How do we decide if the rate is an attack?

Z-score: how many standard deviations above normal?

The z-score is a way of asking "how unusual is this number compared to what we normally see?"

z-score = (current_rate - baseline_mean) / baseline_stddev
Enter fullscreen mode Exit fullscreen mode

If the baseline is mean=5 req/s, stddev=2:

  • Current rate 7 req/s → z = (7-5)/2 = 1.0 → normal variation
  • Current rate 11 req/s → z = (11-5)/2 = 3.0 → suspicious
  • Current rate 25 req/s → z = (25-5)/2 = 10.0 → attack

We flag anything with z-score > 3.0. In a normal distribution, only 0.13% of values naturally exceed z=3. So a false positive rate of 0.13% is acceptable.

Multiplier: absolute rate check

Z-score alone has a weakness: if traffic is very consistent (low stddev), even a modest real attack might not produce a high z-score. So we add a second condition:

if rate > 5 × mean  flag it
Enter fullscreen mode Exit fullscreen mode

If normal is 5 req/s and someone hits 26 req/s, that's more than 5× — flag it regardless of stddev.

Either condition firing triggers the response. Here's the actual detection code:

def _zscore(rate: float, mean: float, stddev: float) -> float:
    if stddev == 0:
        return 0.0
    return (rate - mean) / stddev

def check_anomaly(ip_rate, mean, stddev):
    z_thresh = 3.0   # loaded from config.yaml
    m_thresh = 5.0   # loaded from config.yaml

    ip_z = _zscore(ip_rate, mean, stddev)

    # Either condition triggers the alert
    is_anomaly = (ip_z > z_thresh) or (ip_rate > m_thresh * mean and mean > 0)
    return is_anomaly, ip_z
Enter fullscreen mode Exit fullscreen mode

Error surge tightening

There's a third detection mode for slow attacks. An attacker doing credential stuffing (trying username/password combinations) won't necessarily send huge traffic volumes — but they'll generate lots of 401 Unauthorized responses.

We track error rates separately:

# If this IP's 4xx/5xx rate is 3× the baseline error rate,
# tighten the thresholds for this IP
if err_rate >= 3.0 * baseline_error_mean:
    z_thresh = 2.0   # lower bar → easier to trigger
    m_thresh = 3.0   # lower bar → easier to trigger
    state.tightened = True
Enter fullscreen mode Exit fullscreen mode

This catches slow, stealthy attacks that would otherwise fly under the radar.

Putting it all together

Here's the full process() function that runs on every log line:

def process(self, entry: dict):
    ip = entry["source_ip"]
    now = entry["_parsed_at"]
    status = entry.get("status", 200)

    mean, stddev = self._baseline.get_baseline()

    with self._lock:
        state = self._ip_states[ip]

        # 1. Update sliding windows
        state.timestamps.append(now)
        self._evict(state.timestamps, 60, now)

        self._global_ts.append(now)
        self._evict(self._global_ts, 60, now)

        ip_rate = len(state.timestamps) / 60
        global_rate = len(self._global_ts) / 60

        # 2. Check error surge → tighten thresholds if needed
        if status >= 400:
            state.error_ts.append(now)
        self._evict(state.error_ts, 60, now)
        err_rate = len(state.error_ts) / 60

        z_thresh = 2.0 if state.tightened else 3.0
        m_thresh = 3.0 if state.tightened else 5.0

        # 3. Score and decide
        ip_z = (ip_rate - mean) / stddev if stddev > 0 else 0
        ip_anomaly = (ip_z > z_thresh) or (ip_rate > m_thresh * mean)

        # 4. Fire callback if anomalous (with 10-second cooldown)
        if ip_anomaly:
            last = self._last_fired.get(ip, 0)
            if now - last > 10:
                self._last_fired[ip] = now
                self.on_ip_anomaly(ip, {"ip_rate": ip_rate, "zscore": ip_z, ...})

        # 5. Check global anomaly separately
        g_z = (global_rate - mean) / stddev if stddev > 0 else 0
        if g_z > 3.0 or global_rate > 5.0 * mean:
            if now - self._global_last_fired > 30:
                self._global_last_fired = now
                self.on_global_anomaly({"global_rate": global_rate, ...})
Enter fullscreen mode Exit fullscreen mode

When on_ip_anomaly fires, it calls the blocker. When on_global_anomaly fires, it calls the notifier directly (no ban — you can't ban the whole internet).


Part 5: How iptables Blocks an IP

iptables is the Linux kernel's firewall. It processes every network packet that enters or leaves your machine and applies rules to decide what to do with each one.

Think of it like a bouncer at a club. The bouncer has a list of rules:

  1. Allow anyone with a VIP pass (ACCEPT)
  2. Drop anyone on the blacklist (DROP)
  3. Default: let everyone in (ACCEPT)

When we detect an attack from IP 1.2.3.4, we add a rule to the top of the INPUT chain (the chain that handles incoming packets):

iptables -I INPUT -s 1.2.3.4 -j DROP
Enter fullscreen mode Exit fullscreen mode

Breaking this down:

  • -I INPUT — Insert at the top of the INPUT chain (before other rules)
  • -s 1.2.3.4 — Match packets with source IP 1.2.3.4
  • -j DROP — Silently drop matching packets (no response sent to attacker)

The -I (insert) rather than -A (append) is important. Rules are checked top-to-bottom. By inserting at the top, our DROP rule is checked before any ACCEPT rules — the attacker is blocked immediately.

You can verify the rule was added:

$ iptables -L INPUT -n -v
Chain INPUT (policy ACCEPT)
target     prot opt source               destination
DROP       all  --  1.2.3.4             0.0.0.0/0
ACCEPT     all  --  0.0.0.0/0           0.0.0.0/0  state RELATED,ESTABLISHED
...
Enter fullscreen mode Exit fullscreen mode

Here's the Python code that issues this command:

import subprocess

def _iptables_add(ip: str):
    """Add a DROP rule for this IP. Check first to avoid duplicates."""
    # Check if rule already exists
    check = subprocess.run(
        ["iptables", "-C", "INPUT", "-s", ip, "-j", "DROP"],
        capture_output=True
    )
    if check.returncode == 0:
        return  # already banned, nothing to do

    # Add the rule
    subprocess.run(
        ["iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"],
        check=True,
        capture_output=True
    )

def _iptables_remove(ip: str):
    """Remove the DROP rule. Loop in case it was added more than once."""
    while True:
        result = subprocess.run(
            ["iptables", "-D", "INPUT", "-s", ip, "-j", "DROP"],
            capture_output=True
        )
        if result.returncode != 0:
            break  # no more rules for this IP
Enter fullscreen mode Exit fullscreen mode

The backoff schedule

We don't ban forever on the first offence. The ban duration follows a backoff schedule:

Offence Duration
1st ban 10 minutes
2nd ban 30 minutes
3rd ban 2 hours
4th+ ban Permanent

A background thread (the unbanner) wakes up every 30 seconds and checks whether any bans have expired:

import threading
import time

class UnbannerThread(threading.Thread):
    def run(self):
        while not self._stop.wait(30):  # check every 30 seconds
            expired = self._blocker.get_expired_bans()
            for ip in expired:
                record = self._blocker.unban(ip)
                if record:
                    self._notifier.send_unban(record)  # Slack alert

def get_expired_bans(self) -> list[str]:
    now = datetime.utcnow()
    return [
        ip for ip, rec in self._bans.items()
        if rec.expires_at is not None and now >= rec.expires_at
    ]
Enter fullscreen mode Exit fullscreen mode

When an IP is unbanned, iptables -D INPUT -s <ip> -j DROP removes the rule, and a Slack notification goes out so the team knows the ban was lifted.


Part 6: Slack Alerts

Every ban, unban, and global anomaly sends a Slack message. We use Slack's Incoming Webhooks — a URL you POST JSON to, and it appears as a message in your channel.

import requests
import json

def send_ban(ip: str, info: dict, record):
    text = (
        f":rotating_light: *IP BANNED* — `{ip}`\n"
        f"*Rate:* `{info['ip_rate']:.2f} req/s`  "
        f"*Baseline:* `{info['mean']:.2f}`  "
        f"*Z-score:* `{info['zscore']:.2f}`\n"
        f"*Duration:* `{record.duration_label}`  (ban #{record.ban_count})\n"
        f"*Time:* {timestamp}"
    )
    requests.post(
        webhook_url,
        data=json.dumps({"text": text}),
        headers={"Content-Type": "application/json"},
        timeout=5
    )
Enter fullscreen mode Exit fullscreen mode

Critically, Slack calls happen in a thread pool — they never block the main detection loop. If Slack is slow or down, we don't miss detecting the next attack:

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

def send_ban_async(ip, info, record):
    executor.submit(_post_to_slack, ip, info, record)
Enter fullscreen mode Exit fullscreen mode

Part 7: The Live Dashboard

The dashboard is a Flask web app that serves a single HTML page which polls /api/metrics every 3 seconds via JavaScript fetch().

from flask import Flask, jsonify
import psutil

app = Flask(__name__)

@app.route("/api/metrics")
def metrics():
    snap = detector.get_snapshot()
    mean, stddev = baseline.get_baseline()
    banned = blocker.get_banned_ips()

    return jsonify({
        "uptime": get_uptime_string(),
        "global_rate": snap["global_rate"],
        "top_ips": snap["top_ips"],
        "mean": round(mean, 3),
        "stddev": round(stddev, 3),
        "banned": [{"ip": r.ip, "duration": r.duration_label, ...} for r in banned],
        "cpu_pct": psutil.cpu_percent(),
        "mem_pct": psutil.virtual_memory().percent,
        "audit_tail": read_last_20_audit_lines(),
    })
Enter fullscreen mode Exit fullscreen mode

The frontend JavaScript refreshes this every 3 seconds and updates the DOM without a full page reload:

async function refresh() {
    const r = await fetch('/api/metrics');
    const d = await r.json();

    document.getElementById('global-rate').textContent = d.global_rate.toFixed(2) + ' req/s';
    document.getElementById('mean').textContent = d.mean;

    // Color the rate red if it's approaching the anomaly threshold
    const rateEl = document.getElementById('global-rate');
    rateEl.style.color = d.global_rate > d.mean * 3 ? '#f87171' : '#4ade80';

    // Render banned IPs table
    const bannedRows = d.banned.map(b =>
        `<tr><td>${b.ip}</td><td>${b.duration}</td><td>${b.banned_at}</td></tr>`
    ).join('');
    document.getElementById('banned-table').innerHTML = bannedRows || '<tr><td colspan="3">No active bans ✓</td></tr>';
}

setInterval(refresh, 3000);
refresh(); // immediate first load
Enter fullscreen mode Exit fullscreen mode

Part 8: The Audit Log

Every significant event is written to a structured audit log in a format designed to be both human-readable and grep-able:

[2026-04-26T14:08:21Z] BAN ip=1.2.3.4 | condition=zscore=4.21 rate=87.3req/s mean=5.2 | rate=87.300 | baseline=5.200 | duration=10m
[2026-04-26T14:18:22Z] UNBAN ip=1.2.3.4 | condition=zscore=4.21 rate=87.3req/s mean=5.2 | rate=87.300 | baseline=5.200 | duration=10m
[2026-04-26T14:07:06Z] BASELINE_RECALC ip=N/A | condition=recalculation | rate=5.200 | baseline=1.100 | duration=N/A
Enter fullscreen mode Exit fullscreen mode

You can search these logs:

# All bans today
grep "BAN " /var/log/detector/audit.log | grep "2026-04-26"

# All recalculations (to see baseline evolution)
grep "BASELINE_RECALC" /var/log/detector/audit.log

# How often was 1.2.3.4 banned?
grep "ip=1.2.3.4" /var/log/detector/audit.log | grep "^.*BAN "
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

Here's main.py — the entry point that wires every component together:

import queue
import signal
import threading

# 1. Build all components
log_queue = queue.Queue(maxsize=100_000)
monitor = LogMonitor(log_path, log_queue)
baseline = BaselineEngine(cfg)
blocker = BlockerEngine(cfg, audit_path)
notifier = NotifierEngine(cfg)
detector = DetectorEngine(cfg, baseline)

# 2. Wire the anomaly callbacks
def on_ip_anomaly(ip, info):
    if blocker.is_banned(ip):
        return  # already banned, skip
    record = blocker.ban(ip, condition=..., rate=info["ip_rate"], baseline=info["mean"])
    notifier.send_ban(ip, info, record)

def on_global_anomaly(info):
    notifier.send_global_anomaly(info)

detector.on_ip_anomaly = on_ip_anomaly
detector.on_global_anomaly = on_global_anomaly

# 3. Start background threads
monitor.start()
UnbannerThread(blocker, notifier).start()
run_dashboard(host="0.0.0.0", port=8080)

# 4. Main loop — drain queue, feed baseline and detector
while True:
    try:
        entry = log_queue.get(timeout=0.05)
        baseline.record(entry)   # update statistics
        detector.process(entry)  # check for anomalies
    except queue.Empty:
        pass  # no new log lines, loop again
Enter fullscreen mode Exit fullscreen mode

The design is intentionally simple: one hot path (queue drain → baseline → detector) runs in the main thread. All I/O — Slack posts, iptables calls, dashboard requests — happens in separate threads and never blocks detection.


What I Learned

Building this from scratch taught me things that using Fail2Ban never would have:

Deques are the right tool for sliding windows. The O(1) popleft makes a real difference when you're processing thousands of events per second. A list would have worked fine until the DDoS actually started — which is exactly when you need it to be fast.

Baselines must have floors. My first version had no floor values. It triggered constantly at night because the stddev got so low that normal morning traffic looked like an anomaly. One line of code (mean = max(mean, 1.0)) fixed it.

Per-hour slots matter more than you'd think. Without them, the 30-minute window picks up the tail of the previous hour's traffic and skews the baseline for the current hour. The hour slots give you a cleaner "what does this hour normally look like" answer.

iptables rules apply to the host kernel, not the container. The detector container has to run with network_mode: host and CAP_NET_ADMIN. If you run it in a bridge network and add iptables rules inside the container, those rules only affect traffic inside the container's network namespace — not the actual packets arriving at the server.

Thread safety is non-negotiable. The log monitor, the main loop, the Flask dashboard, the unbanner, and the Slack notifier all touch shared state. A threading.Lock() around every mutation took 20 minutes to add and saved hours of debugging mysterious state corruption.


Running It Yourself

The full source code is on GitHub: github.com/your-username/hng-detector

To run it on a fresh Ubuntu VPS:

# Install Docker
curl -fsSL https://get.docker.com | sh

# Clone and configure
git clone https://github.com/your-username/hng-detector.git
cd hng-detector
cp .env.example .env
nano .env  # add your Slack webhook URL and server IP

# Launch everything
docker compose up -d --build

# Watch the detector work
docker logs -f hng-detector

# Test it with a traffic burst
apt install apache2-utils
ab -n 2000 -c 150 http://localhost/

# Check iptables for the ban
iptables -L INPUT -n -v
Enter fullscreen mode Exit fullscreen mode

The dashboard will be live at http://your-server-ip:8080.


Conclusion

Security tooling doesn't have to be a black box. The core ideas here — a deque-based sliding window, a rolling statistical baseline, z-score anomaly detection, and iptables for enforcement — are each individually simple. The power comes from wiring them together into a continuously running daemon that never sleeps.

If you're new to security tooling, I hope this post demystified what's happening under the hood of the tools you've always treated as magic. Go build something.


Found this useful? Share it with someone learning DevSecOps. Questions? Drop them in the comments.

Top comments (0)