DEV Community

ZNY
ZNY

Posted on

AI 2026AI

AI 应用可观测性完全指南:2026年构建可监控、可调试的AI系统

前言

AI 应用的行为比传统软件更难预测。当 AI 输出错误结果时,如何快速定位问题?当 AI 响应变慢时,如何找到瓶颈?

可观测性是 2026 年 AI 工程化的核心挑战之一。本文介绍构建 AI 可观测性体系的方法和工具。

什么是 AI 可观测性

传统可观测性 vs AI 可观测性

| 维度 | 传统可观测性 | AI 可观测性 |

|------|------------|------------|

| 核心指标 | 延迟、错误率、QPS | 回答质量、Token 消耗、幻觉率 |

| 日志类型 | 结构化日志 | Prompt/Response 对 |

| Trace | 函数调用链 | Agent 决策链 |

| 告警 | 错误告警 | 质量下降告警 |

| 调试 | 重放日志 | 重放 Prompt |

AI 可观测性三大支柱


├── Metrics(指标)

│   ├── Token 消耗

│   ├── 响应延迟

│   ├── API 错误率

│   ├── 模型调用成本

│   └── 质量评分

├── Logs(日志)

│   ├── Prompt 日志

│   ├── Response 日志

│   ├── Token 使用明细

│   └── 错误详情

└── Traces(追踪)

├── Agent 决策链

├── Tool 调用链

├── RAG 检索链

└── 多 Agent 协作链

Enter fullscreen mode Exit fullscreen mode

核心指标体系

Token 消耗监控


from dataclasses import dataclass

from typing import Optional

import time

class TokenUsage:

"""Token 使用记录"""

prompt_tokens: int

completion_tokens: int

total_tokens: int

cost: float

timestamp: float = time.time()

class TokenMonitor:

"""Token 消耗监控"""

def __init__(self):

self.usage_records = []

self.daily_limit = 1_000_000  # 100万 Token/天

self.monthly_limit = 20_000_000  # 2000万 Token/月

def record(self, usage: TokenUsage):

"""记录 Token 使用"""

self.usage_records.append(usage)

daily = self.get_daily_usage()

if daily > self.daily_limit:

self.alert(f"日 Token 消耗 {daily:,} 超过限制 {self.daily_limit:,}")

def get_daily_usage(self) -> int:

"""获取今日 Token 消耗"""

today_start = time.time() - 86400

