Production log volumes make manual inspection impossible, and rule-based alerting only catches failures you've already seen before. Novel attack patterns, cascading service degradations, and subtle configuration drift routinely escape detection until they become outages. A language model can read logs the way a senior engineer would — understanding context, sequence, and semantic meaning — and flag what doesn't belong.
The Problem with Rules-Based Alerting
Every ops team starts the same way: if ERROR appears more than N times in M minutes, page someone. This works until it doesn't. The failure modes are predictable:
- Rules alert on noise (a single retry storm) and miss signals (five suspicious auth events spread over an hour)
- New service behavior means writing new rules — which requires knowing what to look for before the incident
- High-severity anomalies often look like low-severity events individually
Statistical approaches (Z-score on error rate, isolation forests) improve coverage but still require labeled training data and don't understand log semantics — they see numbers and tokens, not meaning. An LLM treats a log batch as structured text and reasons about it the way a person would.
Architecture Overview
The system works in three stages:
- Collection: aggregate logs into time-windowed batches (e.g., 5-minute windows)
- Analysis: send each batch to a language model with a structured prompt requesting JSON output
- Routing: if anomalies are detected, publish to a notification channel (Slack, PagerDuty, etc.)
You run this pipeline asynchronously on a schedule. It is not a real-time per-line filter — it is a second-pass analysis layer on top of your existing log infrastructure. The tradeoff compared to streaming detection: a few minutes of latency in exchange for dramatically better signal-to-noise ratio.
Ingesting and Batching Logs
A function that reads from a log file and groups entries into time windows:
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Generator
LOG_PATTERN = re.compile(
r"(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})"
r"\s+(?P<level>DEBUG|INFO|WARNING|ERROR|CRITICAL)"
r"\s+(?P<service>\S+)"
r"\s+(?P<message>.+)"
)
def parse_log_line(line: str) -> dict | None:
m = LOG_PATTERN.match(line.strip())
if not m:
return None
return {
"timestamp": m.group("timestamp"),
"level": m.group("level"),
"service": m.group("service"),
"message": m.group("message"),
}
def batch_logs(
log_path: Path,
window_minutes: int = 5,
max_lines: int = 200,
) -> Generator[list[dict], None, None]:
"""Yield non-overlapping time windows of parsed log entries."""
current_batch: list[dict] = []
window_start: datetime | None = None
with log_path.open() as fh:
for raw_line in fh:
entry = parse_log_line(raw_line)
if entry is None:
continue
ts = datetime.fromisoformat(entry["timestamp"])
if window_start is None:
window_start = ts
if ts - window_start > timedelta(minutes=window_minutes):
if current_batch:
yield current_batch
current_batch = [entry]
window_start = ts
else:
current_batch.append(entry)
if len(current_batch) >= max_lines:
yield current_batch
current_batch = []
window_start = None
if current_batch:
yield current_batch
Querying the Language Model
The prompt is the critical piece. Give the model enough context to distinguish normal chatter from real signals, and constrain it to return machine-readable output:
import json
import httpx
LLM_API_URL = "http://localhost:11434/v1/chat/completions" # Ollama-compatible endpoint
LLM_MODEL = "llama3.2"
SYSTEM_PROMPT = """You are a production reliability engineer analyzing application logs.
Identify anomalies: patterns that suggest errors, security incidents, or degraded service behavior.
Return ONLY valid JSON with this schema:
{
"anomalies": [
{
"severity": "low|medium|high|critical",
"type": "error_spike|auth_failure|latency|security|unknown",
"summary": "one sentence description",
"evidence": ["relevant log line 1", "relevant log line 2"],
"recommended_action": "what an engineer should do"
}
],
"window_health": "normal|degraded|critical",
"total_lines_analyzed": 0
}
If no anomalies are found, return an empty anomalies array.
Do not include markdown, explanation, or any text outside the JSON object."""
def analyze_batch(entries: list[dict]) -> dict:
log_text = "\n".join(
f"[{e['timestamp']}] {e['level']} {e['service']}: {e['message']}"
for e in entries
)
payload = {
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": f"Analyze these {len(entries)} log entries:\n\n{log_text}",
},
],
"temperature": 0.1,
"response_format": {"type": "json_object"},
}
resp = httpx.post(LLM_API_URL, json=payload, timeout=60)
resp.raise_for_status()
return json.loads(resp.json()["choices"][0]["message"]["content"])
def run_pipeline(log_path: Path) -> None:
all_anomalies: list[dict] = []
for batch in batch_logs(log_path):
result = analyze_batch(batch)
anomalies = result.get("anomalies", [])
if anomalies:
all_anomalies.extend(anomalies)
health = result.get("window_health", "unknown").upper()
print(f"[{health}] {len(anomalies)} anomaly(ies) detected")
for a in anomalies:
print(f" [{a['severity']}] {a['type']}: {a['summary']}")
if all_anomalies:
critical = [a for a in all_anomalies if a["severity"] == "critical"]
if critical:
print(f"\n!!! {len(critical)} CRITICAL anomaly(ies) require immediate attention !!!")
else:
print("No anomalies detected in this log window.")
if __name__ == "__main__":
run_pipeline(Path("/var/log/app/service.log"))
A few implementation notes worth calling out:
-
temperature: 0.1is non-negotiable. Higher values cause the model to invent anomalies not present in the data. -
response_format: json_objectforces JSON at the API level — do not rely on prompt instructions alone. - The Ollama endpoint is used here, but any OpenAI-compatible API works: swap the URL and model name accordingly.
Reducing Noise in Production
LLM detection has its own failure modes. Here is what actually matters once you move beyond a test environment:
Batch size discipline: 50–200 lines per batch is the practical range. Too few lines and the model lacks context. Too many and signals get diluted — the model starts summarizing instead of detecting.
Suppression windows: if an anomaly type fires, suppress re-alerting for the same type for 15–30 minutes. Cascading failures produce repetitive log patterns; without suppression you will get dozens of identical alerts for a single root cause.
Severity-based routing: send critical anomalies to PagerDuty, medium/high to Slack, low to a file for weekly review. Not everything needs to wake someone up at 3 AM.
Service-specific context in the system prompt: add two or three sentences describing what normal operations look like for your service. "This is a payment processing service. Background jobs run every 5 minutes and produce INFO-level lines with 'batch_complete'. Connection pool exhaustion is always critical." This alone cuts false positives significantly.
For teams building security-focused log pipelines, the security hardening checklists at AYI NEDJIMI Consultants include baseline anomaly categories and log retention guidance useful for bootstrapping detection rules.
Data sensitivity: if your logs contain PII, credentials, or tokens, run inference entirely on-premises. Never send raw production logs to an external LLM API — pre-screen and redact before batching.
The Takeaway
LLM-based log analysis is not a replacement for metrics and structured alerting. It is a second layer that catches semantically unusual events, novel failure modes, and slow drifts that rules miss. The implementation is straightforward: parse logs into batched windows, send each window to a language model with a tight structured prompt, parse the JSON response, and route by severity.
The real work is in the system prompt. Spend time describing what normal looks like for your specific service — that context is what separates useful anomaly reports from an expensive noise machine. Start small: run the pipeline against a week of historical logs to calibrate before enabling live alerting. False positive rates drop significantly after one round of prompt iteration.
I run AYI NEDJIMI Consultants, a cybersecurity consulting firm. We publish free security hardening checklists — PDF and Excel.
Top comments (0)