DEV Community

ZNY
ZNY

Posted on

AI 2026AI

AI 应用可观测性完全指南:2026年生产环境AI监控实战

前言

2026 年,AI 应用已经广泛应用于生产环境。但 AI 应用有其独特性:模型输出不稳定、延迟高、成本难以预测。

传统的应用监控(APM)无法满足 AI 监控的需求。本文介绍 AI 应用可观测性的核心方法。

什么是 AI 可观测性

传统监控 vs AI 监控

| 维度 | 传统监控 | AI 监控 |

|------|---------|---------|

| 延迟 | HTTP 请求耗时 | API 调用 + 模型推理耗时 |

| 错误率 | 4xx/5xx 状态码 | 拒绝、幻觉、格式错误 |

| 成本 | 固定云资源 | Token 消耗波动 |

| 质量 | 可精确测量 | 需要额外评估 |

AI 可观测性四大支柱


├── Logging(AI 请求日志)

├── Metrics(Token 消耗、延迟、成本)

├── Tracing(AI 调用链路追踪)

└── Evaluation(输出质量评估)

Enter fullscreen mode Exit fullscreen mode

核心指标体系

1. 延迟指标


import time

from functools import wraps

class AILatencyTracker:

def __init__(self):

self.latencies = []

def track(self, func):

"""装饰器追踪延迟"""

@wraps(func)

async def async_wrapper(*args, **kwargs):

start = time.time()

result = await func(*args, **kwargs)

elapsed = time.time() - start

self.record("success", elapsed)

return result

except Exception as e:

elapsed = time.time() - start

self.record("error", elapsed)

@wraps(func)

def sync_wrapper(*args, **kwargs):

start = time.time()

result = func(*args, **kwargs)

elapsed = time.time() - start

self.record("success", elapsed)

return result

except Exception as e:

elapsed = time.time() - start

self.record("error", elapsed)

import asyncio

if asyncio.iscoroutinefunction(func):

return async_wrapper

return sync_wrapper

def record(self, status: str, latency: float):

