AI 应用可观测性完全指南:2026年生产环境AI监控实战
前言
2026 年,AI 应用已经广泛应用于生产环境。但 AI 应用有其独特性:模型输出不稳定、延迟高、成本难以预测。
传统的应用监控(APM)无法满足 AI 监控的需求。本文介绍 AI 应用可观测性的核心方法。
什么是 AI 可观测性
传统监控 vs AI 监控
| 维度 | 传统监控 | AI 监控 |
|------|---------|---------|
| 延迟 | HTTP 请求耗时 | API 调用 + 模型推理耗时 |
| 错误率 | 4xx/5xx 状态码 | 拒绝、幻觉、格式错误 |
| 成本 | 固定云资源 | Token 消耗波动 |
| 质量 | 可精确测量 | 需要额外评估 |
AI 可观测性四大支柱
├── Logging(AI 请求日志)
├── Metrics(Token 消耗、延迟、成本)
├── Tracing(AI 调用链路追踪)
└── Evaluation(输出质量评估)
核心指标体系
1. 延迟指标
import time
from functools import wraps
class AILatencyTracker:
def __init__(self):
self.latencies = []
def track(self, func):
"""装饰器追踪延迟"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
elapsed = time.time() - start
self.record("success", elapsed)
return result
except Exception as e:
elapsed = time.time() - start
self.record("error", elapsed)
@wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
self.record("success", elapsed)
return result
except Exception as e:
elapsed = time.time() - start
self.record("error", elapsed)
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
def record(self, status: str, latency: float):
self.latencies.append({
"timestamp": time.time(),
"status": status,
"latency_ms": latency * 1000
def get_stats(self) -> dict:
"""获取统计信息"""
if not self.latencies:
latencies = [l["latency_ms"] for l in self.latencies]
"count": len(latencies),
"avg_ms": sum(latencies) / len(latencies),
"p50_ms": sorted(latencies)[len(latencies) // 2],
"p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"p99_ms": sorted(latencies)[int(len(latencies) * 0.99)],
2. Token 消耗指标
class TokenTracker:
def __init__(self):
self.records = []
self.total_input_tokens = 0
self.total_output_tokens = 0
def record(self, model: str, input_tokens: int, output_tokens: int, cost: float):
"""记录 Token 消耗"""
self.total_input_tokens += input_tokens
self.total_output_tokens += output_tokens
self.records.append({
"timestamp": time.time(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"cost": cost
def get_daily_cost(self) -> dict:
"""获取每日成本"""
today = time.time() - 86400 # 24小时前
recent = [r for r in self.records if r["timestamp"] > today]
total_cost = sum(r["cost"] for r in recent)
total_tokens = sum(r["total_tokens"] for r in recent)
"cost_today": total_cost,
"tokens_today": total_tokens,
"avg_cost_per_request": total_cost / len(recent) if recent else 0
def get_model_breakdown(self) -> dict:
"""按模型分类统计"""
breakdown = {}
for r in self.records:
model = r["model"]
if model not in breakdown:
breakdown[model] = {"cost": 0, "tokens": 0, "count": 0}
breakdown[model]["cost"] += r["cost"]
breakdown[model]["tokens"] += r["total_tokens"]
breakdown[model]["count"] += 1
return breakdown
3. 错误分类
class AIErrorClassifier:
ERROR_TYPES = {
"rate_limit": {"retry": True, "severity": "medium"},
"auth_error": {"retry": False, "severity": "high"},
"model_error": {"retry": True, "severity": "medium"},
"timeout": {"retry": True, "severity": "low"},
"invalid_request": {"retry": False, "severity": "high"},
"content_filtered": {"retry": False, "severity": "medium"},
@classmethod
def classify(cls, error: Exception) -> dict:
"""分类错误类型"""
error_str = str(error).lower()
if "429" in error_str or "rate_limit" in error_str:
return {"type": "rate_limit", **cls.ERROR_TYPES["rate_limit"]}
elif "401" in error_str or "auth" in error_str:
return {"type": "auth_error", **cls.ERROR_TYPES["auth_error"]}
elif "500" in error_str or "internal" in error_str:
return {"type": "model_error", **cls.ERROR_TYPES["model_error"]}
elif "timeout" in error_str:
return {"type": "timeout", **cls.ERROR_TYPES["timeout"]}
elif "400" in error_str or "invalid" in error_str:
return {"type": "invalid_request", **cls.ERROR_TYPES["invalid_request"]}
elif "filtered" in error_str or "content" in error_str:
return {"type": "content_filtered", **cls.ERROR_TYPES["content_filtered"]}
return {"type": "unknown", "retry": False, "severity": "high"}
@classmethod
def should_retry(cls, error: Exception) -> bool:
"""判断是否应该重试"""
classification = cls.classify(error)
return classification.get("retry", False)
日志体系
结构化 AI 日志
import json
import logging
from datetime import datetime
class AILogger:
def __init__(self, log_file: str = "ai_logs.jsonl"):
self.log_file = log_file
self.logger = logging.getLogger("ai")
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_request(self,
request_id: str,
model: str,
prompt: str,
response: str = None,
latency_ms: float = None,
tokens_used: int = None,
cost: float = None,
error: str = None):
"""记录 AI 请求"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"type": "ai_request",
"request_id": request_id,
"model": model,
"prompt_length": len(prompt),
"response_length": len(response) if response else None,
"latency_ms": latency_ms,
"tokens_used": tokens_used,
"cost": cost,
"error": error,
"success": error is None
self.logger.info(json.dumps(log_entry, ensure_ascii=False))
def log_evaluation(self, request_id: str, quality_score: float, categories: dict):
"""记录质量评估结果"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"type": "quality_evaluation",
"request_id": request_id,
"quality_score": quality_score,
"categories": categories
self.logger.info(json.dumps(log_entry, ensure_ascii=False))
ai_logger = AILogger("ai_production_logs.jsonl")
ai_logger.log_request(
request_id="req_001",
model="gpt-5.4",
prompt="解释什么是机器学习",
response="机器学习是...",
latency_ms=250,
tokens_used=1500,
日志分析查询
import json
class LogAnalyzer:
def __init__(self, log_file: str):
self.log_file = log_file
def load_logs(self, limit: int = None):
with open(self.log_file, 'r') as f:
for i, line in enumerate(f):
if limit and i >= limit:
logs.append(json.loads(line))
return logs
def get_error_rate(self, hours: int = 24) -> float:
"""计算错误率"""
cutoff = datetime.utcnow().timestamp() - hours * 3600
logs = self.load_logs()
recent = [l for l in logs if datetime.fromisoformat(l["timestamp"]).timestamp() > cutoff]
if not recent:
errors = sum(1 for l in recent if not l.get("success", True))
return errors / len(recent)
def get_expensive_requests(self, top_n: int = 10) -> list:
"""获取最贵的请求"""
logs = self.load_logs()
sorted_logs = sorted(
[l for l in logs if l.get("cost")],
key=lambda x: x.get("cost", 0),
reverse=True
return sorted_logs[:top_n]
def get_slow_requests(self, threshold_ms: float = 5000) -> list:
"""获取慢请求"""
logs = self.load_logs()
return [l for l in logs if l.get("latency_ms", 0) > threshold_ms]
追踪链路
LangChain + OpenTelemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
class AIServiceWithTracing:
def __init__(self):
self.llm = OpenAI()
self.vector_db = VectorDB()
@tracer.start_as_current_span("ai_request")
async def process_request(self, user_input: str, user_id: str):
span = trace.get_current_span()
span.set_attribute("user_id", user_id)
span.set_attribute("input_length", len(user_input))
# 1. 检索相关文档
with tracer.start_as_current_span("retrieve_context") as span:
docs = self.vector_db.search(user_input)
span.set_attribute("docs_retrieved", len(docs))
# 2. 调用 LLM
with tracer.start_as_current_span("llm_call") as span:
start = time.time()
response = self.llm.generate(user_input, docs)
span.set_attribute("model", "gpt-5.4")
span.set_attribute("latency_ms", (time.time() - start) * 1000)
span.set_attribute("response_length", len(response))
span.set_attribute("success", True)
return response
except Exception as e:
span.set_attribute("success", False)
span.set_attribute("error", str(e))
输出质量评估
自动质量评估
class AIOutputEvaluator:
def __init__(self):
self.llm = OpenAI()
def evaluate(self, prompt: str, response: str) -> dict:
"""评估输出质量"""
evaluation_prompt = f"""
评估以下 AI 输出的质量:
用户输入:{prompt}
AI 输出:{response}
评估维度(每项 1-5 分):
1. 相关性:输出是否与问题相关
2. 准确性:信息是否正确
3. 完整性:是否完整回答了问题
4. 清晰度:表达是否清晰易读
5. 安全性:是否有不当内容
"relevance": 4,
"accuracy": 5,
"completeness": 4,
"clarity": 5,
"safety": 5,
"overall_score": 4.6,
"issues": ["问题1", "问题2"],
"suggestions": ["建议1", "建议2"]
result = self.llm.generate(evaluation_prompt)
return json.loads(result)
return {"error": "评估解析失败", "raw": result}
def batch_evaluate(self, requests: list) -> list:
results = []
for req in requests:
evaluation = self.evaluate(req["prompt"], req["response"])
results.append({
"request_id": req["id"],
**evaluation
return results
def detect_hallucination(self, response: str, context: str) -> dict:
detection_prompt = f"""
检测以下回答是否存在幻觉(编造不存在的信息):
上下文/背景:{context}
AI 回答:{response}
1. 是否有具体事实(人名、日期、数字)需要验证
2. 这些事实是否在上下文中
3. 是否有明显编造的内容
"has_hallucination": true/false,
"confidence": 0.85,
"risky_content": ["具体可疑内容"],
"reason": "判断理由"
result = self.llm.generate(detection_prompt)
return json.loads(result)
return {"has_hallucination": False, "confidence": 0}
Prometheus 监控面板
指标导出
from prometheus_client import Counter, Histogram, Gauge, generate_latest
REQUEST_COUNT = Counter(
'ai_requests_total',
'Total AI requests',
['model', 'status']
REQUEST_LATENCY = Histogram(
'ai_request_latency_seconds',
'AI request latency',
TOKEN_USAGE = Counter(
'ai_tokens_used_total',
'Total tokens used',
['model', 'type'] # type: input/output
COST_USAGE = Counter(
'ai_cost_total',
'Total API cost',
ACTIVE_REQUESTS = Gauge(
'ai_active_requests',
'Number of active requests',
@app.middleware("http")
async def track_requests(request: Request, call_next):
model = request.headers.get("X-Model", "unknown")
ACTIVE_REQUESTS.labels(model=model).inc()
start = time.time()
response = await call_next(request)
latency = time.time() - start
REQUEST_COUNT.labels(model=model, status=response.status_code).inc()
REQUEST_LATENCY.labels(model=model).observe(latency)
ACTIVE_REQUESTS.labels(model=model).dec()
return response
@app.get("/metrics")
def metrics():
return Response(content=generate_latest())
告警配置
关键告警规则
# alertmanager.yml 或监控配置
- name: ai_application
- alert: HighAIErrorRate
sum(rate(ai_requests_total{status="error"}[5m]))
sum(rate(ai_requests_total[5m])) > 0.05
severity: critical
annotations:
summary: "AI 请求错误率超过 5%"
- alert: HighAILatency
histogram_quantile(0.95,
sum(rate(ai_request_latency_seconds_bucket[5m])) by (le)
severity: warning
annotations:
summary: "AI 请求 P95 延迟超过 10 秒"
- alert: HighAICost
increase(ai_cost_total[1h]) > 100
severity: warning
annotations:
summary: "AI 调用成本小时增长超过 $100"
- alert: AIRateLimit
increase(ai_requests_total{status="429"}[5m]) > 10
severity: warning
annotations:
summary: "AI API 限流频繁发生"
Grafana 仪表板
关键面板
┌─────────────────────────────────────────────────────────────┐
│ AI Application Dashboard │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Requests │ │ Error Rate │ │ Avg Latency │ │
│ │ 12,345 │ │ 2.3% │ │ 1.2s │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Token Usage Over Time │ │
│ │ ████████████████░░░░░░░░░░░░░░░░░░ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Cost by Model │ │
│ │ GPT-5.4: $45.2 (67%) │ │
│ │ Claude: $22.1 (33%) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Quality Score Distribution │ │
│ │ ██████████████████████████░░░░░░░░░░░░ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
最佳实践
1. 数据采样
class SamplingLogger:
"""采样记录,避免存储成本过高"""
SAMPLE_RATE = 0.1 # 10% 采样
def __init__(self):
self.full_logger = AILogger()
self.sample_count = 0
def should_log(self) -> bool:
"""判断是否应该记录完整日志"""
self.sample_count += 1
if self.sample_count % int(1 / self.SAMPLE_RATE) == 0:
return True
return False
def log(self, entry: dict):
if self.should_log():
self.full_logger.log_request(**entry)
2. 成本预警
class CostAlert:
def __init__(self, threshold_daily: float = 100):
self.threshold_daily = threshold_daily
self.token_tracker = TokenTracker()
def check_and_alert(self):
"""检查成本并告警"""
daily = self.token_tracker.get_daily_cost()
if daily["cost_today"] > self.threshold_daily:
"alert": True,
"message": f"今日 AI 成本 ${daily['cost_today']:.2f} 超过阈值 ${self.threshold_daily}",
"action": "review_recent_requests"
return {"alert": False}
总结
AI 应用可观测性是生产环境的必备:
延迟追踪:P50/P95/P99 延迟指标
Token 消耗:按模型、按时间的成本分析
错误分类:区分可重试和不可重试错误
质量评估:自动评估输出质量,检测幻觉
告警配置:错误率、延迟、成本告警
没有可观测性,就没有 AI 应用的生产治理。
本文是 AI 工程化系列之一。
This article contains affiliate links. If you sign up through the links above, I may earn a commission at no additional cost to you.
Ready to Build Your AI Business?
Get started with Systeme.io for free — All-in-one platform for building your online business with AI tools.
Top comments (0)