DEV Community

babaolu
babaolu

Posted on

How I Built a Real-Time Anomaly Detection Engine for a Cloud Storage Platform

Introduction

Imagine you're running a cloud storage platform — thousands of users uploading files, downloading documents, sharing links — all day, every day. Now imagine a hacker decides to hammer your server with thousands of fake requests per second. Without protection, your server slows to a crawl, real users get locked out, and your business takes a hit.

That's exactly the problem I was asked to solve. My task: build a tool that watches all incoming web traffic in real time, learns what "normal" looks like, and automatically blocks attackers — without any human intervention.

In this post I'll walk you through exactly how I built it, piece by piece, in beginner-friendly terms. No security background required.


What We're Building (The Big Picture)

Here's the system at a glance:

Internet → Nginx (reverse proxy) → Nextcloud (the app)
                ↓
         JSON access logs
                ↓
         Detection Daemon (our tool)
         ├── Reads logs line by line
         ├── Tracks request rates
         ├── Learns what's normal
         ├── Detects anomalies
         ├── Blocks bad IPs via iptables
         ├── Sends Slack alerts
         └── Shows a live dashboard
Enter fullscreen mode Exit fullscreen mode

The whole thing runs as Docker containers sitting alongside Nextcloud (our cloud storage app). Our tool never touches Nextcloud directly — it only watches the logs that Nginx writes.


The Tech Stack

  • Python 3.11 — for the detection daemon (readable, great standard library)
  • Nginx — the web server that sits in front of Nextcloud and writes JSON logs
  • Docker + Docker Compose — to run everything together
  • iptables — the Linux firewall, used to drop traffic from bad IPs
  • Flask — a tiny Python web framework for the live dashboard
  • Slack webhooks — to send ban/unban notifications

Part 1: Getting Nginx to Write JSON Logs

The first thing our detector needs is data — specifically, a log of every HTTP request that hits the server. Nginx can write logs in any format, and we chose JSON because it's easy to parse in Python.

Here's the key part of our Nginx config:

log_format hng_json escape=json
    '{'
        '"source_ip":"$remote_addr",'
        '"timestamp":"$time_iso8601",'
        '"method":"$request_method",'
        '"path":"$request_uri",'
        '"status":$status,'
        '"response_size":$body_bytes_sent'
    '}';

access_log /var/log/nginx/hng-access.log hng_json;
Enter fullscreen mode Exit fullscreen mode

Every time someone visits the site, Nginx writes one line like this:

{"source_ip":"203.0.113.42","timestamp":"2026-04-28T22:15:01+00:00","method":"GET","path":"/login","status":200,"response_size":4823}
Enter fullscreen mode Exit fullscreen mode

We also configured Nginx to trust the X-Forwarded-For header, which gives us the real client IP even when traffic passes through a load balancer:

set_real_ip_from 172.16.0.0/12;
real_ip_header   X-Forwarded-For;
Enter fullscreen mode Exit fullscreen mode

The log file lives in a shared Docker volume called HNG-nginx-logs. Nginx writes to it; our detector reads from it. They never need to talk directly.


Part 2: Tailing the Log (monitor.py)

Our monitor.py module continuously reads the log file, one line at a time, as new lines appear. This is called "tailing" — just like the Linux tail -f command.

with open(self.log_path, "r") as f:
    f.seek(0, 2)  # Jump to end of file — don't replay old logs
    while True:
        line = f.readline()
        if not line:
            time.sleep(0.05)  # Brief pause when no new data
            continue
        entry = json.loads(line)
        # Process entry...
Enter fullscreen mode Exit fullscreen mode

f.seek(0, 2) jumps to the end of the file on startup so we don't process logs from before the daemon started. Then we loop forever, reading one line at a time as Nginx writes them.


Part 3: The Sliding Window — Tracking Request Rates

This is the heart of the detection system. We need to answer the question: "How many requests has this IP sent in the last 60 seconds?"

The naive approach would be to count requests per minute in a simple counter. But that has a problem — it resets every minute, so a burst of 1000 requests in seconds 58-62 gets split across two windows and looks less severe than it is.

Instead, we use a sliding window built on Python's collections.deque.

What is a deque?

A deque (pronounced "deck") is a list that you can efficiently add to or remove from on either end. Think of it like a conveyor belt — new items go on the right, old items fall off the left.

