We had a monitoring problem that wasn't really a monitoring problem.
We had Datadog. We had alerts. We had dashboards. What we didn't have was signal. On any given morning, an engineer opening the console might see a large volume of errors aggregated across many customer environments — with no fast way to know if that was one cascading timeout firing repeatedly, or a dozen distinct failures quietly spreading across the fleet.
I built an internal production dashboard to surface that signal. Then I added AI-powered error analysis to it. The pipeline runs on a schedule throughout the day. Here's the architecture, the reasoning, and illustrative code for each layer — patterns you can adapt; they are not copy-pasted from a private repo — including the part many AI monitoring write-ups skip: who owns the problem once the AI summarizes it.
The Problem With Raw Error Counts
The product is SaaS, but it is not the classic “everyone on one shared multi-tenant stack” shape: customers run in separate environments, and observability still rolls up into one place. When something breaks, you want three answers quickly:
- Is this one error happening repeatedly, or many different errors?
- Which customers are affected, and how badly?
- Does this go to the product engineering team or the platform team?
Raw error counts answer none of those questions. A single database deadlock in one busy environment can generate many log lines. Without normalization, that looks like many separate incidents. With normalization, it's one pattern, one API call, one analysis.
The Architecture: Five Layers
Layer 1: Signature Extraction
Before any AI touches the data, errors get normalized. The goal is to strip everything variable — timestamps, customer or environment identifiers, GUIDs, session tokens — and reduce each error to its structural "shape." Many near-duplicate entries collapse to one signature.
Only send redacted, normalized text to a third-party model. Treat log lines like untrusted input: strip or hash anything that could be PII, secrets, or customer-identifying before it leaves your network.
import re
import hashlib
def extract_error_signature(message: str) -> tuple[str, str]:
"""
Normalize an error message to its structural shape,
then hash it for consistent grouping.
"""
normalized = message
# Strip customer / environment / user identifiers (extend for your log formats)
normalized = re.sub(
r'(customer|account|tenant)[_-]?id[:\s]+\S+',
'[CUSTOMER_SCOPE]',
normalized,
flags=re.IGNORECASE,
)
normalized = re.sub(r'user[_-]?id[:\s]+\d+', '[USER_ID]', normalized, flags=re.IGNORECASE)
# Strip timestamps
normalized = re.sub(
r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[\.\d]*Z?',
'[TIMESTAMP]',
normalized
)
# Strip GUIDs
normalized = re.sub(
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'[GUID]',
normalized,
flags=re.IGNORECASE
)
# Strip long numeric IDs
normalized = re.sub(r'\b\d{5,}\b', '[ID]', normalized)
# Normalize whitespace
normalized = re.sub(r'\s+', ' ', normalized).strip()
# Hash the normalized shape for use as a cache/grouping key
signature_hash = hashlib.md5(normalized.encode()).hexdigest()[:16]
return signature_hash, normalized
The deduplication ratio is what this buys you. If hundreds of raw lines normalize to a handful of unique signatures, you make a handful of API calls — not one per line. On a noisy day that is the difference between a cheap run and an expensive one.
Layer 2: Cache With a 6-Hour TTL
The cache is what makes this economical over time. Once a signature is analyzed, that result is reused until it expires. The pipeline runs often — on most runs, the API does not fire for recurring known patterns.
import json
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
class AnalysisCache:
def __init__(self, cache_dir: str = '.cache/error-analysis'):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _cache_path(self, signature: str, analysis_type: str) -> Path:
key = hashlib.md5(f"{signature}:{analysis_type}".encode()).hexdigest()
return self.cache_dir / f"{key}.json"
def get(self, signature: str, analysis_type: str = 'recent') -> dict | None:
path = self._cache_path(signature, analysis_type)
if not path.exists():
return None
cached = json.loads(path.read_text())
cached_at = datetime.fromisoformat(cached['cached_at'])
# Recent error analysis: 6-hour TTL
# Long-term pattern analysis: 7-day TTL
ttl = timedelta(hours=6) if analysis_type == 'recent' else timedelta(days=7)
if datetime.now() - cached_at > ttl:
return None # Expired
return cached['analysis']
def set(self, signature: str, analysis_type: str, result: dict) -> None:
path = self._cache_path(signature, analysis_type)
path.write_text(json.dumps({
'cached_at': datetime.now().isoformat(),
'analysis': result
}, indent=2))
The 6-hour TTL is a deliberate tradeoff. It is short enough that a genuinely new error variant surfaces within a typical business window. It is long enough that a stable recurring pattern does not burn tokens re-analyzing the same shape on every run.
Layer 3: LLM Analysis — Structured for Multiple Audiences
This is where the most important design decision lives. The prompt requests output in a specific JSON schema that serves several audiences simultaneously — support, operations, platform engineering, and leadership — without requiring separate reports.
The examples below use the Anthropic Python SDK; the same idea applies to any provider that accepts structured prompts and returns text you parse as JSON.
import anthropic
import json
import re
class AIErrorAnalyzer:
def __init__(self, api_key: str, model: str = 'claude-sonnet-latest'):
self.client = anthropic.Anthropic(api_key=api_key)
self.model = model
self.total_tokens = 0
self.total_cost = 0.0
def analyze(self, signature: str, error_type: str,
occurrences: int, customers_affected: int,
normalized_message: str) -> dict:
prompt = f"""Analyze this production error pattern and return JSON only.
Error type: {error_type}
Occurrences: {occurrences}
Customers affected: {customers_affected}
Normalized message: {normalized_message[:400]}
Return this exact structure:
{{
"summary": "One sentence for the dashboard",
"explanation": "Plain English for non-technical staff",
"severity": "Critical|High|Medium|Low",
"user_impact": "What the end user experiences",
"root_cause": {{
"likely_cause": "Most probable cause",
"confidence": 0.0
}},
"recommendations": {{
"immediate_actions": [],
"resolution_priority": "Urgent|High|Medium|Low"
}},
"customer_communication": "Suggested response if customer asks",
"technical_details": {{
"error_category": "Application|Infrastructure|Database|Network|Configuration",
"real_application_bug": false,
"affects_critical_operation": false
}}
}}"""
response = self.client.messages.create(
model=self.model,
max_tokens=1500,
system="You are a production error analyst. Return only valid JSON.",
messages=[{"role": "user", "content": prompt}]
)
# Replace rates with your provider's current list price (they change).
usage = response.usage
input_rate_per_mtok = 3.0 # example: USD per 1M input tokens
output_rate_per_mtok = 15.0 # example: USD per 1M output tokens
cost = (usage.input_tokens / 1_000_000 * input_rate_per_mtok) + \
(usage.output_tokens / 1_000_000 * output_rate_per_mtok)
self.total_tokens += usage.input_tokens + usage.output_tokens
self.total_cost += cost
return self._parse(response.content[0].text)
def _parse(self, text: str) -> dict:
# Try markdown code block first
match = re.search(r'```
(?:json)?\s*(\{.*?\})\s*
```', text, re.DOTALL)
if match:
return json.loads(match.group(1))
# Fall back to raw JSON extraction
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1:
return json.loads(text[start:end+1])
return {"summary": text[:200], "fallback": True}
The key fields are summary (dashboard card), explanation (support guidance), error_category and real_application_bug (routing signals). Getting those right means one analysis object can serve both someone answering a ticket and someone triaging an alert.
Ballpark cost (illustrative): Per-call totals depend on model, prompt size, and output length. With aggressive caching, many teams land in the rough range of a few dollars per month for periodic batch triage at moderate error volume — always recompute from your own token meters and current provider pricing.
Layer 4: Anomaly Detection Against a Rolling Baseline
A fresh error and a known recurring error need different responses. The anomaly detector compares each signature against N days of stored history, flagging three conditions: NEW (never seen before), SPIKE (volume far above baseline), and SPREAD (appearing for customers who have not seen it in the baseline window).
from dataclasses import dataclass
from typing import Any
@dataclass
class BaselineStats:
days_present: int
mean_occurrences: float
max_occurrences: int
max_customers: int # peak distinct customers in baseline window
customers_seen: set[str]
def classify_anomaly(
signature: str,
current: dict[str, Any],
baseline: dict[str, BaselineStats]
) -> dict[str, Any]:
occurrences = current.get('occurrence_count', 0)
current_customers = set(current.get('customers', []))
b = baseline.get(signature)
# Never seen before
if not b:
return {
'new_signature': True,
'spike': occurrences >= 10,
'spread': len(current_customers) >= 3,
'new_customers': sorted(current_customers),
}
# Spike: meaningfully above both max and mean from baseline
spike = (
occurrences >= 10 and
occurrences > max(2 * b.max_occurrences,
3 * max(1.0, b.mean_occurrences))
) or (
occurrences >= 25 and occurrences > b.max_occurrences
)
# Spread: affecting customers who haven't seen this before,
# or many more distinct customers than the baseline peak
new_customers = sorted(c for c in current_customers
if c not in b.customers_seen)
spread = len(new_customers) >= 2 or (
len(current_customers) >= 3 and
len(current_customers) > max(1, 2 * b.max_customers)
)
return {
'new_signature': False,
'spike': spike,
'spread': spread,
'new_customers': new_customers,
'baseline_days_present': b.days_present,
'baseline_mean': round(b.mean_occurrences, 2),
'baseline_max': b.max_occurrences,
}
The heuristics are deliberately simple: an explainable approach beats heavy statistics when the goal is action, not false precision. An anomaly flag you cannot explain to a stakeholder in half a minute is not operationally useful.
Layer 5: Triage Routing — Ownership, Not Just Summaries
This is what many AI monitoring articles leave out. Finding the error is half the job. Knowing who owns it is the other half — and getting that wrong is expensive. A platform issue routed to application engineering wastes time. An application bug routed to platform may never get the right fix.
The triage layer maps the model's error_category and real_application_bug fields into a stable owner bucket. When error_category is one of the known labels, it wins — even if real_application_bug is also set — so category is the primary routing signal; the bug flag mainly breaks ties when category is ambiguous.
def triage(analysis: dict) -> dict:
"""
Route an analyzed error to the correct owner bucket.
Returns: bucket, owner, category, reason.
"""
technical = analysis.get('technical_details', {})
error_category = (technical.get('error_category') or '').lower()
real_bug = technical.get('real_application_bug', False)
error_type = analysis.get('error_type', '').lower()
# Explicit model-supplied category takes priority
routing = {
'application': ('application', 'dev'),
'infrastructure': ('platform', 'platform'),
'network': ('platform', 'platform'),
'database': ('platform', 'platform'),
'configuration': ('platform', 'platform'),
}
if error_category in routing:
bucket, owner = routing[error_category]
return {'bucket': bucket, 'owner': owner,
'reason': f'Categorized as {error_category}'}
# Heuristic fallback on error type
if error_type in ('timeout', 'connection'):
return {'bucket': 'platform', 'owner': 'platform',
'reason': 'Connectivity errors route to platform first'}
if error_type == 'sql':
return {'bucket': 'platform', 'owner': 'platform',
'reason': 'Database errors route to platform first'}
if real_bug is True:
return {'bucket': 'application', 'owner': 'dev',
'reason': 'Flagged as application bug'}
return {'bucket': 'needs_review', 'owner': 'review',
'reason': 'Insufficient signal to auto-route'}
Below this, a known-noise list helps: signatures you have classified as benign (for example, expected churn during deploys or maintenance) can be suppressed or down-ranked. A novel signature that SPREADs to new customer environments still escalates. That distinction is what turns a monitoring view into a triage workflow: not just something is wrong, but this is new, this team owns it, and here is suggested wording for support.
What the Pipeline Actually Looks Like
Each scheduled run is roughly:
[06:15 UTC] Starting error analysis pipeline...
Step 1: Pull errors from monitoring API
Step 2: Extract signatures — many raw lines → few unique patterns
Step 3: Cache check — most patterns hits, one miss
Step 4: LLM API call for the new signature
(token count and cost from your meter)
Step 5: Anomaly detection against rolling baseline
Pattern A: KNOWN (stable)
Pattern B: KNOWN (stable)
Pattern C: NEW SIGNATURE — flagged for review
Step 6: Triage routing
Pattern A: platform / database
Pattern B: non_issue (expected noise, suppressed)
Pattern C: needs_review (new, insufficient signal)
Step 7: Write results to storage
[06:15 UTC] Pipeline complete in tens of seconds
Few patterns, one fresh analysis call, short wall time. The dashboard shows the cards that matter; expected noise stays out of the way.
The Actual Value
Spend is usually modest next to overall infra budget. The larger win is the morning triage ritual.
Before: pull errors, group manually, read stack traces, decide who to wake up — a long block if you are thorough.
After: open the dashboard, scan a short list of cards. The model did the grouping, drafted support-facing language, and highlighted what needs a human decision.
That time compounds across a team and across a year. That is the leverage case — not the per-token line item.
If this was useful, leave a comment below — I like comparing notes with people building similar systems.
Mike Falkenberg is a technologist with 20+ years leading development, operations, and security teams. He shares practical insights from building technology organizations. Connect on LinkedIn and follow GitLab for code.
Top comments (0)