DEV Community

Mike Falkenberg
Mike Falkenberg

Posted on

How I Built an AI-Powered Error Triage System for SaaS at Scale — And What It Actually Costs

We had a monitoring problem that wasn't really a monitoring problem.

We had Datadog. We had alerts. We had dashboards. What we didn't have was signal. On any given morning, an engineer opening the console might see a large volume of errors aggregated across many customer environments — with no fast way to know if that was one cascading timeout firing repeatedly, or a dozen distinct failures quietly spreading across the fleet.

I built an internal production dashboard to surface that signal. Then I added AI-powered error analysis to it. The pipeline runs on a schedule throughout the day. Here's the architecture, the reasoning, and illustrative code for each layer — patterns you can adapt; they are not copy-pasted from a private repo — including the part many AI monitoring write-ups skip: who owns the problem once the AI summarizes it.


The Problem With Raw Error Counts

The product is SaaS, but it is not the classic “everyone on one shared multi-tenant stack” shape: customers run in separate environments, and observability still rolls up into one place. When something breaks, you want three answers quickly:

  1. Is this one error happening repeatedly, or many different errors?
  2. Which customers are affected, and how badly?
  3. Does this go to the product engineering team or the platform team?

Raw error counts answer none of those questions. A single database deadlock in one busy environment can generate many log lines. Without normalization, that looks like many separate incidents. With normalization, it's one pattern, one API call, one analysis.


The Architecture: Five Layers

Layer 1: Signature Extraction

Before any AI touches the data, errors get normalized. The goal is to strip everything variable — timestamps, customer or environment identifiers, GUIDs, session tokens — and reduce each error to its structural "shape." Many near-duplicate entries collapse to one signature.

Only send redacted, normalized text to a third-party model. Treat log lines like untrusted input: strip or hash anything that could be PII, secrets, or customer-identifying before it leaves your network.

import re
import hashlib

def extract_error_signature(message: str) -> tuple[str, str]:
    """
    Normalize an error message to its structural shape,
    then hash it for consistent grouping.
    """
    normalized = message

    # Strip customer / environment / user identifiers (extend for your log formats)
    normalized = re.sub(
        r'(customer|account|tenant)[_-]?id[:\s]+\S+',
        '[CUSTOMER_SCOPE]',
        normalized,
        flags=re.IGNORECASE,
    )
    normalized = re.sub(r'user[_-]?id[:\s]+\d+', '[USER_ID]', normalized, flags=re.IGNORECASE)

    # Strip timestamps
    normalized = re.sub(
        r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[\.\d]*Z?',
        '[TIMESTAMP]',
        normalized
    )

    # Strip GUIDs
    normalized = re.sub(
        r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
        '[GUID]',
        normalized,
        flags=re.IGNORECASE
    )

    # Strip long numeric IDs
    normalized = re.sub(r'\b\d{5,}\b', '[ID]', normalized)

    # Normalize whitespace
    normalized = re.sub(r'\s+', ' ', normalized).strip()

    # Hash the normalized shape for use as a cache/grouping key
    signature_hash = hashlib.md5(normalized.encode()).hexdigest()[:16]

    return signature_hash, normalized
Enter fullscreen mode Exit fullscreen mode

The deduplication ratio is what this buys you. If hundreds of raw lines normalize to a handful of unique signatures, you make a handful of API calls — not one per line. On a noisy day that is the difference between a cheap run and an expensive one.


Layer 2: Cache With a 6-Hour TTL

The cache is what makes this economical over time. Once a signature is analyzed, that result is reused until it expires. The pipeline runs often — on most runs, the API does not fire for recurring known patterns.

import json
import hashlib
from datetime import datetime, timedelta
from pathlib import Path

class AnalysisCache:

    def __init__(self, cache_dir: str = '.cache/error-analysis'):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def _cache_path(self, signature: str, analysis_type: str) -> Path:
        key = hashlib.md5(f"{signature}:{analysis_type}".encode()).hexdigest()
        return self.cache_dir / f"{key}.json"

    def get(self, signature: str, analysis_type: str = 'recent') -> dict | None:
        path = self._cache_path(signature, analysis_type)
        if not path.exists():
            return None

        cached = json.loads(path.read_text())
        cached_at = datetime.fromisoformat(cached['cached_at'])

        # Recent error analysis: 6-hour TTL
        # Long-term pattern analysis: 7-day TTL
        ttl = timedelta(hours=6) if analysis_type == 'recent' else timedelta(days=7)

        if datetime.now() - cached_at > ttl:
            return None  # Expired

        return cached['analysis']

    def set(self, signature: str, analysis_type: str, result: dict) -> None:
        path = self._cache_path(signature, analysis_type)
        path.write_text(json.dumps({
            'cached_at': datetime.now().isoformat(),
            'analysis': result
        }, indent=2))