How our sliding window works

For every IP address, we maintain a deque of timestamps — one entry per request:

ip_windows["203.0.113.42"] = deque([
    1714341240.1,    oldest request (61 seconds ago  will be evicted)
    1714341245.8,
    1714341250.2,
    1714341298.7,    newest request (just now)
])
Enter fullscreen mode Exit fullscreen mode

On every new request from that IP, we:

  1. Append the current timestamp to the right
  2. Evict any timestamps older than 60 seconds from the left
  3. The length of the deque is the request count in the last 60 seconds
def _append_and_evict(self, dq: deque, ts: float, now: float) -> None:
    dq.append(ts)                          # Add new request
    cutoff = now - self.window_sec         # 60 seconds ago
    while dq and dq[0] < cutoff:
        dq.popleft()                       # Evict stale entries
Enter fullscreen mode Exit fullscreen mode

Then the rate is simply:

ip_rate = len(self.ip_windows[ip]) / 60.0   # requests per second
Enter fullscreen mode Exit fullscreen mode

We maintain two windows simultaneously:

  • Per-IP window: one deque per source IP address
  • Global window: one deque for all traffic combined

This lets us detect both a single aggressive IP and a distributed attack from many IPs at once.


Part 4: The Rolling Baseline — Learning What's Normal

Z-scores and rate multipliers are meaningless without a reference point. We need to know: what does normal traffic look like on this server?

The trick is to never hardcode this. Traffic at 2am is different from traffic at noon. Monday is different from Friday. Our baseline must adapt.

How the baseline works

Every second, we record how many requests arrived in that second:

second 1: 3 requests
second 2: 5 requests
second 3: 2 requests
...
Enter fullscreen mode Exit fullscreen mode

We keep 30 minutes (1800 seconds) of these counts in a rolling deque. Old entries fall off the left automatically.

Every 60 seconds, we compute the mean (average) and standard deviation (how much traffic varies) from those counts:

@staticmethod
def _mean_stddev(values):
    n = len(values)
    mean = sum(values) / n
    variance = sum((x - mean) ** 2 for x in values) / n
    return mean, math.sqrt(variance)
Enter fullscreen mode Exit fullscreen mode

Per-hour slots

We also keep separate "buckets" for each hour of the day (24 buckets total). If the current hour has at least 2 minutes of data, we prefer that hour's baseline over the full 30-minute average. This makes the baseline more accurate — 3am traffic patterns shouldn't pollute the 3pm baseline.

