DEV Community

Adjerese Precious
Adjerese Precious

Posted on

How I Built a Real-Time HTTP Traffic Anomaly Detector for a Cloud Storage Platform

Introduction

Imagine you're running a cloud storage platform that serves thousands of users around the clock. One day, a wave of suspicious traffic hits your server — thousands of requests per second from a single IP address, trying to overwhelm your system. How do you detect it? How do you stop it automatically before real users are affected?
That's exactly what I built for this project — a real-time anomaly detection engine that watches all incoming HTTP traffic, learns what normal looks like, and automatically blocks anything that deviates from that normal. No human intervention needed.
In this post I'll walk you through exactly how I built it, piece by piece, in plain English. No security experience required.

What the Project Does

The system sits alongside a Nextcloud cloud storage server and does five things continuously:

  1. Watches every HTTP request coming into the server in real time
  2. Learns what normal traffic looks like using statistics
  3. Detects when traffic from a single IP or globally looks suspicious
  4. Blocks suspicious IPs automatically using Linux firewall rules
  5. Alerts your team on Slack within 10 seconds

The Technology Stack

  1. Python 3.11 — the detector daemon
  2. Nginx — reverse proxy that logs all traffic as JSON
  3. Nextcloud — the cloud storage application being protected
  4. Docker + Docker Compose — runs everything together
  5. iptables — Linux kernel firewall that blocks IPs
  6. Slack — receives instant alerts
  7. Flask — serves the live metrics dashboard

Part 1 — Reading the Logs in Real Time

The first challenge is reading Nginx access logs as they are written — not after the fact, but live, line by line.
Nginx is configured to write every request as a JSON object:

{
  "source_ip": "1.2.3.4",
  "timestamp": "2026-04-28T10:08:10+00:00",
  "method": "GET",
  "path": "/",
  "status": 200,
  "response_size": 1234
}
Enter fullscreen mode Exit fullscreen mode

The detector uses a technique called log tailing — similar to the Linux tail -f command. Here's how it works in simple terms:

# Open the log file
with open("/var/log/nginx/hng-access.log") as f:
    # Jump to the end — ignore old entries
    f.seek(0, os.SEEK_END)

    while True:
        line = f.readline()

        if not line:
            # Nothing new yet wait briefly
            time.sleep(0.1)
            continue

       # Parse the JSON and process it
        entry = parse_log_line(line)
        detector.process(entry)
Enter fullscreen mode Exit fullscreen mode

Every new line that Nginx writes gets picked up within 100 milliseconds. That's fast enough to detect attacks as they happen.

Part 2 — The Sliding Window (How We Count Requests)

To detect whether an IP is sending too many requests we need to count how many requests it sent in the last 60 seconds not the last minute (which resets every 60 seconds), but a true rolling 60-second window.
We use Python's collections.deque for this think of it as a list that automatically stays the right size.
Here's the concept:

Time →  10s  11s  12s  13s  14s  15s  16s  17s  ...
IP X:    .    .    .    req  req  req  req  req  ...

Window at t=17s: [13, 14, 15, 16, 17]  → 5 requests in 60s → 0.08 req/s
Window at t=70s: [14, 15, 16, 17]      → old entries evicted from left
Enter fullscreen mode Exit fullscreen mode

Eviction logic. This is the key part:

from collections import deque

# Store timestamps of every request from this IP
ip_window = deque()
def process_request(ip, timestamp):
    cutoff = timestamp - 60  # 60 _second window_

    # Remove old timestamps from the LEFT
    while ip_window and ip_window[0] < cutoff:
        ip_window.popleft()    # O(1) operation

    # Add new timestamp to the RIGHT
    ip_window.append(timestamp)

    # Current rate = count / window size
    rate = len(ip_window) / 60  # requests per second
    return rate
Enter fullscreen mode Exit fullscreen mode

Why a deque? Because popping from the left is O(1) instant. A regular Python list would be O(n) slow for large windows.
We maintain two windows simultaneously:

  1. Per-IP window — one deque per IP address
  2. Global window — one deque for all requests combined

Part 3 — The Rolling Baseline (Learning What Normal Looks Like)

Here's the clever part the system doesn't use a hardcoded threshold like "block anyone over 100 req/s." Instead it learns what normal traffic looks like and flags anything that deviates significantly.
Every second we record how many requests arrived:

Second 1:  3 requests
Second 2:  2 requests  
Second 3:  5 requests
Second 4:  1 request
...
Second 1800: 4 requests  (30 minutes of data)
Enter fullscreen mode Exit fullscreen mode

Every 60 seconds we compute the mean and standard deviation of the last 30 minutes:

import math

def compute_baseline(counts):
    n = len(counts)
    mean = sum(counts) / n
    variance = sum((x - mean) ** 2 for x in counts) / n
    stddev = math.sqrt(variance)
    return mean, stddev
Enter fullscreen mode Exit fullscreen mode

For example if normal traffic is 5 req/s with occasional spikes to 10:

  • Mean = 5.0 req/s
  • Stddev = 2.0

This baseline automatically updates as traffic patterns change. Quiet at night, busy in the morning the baseline adapts to both.
We also keep per-hour slots so the quiet 3am baseline doesn't affect the busy 9am detection:

# Each hour gets its own baseline
hour_slots = {
    0:  [1, 2, 1, 0, 1, ...],   # midnight traffic
    9:  [8, 12, 10, 9, 11, ...], # morning traffic
    14: [15, 18, 14, 16, ...],   # afternoon traffic
}
Enter fullscreen mode Exit fullscreen mode

When detecting anomalies we use the current hour's baseline if it has enough data, otherwise fall back to the 30-minute rolling window.

Part 4 — Anomaly Detection (Making the Decision)

