DEV Community

Ayi NEDJIMI
Ayi NEDJIMI

Posted on

Implementing AI-Powered Log Anomaly Detection

Production log volumes make manual inspection impossible, and rule-based alerting only catches failures you've already seen before. Novel attack patterns, cascading service degradations, and subtle configuration drift routinely escape detection until they become outages. A language model can read logs the way a senior engineer would — understanding context, sequence, and semantic meaning — and flag what doesn't belong.

The Problem with Rules-Based Alerting

Every ops team starts the same way: if ERROR appears more than N times in M minutes, page someone. This works until it doesn't. The failure modes are predictable:

  • Rules alert on noise (a single retry storm) and miss signals (five suspicious auth events spread over an hour)
  • New service behavior means writing new rules — which requires knowing what to look for before the incident
  • High-severity anomalies often look like low-severity events individually

Statistical approaches (Z-score on error rate, isolation forests) improve coverage but still require labeled training data and don't understand log semantics — they see numbers and tokens, not meaning. An LLM treats a log batch as structured text and reasons about it the way a person would.

Architecture Overview

The system works in three stages:

  1. Collection: aggregate logs into time-windowed batches (e.g., 5-minute windows)
  2. Analysis: send each batch to a language model with a structured prompt requesting JSON output
  3. Routing: if anomalies are detected, publish to a notification channel (Slack, PagerDuty, etc.)

You run this pipeline asynchronously on a schedule. It is not a real-time per-line filter — it is a second-pass analysis layer on top of your existing log infrastructure. The tradeoff compared to streaming detection: a few minutes of latency in exchange for dramatically better signal-to-noise ratio.

Ingesting and Batching Logs

A function that reads from a log file and groups entries into time windows:

import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Generator

LOG_PATTERN = re.compile(
    r"(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})"
    r"\s+(?P<level>DEBUG|INFO|WARNING|ERROR|CRITICAL)"
    r"\s+(?P<service>\S+)"
    r"\s+(?P<message>.+)"
)

def parse_log_line(line: str) -> dict | None:
    m = LOG_PATTERN.match(line.strip())
    if not m:
        return None
    return {
        "timestamp": m.group("timestamp"),
        "level": m.group("level"),
        "service": m.group("service"),
        "message": m.group("message"),
    }

def batch_logs(
    log_path: Path,
    window_minutes: int = 5,
    max_lines: int = 200,
) -> Generator[list[dict], None, None]:
    """Yield non-overlapping time windows of parsed log entries."""
    current_batch: list[dict] = []
    window_start: datetime | None = None

    with log_path.open() as fh:
        for raw_line in fh:
            entry = parse_log_line(raw_line)
            if entry is None:
                continue

            ts = datetime.fromisoformat(entry["timestamp"])

            if window_start is None:
                window_start = ts

            if ts - window_start > timedelta(minutes=window_minutes):
                if current_batch:
                    yield current_batch
                current_batch = [entry]
                window_start = ts
            else:
                current_batch.append(entry)
                if len(current_batch) >= max_lines:
                    yield current_batch
                    current_batch = []
                    window_start = None

    if current_batch:
        yield current_batch
Enter fullscreen mode Exit fullscreen mode

Querying the Language Model

The prompt is the critical piece. Give the model enough context to distinguish normal chatter from real signals, and constrain it to return machine-readable output:

import json
import httpx

LLM_API_URL = "http://localhost:11434/v1/chat/completions"  # Ollama-compatible endpoint
LLM_MODEL = "llama3.2"

SYSTEM_PROMPT = """You are a production reliability engineer analyzing application logs.
Identify anomalies: patterns that suggest errors, security incidents, or degraded service behavior.

Return ONLY valid JSON with this schema:
{
  "anomalies": [
    {
      "severity": "low|medium|high|critical",
      "type": "error_spike|auth_failure|latency|security|unknown",
      "summary": "one sentence description",
      "evidence": ["relevant log line 1", "relevant log line 2"],
      "recommended_action": "what an engineer should do"
    }
  ],
  "window_health": "normal|degraded|critical",
  "total_lines_analyzed": 0
}

If no anomalies are found, return an empty anomalies array.
Do not include markdown, explanation, or any text outside the JSON object."""