Enter fullscreen mode Exit fullscreen mode

The 6-hour TTL is a deliberate tradeoff. It is short enough that a genuinely new error variant surfaces within a typical business window. It is long enough that a stable recurring pattern does not burn tokens re-analyzing the same shape on every run.


Layer 3: LLM Analysis — Structured for Multiple Audiences

This is where the most important design decision lives. The prompt requests output in a specific JSON schema that serves several audiences simultaneously — support, operations, platform engineering, and leadership — without requiring separate reports.

The examples below use the Anthropic Python SDK; the same idea applies to any provider that accepts structured prompts and returns text you parse as JSON.

import anthropic
import json
import re

class AIErrorAnalyzer:

    def __init__(self, api_key: str, model: str = 'claude-sonnet-latest'):
        self.client = anthropic.Anthropic(api_key=api_key)
        self.model = model
        self.total_tokens = 0
        self.total_cost = 0.0

    def analyze(self, signature: str, error_type: str,
                occurrences: int, customers_affected: int,
                normalized_message: str) -> dict:

        prompt = f"""Analyze this production error pattern and return JSON only.

Error type: {error_type}
Occurrences: {occurrences}
Customers affected: {customers_affected}
Normalized message: {normalized_message[:400]}

Return this exact structure:
{{
  "summary": "One sentence for the dashboard",
  "explanation": "Plain English for non-technical staff",
  "severity": "Critical|High|Medium|Low",
  "user_impact": "What the end user experiences",
  "root_cause": {{
    "likely_cause": "Most probable cause",
    "confidence": 0.0
  }},
  "recommendations": {{
    "immediate_actions": [],
    "resolution_priority": "Urgent|High|Medium|Low"
  }},
  "customer_communication": "Suggested response if customer asks",
  "technical_details": {{
    "error_category": "Application|Infrastructure|Database|Network|Configuration",
    "real_application_bug": false,
    "affects_critical_operation": false
  }}
}}"""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=1500,
            system="You are a production error analyst. Return only valid JSON.",
            messages=[{"role": "user", "content": prompt}]
        )

        # Replace rates with your provider's current list price (they change).
        usage = response.usage
        input_rate_per_mtok = 3.0   # example: USD per 1M input tokens
        output_rate_per_mtok = 15.0  # example: USD per 1M output tokens
        cost = (usage.input_tokens / 1_000_000 * input_rate_per_mtok) + \
               (usage.output_tokens / 1_000_000 * output_rate_per_mtok)
        self.total_tokens += usage.input_tokens + usage.output_tokens
        self.total_cost += cost

        return self._parse(response.content[0].text)

    def _parse(self, text: str) -> dict:
        # Try markdown code block first
        match = re.search(r'```

(?:json)?\s*(\{.*?\})\s*

```', text, re.DOTALL)
        if match:
            return json.loads(match.group(1))
        # Fall back to raw JSON extraction
        start = text.find('{')
        end = text.rfind('}')
        if start != -1 and end != -1:
            return json.loads(text[start:end+1])
        return {"summary": text[:200], "fallback": True}
Enter fullscreen mode Exit fullscreen mode

The key fields are summary (dashboard card), explanation (support guidance), error_category and real_application_bug (routing signals). Getting those right means one analysis object can serve both someone answering a ticket and someone triaging an alert.

Ballpark cost (illustrative): Per-call totals depend on model, prompt size, and output length. With aggressive caching, many teams land in the rough range of a few dollars per month for periodic batch triage at moderate error volume — always recompute from your own token meters and current provider pricing.


Layer 4: Anomaly Detection Against a Rolling Baseline

A fresh error and a known recurring error need different responses. The anomaly detector compares each signature against N days of stored history, flagging three conditions: NEW (never seen before), SPIKE (volume far above baseline), and SPREAD (appearing for customers who have not seen it in the baseline window).

from dataclasses import dataclass
from typing import Any

@dataclass
class BaselineStats:
    days_present: int
    mean_occurrences: float
    max_occurrences: int
    max_customers: int  # peak distinct customers in baseline window
    customers_seen: set[str]

def classify_anomaly(
    signature: str,
    current: dict[str, Any],
    baseline: dict[str, BaselineStats]
) -> dict[str, Any]:

    occurrences = current.get('occurrence_count', 0)
    current_customers = set(current.get('customers', []))
    b = baseline.get(signature)

    # Never seen before
    if not b:
        return {
            'new_signature': True,
            'spike': occurrences >= 10,
            'spread': len(current_customers) >= 3,
            'new_customers': sorted(current_customers),
        }

    # Spike: meaningfully above both max and mean from baseline
    spike = (
        occurrences >= 10 and
        occurrences > max(2 * b.max_occurrences,
                          3 * max(1.0, b.mean_occurrences))
    ) or (
        occurrences >= 25 and occurrences > b.max_occurrences
    )

    # Spread: affecting customers who haven't seen this before,
    # or many more distinct customers than the baseline peak
    new_customers = sorted(c for c in current_customers
                           if c not in b.customers_seen)
    spread = len(new_customers) >= 2 or (
        len(current_customers) >= 3 and
        len(current_customers) > max(1, 2 * b.max_customers)
    )

    return {
        'new_signature': False,
        'spike': spike,
        'spread': spread,
        'new_customers': new_customers,
        'baseline_days_present': b.days_present,
        'baseline_mean': round(b.mean_occurrences, 2),
        'baseline_max': b.max_occurrences,
    }
Enter fullscreen mode Exit fullscreen mode

The heuristics are deliberately simple: an explainable approach beats heavy statistics when the goal is action, not false precision. An anomaly flag you cannot explain to a stakeholder in half a minute is not operationally useful.


Layer 5: Triage Routing — Ownership, Not Just Summaries

This is what many AI monitoring articles leave out. Finding the error is half the job. Knowing who owns it is the other half — and getting that wrong is expensive. A platform issue routed to application engineering wastes time. An application bug routed to platform may never get the right fix.

The triage layer maps the model's error_category and real_application_bug fields into a stable owner bucket. When error_category is one of the known labels, it wins — even if real_application_bug is also set — so category is the primary routing signal; the bug flag mainly breaks ties when category is ambiguous.

def triage(analysis: dict) -> dict:
    """
    Route an analyzed error to the correct owner bucket.
    Returns: bucket, owner, category, reason.
    """
    technical = analysis.get('technical_details', {})
    error_category = (technical.get('error_category') or '').lower()
    real_bug = technical.get('real_application_bug', False)
    error_type = analysis.get('error_type', '').lower()

    # Explicit model-supplied category takes priority
    routing = {
        'application':    ('application', 'dev'),
        'infrastructure': ('platform',    'platform'),
        'network':        ('platform',    'platform'),
        'database':       ('platform',    'platform'),
        'configuration':  ('platform',    'platform'),
    }
    if error_category in routing:
        bucket, owner = routing[error_category]
        return {'bucket': bucket, 'owner': owner,
                'reason': f'Categorized as {error_category}'}

    # Heuristic fallback on error type
    if error_type in ('timeout', 'connection'):
        return {'bucket': 'platform', 'owner': 'platform',
                'reason': 'Connectivity errors route to platform first'}

    if error_type == 'sql':
        return {'bucket': 'platform', 'owner': 'platform',
                'reason': 'Database errors route to platform first'}

    if real_bug is True:
        return {'bucket': 'application', 'owner': 'dev',
                'reason': 'Flagged as application bug'}

    return {'bucket': 'needs_review', 'owner': 'review',
            'reason': 'Insufficient signal to auto-route'}
Enter fullscreen mode Exit fullscreen mode

Below this, a known-noise list helps: signatures you have classified as benign (for example, expected churn during deploys or maintenance) can be suppressed or down-ranked. A novel signature that SPREADs to new customer environments still escalates. That distinction is what turns a monitoring view into a triage workflow: not just something is wrong, but this is new, this team owns it, and here is suggested wording for support.


What the Pipeline Actually Looks Like

Each scheduled run is roughly:

[06:15 UTC] Starting error analysis pipeline...
  Step 1: Pull errors from monitoring API
  Step 2: Extract signatures — many raw lines → few unique patterns
  Step 3: Cache check — most patterns hits, one miss
  Step 4: LLM API call for the new signature
          (token count and cost from your meter)
  Step 5: Anomaly detection against rolling baseline
          Pattern A: KNOWN (stable)
          Pattern B: KNOWN (stable)
          Pattern C: NEW SIGNATURE — flagged for review
  Step 6: Triage routing
          Pattern A: platform / database
          Pattern B: non_issue (expected noise, suppressed)
          Pattern C: needs_review (new, insufficient signal)
  Step 7: Write results to storage

[06:15 UTC] Pipeline complete in tens of seconds
Enter fullscreen mode Exit fullscreen mode

Few patterns, one fresh analysis call, short wall time. The dashboard shows the cards that matter; expected noise stays out of the way.


The Actual Value

Spend is usually modest next to overall infra budget. The larger win is the morning triage ritual.

Before: pull errors, group manually, read stack traces, decide who to wake up — a long block if you are thorough.

After: open the dashboard, scan a short list of cards. The model did the grouping, drafted support-facing language, and highlighted what needs a human decision.

That time compounds across a team and across a year. That is the leverage case — not the per-token line item.

If this was useful, leave a comment below — I like comparing notes with people building similar systems.

Find me: LinkedIn | GitLab


Mike Falkenberg is a technologist with 20+ years leading development, operations, and security teams. He shares practical insights from building technology organizations. Connect on LinkedIn and follow GitLab for code.

Top comments (0)