return sum(

r.total_tokens for r in self.usage_records

if r.timestamp >= today_start

def get_cost_by_model(self) -> dict:

"""按模型统计成本"""

for r in self.usage_records:

if r.model not in costs:

costs[r.model] = {"tokens": 0, "cost": 0}

costs[r.model]["tokens"] += r.total_tokens

costs[r.model]["cost"] += r.cost

return costs

def alert(self, message: str):

print(f"[ALERT] {message}")

# 实际应该发送到告警系统

Enter fullscreen mode Exit fullscreen mode

回答质量评分


class QualityScorer:

"""AI 回答质量评分"""

def __init__(self):

self.llm_as_judge = LLMJudge()

def score(self, prompt: str, response: str, expected: str = None) -> dict:

"""评估回答质量"""

scores = {}

scores["relevance"] = self._score_relevance(prompt, response)

# 2. 准确性评分(如果有标准答案)

if expected:

scores["accuracy"] = self._score_accuracy(response, expected)

scores["completeness"] = self._score_completeness(prompt, response)

scores["safety"] = self._score_safety(response)

scores["overall"] = sum(scores.values()) / len(scores)

return scores

def _score_relevance(self, prompt: str, response: str) -> float:

"""相关性评分"""

# 使用 embedding 相似度

prompt_emb = get_embedding(prompt)

response_emb = get_embedding(response)

similarity = cosine_similarity(prompt_emb, response_emb)

return similarity

def _score_safety(self, response: str) -> float:

"""安全性评分"""

harmful_keywords = ["暴力", "色情", "仇恨", "犯罪"]

for keyword in harmful_keywords:

if keyword in response:

def _score_completeness(self, prompt: str, response: str) -> float:

"""完整性评分"""

# 简单方法:长度比例

min_length = len(prompt) * 2

if len(response) < min_length:

return len(response) / min_length

Enter fullscreen mode Exit fullscreen mode

Prompt/Response 日志

结构化日志记录


import json

from datetime import datetime

from typing import Optional

class AIPromptLogger:

"""AI Prompt 日志记录器"""

def __init__(self, storage_path: str = "./logs/ai_interactions"):

self.storage_path = storage_path

self.current_file = self._get_log_file()

session_id: str,

prompt: str,

response: str,

model: str,

metadata: dict = None

"""记录 Prompt-Response 对"""

log_entry = {

"timestamp": datetime.now().isoformat(),

"session_id": session_id,

"model": model,

"prompt": prompt,

"response": response,

"prompt_length": len(prompt),

"response_length": len(response),

"metadata": metadata or {}

with open(self.current_file, 'a') as f:

f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')

def search(

session_id: str = None,

keyword: str = None,

date_range: tuple = None

results = []

for log_file in self._get_log_files(date_range):

with open(log_file, 'r') as f:

for line in f:

entry = json.loads(line)

if session_id and entry.get("session_id") != session_id:

if keyword and keyword not in entry["prompt"] and keyword not in entry["response"]:

results.append(entry)

return results

def replay(self, session_id: str) -> list:

"""重放某个 session 的所有交互"""

return self.search(session_id=session_id)

def _get_log_file(self) -> str:

"""获取当前日志文件"""

today = datetime.now().strftime("%Y%m%d")

return f"{self.storage_path}/{today}.jsonl"

def _get_log_files(self, date_range: tuple = None) -> list:

"""获取日志文件列表"""

Enter fullscreen mode Exit fullscreen mode

敏感信息过滤


class SecureAIPromptLogger(AIPromptLogger):

"""安全的 AI Prompt 日志记录器"""

SENSITIVE_PATTERNS = {

"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",

"phone": r"1[3-9]\d{9}",

"id_card": r"\d{17}[\dXx]",

"api_key": r"sk-[a-zA-Z0-9]{32,}",

def log(self, session_id: str, prompt: str, response: str, model: str, metadata: dict = None):

"""记录时过滤敏感信息"""

# 过滤 Prompt

filtered_prompt = self._filter_sensitive(prompt)

# 过滤 Response

filtered_response = self._filter_sensitive(response)

super().log(

session_id, filtered_prompt, filtered_response,

model, metadata

def _filter_sensitive(self, text: str) -> str:

"""过滤敏感信息"""

filtered = text

for data_type, pattern in self.SENSITIVE_PATTERNS.items():

filtered = re.sub(pattern, f"[{data_type}]", filtered)

return filtered

Enter fullscreen mode Exit fullscreen mode

分布式追踪

LangChain Trace


from langchain.callbacks.tracing_v2 import tracing_v2_enabled

# 启用 LangChain 追踪

with tracing_v2_enabled(

project_name="my-ai-app",

endpoint="http://localhost:4318"

# 你的 LangChain 代码

chain = create_chain()

result = chain.invoke({"input": "用户问题"})

# 自动记录完整的调用链

Enter fullscreen mode Exit fullscreen mode

自定义 Agent Trace


from typing import List, Dict, Any

from dataclasses import dataclass, field

import time

class AgentSpan:

"""Agent 执行跨度"""

start_time: float

end_time: float = field(default=None)

input_data: Any = None

output_data: Any = None

metadata: Dict = field(default_factory=dict)

children: List['AgentSpan'] = field(default_factory=list)

def duration(self) -> float:

if self.end_time:

return self.end_time - self.start_time

return time.time() - self.start_time

class AgentTracer:

"""Agent 追踪器"""

def __init__(self):

self.spans = []

self.current_span = None

def start_span(self, name: str, metadata: dict = None) -> AgentSpan:

span = AgentSpan(

start_time=time.time(),

metadata=metadata or {}

if self.current_span:

self.current_span.children.append(span)

self.spans.append(span)

self.current_span = span

return span

def end_span(self, span: AgentSpan, output_data: Any = None):

span.end_time = time.time()

span.output_data = output_data

self.current_span = None

def get_trace_tree(self) -> List[AgentSpan]:

"""获取追踪树"""

return self.spans

def export(self) -> dict:

"""导出追踪数据"""

"spans": [self._serialize_span(s) for s in self.spans],

"total_duration": sum(s.duration() for s in self.spans)

def _serialize_span(self, span: AgentSpan) -> dict:

"""序列化跨度"""

"name": span.name,

"duration": span.duration(),

"input": span.input_data,

"output": span.output_data,

"metadata": span.metadata,

"children": [self._serialize_span(c) for c in span.children]

tracer = AgentTracer()

with tracer.start_span("user_query", {"user_id": "123"}):

with tracer.start_span("route"):

route_result = router.route(user_input)

with tracer.start_span("generate"):

with tracer.start_span("retrieve_context"):

context = retriever.retrieve(user_input)

with tracer.start_span("llm_call"):

response = llm.generate(prompt)

with tracer.start_span("post_process"):

final_response = post_processor.format(response)

trace_data = tracer.export()

print(json.dumps(trace_data, indent=2))

Enter fullscreen mode Exit fullscreen mode

RAG 监控

检索质量监控


class RAGMonitor:

"""RAG 系统监控"""

def __init__(self):

self.retrieval_stats = []

self.answer_stats = []

def record_retrieval(

query: str,

retrieved_docs: list,

relevant_docs: list = None

"""记录检索结果"""

# 计算 Recall

if relevant_docs:

retrieved_set = set(retrieved_docs)

relevant_set = set(relevant_docs)

recall = len(retrieved_set & relevant_set) / len(relevant_set)

recall = None

# 计算 Precision

if retrieved_docs:

if relevant_docs:

precision = len(retrieved_set & relevant_set) / len(retrieved_set)

precision = None

precision = 0

"timestamp": time.time(),

"query": query[:100],  # 截断

"num_retrieved": len(retrieved_docs),

"recall": recall,

"precision": precision,

"retrieved_ids": [d.get("id") for d in retrieved_docs[:5]]

self.retrieval_stats.append(stat)

if recall is not None and recall < 0.5:

self.alert(f"检索 Recall 低于 50%: {recall:.2%}")

def record_answer(self, query: str, answer: str, score: float):

"""记录回答质量"""

self.answer_stats.append({

"timestamp": time.time(),

"query": query[:100],

"answer_length": len(answer),

"quality_score": score

def get_retrieval_stats(self, hours: int = 24) -> dict:

"""获取检索统计"""

cutoff = time.time() - hours * 3600

recent = [s for s in self.retrieval_stats if s["timestamp"] >= cutoff]

if not recent:

return {"error": "No data"}

recalls = [s["recall"] for s in recent if s["recall"] is not None]

precisions = [s["precision"] for s in recent if s["precision"] is not None]

"total_queries": len(recent),

"avg_recall": sum(recalls) / len(recalls) if recalls else None,

"avg_precision": sum(precisions) / len(precisions) if precisions else None,

"low_recall_count": sum(1 for r in recalls if r < 0.5)

Enter fullscreen mode Exit fullscreen mode

监控仪表板

Grafana 面板配置


# Grafana Dashboard for AI Metrics

- title: "Token 消耗趋势"

type: "graph"

datasource: "prometheus"

- expr: 'rate(ai_token_usage_total[5m])'

legendFormat: "{{model}}"

- title: "响应延迟 P99"

type: "graph"

- expr: 'histogram_quantile(0.99, rate(ai_request_duration_seconds_bucket[5m]))'

legendFormat: "P99"

- title: "API 错误率"

type: "stat"

- expr: 'rate(ai_api_errors_total[5m]) / rate(ai_requests_total[5m])'

legendFormat: "错误率"

- title: "回答质量评分"

type: "gauge"

- expr: 'avg(ai_quality_score)'

legendFormat: "平均分"

- title: "RAG 检索 Recall"

type: "graph"

- expr: 'avg(rag_retrieval_recall)'

legendFormat: "Recall@K"

Enter fullscreen mode Exit fullscreen mode

告警配置

告警规则


ALERT_RULES = [

"name": "high_token_consumption",

"condition": "daily_tokens > 1_000_000",

"severity": "warning",

"message": "日 Token 消耗超过 100 万"

"name": "low_answer_quality",

"condition": "avg_quality_score < 0.6",

"severity": "critical",

"message": "回答质量评分低于 0.6"

"name": "high_error_rate",

"condition": "error_rate > 0.05",

"severity": "critical",

"message": "API 错误率超过 5%"

"name": "slow_response",

"condition": "p99_latency > 10",

"severity": "warning",

"message": "P99 响应延迟超过 10 秒"

"name": "rag_low_recall",

"condition": "avg_recall < 0.5",

"severity": "warning",

"message": "RAG 检索 Recall 低于 50%"

class AlertManager:

"""告警管理器"""

def __init__(self):

self.rules = ALERT_RULES

def evaluate(self, metrics: dict) -> list:

"""评估告警规则"""

triggered = []

for rule in self.rules:

if self._check_condition(rule["condition"], metrics):

triggered.append({

"rule": rule["name"],

"severity": rule["severity"],

"message": rule["message"],

"timestamp": time.time()

return triggered

def _check_condition(self, condition: str, metrics: dict) -> bool:

# 实际应该用安全的表达式求值

for key, value in metrics.items():

condition = condition.replace(key, str(value))

return eval(condition)

return False

Enter fullscreen mode Exit fullscreen mode

实战:构建完整监控体系

快速集成


# observability.py

from .metrics import TokenMonitor, QualityScorer

from .logging import SecureAIPromptLogger

from .tracing import AgentTracer

from .rag_monitor import RAGMonitor

from .alerts import AlertManager

class AIObservability:

"""AI 可观测性综合组件"""

def __init__(self, config: dict = None):

self.config = config or {}

self.token_monitor = TokenMonitor()

self.quality_scorer = QualityScorer()

self.logger = SecureAIPromptLogger()

self.tracer = AgentTracer()

self.rag_monitor = RAGMonitor()

self.alert_manager = AlertManager()

def trace_and_record(self, session_id: str, prompt: str, model: str, func, *args, **kwargs):

"""追踪并记录一次 AI 调用"""

with self.tracer.start_span(f"ai_call_{model}"):

response = func(*args, **kwargs)

usage = TokenUsage(

prompt_tokens=len(prompt) // 4,

completion_tokens=len(response) // 4,

total_tokens=(len(prompt) + len(response)) // 4,

model=model,

cost=self._calculate_cost(model, len(prompt), len(response))

self.token_monitor.record(usage)

self.logger.log(session_id, prompt, response, model)

quality = self.quality_scorer.score(prompt, response)

metrics = {

"daily_tokens": self.token_monitor.get_daily_usage(),

"avg_quality_score": quality["overall"]

alerts = self.alert_manager.evaluate(metrics)

return response

def _calculate_cost(self, model: str, prompt_len: int, response_len: int) -> float:

"gpt-5.4": (0.000110, 0.000440),

"gpt-4o-mini": (0.0000015, 0.000006),

"claude-3.5-sonnet": (0.000015, 0.000075),

"deepseek-chat": (0.000001, 0.000002),

if model in prices:

input_p, output_p = prices[model]

return prompt_len / 1_000_000 * input_p + response_len / 1_000_000 * output_p

Enter fullscreen mode Exit fullscreen mode

总结

AI 可观测性核心要点:

  1. 指标监控:Token 消耗、延迟、成本、质量评分

  2. 日志记录:Prompt/Response 对、敏感信息过滤

  3. 分布式追踪:Agent 决策链、Tool 调用链

  4. RAG 监控:检索 Recall/Precision、回答质量

  5. 告警系统:质量下降、异常消耗、错误率

  6. 仪表板:Grafana/Prometheus 可视化

没有可观测性,AI 应用就是黑盒。2026 年,可观测性是 AI 工程化的必备基础设施。

本文是 AI 工程化系列之一。


This article contains affiliate links. If you sign up through the links above, I may earn a commission at no additional cost to you.

Ready to Build Your AI Business?

Get started with Systeme.io for free — All-in-one platform for building your online business with AI tools.

Top comments (0)