def analyze_batch(entries: list[dict]) -> dict:
    log_text = "\n".join(
        f"[{e['timestamp']}] {e['level']} {e['service']}: {e['message']}"
        for e in entries
    )

    payload = {
        "model": LLM_MODEL,
        "messages": [
            {"role": "system", "content": SYSTEM_PROMPT},
            {
                "role": "user",
                "content": f"Analyze these {len(entries)} log entries:\n\n{log_text}",
            },
        ],
        "temperature": 0.1,
        "response_format": {"type": "json_object"},
    }

    resp = httpx.post(LLM_API_URL, json=payload, timeout=60)
    resp.raise_for_status()
    return json.loads(resp.json()["choices"][0]["message"]["content"])


def run_pipeline(log_path: Path) -> None:
    all_anomalies: list[dict] = []

    for batch in batch_logs(log_path):
        result = analyze_batch(batch)
        anomalies = result.get("anomalies", [])
        if anomalies:
            all_anomalies.extend(anomalies)
            health = result.get("window_health", "unknown").upper()
            print(f"[{health}] {len(anomalies)} anomaly(ies) detected")
            for a in anomalies:
                print(f"  [{a['severity']}] {a['type']}: {a['summary']}")

    if all_anomalies:
        critical = [a for a in all_anomalies if a["severity"] == "critical"]
        if critical:
            print(f"\n!!! {len(critical)} CRITICAL anomaly(ies) require immediate attention !!!")
    else:
        print("No anomalies detected in this log window.")


if __name__ == "__main__":
    run_pipeline(Path("/var/log/app/service.log"))
Enter fullscreen mode Exit fullscreen mode

A few implementation notes worth calling out:

  • temperature: 0.1 is non-negotiable. Higher values cause the model to invent anomalies not present in the data.
  • response_format: json_object forces JSON at the API level — do not rely on prompt instructions alone.
  • The Ollama endpoint is used here, but any OpenAI-compatible API works: swap the URL and model name accordingly.

Reducing Noise in Production

LLM detection has its own failure modes. Here is what actually matters once you move beyond a test environment:

Batch size discipline: 50–200 lines per batch is the practical range. Too few lines and the model lacks context. Too many and signals get diluted — the model starts summarizing instead of detecting.

Suppression windows: if an anomaly type fires, suppress re-alerting for the same type for 15–30 minutes. Cascading failures produce repetitive log patterns; without suppression you will get dozens of identical alerts for a single root cause.

Severity-based routing: send critical anomalies to PagerDuty, medium/high to Slack, low to a file for weekly review. Not everything needs to wake someone up at 3 AM.

Service-specific context in the system prompt: add two or three sentences describing what normal operations look like for your service. "This is a payment processing service. Background jobs run every 5 minutes and produce INFO-level lines with 'batch_complete'. Connection pool exhaustion is always critical." This alone cuts false positives significantly.

For teams building security-focused log pipelines, the security hardening checklists at AYI NEDJIMI Consultants include baseline anomaly categories and log retention guidance useful for bootstrapping detection rules.

Data sensitivity: if your logs contain PII, credentials, or tokens, run inference entirely on-premises. Never send raw production logs to an external LLM API — pre-screen and redact before batching.

The Takeaway

LLM-based log analysis is not a replacement for metrics and structured alerting. It is a second layer that catches semantically unusual events, novel failure modes, and slow drifts that rules miss. The implementation is straightforward: parse logs into batched windows, send each window to a language model with a tight structured prompt, parse the JSON response, and route by severity.

The real work is in the system prompt. Spend time describing what normal looks like for your specific service — that context is what separates useful anomaly reports from an expensive noise machine. Start small: run the pipeline against a week of historical logs to calibrate before enabling live alerting. False positive rates drop significantly after one round of prompt iteration.


I run AYI NEDJIMI Consultants, a cybersecurity consulting firm. We publish free security hardening checklists — PDF and Excel.

Top comments (0)