current_hour_slot = int(now // 3600) % 24
slot = self._hour_slots[current_hour_slot]

if len(slot) >= 120:   # 2 minutes of per-second data
    counts = [c for (_, c) in slot]
else:
    counts = [c for (_, c) in self._rolling]  # fall back to 30-min window
Enter fullscreen mode Exit fullscreen mode

Floor values

What if traffic is so low that stddev approaches zero? Division by zero would break our z-score formula. So we apply floors:

self.effective_mean   = max(computed_mean,   0.1)   # at least 0.1 req/s
self.effective_stddev = max(computed_stddev, 0.05)  # at least 0.05
Enter fullscreen mode Exit fullscreen mode

Part 5: Anomaly Detection — Spotting the Attack

With a sliding window for current rates and a rolling baseline for normal rates, we can now make decisions. Our detector.py uses two independent checks on every single log line:

Check 1: Z-Score

The z-score tells us how many standard deviations above normal the current rate is:

z_score = (current_rate - mean) / stddev
Enter fullscreen mode Exit fullscreen mode

If the mean is 2 req/s and stddev is 0.5, and we see 8 req/s:

z_score = (8 - 2) / 0.5 = 12.0
Enter fullscreen mode Exit fullscreen mode

That's 12 standard deviations above normal — definitely an attack. We flag anything above 3.0.

def _z_score(self, rate: float) -> float:
    mean   = self.baseline.effective_mean
    stddev = self.baseline.effective_stddev
    return (rate - mean) / stddev
Enter fullscreen mode Exit fullscreen mode

Check 2: Rate Multiplier

The z-score works well for stable traffic. But what if traffic is genuinely variable? As a second safety net, we also flag any rate that is more than 5× the baseline mean — regardless of stddev.

ip_rate_check = (mean > 0 and ip_rate >= 5.0 * mean)
Enter fullscreen mode Exit fullscreen mode

Whichever check fires first triggers the response. This makes the system robust against both smooth and spiky normal traffic patterns.

Error Surge: Detecting Credential Stuffing

Some attacks are slow and low-volume but hammer error codes — like brute-forcing a login page. These might not hit rate thresholds, but they'll produce floods of 401 or 403 responses.

We track each IP's error rate separately. If it's 3× the baseline error rate, we automatically tighten the detection thresholds (halving the z-score threshold and rate multiplier) for that IP:

if ip_error_rate >= 3.0 * baseline_error_mean:
    effective_zscore_thresh = 3.0 / 2.0   # 1.5 instead of 3.0
    effective_rate_mult     = 5.0 / 2.0   # 2.5x instead of 5x
Enter fullscreen mode Exit fullscreen mode

Part 6: Blocking with iptables

When an anomaly is detected for a specific IP, we need to drop their traffic at the network level — before it even reaches Nginx.

Linux has a built-in firewall called iptables. Think of it as a bouncer at the door. We can tell it: "Drop all packets from this IP address."

import subprocess

def _iptables(action: str, ip: str) -> bool:
    cmd = ["iptables", action, "INPUT", "-s", ip, "-j", "DROP"]
    subprocess.run(cmd, timeout=5)
Enter fullscreen mode Exit fullscreen mode
  • iptables -I INPUT -s 203.0.113.42 -j DROPban (insert at top of chain)
  • iptables -D INPUT -s 203.0.113.42 -j DROPunban (delete rule)

The -I flag inserts the rule at position 1, which means it's evaluated first — highest priority. The attacker's packets get dropped at the kernel level before any Python or Nginx code even sees them.

The Backoff Schedule

We don't ban forever on the first offense. We use a backoff schedule that gets harsher with repeat offenders:

Offense Ban Duration
1st 10 minutes
2nd 30 minutes
3rd 2 hours
4th+ Permanent

This is stored in config.yaml so you can tune it without touching code:

ban:
  schedule:
    - 600     # 10 min
    - 1800    # 30 min
    - 7200    # 2 hours
    - -1      # permanent
Enter fullscreen mode Exit fullscreen mode

Part 7: Auto-Unban (unbanner.py)

A background thread polls every 10 seconds and checks if any ban has expired:

def _check_and_unban(self) -> None:
    now = time.time()
    for ip, info in list(self.state["banned"].items()):
        if info["until"] == -1:
            continue    # permanent, skip
        if now >= info["until"]:
            # Remove iptables rule
            subprocess.run(["iptables", "-D", "INPUT", "-s", ip, "-j", "DROP"])
            # Remove from our state
            del self.state["banned"][ip]
            # Send Slack notification
            self.notifier.unban_alert(ip, info)
Enter fullscreen mode Exit fullscreen mode

Every unban also sends a Slack notification and writes an audit log entry.


Part 8: Slack Alerts (notifier.py)

Slack has a feature called "Incoming Webhooks" — a URL you can POST JSON to and it appears as a message in a channel. We use this for real-time alerts.

def ban_alert(self, ip, condition, rate, baseline, duration, offense):
    text = (
        f":rotating_light: *IP BANNED* `{ip}`\n"
        f"• *Condition:* {condition}\n"
        f"• *Current rate:* {rate:.2f} req/s\n"
        f"• *Baseline mean:* {baseline:.4f} req/s\n"
        f"• *Ban duration:* {duration}s (offense #{offense})\n"
        f"• *Timestamp:* {timestamp}"
    )
    requests.post(self.webhook_url, json={"text": text})
Enter fullscreen mode Exit fullscreen mode

All alerts are fired in a background thread so they never block the detection loop. If Slack is slow, your detector doesn't slow down.


Part 9: The Live Dashboard (dashboard.py)

We built a Flask web app that shows everything happening in real time:

  • Global req/s — current traffic rate vs. baseline
  • Banned IPs — with offense count, condition that triggered the ban, and time until unban
  • Top 10 source IPs — most active IPs in the last 60 seconds
  • CPU and memory usage — system health
  • Effective mean and stddev — what the baseline currently looks like

The page uses <meta http-equiv="refresh" content="3"> to auto-reload every 3 seconds. There's also a /api/metrics endpoint that returns everything as JSON, useful for monitoring tools or external dashboards.

@app.route("/api/metrics")
def metrics():
    return jsonify({
        "global_rps":       state["global_rps"],
        "baseline_mean":    baseline.effective_mean,
        "baseline_stddev":  baseline.effective_stddev,
        "banned":           state["banned"],
        "top_ips":          state["top_ips"],
        "cpu_percent":      psutil.cpu_percent(),
        "mem_percent":      psutil.virtual_memory().percent,
        "uptime_seconds":   int(time.time() - state["start_time"]),
    })
Enter fullscreen mode Exit fullscreen mode

Part 10: The Audit Log

Every significant event is written to a structured audit log file:

[2026-04-28T23:15:01Z] BAN ip=203.0.113.42 | condition=zscore=4.21>=3.0 | rate=18.3200 | baseline=2.1000 | duration=600s | offense=1
[2026-04-28T23:25:01Z] UNBAN ip=203.0.113.42 | condition=zscore=4.21>=3.0 | rate=N/A | baseline=N/A | duration=600s | offense=1
[2026-04-28T23:26:40Z] BASELINE_RECALC ip=global | source=hour_slot[23] | mean=2.1450 | stddev=0.3821 | samples=960
Enter fullscreen mode Exit fullscreen mode

This gives you a full history of what happened, when, and why — invaluable for incident review.


Putting It All Together with Docker Compose

All of this runs as Docker containers defined in a single docker-compose.yml:

services:
  db:        # MariaDB database for Nextcloud
  nextcloud: # The cloud storage app
  nginx:     # Reverse proxy that writes JSON logs
  detector:  # Our anomaly detection daemon
Enter fullscreen mode Exit fullscreen mode

The magic glue is the shared Docker volume HNG-nginx-logs. Nginx writes logs to it; the detector reads from it. They never talk directly — they just share a filesystem path.

volumes:
  HNG-nginx-logs:   # Named volume shared between nginx and detector
Enter fullscreen mode Exit fullscreen mode

The detector runs with network_mode: host and cap_add: NET_ADMIN so it can modify iptables rules on the host machine.


Testing It

To verify the system works, you can simulate an attack using Apache Bench:

ab -n 5000 -c 100 http://YOUR_SERVER_IP/
Enter fullscreen mode Exit fullscreen mode

This fires 5000 requests with 100 concurrent connections. Within seconds you should see:

  1. The detector logs: IP ANOMALY 1.2.3.4: zscore=8.4>=3.0
  2. A Slack message: "🚨 IP BANNED 1.2.3.4"
  3. An iptables rule: DROP all -- 1.2.3.4 anywhere
  4. The dashboard updating with the banned IP

Key Lessons Learned

1. Never hardcode thresholds. What's "too much" traffic depends on time of day, day of week, and the specific server. A baseline that learns from real traffic is always more accurate than a hardcoded number.

2. Two detection signals are better than one. Z-score alone can miss slow attacks on variable traffic. Rate multiplier alone can miss subtle statistical anomalies. Together they cover each other's blind spots.

3. Deques are perfect for sliding windows. Python's collections.deque evicts from the left in O(1) time. It's the ideal data structure for time-window problems.

4. Block at the network layer, not the application layer. iptables drops packets before they reach Nginx or Python. It's faster and more reliable than any application-level rate limiting.

5. Alert on everything, block on IP anomalies. Global traffic spikes might be legitimate (a viral post, a TV mention). We alert on global anomalies but only auto-block individual IPs where the evidence is clear.


Conclusion

Building this project taught me that security tooling doesn't have to be mysterious. At its core, this is just:

  • Read logs line by line
  • Count requests in a time window using a deque
  • Compare current rates against a learned baseline
  • Act when something looks wrong

The maths is simple (mean, standard deviation, z-score). The data structures are simple (deque). The blocking mechanism is a single shell command. But combined, they form a system that responds to attacks in under 10 seconds, adapts to changing traffic patterns, and gets progressively stricter with repeat offenders.

If you're interested in security, DevOps, or systems programming, I hope this gives you a clear mental model for how real anomaly detection works — and the confidence to build your own.


The full source code is available at: https://github.com/babaolu/HNG_c14DO3

Built as part of the HNG Internship Stage 3 DevSecOps challenge.

Top comments (0)