Production systems generate thousands of log lines per minute. Most are noise. The signals that matter — a failed authentication spike, an unexpected process crash, a database query that suddenly takes 10x longer — are buried in that noise. Rule-based alerting catches what you anticipated; anomaly detection catches what you didn't.
Why Rule-Based Alerting Falls Short
Writing alert rules requires knowing what failure looks like before it happens. That works for well-understood failure modes but leaves you blind to novel attacks, silent degradation, or emergent issues across services. A statistical model can learn what "normal" looks like from historical data and flag deviations automatically — without you enumerating every failure scenario upfront.
This article shows how to build a working log anomaly detector in Python that:
- Parses raw log lines into numeric features
- Trains an Isolation Forest on a baseline window
- Scores new lines in near real-time and surfaces the most anomalous ones
- Optionally routes flagged lines through a language model for triage
Parsing Logs Into Features
Raw log text is the worst possible input for a statistical model. We need numeric features. The simplest useful set includes: log level, HTTP status code, response latency, message length, and a coarse hash of the message template.
import re
import hashlib
from dataclasses import dataclass
LOG_PATTERN = re.compile(
r'\S+\s+(?P<level>DEBUG|INFO|WARN|ERROR|FATAL)\s+'
r'(?P<status>\d{3})?\s*(?P<latency_ms>\d+)?ms?\s*(?P<message>.+)'
)
LEVEL_MAP = {"DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3, "FATAL": 4}
@dataclass
class LogFeatures:
level: int
status: int
latency_ms: int
message_len: int
message_hash: int
def parse_line(raw: str) -> LogFeatures | None:
m = LOG_PATTERN.match(raw.strip())
if not m:
return None
return LogFeatures(
level=LEVEL_MAP.get(m.group("level"), 1),
status=int(m.group("status") or 200),
latency_ms=int(m.group("latency_ms") or 0),
message_len=len(m.group("message")),
message_hash=int(hashlib.md5(m.group("message").encode()).hexdigest()[:8], 16) % 10_000,
)
message_hash is a cheap way to bucket log message templates without full NLP. It handles the common case where the same event type repeats with different variable values — the hash groups them while still separating genuinely novel messages from the baseline.
Training an Isolation Forest
Isolation Forest is a strong fit for log anomaly detection: it requires no labeled examples, handles high-dimensional sparse data well, and its anomaly score is easy to interpret. The algorithm isolates anomalies by randomly partitioning the feature space — outliers require fewer partitions because they live in sparse regions.
import numpy as np
from sklearn.ensemble import IsolationForest
def build_matrix(lines: list[str]) -> np.ndarray:
features = [parse_line(l) for l in lines]
features = [f for f in features if f is not None]
return np.array([
[f.level, f.status, f.latency_ms, f.message_len, f.message_hash]
for f in features
], dtype=float)
def train_detector(baseline_lines: list[str], contamination: float = 0.01):
X = build_matrix(baseline_lines)
model = IsolationForest(
n_estimators=200,
contamination=contamination,
random_state=42,
n_jobs=-1,
)
model.fit(X)
return model
contamination=0.01 tells the model to expect roughly 1% of the baseline to already be anomalous — a defensible prior for most production environments. If your baseline window included an incident, push this to 0.05. If it was an unusually clean period, drop it to 0.005. The parameter is worth measuring rather than guessing: sample 500 lines, review them manually, and set contamination from the actual observed fraction of weird lines.
Scoring Lines and Surfacing Anomalies
Once trained, scoring is fast. predict returns -1 for anomalies and 1 for normal samples. decision_function gives a continuous score — lower values are more anomalous — which lets you prioritize when dozens of anomalies appear at once.
def score_lines(model, lines: list[str]) -> list[dict]:
parsed = [(l, parse_line(l)) for l in lines]
valid = [(raw, f) for raw, f in parsed if f is not None]
if not valid:
return []
X = np.array([
[f.level, f.status, f.latency_ms, f.message_len, f.message_hash]
for _, f in valid
], dtype=float)
predictions = model.predict(X)
scores = model.decision_function(X)
results = []
for (raw, _), pred, score in zip(valid, predictions, scores):
if pred == -1:
results.append({"line": raw.strip(), "score": round(float(score), 4)})
return sorted(results, key=lambda r: r["score"])
You can pipe results directly into any alerting channel — Slack webhook, PagerDuty, Telegram bot. Sorting by score ascending puts the most extreme outliers first.
Tailing a Live Log File
For production use, wire the detector into a file tail loop with a rolling baseline. Retrain periodically so the model adapts to deployment changes and organic traffic growth.
import time
from collections import deque
BASELINE_MAX = 50_000
RETRAIN_EVERY = 500 # batches
def tail_and_detect(log_path: str, contamination: float = 0.01):
baseline: deque[str] = deque(maxlen=BASELINE_MAX)
model = None
batch_n = 0
with open(log_path) as fh:
fh.seek(0, 2) # start at EOF — only tail new lines
while True:
lines = fh.readlines()
if not lines:
time.sleep(0.5)
continue
for line in lines:
baseline.append(line)
if len(baseline) >= 1_000 and model is None:
model = train_detector(list(baseline), contamination)
print(f"Model trained on {len(baseline)} lines")
if model is not None:
for a in score_lines(model, lines):
print(f"[ANOMALY {a['score']:.4f}] {a['line']}")
batch_n += 1
if model is not None and batch_n % RETRAIN_EVERY == 0:
model = train_detector(list(baseline), contamination)
print("Model retrained on rolling baseline")
In a real deployment, replace print with structured JSON output, add a watchdog that alerts if the detector stops processing (a silent failure is worse than no detector), and persist the trained model to disk so restarts don't lose the accumulated baseline.
Optional: LLM-Based Triage
The Isolation Forest flags outliers but cannot explain them in plain language. For the most severe anomalies, route the flagged line and its surrounding context to a language model for a human-readable summary:
def triage_anomaly(line: str, context: list[str], llm_client) -> str:
prompt = (
"You are an SRE reviewing a log anomaly.\n\n"
f"Context (10 lines before):\n{''.join(context[-10:])}\n\n"
f"Anomalous line:\n{line}\n\n"
"In 2-3 sentences: what is likely happening and how urgent is it?"
)
# Works with any chat-completions-compatible API (OpenAI, Ollama, Mistral, etc.)
resp = llm_client.chat.completions.create(
model="your-model-id",
messages=[{"role": "user", "content": prompt}],
max_tokens=200,
)
return resp.choices[0].message.content
This two-stage pattern — statistical detection first, LLM triage second — keeps costs under control. You only spend LLM tokens on the small fraction of lines flagged as anomalous, not the full stream. On a service generating 10k lines/minute with a 0.5% anomaly rate, that's at most 50 LLM calls per minute.
For teams building security-aware systems, it is worth auditing your logging and monitoring posture before shipping. We publish free security hardening checklists covering log retention policies, alerting gaps, and incident response readiness.
The Takeaway
This architecture is production-ready today with no exotic dependencies — just scikit-learn and Python's standard library for the core detector:
- Parse into features first. Do not pass raw log text to statistical models; numeric features are orders of magnitude more effective.
- Use a rolling baseline. Log patterns shift with every deployment. A static baseline from three months ago generates too many false positives.
-
Tune
contaminationfrom measurement. Sample your actual logs, review them, then set the parameter from real data rather than intuition. - Reserve LLM calls for triage. Detection must be fast and cheap; explanation can afford to be slower.
- Add a watchdog. A detector that silently stops processing is worse than having no detector at all.
The full pipeline runs under 200 lines of Python, has no database dependency, and handles tens of thousands of log lines per second on a single core.
I run AYI NEDJIMI Consultants, a cybersecurity consulting firm. We publish free security hardening checklists — PDF and Excel.
Top comments (0)