Swarm Agents — 架构设计文档
二次开发基础: DeerFlow 2.0 (Lead Agent + Middleware Chain + Subagents)
配置驱动: OpenClaw soul.md 模式 + Skills 三级加载
核心模式: Origin-X (天书) + 隔离执行 (观澜/执戈) + 异步消息总线 (Redis Streams)
设计哲学: 决策唯一中心、执行完全隔离、记忆按需检索、Token 极致优化
决策记录 (D01-D17)
| # | 决策项 | 选型 | 理由 |
|---|---|---|---|
| D01 | 天书实现方式 | DeerFlow 2.0 make_lead_agent 二次开发 |
复用成熟的中间件链与 LangGraph 基础设施 |
| D02 | 中间件链 | 保留 12 层 + 新增 1 层 = 13 层 | 恢复 Title/Memory,新增 Acceptance,裁剪 Guardrail/LoopDetection |
| D03 | 任务派发通道 | Redis Streams 异步消息总线 | 解耦 Origin-X 与 Guardian,支持消费者组与持久化 |
| D04 | 同步/异步模型 | 异步执行 + 关键路径 ACK 确认 | 派发不阻塞,但接收与结果必须强制确认 |
| D05 | 执行模型 | Origin-X (调度) → Guardian (常驻分解) → Worker (按需执行) | 兼顾常驻响应与弹性伸缩 |
| D06 | 跨 Agent 通信 | 严禁直连,必须经 Origin-X 路由中转 | 避免网状调用导致上下文污染与管理爆炸 |
| D07 | TaskPool 存储 | SQLite (主表) + Redis Hash (子任务/运行时状态) | 主数据持久化查询,临时状态高性能读写 |
| D08 | 监控独立部署 | MonitorFlow 独立进程 | Origin-X 挂掉时仍可检测心跳与发送中断指令 |
| D09 | 配置格式 |
config.yaml (主) + agents/{id}/config.local.yaml (从) + soul.md
|
遵循 DeerFlow 官方规范,主从分层 |
| D10 | 置信度/评估 | Guardian 自评 + Origin-X 复核 + 来源渠道优先级加成 | 观澜按渠道权重自评,天书用统一标准验收 |
| D11 | 质量评分 | 按 task_type 独立定义维度与权重 |
情报/代码/图片/音视频/建站各有验收标准 |
| D12 | 记忆架构 | Global (天书) / Domain (观澜) / Skill (执戈) / Ephemeral (子实例) | 业务从属但记忆独立,防止精神分裂与 Token 浪费 |
| D13 | 结果交付 | FileBrowser 目录映射公网 URL + 多类型 MIME 预览 | 用户不登后台,直接点击超链接预览/下载 |
| D14 | 子任务监控 | 三级中断:消息层 → asyncio.Task.cancel() → 沙箱进程 kill |
随时可停,返回中间结果 |
| D15 | 硬件自适应 | 三档检测 (低/中/高) + 运行时热重载 | 防止多任务卡死,动态调整并发与超时 |
| D16 | 执戈技能接口 | 6 大技能统一注册,按 task_type 快速切换 |
代码/图片/音视频/建站/数据/文档一套接口 |
| D17 | 任务路由识别 | 正则/关键词匹配 task_types,支持直派/链式/并行 |
不是所有任务都需要观澜,按需路由 |
| D18 | 工具防爆截断 | 工具返回值超 Token 阈值强制截断 + 警告 | 防止 read_file 读大文件撑爆上下文窗口 |
| D19 | 子任务依赖编排 | Guardian 内 DAG 解析,depends_on 声明依赖 |
线性链不够用时,支持网状依赖并发执行 |
| D20 | 死循环熔断 | Subagent max_turns=15 硬限制 + timeout_seconds 兜底 |
Worker 陷入"改 Bug→报错→再改"循环时物理掐断 |
一、系统总览
1.1 架构全景
┌─────────────────────────────────────────────────────────────────────────────┐
│ Swarm Agents v3.5 架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌───────────────────────────────────────────────────┐ │
│ │ 用户 │◄───►│ 天书 (Origin-X Agent) │ │
│ └──────────┘ │ • 13 层中间件链 • 任务路由识别 (D17) │ │
│ │ • FactQuery 防幻觉 • 验收评估 (D10/D11) │ │
│ │ • 全局记忆 (Global Memory) • 异步消息总线客户端 │ │
│ └───────────────────────┬───────────────────────────┘ │
│ │ Redis Streams (Pub/Sub) │
│ ┌──────────────────────┼───────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ │
│ │ 观澜 Guardian │ │ 执戈 Guardian │ │ 扩展 Guardian │ │
│ │ (情报常驻) │ │ (多技能常驻) │ │ (动态插拔) │ │
│ │ │ │ │ │ │ │
│ │ • 领域记忆加载 │ │ • 技能记忆加载 │ │ • 独立记忆/配置 │ │
│ │ • 任务分解 │ │ • 技能路由切换 │ │ • 任务分解 │ │
│ │ • 异步并发控制 │ │ • 外部 API 调度 │ │ • 并发控制 │ │
│ │ (Semaphore) │ │ (comfyUI/..) │ │ │ │
│ │ • 结果摘要生成 │ │ • 摘要/链接返回 │ │ • 摘要返回 │ │
│ └────────┬──────────┘ └────────┬──────────┘ └────────┬──────────┘ │
│ │ │ │ │
│ ┌────────┴──────────┐ ┌────────┴──────────┐ ┌────────┴──────────┐ │
│ │ 观澜 Worker 池 │ │ 执戈 Worker 池 │ │ 扩展 Worker 池 │ │
│ │ (无状态, 用完销毁) │ │ (无状态, 用完销毁) │ │ (无状态, 用完销毁) │ │
│ │ • 独立 Prompt │ │ • 按需加载 Skill │ │ • 独立上下文 │ │
│ │ • 临时记忆 │ │ • 外部 API 调用 │ │ • 临时记忆 │ │
│ └───────────────────┘ └───────────────────┘ └───────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 侧边支撑组件 (Sidecars) │ │
│ │ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │ │
│ │ │ Memory Engine│ │ Summary Engine│ │ TaskPool │ │ MonitorFlow │ │ │
│ │ │ (检索/合并) │ │ (3级摘要) │ │ (SQLite+Redis)│ │ (独立进程) │ │ │
│ │ │ Redis: mem:* │ │ 上下文/结果 │ │ 硬件自适应 │ │ 心跳/超时/ │ │ │
│ │ └─────────────┘ └──────────────┘ └───────────────┘ │ 中断/降级 │ │ │
│ │ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │ │
│ │ │ FileBrowser │ │ Ext. APIs │ │ Agent Registry│ │ RAG Engine │ │ │
│ │ │ (公网映射) │ │ (comfyUI等) │ │ (热重载) │ │ (v2 入口预留)│ │ │
│ │ └─────────────┘ └──────────────┘ └───────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
1.2 核心设计原则
- Origin-X 唯一调度: 天书是决策中心、记忆写入中心、验收中心。子 Agent 严禁直连。
- 隔离执行: Guardian 常驻分解,Worker 无状态执行。Worker 不共享上下文,不持长期记忆。
- 按需检索 (Retrieval): 记忆绝不全量加载。基于任务意图做相似度检索,只注入 Top-K 相关片段。
-
摘要驱动 (Summary-First): 跨 Agent 传递只传
Summary + Key Facts + 数据链接。Worker 需细节时主动read_file读取。 - 配置热插拔: 新增 Agent = 建目录 + 写 soul.md + 改 config.yaml。Agent Manager 自动启停。
- 硬件自适应: 启动检测 CPU/内存分档,运行时动态调整并发/超时,基础流程绝不卡死。
二、核心模块
2.1 天书 (Origin-X Agent)
基于 DeerFlow make_lead_agent 改造,保留 13 层中间件,新增验收与事实查询层。
2.1.1 中间件链 (13 层)
ThreadData → Uploads → Sandbox → DanglingToolCall → Summarization → Todo → Title → Memory(全局检索) → ViewImage → DeferredToolFilter → SubagentLimit → Acceptance(验收) → Clarification(最后)
⚠️ 设计约束(不可违背):
ClarificationMiddleware必须在最后,用于拦截澄清请求并中断执行流AcceptanceMiddleware在SubagentLimit之后、Clarification之前,只验收子 Agent 返回结果,不拦截用户输入MemoryMiddleware执行异步检索(不阻塞主流程),Top-3 相关片段注入 SystemMessageTitleMiddleware自动生成对话标题,用户必须能区分不同会话(不可裁剪)- 裁剪掉的
GuardrailMiddleware和LoopDetectionMiddleware在 v1 阶段不恢复
2.1.2 事实查询保障 (FactQueryMiddleware)
防止 LLM 编造进度/配置。用户提问时自动拦截,查询 SQLite/Redis 注入真实数据。
class FactQueryMiddleware(Middleware):
INTENT_MAP = {
"task_status": ["进度", "到哪了", "状态"],
"agent_status": ["观澜", "执戈", "在线"],
"config": ["配置", "模型", "工具"],
"deliverables": ["文件", "报告", "链接"]
}
# 检测到意图 → 查 SQLite/Redis → 注入 SystemMessage: "【事实数据】... 必须基于此回答"
2.1.3 任务类型识别与路由 (D17)
天书接收请求后,先匹配 task_types 定义,决定路由策略:
-
simple_qa→ 天书直接回答 -
image_process / code_dev / web_dev→ 直派执戈 (跳过观澜) -
intelligence→ 派观澜 -
research_then_web / doc / data→ 链式: 观澜 → (验收) → 执戈 -
parallel→ 并行派发
⚠️ 设计约束(不可违背):
- 匹配算法必须支持三级优先级:精确短语 > 正则 > 关键词,最长匹配优先
- 多个任务类型同时匹配时,取分数最高者,不允许随机选或全部派发
- 所有类型都匹配不上时,fallback 到天书 LLM 自主决策,不允许拒绝服务
- 链式路由的
input_from: previous必须通过chain_contextJSON 传递,禁止让下一个 Agent 去翻数据库- 关键词匹配区分任务边界:"裁剪图片" ≠ "分析图片数据",不能仅用
in判断
2.2 异步消息总线 (Redis Streams)
解耦 Origin-X 与 Guardian,支持消费组、持久化、死信重试。
Topic 定义:
| Stream | 生产者 | 消费者组 | 用途 |
|--------|--------|----------|------|
| tasks.dispatch | 天书 | guardians | 派发主任务/链式步骤 |
| tasks.ack | Guardian | origin-x | 接收确认 + 预估时间 |
| tasks.progress | Guardian/Worker | origin-x | 实时进度推送 |
| tasks.complete | Guardian | origin-x | 任务完成 + 摘要 + 链接 |
| tasks.failed | Guardian | origin-x | 失败归档 |
| tasks.subtask | Guardian | subtask-workers | 子任务派发/结果 |
| tasks.interrupt | 天书/用户 | guardians | 强制中断信号 |
| agents.heartbeat | 所有进程 | monitor | 心跳保活 |
2.3 记忆与摘要引擎 (Memory & Summary)
2.3.1 三层记忆隔离 (D12)
| 层级 | Key | 内容 | 权限 | 更新时机 |
|---|---|---|---|---|
| Global | mem:global |
用户偏好、项目目标、跨 Agent 历史 | 天书读写,其他只读 | 任务验收后合并 |
| Domain | mem:vanguard |
权威数据源、历史报告摘要、避坑指南 | 观澜读写,天书监督 | 观澜任务完成后 |
| Skill | mem:ironclad |
代码规范、服务器配置、API 密钥/环境 | 执戈读写,天书监督 | 执戈任务完成后 |
| Ephemeral | 进程内存 | 当前 Worker 的中间变量、搜索草稿 | Worker 私有 | 任务结束即销毁 |
2.3.2 检索加载 (Retrieval-Augmented Loading)
Agent 启动/接单时,绝不全量加载记忆。
核心逻辑说明:
记忆加载分为 意图提取 → 相似度检索 → Top-K 注入 三步,确保 Prompt 只包含相关片段。
# ─────────────────────────────────────────────────────────────
# Memory Engine — 检索加载实现
# ─────────────────────────────────────────────────────────────
class MemoryEngine:
"""记忆引擎 — 检索加载 + 沉淀合并"""
def __init__(self, redis_client, vector_store=None):
self.redis = redis_client
self.vector_store = vector_store # 可选:向量检索 (如 FAISS/RedisVL)
self.use_vector = vector_store is not None
async def retrieve(self, task_type: str, keywords: list[str],
memory_scope: str = "all", top_k: int = 3) -> dict:
"""
根据任务意图检索记忆
参数:
task_type: 任务类型 (intelligence / web_development / ...)
keywords: 关键词列表 ["Vue3", "2024 AI", "响应式"]
memory_scope: 检索范围 "global" | "vanguard" | "ironclad" | "all"
top_k: 返回最相关的 Top-K 条
返回:
结构化记忆片段,直接注入 Prompt
"""
results = {}
# ── 步骤 1: 确定检索范围 ──
scopes = self._resolve_scope(memory_scope)
for scope in scopes:
mem_key = f"mem:{scope}"
raw_mem = await self.redis.json().get(mem_key)
if not raw_mem:
continue
# ── 步骤 2: 相似度检索 ──
if self.use_vector:
# 向量检索:将 keywords 合并为查询向量,搜索最相似的记忆片段
query_vec = self.vector_store.encode(" ".join(keywords))
scored = self.vector_store.search(query_vec, scope=scope, top_k=top_k)
results[scope] = scored
else:
# 轻量模式:基于关键词的标签/字段匹配 + 相关性打分
scored = self._keyword_match(raw_mem, keywords, top_k)
if scored:
results[scope] = scored
# ── 步骤 3: 全局去重 + 排序 ──
return self._dedup_and_rank(results, top_k)
def _keyword_match(self, raw_mem: dict, keywords: list[str], top_k: int) -> list:
"""
关键词匹配 (轻量模式,无需向量库)
策略:
1. 对记忆中的每条记录,计算与关键词的 overlap 分数
2. 字段权重:tags (3x) > key (2x) > value (1x)
3. 返回 Top-K
"""
scored = []
for key, value in raw_mem.items():
score = 0
text_blob = f"{key} {self._flatten(value)}".lower()
# 检查 tags 字段 (最高权重)
tags = raw_mem.get("tags", [])
if isinstance(tags, list):
for kw in keywords:
if any(kw.lower() in t.lower() for t in tags):
score += 3
# 关键词匹配
for kw in keywords:
if kw.lower() in text_blob:
score += 1
if score > 0:
scored.append({"key": key, "value": value, "score": score})
# 按分数降序,返回 Top-K
scored.sort(key=lambda x: x["score"], reverse=True)
return scored[:top_k]
def _resolve_scope(self, memory_scope: str) -> list[str]:
"""解析检索范围"""
if memory_scope == "all":
return ["global", "vanguard", "ironclad"]
return [memory_scope]
def _flatten(self, value) -> str:
"""将嵌套字典展平为字符串"""
if isinstance(value, str):
return value
if isinstance(value, dict):
return " ".join(str(v) for v in value.values())
return str(value)
def _dedup_and_rank(self, results: dict, top_k: int) -> dict:
"""全局去重 + 排序"""
all_items = []
for scope, items in results.items():
for item in items:
item["scope"] = scope
all_items.append(item)
# 按分数排序,取 Top-K
all_items.sort(key=lambda x: x["score"], reverse=True)
return {"memories": all_items[:top_k]}
# ─────────────────────────────────────────────────────────────
# 使用示例 — 执戈接任务时的记忆加载
# ─────────────────────────────────────────────────────────────
# 执戈守护实例收到任务: "基于情报报告生成可视化网页"
# → 提取意图关键词: ["web", "visualization", "chart", "responsive"]
# → 检索 mem:ironclad (技能记忆) + mem:global (全局记忆)
memories = await memory_engine.retrieve(
task_type="web_development",
keywords=["web", "visualization", "chart", "responsive"],
memory_scope="all",
top_k=3
)
# 返回:
# {
# "memories": [
# {
# "scope": "ironclad",
# "key": "coding_style",
# "value": "优先使用 Vue3 + TailwindCSS + Chart.js",
# "score": 5
# },
# {
# "scope": "ironclad",
# "key": "server_config",
# "value": {"host": "192.168.1.100", "port": 8080},
# "score": 2
# },
# {
# "scope": "global",
# "key": "user_preferences",
# "value": {"language": "zh-CN", "report_style": "数据驱动"},
# "score": 1
# }
# ]
# }
# → 注入执戈 Prompt:
# """
# 【历史记忆】
# - 编码风格: 优先使用 Vue3 + TailwindCSS + Chart.js
# - 服务器配置: 192.168.1.100:8080
# - 用户偏好: 中文界面,数据驱动风格
# """
2.3.3 三级摘要架构 (D-Summary)
-
上下文摘要 (Context): 对话 Token > 80% 时触发滑动窗口压缩,保留最近 3 轮,历史折叠为
summary_context。 - 结果摘要 (Result): Guardian 完成任务后,LLM 生成结构化摘要 JSON (Summary + Key Facts + Links)。链式传递只传此 JSON,不传原文。
-
记忆摘要 (Memory): 异步
ConsolidateMemory任务。从本次结果中提取新事实/偏好,合并到对应 Memory 层级。冲突时以最新为准。
2.3.3.1 ConsolidateMemory — 记忆沉淀详细逻辑
核心问题:记忆不是堆砌,是"按需索取"且"持续进化"的。每次任务完成后,必须从结果中提炼有价值的信息,写入对应记忆层。
沉淀时机:天书验收通过后,后台异步触发(不阻塞用户交付)。
沉淀流程:
# ─────────────────────────────────────────────────────────────
# ConsolidateMemory — 记忆沉淀实现
# ─────────────────────────────────────────────────────────────
class MemoryConsolidator:
"""记忆沉淀器 — 从任务结果中提取新知,合并入记忆库"""
def __init__(self, memory_engine: MemoryEngine, redis_client):
self.memory = memory_engine
self.redis = redis_client
async def consolidate(self, task_result: dict, user_feedback: str | None = None):
"""
记忆沉淀主流程
输入:
task_result: 天书验收后的完整任务结果
user_feedback: 用户反馈 (可选,如"把 Tailwind 换成 Bootstrap")
"""
task_type = task_result["task_type"]
agent_id = task_result["guard_id"]
result_summary = task_result["result_summary"]
confidence = task_result.get("confidence", 0)
quality_score = task_result.get("quality_score", 0)
# ── 步骤 1: 事实提取 (Fact Extraction) ──
# 从情报报告中提取新的事实,写入 mem:global
if task_type == "intelligence":
await self._extract_facts(result_summary, task_result)
# ── 步骤 2: 偏好修正 (Preference Update) ──
# 如果用户修改了产出物,更新技能记忆
if user_feedback:
await self._update_preferences(user_feedback, agent_id)
# ── 步骤 3: 错误复盘 (Error Retrospective) ──
# 如果有子任务失败,记录避坑指南
if task_result.get("errors"):
await self._record_lessons_learned(task_result, agent_id)
# ── 步骤 4: 去重合并 (Dedup & Merge) ──
# 新旧冲突时以最新为准
await self._dedup_and_merge(agent_id, task_result)
async def _extract_facts(self, summary: str, result: dict):
"""
事实提取 — 从情报摘要中提取新事实写入 mem:global
示例:
摘要: "AI 芯片市场突破 500 亿美元"
→ 写入 mem:global["market_facts"]["ai_chips_2024"] = "500亿美元"
"""
# 使用 LLM 提取结构化事实
facts = await self._llm_extract_facts(summary)
current_global = await self.redis.json().get("mem:global") or {}
current_global.setdefault("market_facts", {})
for fact_key, fact_value in facts.items():
current_global["market_facts"][fact_key] = fact_value
await self.redis.json().set("mem:global", "$", current_global)
async def _update_preferences(self, feedback: str, agent_id: str):
"""
偏好修正 — 根据用户反馈更新技能记忆
示例:
用户反馈: "把 Tailwind 换成 Bootstrap"
→ 更新 mem:ironclad["coding_style"] = "优先使用 Vue3 + Bootstrap + Chart.js"
"""
mem_key = f"mem:{agent_id}"
current_mem = await self.redis.json().get(mem_key) or {}
# 使用 LLM 解析用户反馈,识别要修改的字段
updates = await self._llm_parse_feedback(feedback, current_mem)
for key, new_value in updates.items():
current_mem[key] = new_value
current_mem.setdefault("tags", []).append(f"updated:{key}")
await self.redis.json().set(mem_key, "$", current_mem)
async def _record_lessons_learned(self, result: dict, agent_id: str):
"""
错误复盘 — 记录失败经验到技能记忆
示例:
错误: "comfyUI 生成超时,模型 SDXL 过大"
→ 写入 mem:ironclad["past_issues"]["comfyui_sdxl_timeout"] = {
"issue": "comfyUI SDXL 模型生成超时",
"avoid": "使用 SD-Turbo 或设置 timeout =600",
"date": "2025-01-15"
}
"""
mem_key = f"mem:{agent_id}"
current_mem = await self.redis.json().get(mem_key) or {}
current_mem.setdefault("past_issues", {})
for error in result["errors"]:
issue_key = self._normalize_error_key(error)
current_mem["past_issues"][issue_key] = {
"issue": error["message"],
"avoid": error.get("suggestion", "避免使用相同配置"),
"date": result.get("completed_at", "unknown"),
}
await self.redis.json().set(mem_key, "$", current_mem)
async def _dedup_and_merge(self, agent_id: str, result: dict):
"""
去重合并 — 新旧记忆冲突时以最新为准
策略:
1. 检查 mem 中是否有 tags 标记为 "deprecated" 的条目
2. 如果同一 key 有新值,旧值自动过期
3. 定期清理 (每 10 次任务后触发)
"""
mem_key = f"mem:{agent_id}"
current_mem = await self.redis.json().get(mem_key) or {}
# 标记过期 tags
tags = current_mem.get("tags", [])
deprecated = [t for t in tags if t.startswith("deprecated:")]
for tag in deprecated:
old_key = tag.replace("deprecated:", "")
if old_key in current_mem:
del current_mem[old_key]
# 清理过期 tags
current_mem["tags"] = [t for t in tags if not t.startswith("deprecated:")]
await self.redis.json().set(mem_key, "$", current_mem)
async def _llm_extract_facts(self, summary: str) -> dict:
"""调用 LLM 从摘要中提取结构化事实"""
# Prompt: "从以下情报摘要中提取 3-5 个关键事实,返回 JSON: {fact_key: fact_value}"
...
async def _llm_parse_feedback(self, feedback: str, current_mem: dict) -> dict:
"""调用 LLM 解析用户反馈,识别要修改的记忆字段"""
# Prompt: "用户说'{feedback}',当前记忆是{current_mem}。请识别要修改的字段和新值"
...
def _normalize_error_key(self, error: dict) -> str:
"""将错误信息规范化为记忆键"""
msg = error["message"][:50].lower().replace(" ", "_").replace("/", "_")
return f"issue_{msg}"
# ─────────────────────────────────────────────────────────────
# 使用示例 — 天书验收后触发记忆沉淀
# ─────────────────────────────────────────────────────────────
# 场景 1: 观澜完成情报任务 → 事实提取
task_result = {
"task_type": "intelligence",
"guard_id": "vanguard-s",
"result_summary": "AI 芯片市场突破 500 亿美元,多模态模型成本下降 90%",
"confidence": 0.86,
}
consolidator = MemoryConsolidator(memory_engine, redis_client)
await consolidator.consolidate(task_result)
# → mem:global["market_facts"]["ai_chip_market_2024"] = "500亿美元"
# → mem:global["market_facts"]["multimodal_cost_reduction"] = "下降90%"
# 场景 2: 用户反馈修改执戈产出 → 偏好修正
user_feedback = "下次生成的网页用深色主题,Tailwind 换成 Bootstrap"
await consolidator.consolidate(task_result, user_feedback=user_feedback)
# → mem:ironclad["coding_style"] = "Vue3 + Bootstrap + 深色主题"
# → mem:ironclad["ui_preference"] = "dark_mode"
# 场景 3: 子任务失败 → 错误复盘
task_result_with_error = {
"task_type": "image_processing",
"guard_id": "ironclad",
"errors": [
{"message": "comfyUI 生成超时,SDXL 模型加载过慢"}
],
}
await consolidator.consolidate(task_result_with_error)
# → mem:ironclad["past_issues"]["comfyui_sdxl_timeout"] = {
# "issue": "comfyUI 生成超时,SDXL 模型加载过慢",
# "avoid": "使用 SD-Turbo 或设置 timeout=600",
# "date": "2025-01-15"
# }
记忆沉淀数据流示意:
任务完成 → 天书验收通过
│
▼
┌────────────────────┐
│ ConsolidateMemory │ (后台异步,不阻塞用户交付)
└────────┬───────────┘
│
┌───────┼────────┐
▼ ▼ ▼
事实提取 偏好修正 错误复盘
│ │ │
▼ ▼ ▼
mem:global mem:{agent} mem:{agent}
(新知) (编码风格) (避坑指南)
│ │ │
└───────┴────────┘
│
▼
┌────────────────────┐
│ 去重合并 │ (冲突时以最新为准)
│ 过期标记清理 │
└────────────────────┘
2.3.4 链式按需加载 (Lazy Loading)
天书向执戈派发时,Payload 结构:
"chain_context": {
"demand": "生成可视化网页",
"summary": "2024 AI 趋势:成本降 90%,Agent 自主化...",
"key_facts": ["GPT-4o 支持实时语音", "AI 芯片市场 500 亿"],
"data_sources": { "full_report": "https://filebrowser.../report.md" }
}
执戈生成 Prompt 时注入 summary + key_facts。执行中若需具体数据,主动调用 read_file(data_sources.full_report)。
2.4 Guardian 与并发控制
2.4.1 Guardian 职责
常驻进程。负责:加载专属记忆 → 匹配技能 → 分解子任务 → asyncio.Semaphore 控制并发 → 监听 tasks.interrupt → 汇总生成摘要 → 发布 tasks.complete。
2.4.2 异步并发控制 (Semaphore)
废弃 ThreadPoolExecutor,使用 asyncio.Semaphore(max_workers)。
class GuardianConcurrency:
def __init__(self, limit): self.sem = asyncio.Semaphore(limit)
async def run(self, subtask_id, coro):
async with self.sem:
task = asyncio.create_task(coro)
self.active[subtask_id] = task
try: return await task
finally: del self.active[subtask_id]
async def interrupt(self, subtask_id=None):
targets = [self.active[subtask_id]] if subtask_id else self.active.values()
for t in targets:
if not t.done(): t.cancel()
⚠️ 设计约束(不可违背):
- 防递归深度:
max_subagent_depth = 2(天书→守护→子实例,禁止更深),子实例严禁派发子子任务- 工具剥离: 创建子实例时,必须从工具集中移除
task/delegate/dispatch类工具,子实例只能使用执行类工具- 中断安全:
interrupt()必须返回中间结果(已完成的部分),不允许丢弃已执行的工作- 并发上限: Guardian 的
max_workers受硬件自适应参数控制,不允许写死固定值- 子实例超时: 每个子任务必须有独立
timeout,超时自动 cancel,不阻塞 Guardian 主循环
2.4.3 执戈技能接口 (D16)
agents/ironclad/skills/ 下按 task_type 组织。启动时扫描 L1 元数据,接单时按需加载 L2 指令与工具集。支持外部 API (comfyUI/seedance) 声明与自动初始化。
2.4.4 工具防爆截断 (D18)
痛点:
read_file读一个 10MB 文件 → 30k+ Token 直接吃掉上下文窗口 → Agent 崩溃或丢失历史。
核心机制:所有工具执行完毕后,强制通过 Token 截断器。
# ─────────────────────────────────────────────────────────────
# Tool Output Truncation — 工具返回值物理截断
# ─────────────────────────────────────────────────────────────
import tiktoken
# 默认阈值 (可配置,根据模型上下文窗口动态调整)
DEFAULT_TOOL_MAX_TOKENS = 8000
ENCODING = tiktoken.get_encoding("cl100k_base")
def truncate_tool_output(tool_name: str, output: str, max_tokens: int = DEFAULT_TOOL_MAX_TOKENS) -> str:
"""
截断工具返回值,防止撑爆上下文窗口
策略:
1. 计算输出 Token 数
2. 如果超限 → 截断前 N 个 Token + 尾部保留 500 Token
3. 注入截断警告
"""
tokens = ENCODING.encode(output)
if len(tokens) <= max_tokens:
return output
# 截断: 头部 75% + 尾部 25%,中间用警告替代
head_keep = int(max_tokens * 0.75)
tail_keep = max_tokens - head_keep
head = ENCODING.decode(tokens[:head_keep])
tail = ENCODING.decode(tokens[-tail_keep:]) if tail_keep > 0 else ""
truncated_tokens = len(tokens) - max_tokens
warning = f"\n\n⚠️ [系统拦截] 工具 '{tool_name}' 输出超过 {max_tokens} Token 限制 (实际 {len(tokens)} Token)。\n"
warning += f"已截断 {truncated_tokens} Token。请使用 ask_document / read_file_chunk 分块读取,或缩小操作范围。"
return head + warning + tail
# 在 Guardian/Worker 执行工具后统一调用:
# result = some_tool.execute(...)
# result = truncate_tool_output("read_file", result)
特殊工具处理:
| 工具 | 截断策略 | 替代方案 |
|---|---|---|
read_file (大文件) |
> 50KB 文件禁止全量读取 |
read_file_chunk(path, start_line, end_line) 分块读取 |
bash (长输出) |
只保留最后 200 行 |
bash 命令自带 ` |
{% raw %}web_fetch (长网页) |
只提取正文 (readability) | 内置 readability 过滤,去除广告/导航 |
| 代码执行输出 | 保留 stdout 前 500 行 + stderr 全部 | stderr 全保留因为错误信息不能丢 |
⚠️ 设计约束(不可违背):
- 截断是硬截断,不由 LLM 决定是否截断
- 截断警告必须出现在工具返回值中(不是日志),让 LLM 明确知道自己拿到的不是完整数据
read_file_chunk每次最多读取 100 行,防止 LLM 通过循环调用分块读取来绕过截断- stderr (错误输出)永远不截断,因为错误信息是调试的关键
2.4.5 子任务依赖编排 (D19)
痛点: 线性链 (观澜→执戈) 不够用。全栈开发需要"建表→后端→前端→部署"这种网状依赖。
核心机制:Guardian 将任务拆解为 DAG (有向无环图),按依赖关系动态派发。
// 子任务声明式依赖 (tasks.subtask dispatch payload)
{
"subtask_id": "st-003",
"parent_task_id": "task-0a1b2c3d",
"action": "dispatch",
"describe": "编写 HTML/Tailwind 代码",
"demand": "基于 UI 结构和配图生成落地页代码",
"depends_on": ["st-001", "st-002"], // 声明依赖
"timeout": 300
}
DAG 调度逻辑:
# ─────────────────────────────────────────────────────────────
# DAG Subtask Scheduler — Guardian 内存维护依赖图
# ─────────────────────────────────────────────────────────────
from collections import defaultdict
import asyncio
class DAGScheduler:
"""守护实例内的子任务 DAG 调度器"""
def __init__(self, subtasks: list[dict], semaphore: asyncio.Semaphore):
self.semaphore = semaphore
self.tasks = {st["subtask_id"]: st for st in subtasks}
self.results: dict[str, dict] = {}
self.depends_on: dict[str, list[str]] = {
st["subtask_id"]: st.get("depends_on", [])
for st in subtasks
}
self.completed: set[str] = set()
self.running: set[str] = set()
self._events: dict[str, asyncio.Event] = {
st["subtask_id"]: asyncio.Event() for st in subtasks
}
async def run_all(self):
"""执行所有子任务,按依赖关系动态解锁"""
# 第一轮: 派发所有无依赖的任务
for task_id, deps in self.depends_on.items():
if not deps:
self._dispatch(task_id)
# 等待所有任务完成
await asyncio.gather(*[
self._events[tid].wait()
for tid in self.tasks
])
def _dispatch(self, task_id: str):
"""派发单个子任务 (受 Semaphore 控制并发)"""
self.running.add(task_id)
task = self.tasks[task_id]
asyncio.create_task(self._run_with_deps(task_id, task))
async def _run_with_deps(self, task_id: str, task: dict):
"""等待依赖完成后,执行子任务"""
deps = self.depends_on.get(task_id, [])
# 等待所有依赖完成
for dep_id in deps:
if dep_id not in self.completed:
await self._events[dep_id].wait()
# 依赖全部完成 → 通过 Semaphore 执行
async with self.semaphore:
result = await self._execute_task(task_id, task)
# 标记完成
self.completed.add(task_id)
self.running.discard(task_id)
self.results[task_id] = result
self._events[task_id].set()
# 解锁依赖此任务的后续任务
self._unlock_dependents(task_id)
def _unlock_dependents(self, completed_task_id: str):
"""检查哪些任务依赖当前完成的任务,如果所有依赖都满足则派发"""
for task_id, deps in self.depends_on.items():
if task_id in self.completed or task_id in self.running:
continue
# 检查是否所有依赖都已完成
if all(dep in self.completed for dep in deps):
self._dispatch(task_id)
async def _execute_task(self, task_id: str, task: dict) -> dict:
"""实际执行子任务 (通过 Redis Streams 派发)"""
# ... 通过 message_bus.publish("tasks.subtask", ...) 派发
# ... 轮询等待结果
# ... 返回结果
...
DAG 示意:
执戈 Guardian 收到任务: "生成电商落地页"
┌──────────┐ ┌──────────┐
│ st-01 │ │ st-02 │ ← 无依赖,同时派发
│ 生成UI结构│ │ 生成配图 │ ← Semaphore 控制并发
└────┬─────┘ └────┬─────┘
│ │
└──────┬───────┘
▼
┌──────────┐
│ st-03 │ ← 依赖 st-01 + st-02
│ 编写代码 │ ← 等前两个都完成后才派发
└────┬─────┘
▼
┌──────────┐
│ st-04 │ ← 依赖 st-03
│ 部署上线 │
└──────────┘
物理深度始终 = 2 (Guardian → Worker)
逻辑依赖无限层
⚠️ 设计约束(不可违背):
- DAG 只在 Guardian 内存中维护,不持久化到 SQLite/Redis,降低复杂度
- 如果检测到循环依赖(A→B→A),Guardian 立即拒绝该任务图,返回错误给天书
- 任一子任务失败 → 所有依赖它的后续任务自动取消,返回中间结果给天书
- Guardian 生成的 DAG 最多 10 个节点,超过则拆分为多个独立任务
2.5 TaskPool 与硬件自适应 (D15)
2.5.1 统一字段模型
SQLite tasks 表结构精简,区分基础字段与类型专属 JSON。
CREATE TABLE tasks (
id TEXT PRIMARY KEY, name TEXT, priority TEXT, describe TEXT, demand TEXT,
task_type TEXT, creator TEXT, dispatch_to TEXT, status TEXT DEFAULT 'pending',
result_summary TEXT, confidence REAL, quality_score REAL,
confidence_breakdown TEXT, quality_breakdown TEXT,
result_detail TEXT, -- JSON: 按 task_type 定义不同 schema
deliverables TEXT, -- JSON: 统一文件列表 [{name, type, url, size}]
error TEXT, created_at TEXT, dispatched_at TEXT, acked_at TEXT,
started_at TEXT, completed_at TEXT, expired_at TEXT, last_updated TEXT,
metadata TEXT DEFAULT '{}'
);
-- 注: 子任务状态全在 Redis Hash task:{id}:subtasks 中,SQLite 不存 subtasks 表
2.5.2 硬件自适应与热重载
启动 psutil 检测分档 (Low/Mid/High)。运行时每 5 分钟检测 CPU/内存,若超载则降档 (心跳间隔×1.5, 最大并发减半),通过 ConfigHotReloader 广播新参数给 TaskPool/MonitorFlow。
⚠️ 设计约束(不可违背):
- 分档阈值: 低配
CPU≤4核 OR 内存≤8GB;中配CPU≤8核 OR 内存≤16GB;高配>8核 AND >16GB- 三档参数基准: 低配
max_concurrent=3, heartbeat=90s, ack_timeout=60s;中配10, 60s, 30s;高配30, 30s, 15s- 降档条件: CPU>90% AND 内存>85%,连续 2 次检测才降,避免抖动
- 升档条件: CPU<50% AND 内存<60%,连续 3 次检测才升,当前不是最高档
- 降档上限: 最低降到中配,不允许降到 1 个并发(系统必须保持基本可用性)
- TaskPool/MonitorFlow 参数联动: 两者复用同一
MonitorConfig对象,不允许各自维护独立参数
2.6 外部 API 与容错交付
2.6.1 外部 API 容错链
执戈调用 comfyUI/seedance 等外部服务时,必须带超时与降级:
async def generate_with_fallback(prompt, primary_api, backup_api, timeout=300):
try: return await asyncio.wait_for(primary_api(prompt), timeout)
except Exception:
try: return await asyncio.wait_for(backup_api(prompt), timeout)
except Exception as e: raise GenerationFailedError(f"所有生成服务不可用: {e}")
2.6.2 FileBrowser 交付
执戈产出文件写入 /mnt/user-data/outputs/{task_id}/。通过预设规则拼接公网 URL 注入 deliverables JSON。用户收到的是可直接点击的 preview_url / download_url,无需后台登录。
2.7 MonitorFlow (独立进程)
职责:心跳检测 (消费 agents.heartbeat)、超时/停滞检测、告警。
定位:执行层的 asyncio.wait_for 是第一道防线 (主动熔断)。MonitorFlow 是第二道防线 (兜底监控)。若执行层超时未触发,MonitorFlow 检测到停滞则发布 tasks.interrupt 强制清理。异步非阻塞,过载时自动降级仅保留心跳检测。
2.8 用户交互机制 (Human Communication)
核心原则: 系统不是闭门干活的工具,而是会沟通、会请示、会求助的助手。在关键节点主动与用户交互,而不是等到最后丢一个可能很差的结果。
2.8.1 交互场景总览
| # | 场景 | 触发条件 | 交互类型 | 超时策略 |
|---|---|---|---|---|
| I1 | 任务澄清 | 天书无法确定任务意图或需求模糊 | 天书 → 用户提问 | 2 分钟无响应 → 天书自主决策 |
| I2 | 能力边界 | 任务超出系统当前能力范围 | 天书 → 用户说明限制 + 替代方案 | 用户确认后继续/取消 |
| I3 | 高风险操作 | 需要安装系统级依赖、修改服务器配置、执行破坏性命令 | 天书 → 用户告知风险 + 请求确认 | 3 分钟无响应 → 取消操作 |
| I4 | 低置信度警告 | 任何任务 confidence < 0.6
|
天书 → 用户告知数据质量低 + 建议人工复核 | 通知即可,不阻塞 |
| I5 | 执行中补充 | Guardian/Worker 执行中缺少关键信息(如 Logo、品牌色、API Key) | Guardian → 用户请求补充 | 3 分钟无响应 → 使用默认值或跳过 |
| I6 | 质量低预警 | 天书验收不通过,质量评分 < 60 | 天书 → 用户告知质量低 + 正在重做 | 通知即可,不阻塞 |
| I7 | 重做失败 | 重试 2 次后仍不通过 | 天书 → 用户说明失败原因 + 建议人工介入 | 用户决定下一步 |
| I8 | 成本预警 | 预估成本超过 $1 或实际消耗超过预估 200% | 天书 → 用户告知当前花费 + 预估总成本 | 2 分钟无响应 → 继续执行 |
| I9 | 链式里程碑确认 | 链式任务每一步完成后(Phase 8) | 天书 → 用户汇报 + 请求确认下一步 | 5+5 分钟 → 自动 STOP |
2.8.2 交互消息格式
所有用户交互统一通过以下消息结构,由 Gateway API 推送给前端:
{
"type": "user_interaction",
"interaction_id": "ui-0a1b2c3d",
"task_id": "task-0a1b2c3d",
"scenario": "I1 | I2 | I3 | I4 | I5 | I6 | I7 | I8 | I9",
"severity": "info | warning | critical",
"timestamp": "2025-01-15T10:00:05Z",
"message": {
"title": "需要确认任务需求",
"body": "你提到'做个网页',请问是以下哪种类型?",
"context": {
"original_request": "帮我做个网页",
"ambiguity_reason": "任务描述过于宽泛,无法确定具体类型"
}
},
"actions": [
{
"id": "action_1",
"label": "个人主页",
"type": "select"
},
{
"id": "action_2",
"label": "数据看板",
"type": "select"
},
{
"id": "action_3",
"label": "其他(请描述)",
"type": "text_input"
}
],
"timeout": {
"seconds": 120,
"on_timeout": "auto_decide"
}
}
2.8.3 各场景详细流程
I1: 任务澄清 (Clarification)
触发: 天书 TaskRouter 匹配分数 < 阈值 (如最高分 < 30)
或 LLM 自主判断需求模糊
流程:
天书 → 用户: 发送 clarification 消息,包含 2-4 个选项 + 自由输入
用户 → 天书: 选择选项 或 输入补充描述
天书: 根据用户反馈重新匹配任务类型
超时: 2 分钟无响应 → 天书使用 LLM 自主决策,记录日志
示例:
用户: "帮我做个网页"
天书: "请问你需要什么类型的网页?
[1] 个人主页/简历
[2] 数据看板/报告
[3] 产品展示页
[4] 其他(请描述)"
I2: 能力边界 (Capability Boundary)
触发: Guardian 接单后发现超出能力范围
(如"实时视频流处理"、"硬件调试")
流程:
Guardian → 天书: 报告无法完成的原因
天书 → 用户: 坦诚说明限制 + 提供替代方案
用户 → 天书: 确认尝试替代方案 / 取消任务
示例:
天书: "⚠️ 你要求的'实时视频流处理'超出我目前的能力范围。
我可以做的是:
[1] 离线视频剪辑(上传视频后处理)
[2] 视频格式转换
[3] 取消任务"
I3: 高风险操作 (High Risk)
触发: 需要执行以下操作之一:
- 安装系统级依赖 (apt install, pip install --system)
- 修改服务器配置 (nginx/nginx.conf, iptables)
- 执行可能影响系统稳定性的命令
- 删除/覆盖已有文件
流程:
Guardian → 天书: 标记操作为高风险
天书 → 用户: 告知具体风险 + 请求确认
用户 → 天书: 确认 / 拒绝
天书: 确认则放行,拒绝则跳过该操作或标记失败
示例:
天书: "⚠️ 下一步需要安装系统级依赖 ffmpeg,可能影响服务器环境。
操作: apt install -y ffmpeg
影响: 新增约 200MB 系统包
[1] 确认安装
[2] 跳过(功能可能不可用)
[3] 取消任务"
I4: 低置信度警告 (Low Confidence)
触发: 任何任务完成后 `confidence < 0.6`
流程:
天书: 自动判定 confidence 等级
天书 → 用户: 告知数据质量低 + 标注薄弱来源 + 建议人工复核
不阻塞: 结果仍交付,但附带警告
示例:
天书: "⚠️ 情报报告已生成,但置信度较低 (0.45)。
原因:
- 所有来源均为自媒体/博客,无官方或权威媒体
- 关键数据点仅有 2 条,不足以支撑结论
建议: 请人工复核后再使用此报告。
[预览报告](https://...) [下载](https://...)"
I5: 执行中补充 (In-Flight Supplement)
触发: Guardian/Worker 执行中发现缺少关键信息
(如需要 Logo 图片、品牌色、API Key、数据库密码)
流程:
Guardian → 天书: 报告缺失信息
天书 → 用户: 请求补充具体信息
用户 → 天书: 提供信息 / 表示没有
天书: 有信息 → 转发 Guardian 继续执行
没有 → 使用默认值或跳过该步骤
超时: 3 分钟无响应 → 使用默认值继续
示例:
天书: "生成网页需要你提供以下信息:
1. Logo 图片(可选,拖拽上传)
2. 品牌主色(如 #3B82F6,不填则用默认蓝色)
3. 网站标题
[提交并继续] [跳过,用默认值]"
I6: 质量低预警 (Low Quality Alert)
触发: 天书验收不通过,`quality_score < 60`,触发自动重做
流程:
天书: 自动打回 Guardian 重做
天书 → 用户: 告知质量低 + 正在重做 + 预计额外时间
不阻塞: 用户无需操作,只是知情
示例:
天书: "📝 生成的网页质量评分较低 (55/100),正在自动重做。
问题: 缺少响应式适配,移动端显示异常
预计额外时间: 2 分钟
无需你的操作,完成后会通知你。"
I7: 重做失败 (Retry Exhausted)
触发: 重试 2 次后仍不通过验收
流程:
天书: 标记任务为 failed
天书 → 用户: 详细说明失败原因 + 已尝试的措施 + 建议人工介入
用户 → 天书: 决定下一步(取消 / 修改需求重新尝试 / 人工处理)
示例:
天书: "❌ 任务未能通过验收,已尝试 2 次重做。
失败原因:
- 网页加载速度 > 5s (要求 < 2s)
- 图片资源未压缩 (4.2MB → 要求 < 500KB)
已尝试: 图片压缩、代码分割、懒加载
建议:
[1] 降低要求: 接受当前质量 (加载速度 ~5s)
[2] 修改需求: 减少页面上的图片数量
[3] 取消任务,人工处理
[4] 继续尝试 (可能仍无法达标)"
I8: 成本预警 (Cost Warning)
触发: 预估成本 > $1 或 实际消耗 > 预估 200%
流程:
天书: 监控当前 Token/API 消耗
天书 → 用户: 告知当前花费 + 预估总成本
用户 → 天书: 确认继续 / 降低质量以节省成本 / 取消
超时: 2 分钟无响应 → 继续执行(不中断正在进行的任务)
示例:
天书: "💰 成本提醒
当前已消耗: $0.52
预估总成本: $2.10 (超出预估 $1 的 110%)
主要消耗: GPT-4o 情报采集 ($0.38), Claude 代码生成 ($0.14)
[1] 继续执行
[2] 降低模型档位节省成本 (质量可能下降)
[3] 取消任务"
2.8.4 交互决策流
用户提交任务
│
▼
┌─────────────────┐
│ I1: 任务澄清? │ ── 需求模糊 → 提问用户
└────────┬────────┘
│
▼
┌─────────────────┐
│ I2: 超出能力? │ ── 是 → 说明限制 + 替代方案
└────────┬────────┘
│
▼
┌─────────────────┐
│ I3: 高风险操作? │ ── 是 → 告知风险 + 请求确认
└────────┬────────┘
│
▼
执行任务...
│
┌────┼────────┐
▼ ▼ ▼
┌────┐┌────┐ ┌──────┐
│I5: ││I8: │ │I4: │
│补充││成本│ │低置信│
│信息││预警│ │度 │
└─┬──┘└─┬──┘ └──┬───┘
│ │ │
▼ ▼ ▼
执行完成 → 天书验收
│
┌────┴─────┐
▼ ▼
quality≥60 quality<60
│ │
│ ┌────┴─────┐
│ │ I6: 质量 │
│ │ 低预警 │
│ └────┬─────┘
│ │
│ 重试 ≤ 2?
│ ┌──┬─┘
│ 是 否
│ │ │
│ │ ▼
│ │ ┌────────┐
│ │ │I7: 重做│
│ │ │失败 │
│ │ └───┬────┘
│ │ │
▼ ▼ ▼
┌──────────────────┐
│ I9: 链式里程碑 │ ── 如为链式任务
└────────┬─────────┘
│
▼
交付用户
⚠️ 设计约束(不可违背):
- 用户知情权: I4 (低置信度)、I6 (质量低预警) 仅通知不阻塞,不允许因为质量差就偷偷丢弃结果
- 用户决定权: I2 (能力边界)、I3 (高风险)、I7 (重做失败) 必须等待用户确认,不允许自动跳过
- 超时默认行为: I1 任务澄清超时 → 天书自主决策;I3 高风险超时 → 取消操作(安全第一);I5 补充信息超时 → 使用默认值
- 交互频率上限: 同一个任务在 5 分钟内最多触发 3 次交互,避免骚扰用户
- 交互记录: 所有交互历史写入 task_events 表,可追溯 "系统问了几次、用户怎么回的、最后怎么定的"
- I9 与其他交互互斥: 链式里程碑确认 (I9) 进行时,不允许同时触发 I5/I6/I8,按优先级排队
2.9 RAG 引擎 (v2 入口预留)
定位: v1 不需要。预留接口定义、调用位置、数据流向,为 v2 接入私有知识库做准备。
2.9.1 接口定义
# ─────────────────────────────────────────────────────────────
# RAG Engine — 接口契约 (v2 实现,v1 只定义抽象层)
# ─────────────────────────────────────────────────────────────
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class RAGQuery:
"""RAG 查询请求"""
query: str # 自然语言问题
collection: str = "default" # 知识库集合名 (支持多知识库)
top_k: int = 3 # 返回最相关片段数
filter_tags: list[str] | None = None # 可选标签过滤
@dataclass
class RAGChunk:
"""检索到的文档片段"""
content: str # 片段正文
source: str # 来源 (文件路径 / URL)
metadata: dict # 附加元数据 (页码/章节/标签等)
score: float # 相似度分数 (0-1)
@dataclass
class RAGResult:
"""RAG 查询结果"""
query: str
chunks: list[RAGChunk] # 相关片段
answer: str | None = None # LLM 基于片段生成的答案 (可选)
used_chunks: int = 0 # 实际用于生成答案的片段数
class RAGEngine(ABC):
"""RAG 引擎抽象接口 — v2 实现,v1 只定义契约"""
@abstractmethod
async def query(self, q: RAGQuery) -> RAGResult:
"""检索知识库,返回相关片段"""
...
@abstractmethod
async def ingest(self, collection: str, documents: list[dict]) -> int:
"""写入文档到知识库,返回成功入库数量"""
...
@abstractmethod
async def delete(self, collection: str, doc_ids: list[str]) -> int:
"""删除文档,返回删除数量"""
...
@abstractmethod
async def list_collections(self) -> list[str]:
"""列出所有知识库集合"""
...
# ─────────────────────────────────────────────────────────────
# v1 空实现 — 确保调用链不报错
# ─────────────────────────────────────────────────────────────
class RAGEngineStub(RAGEngine):
"""v1 空实现,query 返回空结果,ingest/delete 记录日志"""
async def query(self, q: RAGQuery) -> RAGResult:
return RAGResult(query=q.query, chunks=[])
async def ingest(self, collection: str, documents: list[dict]) -> int:
return 0 # v1 不存储
async def delete(self, collection: str, doc_ids: list[str]) -> int:
return 0
async def list_collections(self) -> list[str]:
return []
2.9.2 调用位置预留
RAG 引擎在架构中有 3 个潜在调用点,v1 不接入,但预留接入位置:
┌─────────────────────────────────────────────────────────────┐
│ RAG 调用位置 (v2 接入) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 位置 1: Worker 工具层 │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 工具名: ask_document(query, collection="default") │ │
│ │ 触发: LLM 主动调用,替代 read_file 读取大文件 │ │
│ │ 流程: │ │
│ │ LLM → ask_document("品牌色是什么") │ │
│ │ → RAGEngine.query(query, top_k=3) │ │
│ │ → 返回 Top-3 相关片段 │ │
│ │ → 注入 LLM 上下文 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ 位置 2: Guardian 任务分解 │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 触发: Guardian 收到任务时,主动检索相关历史经验 │ │
│ │ 流程: │ │
│ │ Guardian → RAGEngine.query("电商落地页开发经验") │ │
│ │ → 返回历史项目片段、踩坑记录 │ │
│ │ → 注入 Guardian Prompt 作为参考 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ 位置 3: 用户直接提问 │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 触发: 用户输入 "在 XX 文档里找 YY 信息" │ │
│ │ 路由: 天书识别为知识库查询 → 直调 RAG │ │
│ │ 流程: │ │
│ │ 用户 → 天书 → RAGEngine.query() │ │
│ │ → 返回答案 + 引用来源 │ │
│ │ → 推送用户 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
2.9.3 数据流入站 (Ingestion Pipeline)
v2 实现时的文档入库流程:
用户上传 PDF/Markdown/HTML
│
▼
┌──────────────────┐
│ Document Parser │ 提取文本 + 元数据 (标题/章节/页码)
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Text Chunker │ 分块策略 (按段落/语义/固定长度)
│ │ 默认: 500 字/块, 重叠 50 字
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Embedding Model │ 向量化 (OpenAI/本地模型)
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Vector Store │ 存储 (Chroma/FAISS/RedisVL)
└──────────────────┘
⚠️ 设计约束(不可违背):
- v1 不调用 RAG,使用
RAGEngineStub空实现ask_document工具在 v1 中不存在,Worker 使用read_file_chunk替代- RAG 引擎的接口必须是抽象类,确保 v2 实现时不需要改调用方代码
- RAG 查询结果必须带 source 和 score,让 LLM 能判断引用可信度
- v2 实现时,RAG 查询超时 ≤ 3s,不得阻塞 Worker 执行流
三、全链路数据流转 (JSON Trace)
案例: "调研 2024 年 AI 趋势并生成可视化报告网页" → research_then_web (链式)
Phase 1: 用户提交 → 天书识别任务类型
用户输入:
{
"user_id": "user-001",
"request": "帮我调研 2024 年 AI 趋势并生成可视化报告网页"
}
天书 TaskRouter 识别结果(内存中,不持久化):
{
"matched_type": "research_then_web",
"route_type": "chain",
"confidence": 0.92,
"chain_steps": [
{ "agent": "vanguard-s", "task_type": "intelligence", "input_from": null },
{ "agent": "ironclad", "task_type": "web_development", "input_from": "previous" }
]
}
Phase 2: 天书创建任务 → 写入 SQLite tasks
{
"写入者": "天书 (TaskPool)",
"存储": "SQLite tasks 表",
"数据": {
"id": "task-0a1b2c3d",
"name": "调研 2024 年 AI 趋势并生成可视化报告网页",
"priority": "B",
"describe": "调研 2024 年 AI 行业技术趋势、市场规模、关键玩家,生成可视化报告网页",
"demand": "生成包含数据图表、来源引用的可视化 HTML 网页",
"task_type": "research_then_web",
"creator": "user-001",
"dispatch_to": null,
"status": "pending",
"retry": 3,
"result_summary": null,
"confidence": null,
"quality_score": null,
"result_detail": null,
"deliverables": null,
"created_at": "2025-01-15T10:00:00Z",
"last_updated": "2025-01-15T10:00:00Z",
"metadata": {
"route_type": "chain",
"chain_steps": ["vanguard-s:intelligence", "ironclad:web_development"],
"current_chain_index": 0
}
}
}
同时写入 SQLite task_events:
{
"写入者": "天书 (TaskPool)",
"存储": "SQLite task_events 表",
"数据": {
"task_id": "task-0a1b2c3d",
"event_type": "created",
"details": { "matched_type": "research_then_web", "route_type": "chain" },
"created_at": "2025-01-15T10:00:00Z"
}
}
Phase 3: 天书派发观澜 → 更新 SQLite + 发布 Redis
更新 SQLite tasks:
{
"写入者": "天书 (TaskPool)",
"存储": "SQLite tasks 表 (UPDATE)",
"数据": {
"status": "dispatched",
"dispatch_to": "vanguard-s",
"dispatched_at": "2025-01-15T10:00:03Z",
"last_updated": "2025-01-15T10:00:03Z"
}
}
发布 Redis Stream tasks.dispatch:
{
"写入者": "天书 (MessageBus)",
"存储": "Redis Stream: tasks.dispatch",
"数据": {
"trace_id": "trace-0a1b2c3d",
"message_id": "1673489003-0",
"topic": "tasks.dispatch",
"timestamp": "2025-01-15T10:00:03Z",
"source": "origin-x-1",
"payload": {
"task_id": "task-0a1b2c3d",
"name": "调研 2024 年 AI 趋势并生成可视化报告网页",
"priority": "B",
"describe": "调研 2024 年 AI 行业技术趋势、市场规模、关键玩家",
"demand": "生成包含数据图表、来源引用的情报报告,格式为 Markdown",
"task_type": "intelligence",
"creator": "user-001",
"chain_context": null
}
}
}
写入 Redis Hash task:0a1b2c3d (运行时状态):
{
"写入者": "天书 (MessageBus)",
"存储": "Redis Hash: task:0a1b2c3d",
"数据": {
"status": "dispatched",
"dispatch_to": "vanguard-s",
"dispatched_at": "2025-01-15T10:00:03Z",
"chain_index": "0",
"ack_timeout_at": "2025-01-15T10:00:33Z"
}
}
Phase 4: 观澜接收 → 检索加载记忆 + ACK
观澜加载记忆(不持久化,进程内存):
{
"加载者": "观澜守护实例",
"来源 1": "Redis mem:global",
"内容 1": {
"user_preferences": { "language": "zh-CN", "report_style": "数据驱动" },
"project_context": { "project_name": "AI 行业研究 2024" }
},
"来源 2": "Redis mem:vanguard",
"检索逻辑": "根据任务意图 '2024 AI Trends' 检索相关历史报告摘要",
"内容 2": {
"preferred_sources": ["gartner.com", "idc.com", "mckinsey.com"],
"avoid_sources": ["unknown-blog.com"],
"past_reports_summary": "2023 年 AI 报告侧重于大模型参数量,2024 年转向 Agent 应用"
}
}
观澜 ACK → 发布 Redis tasks.ack + 任务分解:
{
"写入者": "观澜守护实例",
"存储 1": "Redis Stream: tasks.ack",
"数据 1": {
"trace_id": "trace-0a1b2c3d",
"topic": "tasks.ack",
"timestamp": "2025-01-15T10:00:08Z",
"source": "vanguard-guard-1",
"payload": {
"task_id": "task-0a1b2c3d",
"guard_id": "vanguard-guard-1",
"ack": true,
"estimated_time": 300,
"subtasks": [
{ "id": "st-001", "describe": "搜索 2024 年 AI 技术趋势", "status": "dispatched" },
{ "id": "st-002", "describe": "抓取 Gartner/IDC 市场报告", "status": "pending" },
{ "id": "st-003", "describe": "汇总生成情报报告", "status": "pending" }
]
}
},
"存储 2": "Redis Hash: task:0a1b2c3d",
"数据 2": {
"status": "running",
"acked_at": "2025-01-15T10:00:08Z",
"subtasks": "[{\"id\":\"st-001\",\"status\":\"dispatched\"},{\"id\":\"st-002\",\"status\":\"pending\"},{\"id\":\"st-003\",\"status\":\"pending\"}]"
},
"存储 3": "SQLite tasks 表 (UPDATE)",
"数据 3": {
"status": "running",
"acked_at": "2025-01-15T10:00:08Z",
"started_at": "2025-01-15T10:00:08Z"
}
}
Phase 5: 观澜派发子实例 → DAG 依赖编排 + 异步并发控制
观澜将任务拆解为 DAG,按依赖关系动态派发:
DAG 结构:
st-001: 搜索 AI 技术趋势 → 无依赖 → 立即派发
st-002: 抓取 Gartner/IDC 报告 → 无依赖 → 立即派发
st-003: 汇总生成情报报告 → 依赖 [st-001, st-002] → 等前两个完成后派发
发布 Redis Stream tasks.subtask(3 条消息,含依赖声明):
{
"写入者": "观澜守护实例",
"存储": "Redis Stream: tasks.subtask",
"数据": {
"trace_id": "trace-0a1b2c3d",
"topic": "tasks.subtask",
"timestamp": "2025-01-15T10:00:10Z",
"source": "vanguard-guard-1",
"payload": {
"subtask_id": "st-001",
"parent_task_id": "task-0a1b2c3d",
"action": "dispatch",
"describe": "搜索 2024 年 AI 技术趋势",
"demand": "搜索 2024 年 AI 关键技术突破:大模型、Agent、多模态",
"depends_on": [],
"timeout": 180
}
}
}
子实例加载技能(不持久化):
{
"加载者": "观澜子实例 st-001",
"来源": "agents/vanguard-s/skills/web_search/SKILL.md",
"内容": {
"skill_name": "web_search",
"instructions": "使用 Tavily/DuckDuckGo 搜索,返回 Top10 结果,标注来源 URL",
"tools": ["web_search", "web_fetch"]
}
}
Phase 6: 子实例执行 + 进度推送
子实例 st-001 执行完成 → 推送进度:
{
"写入者": "观澜子实例 st-001",
"存储 1": "Redis Stream: tasks.progress",
"数据 1": {
"trace_id": "trace-0a1b2c3d",
"topic": "tasks.progress",
"timestamp": "2025-01-15T10:00:25Z",
"source": "vanguard-w-a1b2c3",
"payload": {
"task_id": "task-0a1b2c3d",
"subtask_id": "st-001",
"action": "complete",
"result_summary": "找到 23 条技术趋势,涵盖 GPT-4o、Claude 3.5、Gemini 2.0、Sora",
"raw_data": "[{\"title\":\"OpenAI GPT-4o\",\"url\":\"openai.com\",...},...]"
}
},
"存储 2": "Redis Hash: subtask:st-001",
"数据 2": {
"status": "completed",
"worker_id": "vanguard-w-a1b2c3",
"completed_at": "2025-01-15T10:00:25Z",
"result_summary": "找到 23 条技术趋势"
}
}
Phase 7: 观澜完成 → Summary 摘要生成 + 自评置信度
关键步骤:Summary 引擎生成结果级摘要 (Lazy Loading 基础)
观澜汇总 3 个子任务结果后(假设原始报告 5000 字):
{
"原始结果 (不入消息,存 FileBrowser)": {
"content": "## 核心结论\n2024 年 AI 行业呈现四大趋势...\n\n## 详细分析\n(5000 字详细内容)",
"sources": [
{ "url": "https://gartner.com/...", "channel_type": "authoritative", "title": "Gartner Top AI Trends" },
{ "url": "https://openai.com/...", "channel_type": "official", "title": "GPT-4o" }
]
},
"摘要引擎输出 (Summary, 用于链式传递)": {
"summary_type": "intelligence_report",
"content_summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化,AI 芯片市场突破 500 亿美元。",
"key_facts": [
"GPT-4o 支持实时语音/视觉/文本",
"AI 推理成本从$0.03/千 token 降至$0.003",
"AI Agent 框架从 200+ 收敛至 5 个主流"
],
"data_links": {
"full_report": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
"charts": ["https://filebrowser.example.com/outputs/task-0a1b2c3d/chart1.png"]
},
"source_count": 5,
"source_credibility": 0.88
}
}
观澜发布 tasks.complete (只带摘要,不带原文!):
{
"写入者": "观澜守护实例",
"存储": "Redis Stream: tasks.complete",
"数据": {
"trace_id": "trace-0a1b2c3d",
"topic": "tasks.complete",
"timestamp": "2025-01-15T10:01:30Z",
"source": "vanguard-guard-1",
"payload": {
"task_id": "task-0a1b2c3d",
"guard_id": "vanguard-guard-1",
"task_type": "intelligence",
"result_summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化,AI 芯片市场突破 500 亿美元。",
"key_facts": [
"GPT-4o 支持实时语音/视觉/文本",
"AI 推理成本从$0.03/千 token 降至$0.003",
"AI Agent 框架从 200+ 收敛至 5 个主流"
],
"deliverables": [
{
"name": "2024 年 AI 趋势情报报告.md",
"delivery_type": "text",
"size_bytes": 18500,
"preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
"download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/report.md"
}
],
"confidence": 0.86,
"confidence_breakdown": {
"data_sufficiency": 0.90,
"source_credibility": 0.88,
"structure_completeness": 0.85,
"timeliness": 0.82
},
"completed_at": "2025-01-15T10:01:30Z"
}
}
}
Phase 8: 天书验收评估 → AcceptanceMiddleware
天书消费 complete → 评估 → 写入 SQLite:
{
"写入者": "天书 (AcceptanceMiddleware)",
"存储": "SQLite tasks 表 (UPDATE)",
"数据": {
"status": "completed",
"result_summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化...",
"confidence": 0.86,
"confidence_breakdown": "{\"data_sufficiency\":0.90,\"source_credibility\":0.88,...}",
"quality_score": 90,
"quality_breakdown": "{\"data_sufficiency\":0.90,\"source_credibility\":0.88,...}",
"deliverables": "[{\"name\":\"2024 年 AI 趋势情报报告.md\",\"preview_url\":\"...\"}]",
"completed_at": "2025-01-15T10:01:30Z",
"metadata": {
"route_type": "chain",
"chain_steps_completed": "1/2",
"current_chain_index": 1,
"chain_next": { "agent": "ironclad", "task_type": "web_development" }
}
}
}
Phase 8: 关键节点汇报与用户确认 (HITL Checkpoint)
核心逻辑: 链式任务的关键节点(如情报转开发),Origin-X 必须暂停,向用户汇报“优劣”(置信度/摘要),获得确认后才执行下一步。防止盲目烧 Token。
Origin-X 生成《里程碑汇报》 (Milestone Report):
{
"发送者": "天书 (Origin-X Agent)",
"接收者": "用户 (User)",
"类型": "MILESTONE_REPORT",
"数据": {
"task_id": "task-0a1b2c3d",
"current_step": "情报采集 (观澜) 已完成",
"next_step": "网页开发 (执戈)",
"evaluation": {
// 【优劣汇报 - 置信度】
"confidence": 0.86,
"pros": ["数据来源权威 (Gartner/IDC)", "关键数据点充足"],
"cons": ["缺乏针对中小企业落地案例的实证数据"],
"recommendation": "数据基础扎实,建议进入网页开发阶段。"
},
"next_step_cost": {
"estimated_time": "3 分钟",
"estimated_token_cost": "$0.05"
},
"deliverables_preview": {
"report_url": "https://filebrowser.../report.md"
}
}
}
用户决策分支:
- APPROVE (确认): 用户点击继续 → 触发 Phase 9 (派发执戈)。
- MODIFY (修改): 用户输入"补充 2026 年预测" → Origin-X 重新派发观澜 (带修正指令)。
- STOP (终止): 用户取消 → 发布
tasks.interrupt,结束任务,保留已产出文件。
⚠️ 设计约束(不可违背):
- 超时策略: 用户 5 分钟未响应 → Origin-X 发送提醒,再 5 分钟无响应 → 自动 STOP(不允许自动 APPROVE)
- 链式确认频率: 链式任务每一步都需要 HITL,不允许跳过中间步骤直接到最后
- MODIFY 语义: Origin-X 必须理解用户自然语言修改意见,翻译为具体的修正指令(如"补充2026年预测"→ 更新观澜的 demand 字段),不允许直接把用户原话丢给下一个 Agent
- 低置信度自动拦截: 如果
confidence < 0.6,Origin-X 不得向用户请求确认,直接打回 Guardian 重做(节省用户注意力)- 已产出文件保留: 无论 HITL 结果如何(APPROVE/MODIFY/STOP),已生成的文件永久保留在 FileBrowser,不允许清理
Phase 9: 天书派发执戈 → 注入摘要 (Lazy Loading Context)
关键:执戈的 Prompt 只包含摘要,不包含 5000 字报告,按需读取
发布 Redis tasks.dispatch:
{
"写入者": "天书 (MessageBus)",
"存储": "Redis Stream: tasks.dispatch",
"数据": {
"trace_id": "trace-0a1b2c3d",
"topic": "tasks.dispatch",
"timestamp": "2025-01-15T10:01:35Z",
"source": "origin-x-1",
"payload": {
"task_id": "task-0a1b2c3d",
"name": "基于情报报告生成可视化网页",
"priority": "B",
"describe": "基于以下 AI 趋势摘要生成可视化 HTML 网页",
"demand": "生成响应式网页,包含:标题区、核心结论区、数据图表区、来源引用区",
"task_type": "web_development",
"chain_context": {
// 1. 用户的最终诉求(决定做什么 - 必须注入 Prompt)
"original_demand": "基于情报报告生成可视化网页,包含数据图表、来源引用",
// 2. 核心摘要(决定大方向 - 注入 Prompt)
"summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化,AI 芯片市场突破 500 亿美元。",
// 3. 关键数据点(直接支撑摘要 - 注入 Prompt)
"key_facts": ["GPT-4o 支持实时语音/视觉/文本", "AI 推理成本从$0.03 降至$0.003"],
// 4. 详细资料链接(决定细节 - 不注入 Prompt,Worker 按需 read_file 读取)
"data_links": {
"full_report": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
"chart_data": "https://filebrowser.example.com/outputs/task-0a1b2c3d/data.csv"
}
},
"creator": "user-001",
"dispatched_at": "2025-01-15T10:01:35Z"
}
}
}
Phase 10: 执戈执行 → 检索记忆 + 技能加载 + 分解子任务
核心逻辑:Lazy Loading (按需加载) 的子任务分解
执戈接任务时,不需要读取 5000 字的情报原文。它通过 chain_context 中的 summary + key_facts 就能理解大方向并拆解任务。
- 读取摘要:确定网页结构(标题、结论区、图表区)。
- 制定计划:决定需要哪些子任务(如:st-004 开发网页结构,st-005 生成 AI 概念图)。
- 按需读取:当具体绘制图表需要精确数据时,Worker 主动调用
read_file(data_links.full_report)读取原文细节,而不是在 Prompt 中等待。
执戈加载记忆(相似度检索):
{
"加载者": "执戈守护实例",
"来源 1": "Redis mem:global",
"内容 1": { "user_preferences": { "language": "zh-CN", "report_style": "数据驱动" } },
"来源 2": "Redis mem:ironclad",
"检索逻辑": "根据任务意图 'Web Development' 检索相关编码规范与服务器配置",
"内容 2": {
"coding_style": "优先使用 Vue3 + TailwindCSS",
"server_config": { "host": "192.168.1.100", "port": 8080 },
"past_issues": "上次部署时 CDN 缓存未刷新,需提醒用户"
}
}
执戈加载技能:
{
"技能匹配": { "task_type": "web_development" },
"加载来源": "agents/ironclad/skills/web_development/SKILL.md",
"加载内容": {
"skill_name": "web_development",
"instructions": "使用 HTML/CSS/JS + Chart.js 生成数据图表网页...",
"tools": ["read_file", "write_file", "bash"],
"external_apis": []
}
}
⚠️ 子实例执行防护(D18 + D20):
- 工具截断 (D18): 所有工具返回值经
truncate_tool_output()处理,超 8000 Token 强制截断 + 警告- 死循环熔断 (D20): DeerFlow
SubagentConfig.max_turns = 15(硬限制),Worker 陷入"改 Bug→报错→再改"循环时,15 轮后自动终止- 超时兜底 (D20):
SubagentConfig.timeout_seconds = 300(5 分钟),超时触发CancelledError+ 清理资源- 上下文折叠 (D24): DeerFlow
SummarizationMiddleware已内置,配置trigger: token_ratio=0.7+keep: last_3_turns,无需额外开发
Phase 11: 执戈完成任务 → 文件交付
执戈汇总结果 → 发布 tasks.complete:
{
"写入者": "执戈守护实例",
"存储": "Redis Stream: tasks.complete",
"数据": {
"trace_id": "trace-0a1b2c3d",
"topic": "tasks.complete",
"timestamp": "2025-01-15T10:06:30Z",
"source": "ironclad-guard-1",
"payload": {
"task_id": "task-0a1b2c3d",
"guard_id": "ironclad-guard-1",
"task_type": "web_development",
"result_summary": "已生成可视化网页,包含 3 个数据图表、5 个来源引用、响应式布局",
"deliverables": [
{
"name": "2024 年 AI 趋势情报报告.md",
"delivery_type": "text",
"preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
"download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/report.md"
},
{
"name": "AI 趋势可视化报告.html",
"delivery_type": "webpage",
"preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.html",
"download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/report.html"
},
{
"name": "AI 市场规模图表.png",
"delivery_type": "image",
"preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/chart.png",
"download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/chart.png"
}
],
"confidence": 0.90,
"quality_score": 95,
"quality_breakdown": {
"feature_completeness": 0.95,
"page_performance": 0.90,
"cross_browser_compat": 0.95,
"security": 1.0
},
"completed_at": "2025-01-15T10:06:30Z"
}
}
}
Phase 12: 天书最终验收 + 记忆沉淀 + 推送用户
⚠️ 设计约束(不可违背):
- confidence 计算: 必须基于来源渠道加权平均(official=0.80×2.0权重, authoritative=0.70×2.0, industry=0.60×1.0, social=0.40×1.0, anonymous=0.20×1.0),不允许 LLM 自评拍脑袋
- quality_score 计算: 必须按
task_type预定义的维度权重累加(如代码开发: 编译通过40%+测试30%+质量20%+安全10%),不允许单一维度打分- 来源优先级衰减: 如果结果中没有任何 official/authoritative 来源,最终 confidence × 0.8 衰减
- 天书验收决策:
confidence >= min_confidence AND quality_score >= 60才 PASS,两个条件必须同时满足- 验收不通过处理: 自动打回 Guardian 重做,最多重试 2 次,超过后标记 failed 并通知用户
- tasks.complete 消息格式: 必须包含
confidence+confidence_breakdown+quality_score+quality_breakdown+deliverables五个字段,缺一不可
天书更新 SQLite:
{
"写入者": "天书",
"存储": "SQLite tasks 表",
"数据": {
"status": "completed",
"result_summary": "任务完成:情报采集 + 网页生成,质量评分 95/100",
"confidence": 0.90,
"quality_score": 95,
"deliverables": "[{\"name\":\"AI 趋势可视化报告.html\",\"preview_url\":\"https://filebrowser.example.com/outputs/task-0a1b2c3d/report.html\"},...]",
"completed_at": "2025-01-15T10:06:30Z",
"metadata": {
"route_type": "chain",
"chain_steps_completed": "2/2",
"total_duration_seconds": 390
}
}
}
异步记忆沉淀 (ConsolidateMemory 逻辑说明):
这是一个后台任务,由天书在验收通过后触发,负责将本次任务的“新知”提取入库:
- 事实提取: 从观澜的报告摘要中提取新的事实(如"AI 芯片市场达 500 亿")存入
mem:global。 - 偏好修正: 如果用户在执戈生成的代码上做了修改(如“把 Tailwind 换成 Bootstrap"),天书捕获到反馈后,更新
mem:ironclad中的coding_style。 - 错误复盘: 如果某子任务失败(如 comfyUI 超时),记录 "avoid_comfyui_heavy_models" 到
mem:ironclad的past_issues中,防止下次踩坑。 - 去重合并: 如果新旧记忆冲突(如旧记录说 server_ip 是 192.168.1.50,新任务是 192.168.1.100),以最新任务的记录为准进行覆盖。
{
"执行者": "天书 (后台 ConsolidateMemory)",
"触发时机": "任务验收通过后",
"输入": {
"task_summary": "调研 2024 年 AI 趋势并生成网页",
"result": "观澜置信度 0.86, 执戈质量 95/100",
"user_feedback": null
},
"输出 1": "Redis mem:global",
"数据 1": {
"user_preferences": { "language": "zh-CN", "report_style": "数据驱动,包含图表" },
"project_progress": { "AI 行业研究 2024": "已完成" }
},
"输出 2": "Redis mem:vanguard",
"数据 2": {
"successful_sources": ["gartner.com", "openai.com"],
"failed_sources": []
},
"输出 3": "Redis mem:ironclad",
"数据 3": {
"coding_style": "Vue3 + TailwindCSS + Chart.js",
"server_config": { "host": "192.168.1.100", "port": 8080 }
}
}
推送用户:
{
"发送者": "天书",
"接收者": "用户",
"数据": {
"task_id": "task-0a1b2c3d",
"status": "completed",
"summary": "任务完成!链式任务 2/2 全部通过验收,总耗时 6 分 30 秒。",
"chain_results": [
{ "agent": "vanguard-s", "task": "情报采集", "confidence": 0.86, "quality": 90 },
{ "agent": "ironclad", "task": "网页开发", "confidence": 0.90, "quality": 95 }
],
"deliverables": [
{ "name": "AI 趋势情报报告.md", "preview": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md" },
{ "name": "AI 趋势可视化报告.html", "preview": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.html" },
{ "name": "AI 市场规模图表.png", "preview": "https://filebrowser.example.com/outputs/task-0a1b2c3d/chart.png" }
]
}
}
Top comments (0)