DEV Community

ZNY
ZNY

Posted on

DEV.TO ARTICLE 47: Monitoring and Observability for AI Applications in Production

Target Keyword: "ai application monitoring observability"
Tags: monitoring,devops,ai,programming,developer
Type: Guide


Content

Monitoring and Observability for AI Applications in Production

AI applications behave differently from traditional software. Model outputs vary, latency is higher, and failures are less predictable. Here's how to build a complete observability stack for AI applications.

Why AI Monitoring Is Different

Traditional monitoring: "Did the function return correctly?"
AI monitoring: "Did the output meet quality standards? Was the response appropriate?"

Key differences:

  1. Non-deterministic outputs — Same input may produce different outputs
  2. Higher latency — AI calls take seconds, not milliseconds
  3. Cost per request — Every token costs money
  4. Quality drift — Model behavior may change over time

Core Metrics to Track

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import time

@dataclass
class AIMetrics:
    # Request metrics
    request_id: str
    timestamp: datetime
    latency_seconds: float
    tokens_used: int
    model: str

    # Cost metrics
    cost_usd: float

    # Quality metrics (can be computed post-hoc)
    user_rating: Optional[int] = None
    was_flagged: bool = False

    # Context
    user_id: Optional[str] = None
    prompt_length: int = 0
    response_length: int = 0

class MetricsCollector:
    def __init__(self):
        self.metrics: list[AIMetrics] = []
        self.prometheus_client = None  # Initialize if available

    def record(self, metric: AIMetrics):
        self.metrics.append(metric)

        # Also push to Prometheus
        if self.prometheus_client:
            self.prometheus_client.gauge(
                'ai_request_latency_seconds',
                metric.latency_seconds,
                labels={'model': metric.model}
            )

    def get_daily_cost(self) -> float:
        today = datetime.now().date()
        return sum(
            m.cost_usd for m in self.metrics
            if m.timestamp.date() == today
        )

    def get_p95_latency(self) -> float:
        sorted_latencies = sorted(m.latency_seconds for m in self.metrics)
        idx = int(len(sorted_latencies) * 0.95)
        return sorted_latencies[idx]
Enter fullscreen mode Exit fullscreen mode

Request Tracing

import uuid
from contextvars import ContextVar
from typing import Optional

request_id_ctx: ContextVar[Optional[str]] = ContextVar('request_id', default=None)

class AIRequestTracer:
    def __init__(self):
        self.traces: dict[str, dict] = {}

    def start_trace(self, user_id: Optional[str] = None) -> str:
        trace_id = str(uuid.uuid4())
        self.traces[trace_id] = {
            'id': trace_id,
            'user_id': user_id,
            'start_time': time.time(),
            'spans': [],
            'metadata': {}
        }
        request_id_ctx.set(trace_id)
        return trace_id

    def add_span(self, name: str, duration_ms: float, metadata: dict = None):
        trace_id = request_id_ctx.get()
        if not trace_id or trace_id not in self.traces:
            return

        self.traces[trace_id]['spans'].append({
            'name': name,
            'duration_ms': duration_ms,
            'timestamp': time.time(),
            'metadata': metadata or {}
        })

    def end_trace(self, success: bool = True):
        trace_id = request_id_ctx.get()
        if not trace_id or trace_id not in self.traces:
            return

        trace = self.traces[trace_id]
        trace['end_time'] = time.time()
        trace['total_duration_ms'] = (trace['end_time'] - trace['start_time']) * 1000
        trace['success'] = success

    def get_trace(self, trace_id: str) -> Optional[dict]:
        return self.traces.get(trace_id)
Enter fullscreen mode Exit fullscreen mode

Prompt Performance Analysis

class PromptAnalyzer:
    def __init__(self, metrics_collector: MetricsCollector):
        self.collector = metrics_collector

    def analyze_prompt_effectiveness(self, prompt_template: str) -> dict:
        """Analyze how well a prompt template performs."""
        matching_metrics = [
            m for m in self.collector.metrics
            if prompt_template in str(m)  # Simplified matching
        ]

        if not matching_metrics:
            return {"error": "No data for this prompt"}

        return {
            "total_requests": len(matching_metrics),
            "avg_latency_ms": sum(m.latency_seconds for m in matching_metrics) / len(matching_metrics) * 1000,
            "avg_cost_usd": sum(m.cost_usd for m in matching_metrics) / len(matching_metrics),
            "avg_tokens": sum(m.tokens_used for m in matching_metrics) / len(matching_metrics),
            "flagged_rate": sum(1 for m in matching_metrics if m.was_flagged) / len(matching_metrics),
            "avg_user_rating": sum(
                r for r in (m.user_rating for m in matching_metrics if m.user_rating)
            ) / len([m for m in matching_metrics if m.user_rating]) if any(m.user_rating for m in matching_metrics) else None
        }