With the sliding window rate and the baseline computed, detection is simple math.
We use a z-score — a measure of how many standard deviations above normal the current rate is:
z = (current_rate - baseline_mean) / baseline_stddev
Examples:

Normal traffic:  z = (5 - 5) / 2 = 0.0   → safe
Slightly high:   z = (9 - 5) / 2 = 2.0   → watching
Attack traffic:  z = (50 - 5) / 2 = 22.5 → ANOMALY!
Enter fullscreen mode Exit fullscreen mode

We fire an alert when either condition is true — whichever fires first:

# Condition 1 — statistical anomaly
if zscore > 3.0:
    fire_alert(ip, f"z-score {zscore:.2f} exceeds threshold")

# Condition 2 — absolute rate spike (catches early attacks)
elif rate > 5.0 * baseline_mean:
    fire_alert(ip, f"rate is {rate/baseline_mean:.1f}x the baseline")
Enter fullscreen mode Exit fullscreen mode

We also detect error surges — if an IP is generating lots of 4xx/5xx errors (like scanning for vulnerabilities) we tighten the thresholds automatically:

if error_rate >= 3 * baseline_error_rate:
    # This IP is behaving badly — be more sensitive
    zscore_threshold = 2.0   # tighter than normal 3.0
    rate_threshold = 3.0     # tighter than normal 5.0
Enter fullscreen mode Exit fullscreen mode

Part 5 — Blocking with iptables

When an IP is flagged we block it using iptables the Linux kernel's built-in firewall. This happens at the network level, before the request even reaches Nginx or Nextcloud.

import subprocess

def block_ip(ip):
    subprocess.run([
        "iptables", "-I", "INPUT", "1",
        "-s", ip,
        "-j", "DROP",
        "--comment", "hng-detector-ban"
    ])
Enter fullscreen mode Exit fullscreen mode

This inserts a rule at the top of the INPUT chain that says:
Any packet from IP 5.6.7.8 → DROP (silently discard)

The banned IP receives no response — their connection just times out. They can't reach the server at all.
Auto-unban with backoff — we don't ban forever immediately. The system uses a progressive backoff schedule:


ban_durations = [600, 1800, 7200, -1]  _# seconds_

def get_ban_duration(strike):
    index = min(strike - 1, len(ban_durations) - 1)
    return ban_durations[index]
Enter fullscreen mode Exit fullscreen mode

A background thread checks every 30 seconds if any bans have expired and removes them automatically.

Part 6 — Slack Alerts

Every detection event sends a Slack message via webhook:

import requests

def send_slack_alert(ip, condition, rate, baseline, duration):
    message = f"""
🚨 *IP BANNED*
- *IP:* `{ip}`
- *Reason:* {condition}
- *Current rate:* {rate:.2f} req/s
- *Baseline:* {baseline:.2f} req/s
- *Ban duration:* {duration}
    """
    requests.post(webhook_url, json={"text": message})
Enter fullscreen mode Exit fullscreen mode

Alerts are sent asynchronously they never slow down the detection engine.

Part 7 — The Live Dashboard

A Flask web server runs on port 8888 and serves a dashboard that updates every 3 seconds showing:

  • Global requests per second
  • Baseline mean and standard deviation
  • Currently banned IPs
  • Top 10 source IPs by request rate
  • CPU and memory usage
  • System uptime

Results

After deploying to AWS EC2 the system successfully:

  • Detected a flood of 1000 requests from IP 5.6.7.8
  • Fired a Slack alert within 3 seconds
  • Added an iptables DROP rule within 10 seconds
  • Auto-unbanned after 10 minutes
  • Sent an unban Slack notification automatically
  • Baseline adapted from 1.0 req/s idle to 40 req/s under load

Key Lessons

  1. Deques are perfect for sliding windows:- O(1) eviction from the left makes them ideal for real-time rate tracking.
  2. Z-scores beat hardcoded thresholds:- a z-score of 3.0 works whether your baseline is 1 req/s or 1000 req/s.
  3. Per-hour baselines matter:- quiet nights shouldn't make morning traffic look like an attack.
  4. Block at the kernel level:- iptables blocks before the application sees the request, saving server resources during an attack.
  5. Always whitelist your own IPs:- accidentally banning your Docker gateway or monitoring system is embarrassing.

Source Code

Full source code is available at:

HNG Anomaly Detection Engine

Real-time HTTP traffic anomaly detector for cloud.ng (Nextcloud) — built for HNG DevSecOps challenge.

Language: Python 3.11
Why Python? The asyncio/threading model is ideal for I/O-bound log tailing; the standard library covers deques, statistics, and subprocess; and Flask gives us a dashboard in ~100 lines. Python's readability also makes the detection logic easy to audit and comment.


Live Links (fill in after deployment)

Resource URL
Nextcloud http://YOUR_SERVER_IP
Metrics Dashboard http://YOUR_DASHBOARD_DOMAIN:8888
GitHub Repo https://github.com/YOUR_USERNAME/hng-anomaly-detector
Blog Post https://YOUR_BLOG_URL

Architecture

Internet
    │
    ▼
[Nginx :80]  ──── JSON logs ──▶  HNG-nginx-logs (Docker volume)
    │                                      │
    ▼                                      ▼ (read-only mount)
[Nextcloud]                     [Detector Daemon]
                                    ├── LogMonitor   (tail log)
                                    ├── AnomalyDetector (sliding windows)
                                    ├── BaselineEngine  (rolling stats)
                                    ├── Blocker     (iptables)
                                    ├── Notifier    (Slack)
                                    └── Dashboard   (Flask :8888)

How the Sliding Window Works

Two collections.deque structures are maintained simultaneously:

_global_window: deque[float]        # timestamps of ALL requests
_ip_windows:   dict[ip → deque]     # per-IP timestamp deques

Top comments (0)