self.latencies.append({

"timestamp": time.time(),

"status": status,

"latency_ms": latency * 1000

def get_stats(self) -> dict:

"""获取统计信息"""

if not self.latencies:

latencies = [l["latency_ms"] for l in self.latencies]

"count": len(latencies),

"avg_ms": sum(latencies) / len(latencies),

"p50_ms": sorted(latencies)[len(latencies) // 2],

"p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],

"p99_ms": sorted(latencies)[int(len(latencies) * 0.99)],

Enter fullscreen mode Exit fullscreen mode

2. Token 消耗指标


class TokenTracker:

def __init__(self):

self.records = []

self.total_input_tokens = 0

self.total_output_tokens = 0

def record(self, model: str, input_tokens: int, output_tokens: int, cost: float):

"""记录 Token 消耗"""

self.total_input_tokens += input_tokens

self.total_output_tokens += output_tokens

self.records.append({

"timestamp": time.time(),

"model": model,

"input_tokens": input_tokens,

"output_tokens": output_tokens,

"total_tokens": input_tokens + output_tokens,

"cost": cost

def get_daily_cost(self) -> dict:

"""获取每日成本"""

today = time.time() - 86400  # 24小时前

recent = [r for r in self.records if r["timestamp"] > today]

total_cost = sum(r["cost"] for r in recent)

total_tokens = sum(r["total_tokens"] for r in recent)

"cost_today": total_cost,

"tokens_today": total_tokens,

"avg_cost_per_request": total_cost / len(recent) if recent else 0

def get_model_breakdown(self) -> dict:

"""按模型分类统计"""

breakdown = {}

for r in self.records:

model = r["model"]

if model not in breakdown:

breakdown[model] = {"cost": 0, "tokens": 0, "count": 0}

breakdown[model]["cost"] += r["cost"]

breakdown[model]["tokens"] += r["total_tokens"]

breakdown[model]["count"] += 1

return breakdown

Enter fullscreen mode Exit fullscreen mode

3. 错误分类


class AIErrorClassifier:

ERROR_TYPES = {

"rate_limit": {"retry": True, "severity": "medium"},

"auth_error": {"retry": False, "severity": "high"},

"model_error": {"retry": True, "severity": "medium"},

"timeout": {"retry": True, "severity": "low"},

"invalid_request": {"retry": False, "severity": "high"},

"content_filtered": {"retry": False, "severity": "medium"},

@classmethod

def classify(cls, error: Exception) -> dict:

"""分类错误类型"""

error_str = str(error).lower()

if "429" in error_str or "rate_limit" in error_str:

return {"type": "rate_limit", **cls.ERROR_TYPES["rate_limit"]}

elif "401" in error_str or "auth" in error_str:

return {"type": "auth_error", **cls.ERROR_TYPES["auth_error"]}

elif "500" in error_str or "internal" in error_str:

return {"type": "model_error", **cls.ERROR_TYPES["model_error"]}

elif "timeout" in error_str:

return {"type": "timeout", **cls.ERROR_TYPES["timeout"]}

elif "400" in error_str or "invalid" in error_str:

return {"type": "invalid_request", **cls.ERROR_TYPES["invalid_request"]}

elif "filtered" in error_str or "content" in error_str:

return {"type": "content_filtered", **cls.ERROR_TYPES["content_filtered"]}

return {"type": "unknown", "retry": False, "severity": "high"}

@classmethod

def should_retry(cls, error: Exception) -> bool:

"""判断是否应该重试"""

classification = cls.classify(error)

return classification.get("retry", False)

Enter fullscreen mode Exit fullscreen mode

日志体系

结构化 AI 日志


import json

import logging

from datetime import datetime

class AILogger:

def __init__(self, log_file: str = "ai_logs.jsonl"):

self.log_file = log_file

self.logger = logging.getLogger("ai")

self.logger.setLevel(logging.INFO)

handler = logging.FileHandler(log_file)

handler.setFormatter(logging.Formatter('%(message)s'))

self.logger.addHandler(handler)

def log_request(self,

request_id: str,

model: str,

prompt: str,

response: str = None,

latency_ms: float = None,

tokens_used: int = None,

cost: float = None,

error: str = None):

"""记录 AI 请求"""

log_entry = {

"timestamp": datetime.utcnow().isoformat(),

"type": "ai_request",

"request_id": request_id,

"model": model,

"prompt_length": len(prompt),

"response_length": len(response) if response else None,

"latency_ms": latency_ms,

"tokens_used": tokens_used,

"cost": cost,

"error": error,

"success": error is None

self.logger.info(json.dumps(log_entry, ensure_ascii=False))

def log_evaluation(self, request_id: str, quality_score: float, categories: dict):

"""记录质量评估结果"""

log_entry = {

"timestamp": datetime.utcnow().isoformat(),

"type": "quality_evaluation",

"request_id": request_id,

"quality_score": quality_score,

"categories": categories

self.logger.info(json.dumps(log_entry, ensure_ascii=False))

ai_logger = AILogger("ai_production_logs.jsonl")

ai_logger.log_request(

request_id="req_001",

model="gpt-5.4",

prompt="解释什么是机器学习",

response="机器学习是...",

latency_ms=250,

tokens_used=1500,

Enter fullscreen mode Exit fullscreen mode

日志分析查询


import json

class LogAnalyzer:

def __init__(self, log_file: str):

self.log_file = log_file

def load_logs(self, limit: int = None):

with open(self.log_file, 'r') as f:

for i, line in enumerate(f):

if limit and i >= limit:

logs.append(json.loads(line))

return logs

def get_error_rate(self, hours: int = 24) -> float:

"""计算错误率"""

cutoff = datetime.utcnow().timestamp() - hours * 3600

logs = self.load_logs()

recent = [l for l in logs if datetime.fromisoformat(l["timestamp"]).timestamp() > cutoff]

if not recent:

errors = sum(1 for l in recent if not l.get("success", True))

return errors / len(recent)

def get_expensive_requests(self, top_n: int = 10) -> list:

"""获取最贵的请求"""

logs = self.load_logs()

sorted_logs = sorted(

[l for l in logs if l.get("cost")],

key=lambda x: x.get("cost", 0),

reverse=True

return sorted_logs[:top_n]

def get_slow_requests(self, threshold_ms: float = 5000) -> list:

"""获取慢请求"""

logs = self.load_logs()

return [l for l in logs if l.get("latency_ms", 0) > threshold_ms]

Enter fullscreen mode Exit fullscreen mode

追踪链路

LangChain + OpenTelemetry


from opentelemetry import trace

from opentelemetry.sdk.trace import TracerProvider

from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

provider = TracerProvider()

processor = BatchSpanProcessor(ConsoleSpanExporter())

provider.add_span_processor(processor)

trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

class AIServiceWithTracing:

def __init__(self):

self.llm = OpenAI()

self.vector_db = VectorDB()

@tracer.start_as_current_span("ai_request")

async def process_request(self, user_input: str, user_id: str):

span = trace.get_current_span()

span.set_attribute("user_id", user_id)

span.set_attribute("input_length", len(user_input))

# 1. 检索相关文档

with tracer.start_as_current_span("retrieve_context") as span:

docs = self.vector_db.search(user_input)

span.set_attribute("docs_retrieved", len(docs))

# 2. 调用 LLM

with tracer.start_as_current_span("llm_call") as span:

start = time.time()

response = self.llm.generate(user_input, docs)

span.set_attribute("model", "gpt-5.4")

span.set_attribute("latency_ms", (time.time() - start) * 1000)

span.set_attribute("response_length", len(response))

span.set_attribute("success", True)

return response

except Exception as e:

span.set_attribute("success", False)

span.set_attribute("error", str(e))

Enter fullscreen mode Exit fullscreen mode

输出质量评估

自动质量评估


class AIOutputEvaluator:

def __init__(self):

self.llm = OpenAI()

def evaluate(self, prompt: str, response: str) -> dict:

"""评估输出质量"""

evaluation_prompt = f"""

评估以下 AI 输出的质量:

用户输入:{prompt}

AI 输出:{response}

评估维度(每项 1-5 分):

1. 相关性:输出是否与问题相关

2. 准确性:信息是否正确

3. 完整性:是否完整回答了问题

4. 清晰度:表达是否清晰易读

5. 安全性:是否有不当内容

"relevance": 4,

"accuracy": 5,

"completeness": 4,

"clarity": 5,

"safety": 5,

"overall_score": 4.6,

"issues": ["问题1", "问题2"],

"suggestions": ["建议1", "建议2"]

result = self.llm.generate(evaluation_prompt)

return json.loads(result)

return {"error": "评估解析失败", "raw": result}

def batch_evaluate(self, requests: list) -> list:

results = []

for req in requests:

evaluation = self.evaluate(req["prompt"], req["response"])

results.append({

"request_id": req["id"],

**evaluation

return results

def detect_hallucination(self, response: str, context: str) -> dict:

detection_prompt = f"""

检测以下回答是否存在幻觉(编造不存在的信息):

上下文/背景:{context}

AI 回答{response}

1. 是否有具体事实人名日期数字需要验证

2. 这些事实是否在上下文中

3. 是否有明显编造的内容

"has_hallucination": true/false,

"confidence": 0.85,

"risky_content": ["具体可疑内容"],

"reason": "判断理由"

result = self.llm.generate(detection_prompt)

return json.loads(result)

return {"has_hallucination": False, "confidence": 0}

Enter fullscreen mode Exit fullscreen mode

Prometheus 监控面板

指标导出


from prometheus_client import Counter, Histogram, Gauge, generate_latest

REQUEST_COUNT = Counter(

'ai_requests_total',

'Total AI requests',

['model', 'status']

REQUEST_LATENCY = Histogram(

'ai_request_latency_seconds',

'AI request latency',

TOKEN_USAGE = Counter(

'ai_tokens_used_total',

'Total tokens used',

['model', 'type']  # type: input/output

COST_USAGE = Counter(

'ai_cost_total',

'Total API cost',

ACTIVE_REQUESTS = Gauge(

'ai_active_requests',

'Number of active requests',

@app.middleware("http")

async def track_requests(request: Request, call_next):

model = request.headers.get("X-Model", "unknown")

ACTIVE_REQUESTS.labels(model=model).inc()

start = time.time()

response = await call_next(request)

latency = time.time() - start

REQUEST_COUNT.labels(model=model, status=response.status_code).inc()

REQUEST_LATENCY.labels(model=model).observe(latency)

ACTIVE_REQUESTS.labels(model=model).dec()

return response

@app.get("/metrics")

def metrics():

return Response(content=generate_latest())

Enter fullscreen mode Exit fullscreen mode

告警配置

关键告警规则


# alertmanager.yml 或监控配置

- name: ai_application

- alert: HighAIErrorRate

sum(rate(ai_requests_total{status="error"}[5m]))

sum(rate(ai_requests_total[5m])) > 0.05

severity: critical

annotations:

summary: "AI 请求错误率超过 5%"

- alert: HighAILatency

histogram_quantile(0.95,

sum(rate(ai_request_latency_seconds_bucket[5m])) by (le)

severity: warning

annotations:

summary: "AI 请求 P95 延迟超过 10 秒"

- alert: HighAICost

increase(ai_cost_total[1h]) > 100

severity: warning

annotations:

summary: "AI 调用成本小时增长超过 $100"

- alert: AIRateLimit

increase(ai_requests_total{status="429"}[5m]) > 10

severity: warning

annotations:

summary: "AI API 限流频繁发生"

Enter fullscreen mode Exit fullscreen mode

Grafana 仪表板

关键面板


┌─────────────────────────────────────────────────────────────┐

│  AI Application Dashboard                                    │

├─────────────────────────────────────────────────────────────┤

│                                                             │

│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │

│  │ Requests    │  │ Error Rate  │  │ Avg Latency │         │

│  │ 12,345     │  │ 2.3%       │  │ 1.2s        │         │

│  └─────────────┘  └─────────────┘  └─────────────┘         │

│                                                             │

│  ┌─────────────────────────────────────────────────────┐   │

│  │ Token Usage Over Time                               │   │

│  │ ████████████████░░░░░░░░░░░░░░░░░░                  │   │

│  └─────────────────────────────────────────────────────┘   │

│                                                             │

│  ┌─────────────────────────────────────────────────────┐   │

│  │ Cost by Model                                       │   │

│  │ GPT-5.4: $45.2 (67%)                              │   │

│  │ Claude: $22.1 (33%)                                │   │

│  └─────────────────────────────────────────────────────┘   │

│                                                             │

│  ┌─────────────────────────────────────────────────────┐   │

│  │ Quality Score Distribution                           │   │

│  │ ██████████████████████████░░░░░░░░░░░░              │   │

│  └─────────────────────────────────────────────────────┘   │

└─────────────────────────────────────────────────────────────┘

Enter fullscreen mode Exit fullscreen mode

最佳实践

1. 数据采样


class SamplingLogger:

"""采样记录,避免存储成本过高"""

SAMPLE_RATE = 0.1  # 10% 采样

def __init__(self):

self.full_logger = AILogger()

self.sample_count = 0

def should_log(self) -> bool:

"""判断是否应该记录完整日志"""

self.sample_count += 1

if self.sample_count % int(1 / self.SAMPLE_RATE) == 0:

return True

return False

def log(self, entry: dict):

if self.should_log():

self.full_logger.log_request(**entry)

Enter fullscreen mode Exit fullscreen mode

2. 成本预警


class CostAlert:

def __init__(self, threshold_daily: float = 100):

self.threshold_daily = threshold_daily

self.token_tracker = TokenTracker()

def check_and_alert(self):

"""检查成本并告警"""

daily = self.token_tracker.get_daily_cost()

if daily["cost_today"] > self.threshold_daily:

"alert": True,

"message": f"今日 AI 成本 ${daily['cost_today']:.2f} 超过阈值 ${self.threshold_daily}",

"action": "review_recent_requests"

return {"alert": False}

Enter fullscreen mode Exit fullscreen mode

总结

AI 应用可观测性是生产环境的必备:

  1. 延迟追踪:P50/P95/P99 延迟指标

  2. Token 消耗:按模型、按时间的成本分析

  3. 错误分类:区分可重试和不可重试错误

  4. 质量评估:自动评估输出质量,检测幻觉

  5. 告警配置:错误率、延迟、成本告警

没有可观测性,就没有 AI 应用的生产治理。

本文是 AI 工程化系列之一。


This article contains affiliate links. If you sign up through the links above, I may earn a commission at no additional cost to you.

Ready to Build Your AI Business?

Get started with Systeme.io for free — All-in-one platform for building your online business with AI tools.

Top comments (0)