Introduction
Imagine you're running a cloud storage platform that serves thousands of users around the clock. One day, a wave of suspicious traffic hits your server — thousands of requests per second from a single IP address, trying to overwhelm your system. How do you detect it? How do you stop it automatically before real users are affected?
That's exactly what I built for this project — a real-time anomaly detection engine that watches all incoming HTTP traffic, learns what normal looks like, and automatically blocks anything that deviates from that normal. No human intervention needed.
In this post I'll walk you through exactly how I built it, piece by piece, in plain English. No security experience required.
What the Project Does
The system sits alongside a Nextcloud cloud storage server and does five things continuously:
- Watches every HTTP request coming into the server in real time
- Learns what normal traffic looks like using statistics
- Detects when traffic from a single IP or globally looks suspicious
- Blocks suspicious IPs automatically using Linux firewall rules
- Alerts your team on Slack within 10 seconds
The Technology Stack
- Python 3.11 — the detector daemon
- Nginx — reverse proxy that logs all traffic as JSON
- Nextcloud — the cloud storage application being protected
- Docker + Docker Compose — runs everything together
- iptables — Linux kernel firewall that blocks IPs
- Slack — receives instant alerts
- Flask — serves the live metrics dashboard
Part 1 — Reading the Logs in Real Time
The first challenge is reading Nginx access logs as they are written — not after the fact, but live, line by line.
Nginx is configured to write every request as a JSON object:
{
"source_ip": "1.2.3.4",
"timestamp": "2026-04-28T10:08:10+00:00",
"method": "GET",
"path": "/",
"status": 200,
"response_size": 1234
}
The detector uses a technique called log tailing — similar to the Linux tail -f command. Here's how it works in simple terms:
# Open the log file
with open("/var/log/nginx/hng-access.log") as f:
# Jump to the end — ignore old entries
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if not line:
# Nothing new yet wait briefly
time.sleep(0.1)
continue
# Parse the JSON and process it
entry = parse_log_line(line)
detector.process(entry)
Every new line that Nginx writes gets picked up within 100 milliseconds. That's fast enough to detect attacks as they happen.
Part 2 — The Sliding Window (How We Count Requests)
To detect whether an IP is sending too many requests we need to count how many requests it sent in the last 60 seconds not the last minute (which resets every 60 seconds), but a true rolling 60-second window.
We use Python's collections.deque for this think of it as a list that automatically stays the right size.
Here's the concept:
Time → 10s 11s 12s 13s 14s 15s 16s 17s ...
IP X: . . . req req req req req ...
Window at t=17s: [13, 14, 15, 16, 17] → 5 requests in 60s → 0.08 req/s
Window at t=70s: [14, 15, 16, 17] → old entries evicted from left
Eviction logic. This is the key part:
from collections import deque
# Store timestamps of every request from this IP
ip_window = deque()
def process_request(ip, timestamp):
cutoff = timestamp - 60 # 60 _second window_
# Remove old timestamps from the LEFT
while ip_window and ip_window[0] < cutoff:
ip_window.popleft() # O(1) operation
# Add new timestamp to the RIGHT
ip_window.append(timestamp)
# Current rate = count / window size
rate = len(ip_window) / 60 # requests per second
return rate
Why a deque? Because popping from the left is O(1) instant. A regular Python list would be O(n) slow for large windows.
We maintain two windows simultaneously:
- Per-IP window — one deque per IP address
- Global window — one deque for all requests combined
Part 3 — The Rolling Baseline (Learning What Normal Looks Like)
Here's the clever part the system doesn't use a hardcoded threshold like "block anyone over 100 req/s." Instead it learns what normal traffic looks like and flags anything that deviates significantly.
Every second we record how many requests arrived:
Second 1: 3 requests
Second 2: 2 requests
Second 3: 5 requests
Second 4: 1 request
...
Second 1800: 4 requests (30 minutes of data)
Every 60 seconds we compute the mean and standard deviation of the last 30 minutes:
import math
def compute_baseline(counts):
n = len(counts)
mean = sum(counts) / n
variance = sum((x - mean) ** 2 for x in counts) / n
stddev = math.sqrt(variance)
return mean, stddev
For example if normal traffic is 5 req/s with occasional spikes to 10:
- Mean = 5.0 req/s
- Stddev = 2.0
This baseline automatically updates as traffic patterns change. Quiet at night, busy in the morning the baseline adapts to both.
We also keep per-hour slots so the quiet 3am baseline doesn't affect the busy 9am detection:
# Each hour gets its own baseline
hour_slots = {
0: [1, 2, 1, 0, 1, ...], # midnight traffic
9: [8, 12, 10, 9, 11, ...], # morning traffic
14: [15, 18, 14, 16, ...], # afternoon traffic
}
When detecting anomalies we use the current hour's baseline if it has enough data, otherwise fall back to the 30-minute rolling window.
Part 4 — Anomaly Detection (Making the Decision)
With the sliding window rate and the baseline computed, detection is simple math.
We use a z-score — a measure of how many standard deviations above normal the current rate is:
z = (current_rate - baseline_mean) / baseline_stddev
Examples:
Normal traffic: z = (5 - 5) / 2 = 0.0 → safe
Slightly high: z = (9 - 5) / 2 = 2.0 → watching
Attack traffic: z = (50 - 5) / 2 = 22.5 → ANOMALY!
We fire an alert when either condition is true — whichever fires first:
# Condition 1 — statistical anomaly
if zscore > 3.0:
fire_alert(ip, f"z-score {zscore:.2f} exceeds threshold")
# Condition 2 — absolute rate spike (catches early attacks)
elif rate > 5.0 * baseline_mean:
fire_alert(ip, f"rate is {rate/baseline_mean:.1f}x the baseline")
We also detect error surges — if an IP is generating lots of 4xx/5xx errors (like scanning for vulnerabilities) we tighten the thresholds automatically:
if error_rate >= 3 * baseline_error_rate:
# This IP is behaving badly — be more sensitive
zscore_threshold = 2.0 # tighter than normal 3.0
rate_threshold = 3.0 # tighter than normal 5.0
Part 5 — Blocking with iptables
When an IP is flagged we block it using iptables the Linux kernel's built-in firewall. This happens at the network level, before the request even reaches Nginx or Nextcloud.
import subprocess
def block_ip(ip):
subprocess.run([
"iptables", "-I", "INPUT", "1",
"-s", ip,
"-j", "DROP",
"--comment", "hng-detector-ban"
])
This inserts a rule at the top of the INPUT chain that says:
Any packet from IP 5.6.7.8 → DROP (silently discard)
The banned IP receives no response — their connection just times out. They can't reach the server at all.
Auto-unban with backoff — we don't ban forever immediately. The system uses a progressive backoff schedule:
ban_durations = [600, 1800, 7200, -1] _# seconds_
def get_ban_duration(strike):
index = min(strike - 1, len(ban_durations) - 1)
return ban_durations[index]
A background thread checks every 30 seconds if any bans have expired and removes them automatically.
Part 6 — Slack Alerts
Every detection event sends a Slack message via webhook:
import requests
def send_slack_alert(ip, condition, rate, baseline, duration):
message = f"""
🚨 *IP BANNED*
- *IP:* `{ip}`
- *Reason:* {condition}
- *Current rate:* {rate:.2f} req/s
- *Baseline:* {baseline:.2f} req/s
- *Ban duration:* {duration}
"""
requests.post(webhook_url, json={"text": message})
Alerts are sent asynchronously they never slow down the detection engine.
Part 7 — The Live Dashboard
A Flask web server runs on port 8888 and serves a dashboard that updates every 3 seconds showing:
- Global requests per second
- Baseline mean and standard deviation
- Currently banned IPs
- Top 10 source IPs by request rate
- CPU and memory usage
- System uptime
Results
After deploying to AWS EC2 the system successfully:
- Detected a flood of 1000 requests from IP 5.6.7.8
- Fired a Slack alert within 3 seconds
- Added an iptables DROP rule within 10 seconds
- Auto-unbanned after 10 minutes
- Sent an unban Slack notification automatically
- Baseline adapted from 1.0 req/s idle to 40 req/s under load
Key Lessons
- Deques are perfect for sliding windows:- O(1) eviction from the left makes them ideal for real-time rate tracking.
- Z-scores beat hardcoded thresholds:- a z-score of 3.0 works whether your baseline is 1 req/s or 1000 req/s.
- Per-hour baselines matter:- quiet nights shouldn't make morning traffic look like an attack.
- Block at the kernel level:- iptables blocks before the application sees the request, saving server resources during an attack.
- Always whitelist your own IPs:- accidentally banning your Docker gateway or monitoring system is embarrassing.
Source Code
Full source code is available at:
HNG Anomaly Detection Engine
Real-time HTTP traffic anomaly detector for cloud.ng (Nextcloud) — built for HNG DevSecOps challenge.
Language: Python 3.11
Why Python? The asyncio/threading model is ideal for I/O-bound log tailing; the standard library covers deques, statistics, and subprocess; and Flask gives us a dashboard in ~100 lines. Python's readability also makes the detection logic easy to audit and comment.
Live Links (fill in after deployment)
| Resource | URL |
|---|---|
| Nextcloud | http://YOUR_SERVER_IP |
| Metrics Dashboard | http://YOUR_DASHBOARD_DOMAIN:8888 |
| GitHub Repo | https://github.com/YOUR_USERNAME/hng-anomaly-detector |
| Blog Post | https://YOUR_BLOG_URL |
Architecture
Internet
│
▼
[Nginx :80] ──── JSON logs ──▶ HNG-nginx-logs (Docker volume)
│ │
▼ ▼ (read-only mount)
[Nextcloud] [Detector Daemon]
├── LogMonitor (tail log)
├── AnomalyDetector (sliding windows)
├── BaselineEngine (rolling stats)
├── Blocker (iptables)
├── Notifier (Slack)
└── Dashboard (Flask :8888)
How the Sliding Window Works
Two collections.deque structures are maintained simultaneously:
_global_window: deque[float] # timestamps of ALL requests
_ip_windows: dict[ip → deque] # per-IP timestamp deques
…




Top comments (0)