Enter fullscreen mode Exit fullscreen mode

Alerting on Anomalies

class AIAlerting:
    def __init__(self, metrics_collector: MetricsCollector):
        self.collector = metrics_collector
        self.alerts: list[dict] = []

    def check_anomalies(self):
        """Run periodic checks for anomalies."""
        self._check_high_latency()
        self._check_cost_spike()
        self._check_quality_drift()
        self._check_error_rate()

    def _check_high_latency(self):
        p95 = self.collector.get_p95_latency()
        if p95 > 10.0:  # 10 seconds
            self._send_alert(
                severity="warning",
                title="High AI latency detected",
                description=f"P95 latency is {p95:.2f}s, exceeding 10s threshold"
            )

    def _check_cost_spike(self):
        daily_cost = self.collector.get_daily_cost()
        # Compare to rolling average
        recent_costs = [
            self.collector.get_daily_cost() 
            for _ in range(7)
        ]  # Would need historical tracking

        if daily_cost > 100:  # Configurable threshold
            self._send_alert(
                severity="critical",
                title="AI cost spike detected",
                description=f"Daily cost ${daily_cost:.2f} exceeds $100 threshold"
            )

    def _send_alert(self, severity: str, title: "str, description: "str):\""
        alert = {
            'timestamp': datetime.now().isoformat(),
            'severity': severity,
            'title': title,
            'description': description
        }
        self.alerts.append(alert)
        # Send to PagerDuty, Slack, etc.
        print(f"ALERT [{severity.upper()}] {title}: {description}")
Enter fullscreen mode Exit fullscreen mode

Dashboard Setup

# Grafana dashboard JSON (simplified)
{
  "dashboard": {
    "title": "AI Application Monitoring",
    "panels": [
      {
        "title": "Request Latency (P50/P95/P99)",
        "targets": [
          {"expr": "histogram_quantile(0.50, ai_request_latency)"},
          {"expr": "histogram_quantile(0.95, ai_request_latency)"},
          {"expr": "histogram_quantile(0.99, ai_request_latency)"}
        ]
      },
      {
        "title": "Daily AI Cost",
        "targets": [
          {"expr": "sum(ai_cost_total)"}
        ]
      },
      {
        "title": "Requests per Minute",
        "targets": [
          {"expr": "rate(ai_requests_total[1m])"}
        ]
      },
      {
        "title": "Error Rate",
        "targets": [
          {"expr": "rate(ai_errors_total[5m]) / rate(ai_requests_total[5m])"}
        ]
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Cost Optimization

class CostOptimizer:
    def __init__(self, metrics_collector: MetricsCollector):
        self.collector = metrics_collector

    def find_costly_patterns(self) -> list[dict]:
        """Find prompts that are expensive relative to value."""
        # Group by prompt similarity
        patterns = {}
        for m in self.collector.metrics:
            # Simple hash-based grouping
            key = hash(str(m.prompt_length))  # Would use actual prompt hashing
            if key not in patterns:
                patterns[key] = {'metrics': [], 'cost': 0}
            patterns[key]['metrics'].append(m)
            patterns[key]['cost'] += m.cost_usd

        # Find highest cost patterns
        sorted_patterns = sorted(
            patterns.items(),
            key=lambda x: x[1]['cost'],
            reverse=True
        )

        return [
            {
                'pattern_id': k,
                'total_cost': v['cost'],
                'request_count': len(v['metrics']),
                'avg_cost_per_request': v['cost'] / len(v['metrics'])
            }
            for k, v in sorted_patterns[:10]
        ]
Enter fullscreen mode Exit fullscreen mode

Getting Started

Build observable AI applications with ofox.ai — their API provides detailed usage metrics that integrate with your monitoring stack.

👉 Get started with ofox.ai


This article contains affiliate links.


Tags: monitoring,devops,ai,programming,developer
Canonical URL: https://dev.to/zny10289

Top comments (0)