AI 应用可观测性完全指南:2026年构建可监控、可调试的AI系统
前言
AI 应用的行为比传统软件更难预测。当 AI 输出错误结果时,如何快速定位问题?当 AI 响应变慢时,如何找到瓶颈?
可观测性是 2026 年 AI 工程化的核心挑战之一。本文介绍构建 AI 可观测性体系的方法和工具。
什么是 AI 可观测性
传统可观测性 vs AI 可观测性
| 维度 | 传统可观测性 | AI 可观测性 |
|------|------------|------------|
| 核心指标 | 延迟、错误率、QPS | 回答质量、Token 消耗、幻觉率 |
| 日志类型 | 结构化日志 | Prompt/Response 对 |
| Trace | 函数调用链 | Agent 决策链 |
| 告警 | 错误告警 | 质量下降告警 |
| 调试 | 重放日志 | 重放 Prompt |
AI 可观测性三大支柱
├── Metrics(指标)
│ ├── Token 消耗
│ ├── 响应延迟
│ ├── API 错误率
│ ├── 模型调用成本
│ └── 质量评分
├── Logs(日志)
│ ├── Prompt 日志
│ ├── Response 日志
│ ├── Token 使用明细
│ └── 错误详情
└── Traces(追踪)
├── Agent 决策链
├── Tool 调用链
├── RAG 检索链
└── 多 Agent 协作链
核心指标体系
Token 消耗监控
from dataclasses import dataclass
from typing import Optional
import time
class TokenUsage:
"""Token 使用记录"""
prompt_tokens: int
completion_tokens: int
total_tokens: int
cost: float
timestamp: float = time.time()
class TokenMonitor:
"""Token 消耗监控"""
def __init__(self):
self.usage_records = []
self.daily_limit = 1_000_000 # 100万 Token/天
self.monthly_limit = 20_000_000 # 2000万 Token/月
def record(self, usage: TokenUsage):
"""记录 Token 使用"""
self.usage_records.append(usage)
daily = self.get_daily_usage()
if daily > self.daily_limit:
self.alert(f"日 Token 消耗 {daily:,} 超过限制 {self.daily_limit:,}")
def get_daily_usage(self) -> int:
"""获取今日 Token 消耗"""
today_start = time.time() - 86400
return sum(
r.total_tokens for r in self.usage_records
if r.timestamp >= today_start
def get_cost_by_model(self) -> dict:
"""按模型统计成本"""
for r in self.usage_records:
if r.model not in costs:
costs[r.model] = {"tokens": 0, "cost": 0}
costs[r.model]["tokens"] += r.total_tokens
costs[r.model]["cost"] += r.cost
return costs
def alert(self, message: str):
print(f"[ALERT] {message}")
# 实际应该发送到告警系统
回答质量评分
class QualityScorer:
"""AI 回答质量评分"""
def __init__(self):
self.llm_as_judge = LLMJudge()
def score(self, prompt: str, response: str, expected: str = None) -> dict:
"""评估回答质量"""
scores = {}
scores["relevance"] = self._score_relevance(prompt, response)
# 2. 准确性评分(如果有标准答案)
if expected:
scores["accuracy"] = self._score_accuracy(response, expected)
scores["completeness"] = self._score_completeness(prompt, response)
scores["safety"] = self._score_safety(response)
scores["overall"] = sum(scores.values()) / len(scores)
return scores
def _score_relevance(self, prompt: str, response: str) -> float:
"""相关性评分"""
# 使用 embedding 相似度
prompt_emb = get_embedding(prompt)
response_emb = get_embedding(response)
similarity = cosine_similarity(prompt_emb, response_emb)
return similarity
def _score_safety(self, response: str) -> float:
"""安全性评分"""
harmful_keywords = ["暴力", "色情", "仇恨", "犯罪"]
for keyword in harmful_keywords:
if keyword in response:
def _score_completeness(self, prompt: str, response: str) -> float:
"""完整性评分"""
# 简单方法:长度比例
min_length = len(prompt) * 2
if len(response) < min_length:
return len(response) / min_length
Prompt/Response 日志
结构化日志记录
import json
from datetime import datetime
from typing import Optional
class AIPromptLogger:
"""AI Prompt 日志记录器"""
def __init__(self, storage_path: str = "./logs/ai_interactions"):
self.storage_path = storage_path
self.current_file = self._get_log_file()
session_id: str,
prompt: str,
response: str,
model: str,
metadata: dict = None
"""记录 Prompt-Response 对"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"session_id": session_id,
"model": model,
"prompt": prompt,
"response": response,
"prompt_length": len(prompt),
"response_length": len(response),
"metadata": metadata or {}
with open(self.current_file, 'a') as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
def search(
session_id: str = None,
keyword: str = None,
date_range: tuple = None
results = []
for log_file in self._get_log_files(date_range):
with open(log_file, 'r') as f:
for line in f:
entry = json.loads(line)
if session_id and entry.get("session_id") != session_id:
if keyword and keyword not in entry["prompt"] and keyword not in entry["response"]:
results.append(entry)
return results
def replay(self, session_id: str) -> list:
"""重放某个 session 的所有交互"""
return self.search(session_id=session_id)
def _get_log_file(self) -> str:
"""获取当前日志文件"""
today = datetime.now().strftime("%Y%m%d")
return f"{self.storage_path}/{today}.jsonl"
def _get_log_files(self, date_range: tuple = None) -> list:
"""获取日志文件列表"""
敏感信息过滤
class SecureAIPromptLogger(AIPromptLogger):
"""安全的 AI Prompt 日志记录器"""
SENSITIVE_PATTERNS = {
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"phone": r"1[3-9]\d{9}",
"id_card": r"\d{17}[\dXx]",
"api_key": r"sk-[a-zA-Z0-9]{32,}",
def log(self, session_id: str, prompt: str, response: str, model: str, metadata: dict = None):
"""记录时过滤敏感信息"""
# 过滤 Prompt
filtered_prompt = self._filter_sensitive(prompt)
# 过滤 Response
filtered_response = self._filter_sensitive(response)
super().log(
session_id, filtered_prompt, filtered_response,
model, metadata
def _filter_sensitive(self, text: str) -> str:
"""过滤敏感信息"""
filtered = text
for data_type, pattern in self.SENSITIVE_PATTERNS.items():
filtered = re.sub(pattern, f"[{data_type}]", filtered)
return filtered
分布式追踪
LangChain Trace
from langchain.callbacks.tracing_v2 import tracing_v2_enabled
# 启用 LangChain 追踪
with tracing_v2_enabled(
project_name="my-ai-app",
endpoint="http://localhost:4318"
# 你的 LangChain 代码
chain = create_chain()
result = chain.invoke({"input": "用户问题"})
# 自动记录完整的调用链
自定义 Agent Trace
from typing import List, Dict, Any
from dataclasses import dataclass, field
import time
class AgentSpan:
"""Agent 执行跨度"""
start_time: float
end_time: float = field(default=None)
input_data: Any = None
output_data: Any = None
metadata: Dict = field(default_factory=dict)
children: List['AgentSpan'] = field(default_factory=list)
def duration(self) -> float:
if self.end_time:
return self.end_time - self.start_time
return time.time() - self.start_time
class AgentTracer:
"""Agent 追踪器"""
def __init__(self):
self.spans = []
self.current_span = None
def start_span(self, name: str, metadata: dict = None) -> AgentSpan:
span = AgentSpan(
start_time=time.time(),
metadata=metadata or {}
if self.current_span:
self.current_span.children.append(span)
self.spans.append(span)
self.current_span = span
return span
def end_span(self, span: AgentSpan, output_data: Any = None):
span.end_time = time.time()
span.output_data = output_data
self.current_span = None
def get_trace_tree(self) -> List[AgentSpan]:
"""获取追踪树"""
return self.spans
def export(self) -> dict:
"""导出追踪数据"""
"spans": [self._serialize_span(s) for s in self.spans],
"total_duration": sum(s.duration() for s in self.spans)
def _serialize_span(self, span: AgentSpan) -> dict:
"""序列化跨度"""
"name": span.name,
"duration": span.duration(),
"input": span.input_data,
"output": span.output_data,
"metadata": span.metadata,
"children": [self._serialize_span(c) for c in span.children]
tracer = AgentTracer()
with tracer.start_span("user_query", {"user_id": "123"}):
with tracer.start_span("route"):
route_result = router.route(user_input)
with tracer.start_span("generate"):
with tracer.start_span("retrieve_context"):
context = retriever.retrieve(user_input)
with tracer.start_span("llm_call"):
response = llm.generate(prompt)
with tracer.start_span("post_process"):
final_response = post_processor.format(response)
trace_data = tracer.export()
print(json.dumps(trace_data, indent=2))
RAG 监控
检索质量监控
class RAGMonitor:
"""RAG 系统监控"""
def __init__(self):
self.retrieval_stats = []
self.answer_stats = []
def record_retrieval(
query: str,
retrieved_docs: list,
relevant_docs: list = None
"""记录检索结果"""
# 计算 Recall
if relevant_docs:
retrieved_set = set(retrieved_docs)
relevant_set = set(relevant_docs)
recall = len(retrieved_set & relevant_set) / len(relevant_set)
recall = None
# 计算 Precision
if retrieved_docs:
if relevant_docs:
precision = len(retrieved_set & relevant_set) / len(retrieved_set)
precision = None
precision = 0
"timestamp": time.time(),
"query": query[:100], # 截断
"num_retrieved": len(retrieved_docs),
"recall": recall,
"precision": precision,
"retrieved_ids": [d.get("id") for d in retrieved_docs[:5]]
self.retrieval_stats.append(stat)
if recall is not None and recall < 0.5:
self.alert(f"检索 Recall 低于 50%: {recall:.2%}")
def record_answer(self, query: str, answer: str, score: float):
"""记录回答质量"""
self.answer_stats.append({
"timestamp": time.time(),
"query": query[:100],
"answer_length": len(answer),
"quality_score": score
def get_retrieval_stats(self, hours: int = 24) -> dict:
"""获取检索统计"""
cutoff = time.time() - hours * 3600
recent = [s for s in self.retrieval_stats if s["timestamp"] >= cutoff]
if not recent:
return {"error": "No data"}
recalls = [s["recall"] for s in recent if s["recall"] is not None]
precisions = [s["precision"] for s in recent if s["precision"] is not None]
"total_queries": len(recent),
"avg_recall": sum(recalls) / len(recalls) if recalls else None,
"avg_precision": sum(precisions) / len(precisions) if precisions else None,
"low_recall_count": sum(1 for r in recalls if r < 0.5)
监控仪表板
Grafana 面板配置
# Grafana Dashboard for AI Metrics
- title: "Token 消耗趋势"
type: "graph"
datasource: "prometheus"
- expr: 'rate(ai_token_usage_total[5m])'
legendFormat: "{{model}}"
- title: "响应延迟 P99"
type: "graph"
- expr: 'histogram_quantile(0.99, rate(ai_request_duration_seconds_bucket[5m]))'
legendFormat: "P99"
- title: "API 错误率"
type: "stat"
- expr: 'rate(ai_api_errors_total[5m]) / rate(ai_requests_total[5m])'
legendFormat: "错误率"
- title: "回答质量评分"
type: "gauge"
- expr: 'avg(ai_quality_score)'
legendFormat: "平均分"
- title: "RAG 检索 Recall"
type: "graph"
- expr: 'avg(rag_retrieval_recall)'
legendFormat: "Recall@K"
告警配置
告警规则
ALERT_RULES = [
"name": "high_token_consumption",
"condition": "daily_tokens > 1_000_000",
"severity": "warning",
"message": "日 Token 消耗超过 100 万"
"name": "low_answer_quality",
"condition": "avg_quality_score < 0.6",
"severity": "critical",
"message": "回答质量评分低于 0.6"
"name": "high_error_rate",
"condition": "error_rate > 0.05",
"severity": "critical",
"message": "API 错误率超过 5%"
"name": "slow_response",
"condition": "p99_latency > 10",
"severity": "warning",
"message": "P99 响应延迟超过 10 秒"
"name": "rag_low_recall",
"condition": "avg_recall < 0.5",
"severity": "warning",
"message": "RAG 检索 Recall 低于 50%"
class AlertManager:
"""告警管理器"""
def __init__(self):
self.rules = ALERT_RULES
def evaluate(self, metrics: dict) -> list:
"""评估告警规则"""
triggered = []
for rule in self.rules:
if self._check_condition(rule["condition"], metrics):
triggered.append({
"rule": rule["name"],
"severity": rule["severity"],
"message": rule["message"],
"timestamp": time.time()
return triggered
def _check_condition(self, condition: str, metrics: dict) -> bool:
# 实际应该用安全的表达式求值
for key, value in metrics.items():
condition = condition.replace(key, str(value))
return eval(condition)
return False
实战:构建完整监控体系
快速集成
# observability.py
from .metrics import TokenMonitor, QualityScorer
from .logging import SecureAIPromptLogger
from .tracing import AgentTracer
from .rag_monitor import RAGMonitor
from .alerts import AlertManager
class AIObservability:
"""AI 可观测性综合组件"""
def __init__(self, config: dict = None):
self.config = config or {}
self.token_monitor = TokenMonitor()
self.quality_scorer = QualityScorer()
self.logger = SecureAIPromptLogger()
self.tracer = AgentTracer()
self.rag_monitor = RAGMonitor()
self.alert_manager = AlertManager()
def trace_and_record(self, session_id: str, prompt: str, model: str, func, *args, **kwargs):
"""追踪并记录一次 AI 调用"""
with self.tracer.start_span(f"ai_call_{model}"):
response = func(*args, **kwargs)
usage = TokenUsage(
prompt_tokens=len(prompt) // 4,
completion_tokens=len(response) // 4,
total_tokens=(len(prompt) + len(response)) // 4,
model=model,
cost=self._calculate_cost(model, len(prompt), len(response))
self.token_monitor.record(usage)
self.logger.log(session_id, prompt, response, model)
quality = self.quality_scorer.score(prompt, response)
metrics = {
"daily_tokens": self.token_monitor.get_daily_usage(),
"avg_quality_score": quality["overall"]
alerts = self.alert_manager.evaluate(metrics)
return response
def _calculate_cost(self, model: str, prompt_len: int, response_len: int) -> float:
"gpt-5.4": (0.000110, 0.000440),
"gpt-4o-mini": (0.0000015, 0.000006),
"claude-3.5-sonnet": (0.000015, 0.000075),
"deepseek-chat": (0.000001, 0.000002),
if model in prices:
input_p, output_p = prices[model]
return prompt_len / 1_000_000 * input_p + response_len / 1_000_000 * output_p
总结
AI 可观测性核心要点:
指标监控:Token 消耗、延迟、成本、质量评分
日志记录:Prompt/Response 对、敏感信息过滤
分布式追踪:Agent 决策链、Tool 调用链
RAG 监控:检索 Recall/Precision、回答质量
告警系统:质量下降、异常消耗、错误率
仪表板:Grafana/Prometheus 可视化
没有可观测性,AI 应用就是黑盒。2026 年,可观测性是 AI 工程化的必备基础设施。
本文是 AI 工程化系列之一。
This article contains affiliate links. If you sign up through the links above, I may earn a commission at no additional cost to you.
Ready to Build Your AI Business?
Get started with Systeme.io for free — All-in-one platform for building your online business with AI tools.
Top comments (0)