DEV Community

g-wellsa
g-wellsa

Posted on

Swarm Agents 架构设计 | 多智能体系统完整方案

Swarm Agents — 架构设计文档

二次开发基础: DeerFlow 2.0 (Lead Agent + Middleware Chain + Subagents)
配置驱动: OpenClaw soul.md 模式 + Skills 三级加载
核心模式: Origin-X (天书) + 隔离执行 (观澜/执戈) + 异步消息总线 (Redis Streams)
设计哲学: 决策唯一中心、执行完全隔离、记忆按需检索、Token 极致优化


决策记录 (D01-D17)

# 决策项 选型 理由
D01 天书实现方式 DeerFlow 2.0 make_lead_agent 二次开发 复用成熟的中间件链与 LangGraph 基础设施
D02 中间件链 保留 12 层 + 新增 1 层 = 13 层 恢复 Title/Memory,新增 Acceptance,裁剪 Guardrail/LoopDetection
D03 任务派发通道 Redis Streams 异步消息总线 解耦 Origin-X 与 Guardian,支持消费者组与持久化
D04 同步/异步模型 异步执行 + 关键路径 ACK 确认 派发不阻塞,但接收与结果必须强制确认
D05 执行模型 Origin-X (调度) → Guardian (常驻分解) → Worker (按需执行) 兼顾常驻响应与弹性伸缩
D06 跨 Agent 通信 严禁直连,必须经 Origin-X 路由中转 避免网状调用导致上下文污染与管理爆炸
D07 TaskPool 存储 SQLite (主表) + Redis Hash (子任务/运行时状态) 主数据持久化查询,临时状态高性能读写
D08 监控独立部署 MonitorFlow 独立进程 Origin-X 挂掉时仍可检测心跳与发送中断指令
D09 配置格式 config.yaml (主) + agents/{id}/config.local.yaml (从) + soul.md 遵循 DeerFlow 官方规范,主从分层
D10 置信度/评估 Guardian 自评 + Origin-X 复核 + 来源渠道优先级加成 观澜按渠道权重自评,天书用统一标准验收
D11 质量评分 task_type 独立定义维度与权重 情报/代码/图片/音视频/建站各有验收标准
D12 记忆架构 Global (天书) / Domain (观澜) / Skill (执戈) / Ephemeral (子实例) 业务从属但记忆独立,防止精神分裂与 Token 浪费
D13 结果交付 FileBrowser 目录映射公网 URL + 多类型 MIME 预览 用户不登后台,直接点击超链接预览/下载
D14 子任务监控 三级中断:消息层 → asyncio.Task.cancel() → 沙箱进程 kill 随时可停,返回中间结果
D15 硬件自适应 三档检测 (低/中/高) + 运行时热重载 防止多任务卡死,动态调整并发与超时
D16 执戈技能接口 6 大技能统一注册,按 task_type 快速切换 代码/图片/音视频/建站/数据/文档一套接口
D17 任务路由识别 正则/关键词匹配 task_types,支持直派/链式/并行 不是所有任务都需要观澜,按需路由
D18 工具防爆截断 工具返回值超 Token 阈值强制截断 + 警告 防止 read_file 读大文件撑爆上下文窗口
D19 子任务依赖编排 Guardian 内 DAG 解析,depends_on 声明依赖 线性链不够用时,支持网状依赖并发执行
D20 死循环熔断 Subagent max_turns=15 硬限制 + timeout_seconds 兜底 Worker 陷入"改 Bug→报错→再改"循环时物理掐断

一、系统总览

1.1 架构全景

┌─────────────────────────────────────────────────────────────────────────────┐
│                              Swarm Agents v3.5 架构                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌──────────┐     ┌───────────────────────────────────────────────────┐    │
│   │  用户     │◄───►│              天书 (Origin-X Agent)                    │    │
│   └──────────┘     │  • 13 层中间件链  • 任务路由识别 (D17)              │    │
│                    │  • FactQuery 防幻觉 • 验收评估 (D10/D11)           │    │
│                    │  • 全局记忆 (Global Memory) • 异步消息总线客户端    │    │
│                    └───────────────────────┬───────────────────────────┘    │
│                                           │ Redis Streams (Pub/Sub)        │
│                    ┌──────────────────────┼───────────────────────────┐     │
│                    ▼                      ▼                           ▼     │
│          ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐  │
│          │ 观澜 Guardian     │ │ 执戈 Guardian     │ │ 扩展 Guardian     │  │
│          │ (情报常驻)        │ │ (多技能常驻)      │ │ (动态插拔)        │  │
│          │                   │ │                   │ │                   │  │
│          │ • 领域记忆加载    │ │ • 技能记忆加载    │ │ • 独立记忆/配置   │  │
│          │ • 任务分解        │ │ • 技能路由切换    │ │ • 任务分解        │  │
│          │ • 异步并发控制    │ │ • 外部 API 调度   │ │ • 并发控制        │  │
│          │   (Semaphore)     │ │   (comfyUI/..)    │ │                   │  │
│          │ • 结果摘要生成    │ │ • 摘要/链接返回   │ │ • 摘要返回        │  │
│          └────────┬──────────┘ └────────┬──────────┘ └────────┬──────────┘  │
│                   │                     │                     │              │
│          ┌────────┴──────────┐ ┌────────┴──────────┐ ┌────────┴──────────┐  │
│          │ 观澜 Worker 池    │ │ 执戈 Worker 池    │ │ 扩展 Worker 池    │  │
│          │ (无状态, 用完销毁) │ │ (无状态, 用完销毁) │ │ (无状态, 用完销毁) │  │
│          │ • 独立 Prompt     │ │ • 按需加载 Skill  │ │ • 独立上下文      │  │
│          │ • 临时记忆        │ │ • 外部 API 调用   │ │ • 临时记忆        │  │
│          └───────────────────┘ └───────────────────┘ └───────────────────┘  │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │ 侧边支撑组件 (Sidecars)                                               │   │
│  │ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │   │
│  │ │ Memory Engine│ │ Summary Engine│ │ TaskPool     │ │ MonitorFlow  │ │   │
│  │ │ (检索/合并)  │ │ (3级摘要)     │ │ (SQLite+Redis)│ │ (独立进程)   │ │   │
│  │ │ Redis: mem:* │ │ 上下文/结果   │ │ 硬件自适应   │ │ 心跳/超时/   │ │   │
│  │ └─────────────┘ └──────────────┘ └───────────────┘ │ 中断/降级    │ │   │
│  │ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │   │
│  │ │ FileBrowser │ │ Ext. APIs    │ │ Agent Registry│ │ RAG Engine   │ │   │
│  │ │ (公网映射)  │ │ (comfyUI等)  │ │ (热重载)      │ │ (v2 入口预留)│ │   │
│  │ └─────────────┘ └──────────────┘ └───────────────┘ └──────────────┘ │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

1.2 核心设计原则

  1. Origin-X 唯一调度: 天书是决策中心、记忆写入中心、验收中心。子 Agent 严禁直连。
  2. 隔离执行: Guardian 常驻分解,Worker 无状态执行。Worker 不共享上下文,不持长期记忆。
  3. 按需检索 (Retrieval): 记忆绝不全量加载。基于任务意图做相似度检索,只注入 Top-K 相关片段。
  4. 摘要驱动 (Summary-First): 跨 Agent 传递只传 Summary + Key Facts + 数据链接。Worker 需细节时主动 read_file 读取。
  5. 配置热插拔: 新增 Agent = 建目录 + 写 soul.md + 改 config.yaml。Agent Manager 自动启停。
  6. 硬件自适应: 启动检测 CPU/内存分档,运行时动态调整并发/超时,基础流程绝不卡死。

二、核心模块

2.1 天书 (Origin-X Agent)

基于 DeerFlow make_lead_agent 改造,保留 13 层中间件,新增验收与事实查询层。

2.1.1 中间件链 (13 层)

ThreadDataUploadsSandboxDanglingToolCallSummarizationTodoTitleMemory(全局检索)ViewImageDeferredToolFilterSubagentLimitAcceptance(验收)Clarification(最后)

⚠️ 设计约束(不可违背):

  1. ClarificationMiddleware 必须在最后,用于拦截澄清请求并中断执行流
  2. AcceptanceMiddlewareSubagentLimit 之后、Clarification 之前,只验收子 Agent 返回结果,不拦截用户输入
  3. MemoryMiddleware 执行异步检索(不阻塞主流程),Top-3 相关片段注入 SystemMessage
  4. TitleMiddleware 自动生成对话标题,用户必须能区分不同会话(不可裁剪)
  5. 裁剪掉的 GuardrailMiddlewareLoopDetectionMiddleware 在 v1 阶段不恢复

2.1.2 事实查询保障 (FactQueryMiddleware)

防止 LLM 编造进度/配置。用户提问时自动拦截,查询 SQLite/Redis 注入真实数据。

class FactQueryMiddleware(Middleware):
    INTENT_MAP = {
        "task_status": ["进度", "到哪了", "状态"],
        "agent_status": ["观澜", "执戈", "在线"],
        "config": ["配置", "模型", "工具"],
        "deliverables": ["文件", "报告", "链接"]
    }
    # 检测到意图 → 查 SQLite/Redis → 注入 SystemMessage: "【事实数据】... 必须基于此回答"
Enter fullscreen mode Exit fullscreen mode

2.1.3 任务类型识别与路由 (D17)

天书接收请求后,先匹配 task_types 定义,决定路由策略:

  • simple_qa → 天书直接回答
  • image_process / code_dev / web_dev直派执戈 (跳过观澜)
  • intelligence → 派观澜
  • research_then_web / doc / data链式: 观澜 → (验收) → 执戈
  • parallel → 并行派发

⚠️ 设计约束(不可违背):

  1. 匹配算法必须支持三级优先级:精确短语 > 正则 > 关键词,最长匹配优先
  2. 多个任务类型同时匹配时,取分数最高者,不允许随机选或全部派发
  3. 所有类型都匹配不上时,fallback 到天书 LLM 自主决策,不允许拒绝服务
  4. 链式路由的 input_from: previous 必须通过 chain_context JSON 传递,禁止让下一个 Agent 去翻数据库
  5. 关键词匹配区分任务边界:"裁剪图片" ≠ "分析图片数据",不能仅用 in 判断

2.2 异步消息总线 (Redis Streams)

解耦 Origin-X 与 Guardian,支持消费组、持久化、死信重试。

Topic 定义:
| Stream | 生产者 | 消费者组 | 用途 |
|--------|--------|----------|------|
| tasks.dispatch | 天书 | guardians | 派发主任务/链式步骤 |
| tasks.ack | Guardian | origin-x | 接收确认 + 预估时间 |
| tasks.progress | Guardian/Worker | origin-x | 实时进度推送 |
| tasks.complete | Guardian | origin-x | 任务完成 + 摘要 + 链接 |
| tasks.failed | Guardian | origin-x | 失败归档 |
| tasks.subtask | Guardian | subtask-workers | 子任务派发/结果 |
| tasks.interrupt | 天书/用户 | guardians | 强制中断信号 |
| agents.heartbeat | 所有进程 | monitor | 心跳保活 |

2.3 记忆与摘要引擎 (Memory & Summary)

2.3.1 三层记忆隔离 (D12)

层级 Key 内容 权限 更新时机
Global mem:global 用户偏好、项目目标、跨 Agent 历史 天书读写,其他只读 任务验收后合并
Domain mem:vanguard 权威数据源、历史报告摘要、避坑指南 观澜读写,天书监督 观澜任务完成后
Skill mem:ironclad 代码规范、服务器配置、API 密钥/环境 执戈读写,天书监督 执戈任务完成后
Ephemeral 进程内存 当前 Worker 的中间变量、搜索草稿 Worker 私有 任务结束即销毁

2.3.2 检索加载 (Retrieval-Augmented Loading)

Agent 启动/接单时,绝不全量加载记忆

核心逻辑说明
记忆加载分为 意图提取 → 相似度检索 → Top-K 注入 三步,确保 Prompt 只包含相关片段。

# ─────────────────────────────────────────────────────────────
# Memory Engine — 检索加载实现
# ─────────────────────────────────────────────────────────────

class MemoryEngine:
    """记忆引擎 — 检索加载 + 沉淀合并"""

    def __init__(self, redis_client, vector_store=None):
        self.redis = redis_client
        self.vector_store = vector_store  # 可选:向量检索 (如 FAISS/RedisVL)
        self.use_vector = vector_store is not None

    async def retrieve(self, task_type: str, keywords: list[str],
                       memory_scope: str = "all", top_k: int = 3) -> dict:
        """
        根据任务意图检索记忆

        参数:
            task_type: 任务类型 (intelligence / web_development / ...)
            keywords: 关键词列表 ["Vue3", "2024 AI", "响应式"]
            memory_scope: 检索范围 "global" | "vanguard" | "ironclad" | "all"
            top_k: 返回最相关的 Top-K 条

        返回:
            结构化记忆片段,直接注入 Prompt
        """
        results = {}

        # ── 步骤 1: 确定检索范围 ──
        scopes = self._resolve_scope(memory_scope)

        for scope in scopes:
            mem_key = f"mem:{scope}"
            raw_mem = await self.redis.json().get(mem_key)
            if not raw_mem:
                continue

            # ── 步骤 2: 相似度检索 ──
            if self.use_vector:
                # 向量检索:将 keywords 合并为查询向量,搜索最相似的记忆片段
                query_vec = self.vector_store.encode(" ".join(keywords))
                scored = self.vector_store.search(query_vec, scope=scope, top_k=top_k)
                results[scope] = scored
            else:
                # 轻量模式:基于关键词的标签/字段匹配 + 相关性打分
                scored = self._keyword_match(raw_mem, keywords, top_k)
                if scored:
                    results[scope] = scored

        # ── 步骤 3: 全局去重 + 排序 ──
        return self._dedup_and_rank(results, top_k)

    def _keyword_match(self, raw_mem: dict, keywords: list[str], top_k: int) -> list:
        """
        关键词匹配 (轻量模式,无需向量库)

        策略:
        1. 对记忆中的每条记录,计算与关键词的 overlap 分数
        2. 字段权重:tags (3x) > key (2x) > value (1x)
        3. 返回 Top-K
        """
        scored = []
        for key, value in raw_mem.items():
            score = 0
            text_blob = f"{key} {self._flatten(value)}".lower()

            # 检查 tags 字段 (最高权重)
            tags = raw_mem.get("tags", [])
            if isinstance(tags, list):
                for kw in keywords:
                    if any(kw.lower() in t.lower() for t in tags):
                        score += 3

            # 关键词匹配
            for kw in keywords:
                if kw.lower() in text_blob:
                    score += 1

            if score > 0:
                scored.append({"key": key, "value": value, "score": score})

        # 按分数降序,返回 Top-K
        scored.sort(key=lambda x: x["score"], reverse=True)
        return scored[:top_k]

    def _resolve_scope(self, memory_scope: str) -> list[str]:
        """解析检索范围"""
        if memory_scope == "all":
            return ["global", "vanguard", "ironclad"]
        return [memory_scope]

    def _flatten(self, value) -> str:
        """将嵌套字典展平为字符串"""
        if isinstance(value, str):
            return value
        if isinstance(value, dict):
            return " ".join(str(v) for v in value.values())
        return str(value)

    def _dedup_and_rank(self, results: dict, top_k: int) -> dict:
        """全局去重 + 排序"""
        all_items = []
        for scope, items in results.items():
            for item in items:
                item["scope"] = scope
                all_items.append(item)

        # 按分数排序,取 Top-K
        all_items.sort(key=lambda x: x["score"], reverse=True)
        return {"memories": all_items[:top_k]}


# ─────────────────────────────────────────────────────────────
# 使用示例 — 执戈接任务时的记忆加载
# ─────────────────────────────────────────────────────────────

# 执戈守护实例收到任务: "基于情报报告生成可视化网页"
# → 提取意图关键词: ["web", "visualization", "chart", "responsive"]
# → 检索 mem:ironclad (技能记忆) + mem:global (全局记忆)

memories = await memory_engine.retrieve(
    task_type="web_development",
    keywords=["web", "visualization", "chart", "responsive"],
    memory_scope="all",
    top_k=3
)

# 返回:
# {
#   "memories": [
#     {
#       "scope": "ironclad",
#       "key": "coding_style",
#       "value": "优先使用 Vue3 + TailwindCSS + Chart.js",
#       "score": 5
#     },
#     {
#       "scope": "ironclad",
#       "key": "server_config",
#       "value": {"host": "192.168.1.100", "port": 8080},
#       "score": 2
#     },
#     {
#       "scope": "global",
#       "key": "user_preferences",
#       "value": {"language": "zh-CN", "report_style": "数据驱动"},
#       "score": 1
#     }
#   ]
# }

# → 注入执戈 Prompt:
# """
# 【历史记忆】
# - 编码风格: 优先使用 Vue3 + TailwindCSS + Chart.js
# - 服务器配置: 192.168.1.100:8080
# - 用户偏好: 中文界面,数据驱动风格
# """
Enter fullscreen mode Exit fullscreen mode

2.3.3 三级摘要架构 (D-Summary)

  1. 上下文摘要 (Context): 对话 Token > 80% 时触发滑动窗口压缩,保留最近 3 轮,历史折叠为 summary_context
  2. 结果摘要 (Result): Guardian 完成任务后,LLM 生成结构化摘要 JSON (Summary + Key Facts + Links)。链式传递只传此 JSON,不传原文
  3. 记忆摘要 (Memory): 异步 ConsolidateMemory 任务。从本次结果中提取新事实/偏好,合并到对应 Memory 层级。冲突时以最新为准。

2.3.3.1 ConsolidateMemory — 记忆沉淀详细逻辑

核心问题:记忆不是堆砌,是"按需索取"且"持续进化"的。每次任务完成后,必须从结果中提炼有价值的信息,写入对应记忆层。

沉淀时机:天书验收通过后,后台异步触发(不阻塞用户交付)。

沉淀流程

# ─────────────────────────────────────────────────────────────
# ConsolidateMemory — 记忆沉淀实现
# ─────────────────────────────────────────────────────────────

class MemoryConsolidator:
    """记忆沉淀器 — 从任务结果中提取新知,合并入记忆库"""

    def __init__(self, memory_engine: MemoryEngine, redis_client):
        self.memory = memory_engine
        self.redis = redis_client

    async def consolidate(self, task_result: dict, user_feedback: str | None = None):
        """
        记忆沉淀主流程

        输入:
            task_result: 天书验收后的完整任务结果
            user_feedback: 用户反馈 (可选,如"把 Tailwind 换成 Bootstrap")
        """
        task_type = task_result["task_type"]
        agent_id = task_result["guard_id"]
        result_summary = task_result["result_summary"]
        confidence = task_result.get("confidence", 0)
        quality_score = task_result.get("quality_score", 0)

        # ── 步骤 1: 事实提取 (Fact Extraction) ──
        # 从情报报告中提取新的事实,写入 mem:global
        if task_type == "intelligence":
            await self._extract_facts(result_summary, task_result)

        # ── 步骤 2: 偏好修正 (Preference Update) ──
        # 如果用户修改了产出物,更新技能记忆
        if user_feedback:
            await self._update_preferences(user_feedback, agent_id)

        # ── 步骤 3: 错误复盘 (Error Retrospective) ──
        # 如果有子任务失败,记录避坑指南
        if task_result.get("errors"):
            await self._record_lessons_learned(task_result, agent_id)

        # ── 步骤 4: 去重合并 (Dedup & Merge) ──
        # 新旧冲突时以最新为准
        await self._dedup_and_merge(agent_id, task_result)

    async def _extract_facts(self, summary: str, result: dict):
        """
        事实提取 — 从情报摘要中提取新事实写入 mem:global

        示例:
            摘要: "AI 芯片市场突破 500 亿美元"
            → 写入 mem:global["market_facts"]["ai_chips_2024"] = "500亿美元"
        """
        # 使用 LLM 提取结构化事实
        facts = await self._llm_extract_facts(summary)

        current_global = await self.redis.json().get("mem:global") or {}
        current_global.setdefault("market_facts", {})

        for fact_key, fact_value in facts.items():
            current_global["market_facts"][fact_key] = fact_value

        await self.redis.json().set("mem:global", "$", current_global)

    async def _update_preferences(self, feedback: str, agent_id: str):
        """
        偏好修正 — 根据用户反馈更新技能记忆

        示例:
            用户反馈: "把 Tailwind 换成 Bootstrap"
            → 更新 mem:ironclad["coding_style"] = "优先使用 Vue3 + Bootstrap + Chart.js"
        """
        mem_key = f"mem:{agent_id}"
        current_mem = await self.redis.json().get(mem_key) or {}

        # 使用 LLM 解析用户反馈,识别要修改的字段
        updates = await self._llm_parse_feedback(feedback, current_mem)

        for key, new_value in updates.items():
            current_mem[key] = new_value
            current_mem.setdefault("tags", []).append(f"updated:{key}")

        await self.redis.json().set(mem_key, "$", current_mem)

    async def _record_lessons_learned(self, result: dict, agent_id: str):
        """
        错误复盘 — 记录失败经验到技能记忆

        示例:
            错误: "comfyUI 生成超时,模型 SDXL 过大"
            → 写入 mem:ironclad["past_issues"]["comfyui_sdxl_timeout"] = {
                "issue": "comfyUI SDXL 模型生成超时",
                "avoid": "使用 SD-Turbo 或设置 timeout  =600",
                "date": "2025-01-15"
              }
        """
        mem_key = f"mem:{agent_id}"
        current_mem = await self.redis.json().get(mem_key) or {}
        current_mem.setdefault("past_issues", {})

        for error in result["errors"]:
            issue_key = self._normalize_error_key(error)
            current_mem["past_issues"][issue_key] = {
                "issue": error["message"],
                "avoid": error.get("suggestion", "避免使用相同配置"),
                "date": result.get("completed_at", "unknown"),
            }

        await self.redis.json().set(mem_key, "$", current_mem)

    async def _dedup_and_merge(self, agent_id: str, result: dict):
        """
        去重合并 — 新旧记忆冲突时以最新为准

        策略:
        1. 检查 mem 中是否有 tags 标记为 "deprecated" 的条目
        2. 如果同一 key 有新值,旧值自动过期
        3. 定期清理 (每 10 次任务后触发)
        """
        mem_key = f"mem:{agent_id}"
        current_mem = await self.redis.json().get(mem_key) or {}

        # 标记过期 tags
        tags = current_mem.get("tags", [])
        deprecated = [t for t in tags if t.startswith("deprecated:")]
        for tag in deprecated:
            old_key = tag.replace("deprecated:", "")
            if old_key in current_mem:
                del current_mem[old_key]

        # 清理过期 tags
        current_mem["tags"] = [t for t in tags if not t.startswith("deprecated:")]

        await self.redis.json().set(mem_key, "$", current_mem)

    async def _llm_extract_facts(self, summary: str) -> dict:
        """调用 LLM 从摘要中提取结构化事实"""
        # Prompt: "从以下情报摘要中提取 3-5 个关键事实,返回 JSON: {fact_key: fact_value}"
        ...

    async def _llm_parse_feedback(self, feedback: str, current_mem: dict) -> dict:
        """调用 LLM 解析用户反馈,识别要修改的记忆字段"""
        # Prompt: "用户说'{feedback}',当前记忆是{current_mem}。请识别要修改的字段和新值"
        ...

    def _normalize_error_key(self, error: dict) -> str:
        """将错误信息规范化为记忆键"""
        msg = error["message"][:50].lower().replace(" ", "_").replace("/", "_")
        return f"issue_{msg}"


# ─────────────────────────────────────────────────────────────
# 使用示例 — 天书验收后触发记忆沉淀
# ─────────────────────────────────────────────────────────────

# 场景 1: 观澜完成情报任务 → 事实提取
task_result = {
    "task_type": "intelligence",
    "guard_id": "vanguard-s",
    "result_summary": "AI 芯片市场突破 500 亿美元,多模态模型成本下降 90%",
    "confidence": 0.86,
}

consolidator = MemoryConsolidator(memory_engine, redis_client)
await consolidator.consolidate(task_result)
# → mem:global["market_facts"]["ai_chip_market_2024"] = "500亿美元"
# → mem:global["market_facts"]["multimodal_cost_reduction"] = "下降90%"

# 场景 2: 用户反馈修改执戈产出 → 偏好修正
user_feedback = "下次生成的网页用深色主题,Tailwind 换成 Bootstrap"
await consolidator.consolidate(task_result, user_feedback=user_feedback)
# → mem:ironclad["coding_style"] = "Vue3 + Bootstrap + 深色主题"
# → mem:ironclad["ui_preference"] = "dark_mode"

# 场景 3: 子任务失败 → 错误复盘
task_result_with_error = {
    "task_type": "image_processing",
    "guard_id": "ironclad",
    "errors": [
        {"message": "comfyUI 生成超时,SDXL 模型加载过慢"}
    ],
}
await consolidator.consolidate(task_result_with_error)
# → mem:ironclad["past_issues"]["comfyui_sdxl_timeout"] = {
#     "issue": "comfyUI 生成超时,SDXL 模型加载过慢",
#     "avoid": "使用 SD-Turbo 或设置 timeout=600",
#     "date": "2025-01-15"
#   }
Enter fullscreen mode Exit fullscreen mode

记忆沉淀数据流示意

任务完成 → 天书验收通过
              │
              ▼
     ┌────────────────────┐
     │ ConsolidateMemory  │  (后台异步,不阻塞用户交付)
     └────────┬───────────┘
              │
      ┌───────┼────────┐
      ▼       ▼        ▼
  事实提取  偏好修正   错误复盘
      │       │        │
      ▼       ▼        ▼
  mem:global  mem:{agent}  mem:{agent}
  (新知)     (编码风格)   (避坑指南)
      │       │        │
      └───────┴────────┘
              │
              ▼
     ┌────────────────────┐
     │  去重合并           │  (冲突时以最新为准)
     │  过期标记清理        │
     └────────────────────┘
Enter fullscreen mode Exit fullscreen mode

2.3.4 链式按需加载 (Lazy Loading)

天书向执戈派发时,Payload 结构:

"chain_context": {
  "demand": "生成可视化网页",
  "summary": "2024 AI 趋势:成本降 90%,Agent 自主化...",
  "key_facts": ["GPT-4o 支持实时语音", "AI 芯片市场 500 亿"],
  "data_sources": { "full_report": "https://filebrowser.../report.md" }
}
Enter fullscreen mode Exit fullscreen mode

执戈生成 Prompt 时注入 summary + key_facts。执行中若需具体数据,主动调用 read_file(data_sources.full_report)

2.4 Guardian 与并发控制

2.4.1 Guardian 职责

常驻进程。负责:加载专属记忆 → 匹配技能 → 分解子任务 → asyncio.Semaphore 控制并发 → 监听 tasks.interrupt → 汇总生成摘要 → 发布 tasks.complete

2.4.2 异步并发控制 (Semaphore)

废弃 ThreadPoolExecutor,使用 asyncio.Semaphore(max_workers)

class GuardianConcurrency:
    def __init__(self, limit): self.sem = asyncio.Semaphore(limit)
    async def run(self, subtask_id, coro):
        async with self.sem:
            task = asyncio.create_task(coro)
            self.active[subtask_id] = task
            try: return await task
            finally: del self.active[subtask_id]

    async def interrupt(self, subtask_id=None):
        targets = [self.active[subtask_id]] if subtask_id else self.active.values()
        for t in targets:
            if not t.done(): t.cancel()
Enter fullscreen mode Exit fullscreen mode

⚠️ 设计约束(不可违背):

  1. 防递归深度: max_subagent_depth = 2(天书→守护→子实例,禁止更深),子实例严禁派发子子任务
  2. 工具剥离: 创建子实例时,必须从工具集中移除 task/delegate/dispatch 类工具,子实例只能使用执行类工具
  3. 中断安全: interrupt() 必须返回中间结果(已完成的部分),不允许丢弃已执行的工作
  4. 并发上限: Guardian 的 max_workers 受硬件自适应参数控制,不允许写死固定值
  5. 子实例超时: 每个子任务必须有独立 timeout,超时自动 cancel,不阻塞 Guardian 主循环

2.4.3 执戈技能接口 (D16)

agents/ironclad/skills/ 下按 task_type 组织。启动时扫描 L1 元数据,接单时按需加载 L2 指令与工具集。支持外部 API (comfyUI/seedance) 声明与自动初始化。

2.4.4 工具防爆截断 (D18)

痛点: read_file 读一个 10MB 文件 → 30k+ Token 直接吃掉上下文窗口 → Agent 崩溃或丢失历史。

核心机制:所有工具执行完毕后,强制通过 Token 截断器。

# ─────────────────────────────────────────────────────────────
# Tool Output Truncation — 工具返回值物理截断
# ─────────────────────────────────────────────────────────────

import tiktoken

# 默认阈值 (可配置,根据模型上下文窗口动态调整)
DEFAULT_TOOL_MAX_TOKENS = 8000
ENCODING = tiktoken.get_encoding("cl100k_base")

def truncate_tool_output(tool_name: str, output: str, max_tokens: int = DEFAULT_TOOL_MAX_TOKENS) -> str:
    """
    截断工具返回值,防止撑爆上下文窗口

    策略:
    1. 计算输出 Token 数
    2. 如果超限 → 截断前 N 个 Token + 尾部保留 500 Token
    3. 注入截断警告
    """
    tokens = ENCODING.encode(output)

    if len(tokens) <= max_tokens:
        return output

    # 截断: 头部 75% + 尾部 25%,中间用警告替代
    head_keep = int(max_tokens * 0.75)
    tail_keep = max_tokens - head_keep

    head = ENCODING.decode(tokens[:head_keep])
    tail = ENCODING.decode(tokens[-tail_keep:]) if tail_keep > 0 else ""
    truncated_tokens = len(tokens) - max_tokens

    warning = f"\n\n⚠️ [系统拦截] 工具 '{tool_name}' 输出超过 {max_tokens} Token 限制 (实际 {len(tokens)} Token)。\n"
    warning += f"已截断 {truncated_tokens} Token。请使用 ask_document / read_file_chunk 分块读取,或缩小操作范围。"

    return head + warning + tail


# 在 Guardian/Worker 执行工具后统一调用:
# result = some_tool.execute(...)
# result = truncate_tool_output("read_file", result)
Enter fullscreen mode Exit fullscreen mode

特殊工具处理

工具 截断策略 替代方案
read_file (大文件) > 50KB 文件禁止全量读取 read_file_chunk(path, start_line, end_line) 分块读取
bash (长输出) 只保留最后 200 行 bash 命令自带 `
{% raw %}web_fetch (长网页) 只提取正文 (readability) 内置 readability 过滤,去除广告/导航
代码执行输出 保留 stdout 前 500 行 + stderr 全部 stderr 全保留因为错误信息不能丢

⚠️ 设计约束(不可违背):

  1. 截断是硬截断,不由 LLM 决定是否截断
  2. 截断警告必须出现在工具返回值中(不是日志),让 LLM 明确知道自己拿到的不是完整数据
  3. read_file_chunk 每次最多读取 100 行,防止 LLM 通过循环调用分块读取来绕过截断
  4. stderr (错误输出)永远不截断,因为错误信息是调试的关键

2.4.5 子任务依赖编排 (D19)

痛点: 线性链 (观澜→执戈) 不够用。全栈开发需要"建表→后端→前端→部署"这种网状依赖。

核心机制:Guardian 将任务拆解为 DAG (有向无环图),按依赖关系动态派发。

// 子任务声明式依赖 (tasks.subtask dispatch payload)
{
  "subtask_id": "st-003",
  "parent_task_id": "task-0a1b2c3d",
  "action": "dispatch",
  "describe": "编写 HTML/Tailwind 代码",
  "demand": "基于 UI 结构和配图生成落地页代码",
  "depends_on": ["st-001", "st-002"],  // 声明依赖
  "timeout": 300
}
Enter fullscreen mode Exit fullscreen mode

DAG 调度逻辑

# ─────────────────────────────────────────────────────────────
# DAG Subtask Scheduler — Guardian 内存维护依赖图
# ─────────────────────────────────────────────────────────────

from collections import defaultdict
import asyncio

class DAGScheduler:
    """守护实例内的子任务 DAG 调度器"""

    def __init__(self, subtasks: list[dict], semaphore: asyncio.Semaphore):
        self.semaphore = semaphore
        self.tasks = {st["subtask_id"]: st for st in subtasks}
        self.results: dict[str, dict] = {}
        self.depends_on: dict[str, list[str]] = {
            st["subtask_id"]: st.get("depends_on", [])
            for st in subtasks
        }
        self.completed: set[str] = set()
        self.running: set[str] = set()
        self._events: dict[str, asyncio.Event] = {
            st["subtask_id"]: asyncio.Event() for st in subtasks
        }

    async def run_all(self):
        """执行所有子任务,按依赖关系动态解锁"""
        # 第一轮: 派发所有无依赖的任务
        for task_id, deps in self.depends_on.items():
            if not deps:
                self._dispatch(task_id)

        # 等待所有任务完成
        await asyncio.gather(*[
            self._events[tid].wait()
            for tid in self.tasks
        ])

    def _dispatch(self, task_id: str):
        """派发单个子任务 (受 Semaphore 控制并发)"""
        self.running.add(task_id)
        task = self.tasks[task_id]

        asyncio.create_task(self._run_with_deps(task_id, task))

    async def _run_with_deps(self, task_id: str, task: dict):
        """等待依赖完成后,执行子任务"""
        deps = self.depends_on.get(task_id, [])

        # 等待所有依赖完成
        for dep_id in deps:
            if dep_id not in self.completed:
                await self._events[dep_id].wait()

        # 依赖全部完成 → 通过 Semaphore 执行
        async with self.semaphore:
            result = await self._execute_task(task_id, task)

        # 标记完成
        self.completed.add(task_id)
        self.running.discard(task_id)
        self.results[task_id] = result
        self._events[task_id].set()

        # 解锁依赖此任务的后续任务
        self._unlock_dependents(task_id)

    def _unlock_dependents(self, completed_task_id: str):
        """检查哪些任务依赖当前完成的任务,如果所有依赖都满足则派发"""
        for task_id, deps in self.depends_on.items():
            if task_id in self.completed or task_id in self.running:
                continue
            # 检查是否所有依赖都已完成
            if all(dep in self.completed for dep in deps):
                self._dispatch(task_id)

    async def _execute_task(self, task_id: str, task: dict) -> dict:
        """实际执行子任务 (通过 Redis Streams 派发)"""
        # ... 通过 message_bus.publish("tasks.subtask", ...) 派发
        # ... 轮询等待结果
        # ... 返回结果
        ...
Enter fullscreen mode Exit fullscreen mode

DAG 示意

执戈 Guardian 收到任务: "生成电商落地页"

┌──────────┐  ┌──────────┐
│ st-01    │  │ st-02    │   ← 无依赖,同时派发
│ 生成UI结构│  │ 生成配图  │   ← Semaphore 控制并发
└────┬─────┘  └────┬─────┘
     │              │
     └──────┬───────┘
            ▼
     ┌──────────┐
     │ st-03    │      ← 依赖 st-01 + st-02
     │ 编写代码  │      ← 等前两个都完成后才派发
     └────┬─────┘
          ▼
     ┌──────────┐
     │ st-04    │      ← 依赖 st-03
     │ 部署上线  │
     └──────────┘

物理深度始终 = 2 (Guardian → Worker)
逻辑依赖无限层
Enter fullscreen mode Exit fullscreen mode

⚠️ 设计约束(不可违背):

  1. DAG 只在 Guardian 内存中维护,不持久化到 SQLite/Redis,降低复杂度
  2. 如果检测到循环依赖(A→B→A),Guardian 立即拒绝该任务图,返回错误给天书
  3. 任一子任务失败 → 所有依赖它的后续任务自动取消,返回中间结果给天书
  4. Guardian 生成的 DAG 最多 10 个节点,超过则拆分为多个独立任务

2.5 TaskPool 与硬件自适应 (D15)

2.5.1 统一字段模型

SQLite tasks 表结构精简,区分基础字段与类型专属 JSON。

CREATE TABLE tasks (
    id TEXT PRIMARY KEY, name TEXT, priority TEXT, describe TEXT, demand TEXT,
    task_type TEXT, creator TEXT, dispatch_to TEXT, status TEXT DEFAULT 'pending',
    result_summary TEXT, confidence REAL, quality_score REAL,
    confidence_breakdown TEXT, quality_breakdown TEXT,
    result_detail TEXT,  -- JSON: 按 task_type 定义不同 schema
    deliverables TEXT,   -- JSON: 统一文件列表 [{name, type, url, size}]
    error TEXT, created_at TEXT, dispatched_at TEXT, acked_at TEXT,
    started_at TEXT, completed_at TEXT, expired_at TEXT, last_updated TEXT,
    metadata TEXT DEFAULT '{}'
);
-- 注: 子任务状态全在 Redis Hash task:{id}:subtasks 中,SQLite 不存 subtasks 表
Enter fullscreen mode Exit fullscreen mode

2.5.2 硬件自适应与热重载

启动 psutil 检测分档 (Low/Mid/High)。运行时每 5 分钟检测 CPU/内存,若超载则降档 (心跳间隔×1.5, 最大并发减半),通过 ConfigHotReloader 广播新参数给 TaskPool/MonitorFlow。

⚠️ 设计约束(不可违背):

  1. 分档阈值: 低配 CPU≤4核 OR 内存≤8GB;中配 CPU≤8核 OR 内存≤16GB;高配 >8核 AND >16GB
  2. 三档参数基准: 低配 max_concurrent=3, heartbeat=90s, ack_timeout=60s;中配 10, 60s, 30s;高配 30, 30s, 15s
  3. 降档条件: CPU>90% AND 内存>85%,连续 2 次检测才降,避免抖动
  4. 升档条件: CPU<50% AND 内存<60%,连续 3 次检测才升,当前不是最高档
  5. 降档上限: 最低降到中配,不允许降到 1 个并发(系统必须保持基本可用性)
  6. TaskPool/MonitorFlow 参数联动: 两者复用同一 MonitorConfig 对象,不允许各自维护独立参数

2.6 外部 API 与容错交付

2.6.1 外部 API 容错链

执戈调用 comfyUI/seedance 等外部服务时,必须带超时与降级:

async def generate_with_fallback(prompt, primary_api, backup_api, timeout=300):
    try: return await asyncio.wait_for(primary_api(prompt), timeout)
    except Exception:
        try: return await asyncio.wait_for(backup_api(prompt), timeout)
        except Exception as e: raise GenerationFailedError(f"所有生成服务不可用: {e}")
Enter fullscreen mode Exit fullscreen mode

2.6.2 FileBrowser 交付

执戈产出文件写入 /mnt/user-data/outputs/{task_id}/。通过预设规则拼接公网 URL 注入 deliverables JSON。用户收到的是可直接点击的 preview_url / download_url,无需后台登录。

2.7 MonitorFlow (独立进程)

职责:心跳检测 (消费 agents.heartbeat)、超时/停滞检测、告警。
定位:执行层的 asyncio.wait_for 是第一道防线 (主动熔断)。MonitorFlow 是第二道防线 (兜底监控)。若执行层超时未触发,MonitorFlow 检测到停滞则发布 tasks.interrupt 强制清理。异步非阻塞,过载时自动降级仅保留心跳检测。

2.8 用户交互机制 (Human Communication)

核心原则: 系统不是闭门干活的工具,而是会沟通、会请示、会求助的助手。在关键节点主动与用户交互,而不是等到最后丢一个可能很差的结果。

2.8.1 交互场景总览

# 场景 触发条件 交互类型 超时策略
I1 任务澄清 天书无法确定任务意图或需求模糊 天书 → 用户提问 2 分钟无响应 → 天书自主决策
I2 能力边界 任务超出系统当前能力范围 天书 → 用户说明限制 + 替代方案 用户确认后继续/取消
I3 高风险操作 需要安装系统级依赖、修改服务器配置、执行破坏性命令 天书 → 用户告知风险 + 请求确认 3 分钟无响应 → 取消操作
I4 低置信度警告 任何任务 confidence < 0.6 天书 → 用户告知数据质量低 + 建议人工复核 通知即可,不阻塞
I5 执行中补充 Guardian/Worker 执行中缺少关键信息(如 Logo、品牌色、API Key) Guardian → 用户请求补充 3 分钟无响应 → 使用默认值或跳过
I6 质量低预警 天书验收不通过,质量评分 < 60 天书 → 用户告知质量低 + 正在重做 通知即可,不阻塞
I7 重做失败 重试 2 次后仍不通过 天书 → 用户说明失败原因 + 建议人工介入 用户决定下一步
I8 成本预警 预估成本超过 $1 或实际消耗超过预估 200% 天书 → 用户告知当前花费 + 预估总成本 2 分钟无响应 → 继续执行
I9 链式里程碑确认 链式任务每一步完成后(Phase 8) 天书 → 用户汇报 + 请求确认下一步 5+5 分钟 → 自动 STOP

2.8.2 交互消息格式

所有用户交互统一通过以下消息结构,由 Gateway API 推送给前端:

{
  "type": "user_interaction",
  "interaction_id": "ui-0a1b2c3d",
  "task_id": "task-0a1b2c3d",
  "scenario": "I1 | I2 | I3 | I4 | I5 | I6 | I7 | I8 | I9",
  "severity": "info | warning | critical",
  "timestamp": "2025-01-15T10:00:05Z",

  "message": {
    "title": "需要确认任务需求",
    "body": "你提到'做个网页',请问是以下哪种类型?",
    "context": {
      "original_request": "帮我做个网页",
      "ambiguity_reason": "任务描述过于宽泛,无法确定具体类型"
    }
  },

  "actions": [
    {
      "id": "action_1",
      "label": "个人主页",
      "type": "select"
    },
    {
      "id": "action_2",
      "label": "数据看板",
      "type": "select"
    },
    {
      "id": "action_3",
      "label": "其他(请描述)",
      "type": "text_input"
    }
  ],

  "timeout": {
    "seconds": 120,
    "on_timeout": "auto_decide"
  }
}
Enter fullscreen mode Exit fullscreen mode

2.8.3 各场景详细流程

I1: 任务澄清 (Clarification)

触发: 天书 TaskRouter 匹配分数 < 阈值 (如最高分 < 30)
      或 LLM 自主判断需求模糊

流程:
  天书 → 用户: 发送 clarification 消息,包含 2-4 个选项 + 自由输入
  用户 → 天书: 选择选项 或 输入补充描述
  天书: 根据用户反馈重新匹配任务类型

超时: 2 分钟无响应 → 天书使用 LLM 自主决策,记录日志

示例:
  用户: "帮我做个网页"
  天书: "请问你需要什么类型的网页?
         [1] 个人主页/简历
         [2] 数据看板/报告
         [3] 产品展示页
         [4] 其他(请描述)"
Enter fullscreen mode Exit fullscreen mode

I2: 能力边界 (Capability Boundary)

触发: Guardian 接单后发现超出能力范围
      (如"实时视频流处理"、"硬件调试")

流程:
  Guardian → 天书: 报告无法完成的原因
  天书 → 用户: 坦诚说明限制 + 提供替代方案
  用户 → 天书: 确认尝试替代方案 / 取消任务

示例:
  天书: "⚠️ 你要求的'实时视频流处理'超出我目前的能力范围。
         我可以做的是:
         [1] 离线视频剪辑(上传视频后处理)
         [2] 视频格式转换
         [3] 取消任务"
Enter fullscreen mode Exit fullscreen mode

I3: 高风险操作 (High Risk)

触发: 需要执行以下操作之一:
      - 安装系统级依赖 (apt install, pip install --system)
      - 修改服务器配置 (nginx/nginx.conf, iptables)
      - 执行可能影响系统稳定性的命令
      - 删除/覆盖已有文件

流程:
  Guardian → 天书: 标记操作为高风险
  天书 → 用户: 告知具体风险 + 请求确认
  用户 → 天书: 确认 / 拒绝
  天书: 确认则放行,拒绝则跳过该操作或标记失败

示例:
  天书: "⚠️ 下一步需要安装系统级依赖 ffmpeg,可能影响服务器环境。
         操作: apt install -y ffmpeg
         影响: 新增约 200MB 系统包
         [1] 确认安装
         [2] 跳过(功能可能不可用)
         [3] 取消任务"
Enter fullscreen mode Exit fullscreen mode

I4: 低置信度警告 (Low Confidence)

触发: 任何任务完成后 `confidence < 0.6`

流程:
  天书: 自动判定 confidence 等级
  天书 → 用户: 告知数据质量低 + 标注薄弱来源 + 建议人工复核
  不阻塞: 结果仍交付,但附带警告

示例:
  天书: "⚠️ 情报报告已生成,但置信度较低 (0.45)。
         原因:
         - 所有来源均为自媒体/博客,无官方或权威媒体
         - 关键数据点仅有 2 条,不足以支撑结论
         建议: 请人工复核后再使用此报告。
         [预览报告](https://...) [下载](https://...)"
Enter fullscreen mode Exit fullscreen mode

I5: 执行中补充 (In-Flight Supplement)

触发: Guardian/Worker 执行中发现缺少关键信息
      (如需要 Logo 图片、品牌色、API Key、数据库密码)

流程:
  Guardian → 天书: 报告缺失信息
  天书 → 用户: 请求补充具体信息
  用户 → 天书: 提供信息 / 表示没有
  天书: 有信息 → 转发 Guardian 继续执行
       没有 → 使用默认值或跳过该步骤

超时: 3 分钟无响应 → 使用默认值继续

示例:
  天书: "生成网页需要你提供以下信息:
         1. Logo 图片(可选,拖拽上传)
         2. 品牌主色(如 #3B82F6,不填则用默认蓝色)
         3. 网站标题
         [提交并继续] [跳过,用默认值]"
Enter fullscreen mode Exit fullscreen mode

I6: 质量低预警 (Low Quality Alert)

触发: 天书验收不通过,`quality_score < 60`,触发自动重做

流程:
  天书: 自动打回 Guardian 重做
  天书 → 用户: 告知质量低 + 正在重做 + 预计额外时间
  不阻塞: 用户无需操作,只是知情

示例:
  天书: "📝 生成的网页质量评分较低 (55/100),正在自动重做。
         问题: 缺少响应式适配,移动端显示异常
         预计额外时间: 2 分钟
         无需你的操作,完成后会通知你。"
Enter fullscreen mode Exit fullscreen mode

I7: 重做失败 (Retry Exhausted)

触发: 重试 2 次后仍不通过验收

流程:
  天书: 标记任务为 failed
  天书 → 用户: 详细说明失败原因 + 已尝试的措施 + 建议人工介入
  用户 → 天书: 决定下一步(取消 / 修改需求重新尝试 / 人工处理)

示例:
  天书: "❌ 任务未能通过验收,已尝试 2 次重做。
         失败原因:
         - 网页加载速度 > 5s (要求 < 2s)
         - 图片资源未压缩 (4.2MB → 要求 < 500KB)
         已尝试: 图片压缩、代码分割、懒加载
         建议:
         [1] 降低要求: 接受当前质量 (加载速度 ~5s)
         [2] 修改需求: 减少页面上的图片数量
         [3] 取消任务,人工处理
         [4] 继续尝试 (可能仍无法达标)"
Enter fullscreen mode Exit fullscreen mode

I8: 成本预警 (Cost Warning)

触发: 预估成本 > $1 或 实际消耗 > 预估 200%

流程:
  天书: 监控当前 Token/API 消耗
  天书 → 用户: 告知当前花费 + 预估总成本
  用户 → 天书: 确认继续 / 降低质量以节省成本 / 取消

超时: 2 分钟无响应 → 继续执行(不中断正在进行的任务)

示例:
  天书: "💰 成本提醒
         当前已消耗: $0.52
         预估总成本: $2.10 (超出预估 $1 的 110%)
         主要消耗: GPT-4o 情报采集 ($0.38), Claude 代码生成 ($0.14)
         [1] 继续执行
         [2] 降低模型档位节省成本 (质量可能下降)
         [3] 取消任务"
Enter fullscreen mode Exit fullscreen mode

2.8.4 交互决策流

用户提交任务
    │
    ▼
┌─────────────────┐
│ I1: 任务澄清?    │ ── 需求模糊 → 提问用户
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│ I2: 超出能力?    │ ── 是 → 说明限制 + 替代方案
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│ I3: 高风险操作?  │ ── 是 → 告知风险 + 请求确认
└────────┬────────┘
         │
         ▼
    执行任务...
         │
    ┌────┼────────┐
    ▼    ▼        ▼
┌────┐┌────┐  ┌──────┐
│I5: ││I8: │  │I4:   │
│补充││成本│  │低置信│
│信息││预警│  │度    │
└─┬──┘└─┬──┘  └──┬───┘
  │     │        │
  ▼     ▼        ▼
执行完成 → 天书验收
              │
         ┌────┴─────┐
         ▼          ▼
    quality≥60   quality<60
         │          │
         │     ┌────┴─────┐
         │     │ I6: 质量 │
         │     │ 低预警   │
         │     └────┬─────┘
         │          │
         │     重试 ≤ 2?
         │     ┌──┬─┘
         │    是  否
         │     │  │
         │     │  ▼
         │     │ ┌────────┐
         │     │ │I7: 重做│
         │     │ │失败    │
         │     │ └───┬────┘
         │     │     │
         ▼     ▼     ▼
    ┌──────────────────┐
    │ I9: 链式里程碑   │ ── 如为链式任务
    └────────┬─────────┘
             │
             ▼
        交付用户
Enter fullscreen mode Exit fullscreen mode

⚠️ 设计约束(不可违背):

  1. 用户知情权: I4 (低置信度)、I6 (质量低预警) 仅通知不阻塞,不允许因为质量差就偷偷丢弃结果
  2. 用户决定权: I2 (能力边界)、I3 (高风险)、I7 (重做失败) 必须等待用户确认,不允许自动跳过
  3. 超时默认行为: I1 任务澄清超时 → 天书自主决策;I3 高风险超时 → 取消操作(安全第一);I5 补充信息超时 → 使用默认值
  4. 交互频率上限: 同一个任务在 5 分钟内最多触发 3 次交互,避免骚扰用户
  5. 交互记录: 所有交互历史写入 task_events 表,可追溯 "系统问了几次、用户怎么回的、最后怎么定的"
  6. I9 与其他交互互斥: 链式里程碑确认 (I9) 进行时,不允许同时触发 I5/I6/I8,按优先级排队

2.9 RAG 引擎 (v2 入口预留)

定位: v1 不需要。预留接口定义、调用位置、数据流向,为 v2 接入私有知识库做准备。

2.9.1 接口定义

# ─────────────────────────────────────────────────────────────
# RAG Engine — 接口契约 (v2 实现,v1 只定义抽象层)
# ─────────────────────────────────────────────────────────────

from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class RAGQuery:
    """RAG 查询请求"""
    query: str                    # 自然语言问题
    collection: str = "default"   # 知识库集合名 (支持多知识库)
    top_k: int = 3                # 返回最相关片段数
    filter_tags: list[str] | None = None  # 可选标签过滤

@dataclass
class RAGChunk:
    """检索到的文档片段"""
    content: str                  # 片段正文
    source: str                   # 来源 (文件路径 / URL)
    metadata: dict                # 附加元数据 (页码/章节/标签等)
    score: float                  # 相似度分数 (0-1)

@dataclass
class RAGResult:
    """RAG 查询结果"""
    query: str
    chunks: list[RAGChunk]        # 相关片段
    answer: str | None = None     # LLM 基于片段生成的答案 (可选)
    used_chunks: int = 0          # 实际用于生成答案的片段数

class RAGEngine(ABC):
    """RAG 引擎抽象接口 — v2 实现,v1 只定义契约"""

    @abstractmethod
    async def query(self, q: RAGQuery) -> RAGResult:
        """检索知识库,返回相关片段"""
        ...

    @abstractmethod
    async def ingest(self, collection: str, documents: list[dict]) -> int:
        """写入文档到知识库,返回成功入库数量"""
        ...

    @abstractmethod
    async def delete(self, collection: str, doc_ids: list[str]) -> int:
        """删除文档,返回删除数量"""
        ...

    @abstractmethod
    async def list_collections(self) -> list[str]:
        """列出所有知识库集合"""
        ...


# ─────────────────────────────────────────────────────────────
# v1 空实现 — 确保调用链不报错
# ─────────────────────────────────────────────────────────────

class RAGEngineStub(RAGEngine):
    """v1 空实现,query 返回空结果,ingest/delete 记录日志"""

    async def query(self, q: RAGQuery) -> RAGResult:
        return RAGResult(query=q.query, chunks=[])

    async def ingest(self, collection: str, documents: list[dict]) -> int:
        return 0  # v1 不存储

    async def delete(self, collection: str, doc_ids: list[str]) -> int:
        return 0

    async def list_collections(self) -> list[str]:
        return []
Enter fullscreen mode Exit fullscreen mode

2.9.2 调用位置预留

RAG 引擎在架构中有 3 个潜在调用点,v1 不接入,但预留接入位置:

┌─────────────────────────────────────────────────────────────┐
│              RAG 调用位置 (v2 接入)                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  位置 1: Worker 工具层                                       │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ 工具名: ask_document(query, collection="default")    │  │
│  │ 触发: LLM 主动调用,替代 read_file 读取大文件          │  │
│  │ 流程:                                                 │  │
│  │   LLM → ask_document("品牌色是什么")                  │  │
│  │         → RAGEngine.query(query, top_k=3)            │  │
│  │         → 返回 Top-3 相关片段                        │  │
│  │         → 注入 LLM 上下文                            │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
│  位置 2: Guardian 任务分解                                   │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ 触发: Guardian 收到任务时,主动检索相关历史经验         │  │
│  │ 流程:                                                 │  │
│  │   Guardian → RAGEngine.query("电商落地页开发经验")    │  │
│  │            → 返回历史项目片段、踩坑记录                │  │
│  │            → 注入 Guardian Prompt 作为参考            │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
│  位置 3: 用户直接提问                                        │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ 触发: 用户输入 "在 XX 文档里找 YY 信息"               │  │
│  │ 路由: 天书识别为知识库查询 → 直调 RAG                  │  │
│  │ 流程:                                                 │  │
│  │   用户 → 天书 → RAGEngine.query()                    │  │
│  │         → 返回答案 + 引用来源                         │  │
│  │         → 推送用户                                    │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

2.9.3 数据流入站 (Ingestion Pipeline)

v2 实现时的文档入库流程:

  用户上传 PDF/Markdown/HTML
    │
    ▼
  ┌──────────────────┐
  │ Document Parser  │  提取文本 + 元数据 (标题/章节/页码)
  └────────┬─────────┘
           │
           ▼
  ┌──────────────────┐
  │ Text Chunker     │  分块策略 (按段落/语义/固定长度)
  │                  │  默认: 500 字/块, 重叠 50 字
  └────────┬─────────┘
           │
           ▼
  ┌──────────────────┐
  │ Embedding Model  │  向量化 (OpenAI/本地模型)
  └────────┬─────────┘
           │
           ▼
  ┌──────────────────┐
  │ Vector Store     │  存储 (Chroma/FAISS/RedisVL)
  └──────────────────┘
Enter fullscreen mode Exit fullscreen mode

⚠️ 设计约束(不可违背):

  1. v1 不调用 RAG,使用 RAGEngineStub 空实现
  2. ask_document 工具在 v1 中不存在,Worker 使用 read_file_chunk 替代
  3. RAG 引擎的接口必须是抽象类,确保 v2 实现时不需要改调用方代码
  4. RAG 查询结果必须带 source 和 score,让 LLM 能判断引用可信度
  5. v2 实现时,RAG 查询超时 ≤ 3s,不得阻塞 Worker 执行流

三、全链路数据流转 (JSON Trace)

案例: "调研 2024 年 AI 趋势并生成可视化报告网页"research_then_web (链式)

Phase 1: 用户提交 → 天书识别任务类型

用户输入

{
  "user_id": "user-001",
  "request": "帮我调研 2024 年 AI 趋势并生成可视化报告网页"
}
Enter fullscreen mode Exit fullscreen mode

天书 TaskRouter 识别结果(内存中,不持久化):

{
  "matched_type": "research_then_web",
  "route_type": "chain",
  "confidence": 0.92,
  "chain_steps": [
    { "agent": "vanguard-s",  "task_type": "intelligence",    "input_from": null },
    { "agent": "ironclad",    "task_type": "web_development",  "input_from": "previous" }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Phase 2: 天书创建任务 → 写入 SQLite tasks

{
  "写入者": "天书 (TaskPool)",
  "存储": "SQLite tasks 表",
  "数据": {
    "id": "task-0a1b2c3d",
    "name": "调研 2024 年 AI 趋势并生成可视化报告网页",
    "priority": "B",
    "describe": "调研 2024 年 AI 行业技术趋势、市场规模、关键玩家,生成可视化报告网页",
    "demand": "生成包含数据图表、来源引用的可视化 HTML 网页",
    "task_type": "research_then_web",
    "creator": "user-001",
    "dispatch_to": null,
    "status": "pending",
    "retry": 3,
    "result_summary": null,
    "confidence": null,
    "quality_score": null,
    "result_detail": null,
    "deliverables": null,
    "created_at": "2025-01-15T10:00:00Z",
    "last_updated": "2025-01-15T10:00:00Z",
    "metadata": {
      "route_type": "chain",
      "chain_steps": ["vanguard-s:intelligence", "ironclad:web_development"],
      "current_chain_index": 0
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

同时写入 SQLite task_events

{
  "写入者": "天书 (TaskPool)",
  "存储": "SQLite task_events 表",
  "数据": {
    "task_id": "task-0a1b2c3d",
    "event_type": "created",
    "details": { "matched_type": "research_then_web", "route_type": "chain" },
    "created_at": "2025-01-15T10:00:00Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 3: 天书派发观澜 → 更新 SQLite + 发布 Redis

更新 SQLite tasks

{
  "写入者": "天书 (TaskPool)",
  "存储": "SQLite tasks 表 (UPDATE)",
  "数据": {
    "status": "dispatched",
    "dispatch_to": "vanguard-s",
    "dispatched_at": "2025-01-15T10:00:03Z",
    "last_updated": "2025-01-15T10:00:03Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

发布 Redis Stream tasks.dispatch

{
  "写入者": "天书 (MessageBus)",
  "存储": "Redis Stream: tasks.dispatch",
  "数据": {
    "trace_id": "trace-0a1b2c3d",
    "message_id": "1673489003-0",
    "topic": "tasks.dispatch",
    "timestamp": "2025-01-15T10:00:03Z",
    "source": "origin-x-1",
    "payload": {
      "task_id": "task-0a1b2c3d",
      "name": "调研 2024 年 AI 趋势并生成可视化报告网页",
      "priority": "B",
      "describe": "调研 2024 年 AI 行业技术趋势、市场规模、关键玩家",
      "demand": "生成包含数据图表、来源引用的情报报告,格式为 Markdown",
      "task_type": "intelligence",
      "creator": "user-001",
      "chain_context": null
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

写入 Redis Hash task:0a1b2c3d (运行时状态)

{
  "写入者": "天书 (MessageBus)",
  "存储": "Redis Hash: task:0a1b2c3d",
  "数据": {
    "status": "dispatched",
    "dispatch_to": "vanguard-s",
    "dispatched_at": "2025-01-15T10:00:03Z",
    "chain_index": "0",
    "ack_timeout_at": "2025-01-15T10:00:33Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 4: 观澜接收 → 检索加载记忆 + ACK

观澜加载记忆(不持久化,进程内存):

{
  "加载者": "观澜守护实例",
  "来源 1": "Redis mem:global",
  "内容 1": {
    "user_preferences": { "language": "zh-CN", "report_style": "数据驱动" },
    "project_context": { "project_name": "AI 行业研究 2024" }
  },
  "来源 2": "Redis mem:vanguard",
  "检索逻辑": "根据任务意图 '2024 AI Trends' 检索相关历史报告摘要",
  "内容 2": {
    "preferred_sources": ["gartner.com", "idc.com", "mckinsey.com"],
    "avoid_sources": ["unknown-blog.com"],
    "past_reports_summary": "2023 年 AI 报告侧重于大模型参数量,2024 年转向 Agent 应用"
  }
}
Enter fullscreen mode Exit fullscreen mode

观澜 ACK → 发布 Redis tasks.ack + 任务分解

{
  "写入者": "观澜守护实例",
  "存储 1": "Redis Stream: tasks.ack",
  "数据 1": {
    "trace_id": "trace-0a1b2c3d",
    "topic": "tasks.ack",
    "timestamp": "2025-01-15T10:00:08Z",
    "source": "vanguard-guard-1",
    "payload": {
      "task_id": "task-0a1b2c3d",
      "guard_id": "vanguard-guard-1",
      "ack": true,
      "estimated_time": 300,
      "subtasks": [
        { "id": "st-001", "describe": "搜索 2024 年 AI 技术趋势", "status": "dispatched" },
        { "id": "st-002", "describe": "抓取 Gartner/IDC 市场报告", "status": "pending" },
        { "id": "st-003", "describe": "汇总生成情报报告", "status": "pending" }
      ]
    }
  },
  "存储 2": "Redis Hash: task:0a1b2c3d",
  "数据 2": {
    "status": "running",
    "acked_at": "2025-01-15T10:00:08Z",
    "subtasks": "[{\"id\":\"st-001\",\"status\":\"dispatched\"},{\"id\":\"st-002\",\"status\":\"pending\"},{\"id\":\"st-003\",\"status\":\"pending\"}]"
  },
  "存储 3": "SQLite tasks 表 (UPDATE)",
  "数据 3": {
    "status": "running",
    "acked_at": "2025-01-15T10:00:08Z",
    "started_at": "2025-01-15T10:00:08Z"
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 5: 观澜派发子实例 → DAG 依赖编排 + 异步并发控制

观澜将任务拆解为 DAG,按依赖关系动态派发

DAG 结构:
  st-001: 搜索 AI 技术趋势     → 无依赖 → 立即派发
  st-002: 抓取 Gartner/IDC 报告 → 无依赖 → 立即派发
  st-003: 汇总生成情报报告      → 依赖 [st-001, st-002] → 等前两个完成后派发
Enter fullscreen mode Exit fullscreen mode

发布 Redis Stream tasks.subtask(3 条消息,含依赖声明):

{
  "写入者": "观澜守护实例",
  "存储": "Redis Stream: tasks.subtask",
  "数据": {
    "trace_id": "trace-0a1b2c3d",
    "topic": "tasks.subtask",
    "timestamp": "2025-01-15T10:00:10Z",
    "source": "vanguard-guard-1",
    "payload": {
      "subtask_id": "st-001",
      "parent_task_id": "task-0a1b2c3d",
      "action": "dispatch",
      "describe": "搜索 2024 年 AI 技术趋势",
      "demand": "搜索 2024 年 AI 关键技术突破:大模型、Agent、多模态",
      "depends_on": [],
      "timeout": 180
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

子实例加载技能(不持久化):

{
  "加载者": "观澜子实例 st-001",
  "来源": "agents/vanguard-s/skills/web_search/SKILL.md",
  "内容": {
    "skill_name": "web_search",
    "instructions": "使用 Tavily/DuckDuckGo 搜索,返回 Top10 结果,标注来源 URL",
    "tools": ["web_search", "web_fetch"]
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 6: 子实例执行 + 进度推送

子实例 st-001 执行完成 → 推送进度

{
  "写入者": "观澜子实例 st-001",
  "存储 1": "Redis Stream: tasks.progress",
  "数据 1": {
    "trace_id": "trace-0a1b2c3d",
    "topic": "tasks.progress",
    "timestamp": "2025-01-15T10:00:25Z",
    "source": "vanguard-w-a1b2c3",
    "payload": {
      "task_id": "task-0a1b2c3d",
      "subtask_id": "st-001",
      "action": "complete",
      "result_summary": "找到 23 条技术趋势,涵盖 GPT-4o、Claude 3.5、Gemini 2.0、Sora",
      "raw_data": "[{\"title\":\"OpenAI GPT-4o\",\"url\":\"openai.com\",...},...]"
    }
  },
  "存储 2": "Redis Hash: subtask:st-001",
  "数据 2": {
    "status": "completed",
    "worker_id": "vanguard-w-a1b2c3",
    "completed_at": "2025-01-15T10:00:25Z",
    "result_summary": "找到 23 条技术趋势"
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 7: 观澜完成 → Summary 摘要生成 + 自评置信度

关键步骤:Summary 引擎生成结果级摘要 (Lazy Loading 基础)

观澜汇总 3 个子任务结果后(假设原始报告 5000 字):

{
  "原始结果 (不入消息,存 FileBrowser)": {
    "content": "## 核心结论\n2024 年 AI 行业呈现四大趋势...\n\n## 详细分析\n(5000 字详细内容)",
    "sources": [
      { "url": "https://gartner.com/...", "channel_type": "authoritative", "title": "Gartner Top AI Trends" },
      { "url": "https://openai.com/...", "channel_type": "official", "title": "GPT-4o" }
    ]
  },

  "摘要引擎输出 (Summary, 用于链式传递)": {
    "summary_type": "intelligence_report",
    "content_summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化,AI 芯片市场突破 500 亿美元。",
    "key_facts": [
      "GPT-4o 支持实时语音/视觉/文本",
      "AI 推理成本从$0.03/千 token 降至$0.003",
      "AI Agent 框架从 200+ 收敛至 5 个主流"
    ],
    "data_links": {
      "full_report": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
      "charts": ["https://filebrowser.example.com/outputs/task-0a1b2c3d/chart1.png"]
    },
    "source_count": 5,
    "source_credibility": 0.88
  }
}
Enter fullscreen mode Exit fullscreen mode

观澜发布 tasks.complete (只带摘要,不带原文!)

{
  "写入者": "观澜守护实例",
  "存储": "Redis Stream: tasks.complete",
  "数据": {
    "trace_id": "trace-0a1b2c3d",
    "topic": "tasks.complete",
    "timestamp": "2025-01-15T10:01:30Z",
    "source": "vanguard-guard-1",
    "payload": {
      "task_id": "task-0a1b2c3d",
      "guard_id": "vanguard-guard-1",
      "task_type": "intelligence",

      "result_summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化,AI 芯片市场突破 500 亿美元。",

      "key_facts": [
        "GPT-4o 支持实时语音/视觉/文本",
        "AI 推理成本从$0.03/千 token 降至$0.003",
        "AI Agent 框架从 200+ 收敛至 5 个主流"
      ],

      "deliverables": [
        {
          "name": "2024 年 AI 趋势情报报告.md",
          "delivery_type": "text",
          "size_bytes": 18500,
          "preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
          "download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/report.md"
        }
      ],

      "confidence": 0.86,
      "confidence_breakdown": {
        "data_sufficiency": 0.90,
        "source_credibility": 0.88,
        "structure_completeness": 0.85,
        "timeliness": 0.82
      },

      "completed_at": "2025-01-15T10:01:30Z"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 8: 天书验收评估 → AcceptanceMiddleware

天书消费 complete → 评估 → 写入 SQLite

{
  "写入者": "天书 (AcceptanceMiddleware)",
  "存储": "SQLite tasks 表 (UPDATE)",
  "数据": {
    "status": "completed",
    "result_summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化...",
    "confidence": 0.86,
    "confidence_breakdown": "{\"data_sufficiency\":0.90,\"source_credibility\":0.88,...}",
    "quality_score": 90,
    "quality_breakdown": "{\"data_sufficiency\":0.90,\"source_credibility\":0.88,...}",
    "deliverables": "[{\"name\":\"2024 年 AI 趋势情报报告.md\",\"preview_url\":\"...\"}]",
    "completed_at": "2025-01-15T10:01:30Z",
    "metadata": {
      "route_type": "chain",
      "chain_steps_completed": "1/2",
      "current_chain_index": 1,
      "chain_next": { "agent": "ironclad", "task_type": "web_development" }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 8: 关键节点汇报与用户确认 (HITL Checkpoint)

核心逻辑: 链式任务的关键节点(如情报转开发),Origin-X 必须暂停,向用户汇报“优劣”(置信度/摘要),获得确认后才执行下一步。防止盲目烧 Token。

Origin-X 生成《里程碑汇报》 (Milestone Report)

{
  "发送者": "天书 (Origin-X Agent)",
  "接收者": "用户 (User)",
  "类型": "MILESTONE_REPORT",
  "数据": {
    "task_id": "task-0a1b2c3d",
    "current_step": "情报采集 (观澜) 已完成",
    "next_step": "网页开发 (执戈)",

    "evaluation": {
      // 【优劣汇报 - 置信度】
      "confidence": 0.86,
      "pros": ["数据来源权威 (Gartner/IDC)", "关键数据点充足"],
      "cons": ["缺乏针对中小企业落地案例的实证数据"],
      "recommendation": "数据基础扎实,建议进入网页开发阶段。"
    },

    "next_step_cost": {
      "estimated_time": "3 分钟",
      "estimated_token_cost": "$0.05"
    },

    "deliverables_preview": {
      "report_url": "https://filebrowser.../report.md"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

用户决策分支:

  • APPROVE (确认): 用户点击继续 → 触发 Phase 9 (派发执戈)。
  • MODIFY (修改): 用户输入"补充 2026 年预测" → Origin-X 重新派发观澜 (带修正指令)。
  • STOP (终止): 用户取消 → 发布 tasks.interrupt,结束任务,保留已产出文件。

⚠️ 设计约束(不可违背):

  1. 超时策略: 用户 5 分钟未响应 → Origin-X 发送提醒,再 5 分钟无响应 → 自动 STOP(不允许自动 APPROVE)
  2. 链式确认频率: 链式任务每一步都需要 HITL,不允许跳过中间步骤直接到最后
  3. MODIFY 语义: Origin-X 必须理解用户自然语言修改意见,翻译为具体的修正指令(如"补充2026年预测"→ 更新观澜的 demand 字段),不允许直接把用户原话丢给下一个 Agent
  4. 低置信度自动拦截: 如果 confidence < 0.6,Origin-X 不得向用户请求确认,直接打回 Guardian 重做(节省用户注意力)
  5. 已产出文件保留: 无论 HITL 结果如何(APPROVE/MODIFY/STOP),已生成的文件永久保留在 FileBrowser,不允许清理

Phase 9: 天书派发执戈 → 注入摘要 (Lazy Loading Context)

关键:执戈的 Prompt 只包含摘要,不包含 5000 字报告,按需读取

发布 Redis tasks.dispatch

{
  "写入者": "天书 (MessageBus)",
  "存储": "Redis Stream: tasks.dispatch",
  "数据": {
    "trace_id": "trace-0a1b2c3d",
    "topic": "tasks.dispatch",
    "timestamp": "2025-01-15T10:01:35Z",
    "source": "origin-x-1",
    "payload": {
      "task_id": "task-0a1b2c3d",
      "name": "基于情报报告生成可视化网页",
      "priority": "B",
      "describe": "基于以下 AI 趋势摘要生成可视化 HTML 网页",
      "demand": "生成响应式网页,包含:标题区、核心结论区、数据图表区、来源引用区",
      "task_type": "web_development",

      "chain_context": {
        // 1. 用户的最终诉求(决定做什么 - 必须注入 Prompt)
        "original_demand": "基于情报报告生成可视化网页,包含数据图表、来源引用",

        // 2. 核心摘要(决定大方向 - 注入 Prompt)
        "summary": "2024 年 AI 趋势:多模态大模型成本下降 90%,Agent 自主化,AI 芯片市场突破 500 亿美元。",

        // 3. 关键数据点(直接支撑摘要 - 注入 Prompt)
        "key_facts": ["GPT-4o 支持实时语音/视觉/文本", "AI 推理成本从$0.03 降至$0.003"],

        // 4. 详细资料链接(决定细节 - 不注入 Prompt,Worker 按需 read_file 读取)
        "data_links": {
          "full_report": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
          "chart_data": "https://filebrowser.example.com/outputs/task-0a1b2c3d/data.csv"
        }
      },

      "creator": "user-001",
      "dispatched_at": "2025-01-15T10:01:35Z"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 10: 执戈执行 → 检索记忆 + 技能加载 + 分解子任务

核心逻辑:Lazy Loading (按需加载) 的子任务分解

执戈接任务时,不需要读取 5000 字的情报原文。它通过 chain_context 中的 summary + key_facts 就能理解大方向并拆解任务。

  1. 读取摘要:确定网页结构(标题、结论区、图表区)。
  2. 制定计划:决定需要哪些子任务(如:st-004 开发网页结构,st-005 生成 AI 概念图)。
  3. 按需读取:当具体绘制图表需要精确数据时,Worker 主动调用 read_file(data_links.full_report) 读取原文细节,而不是在 Prompt 中等待。

执戈加载记忆(相似度检索)

{
  "加载者": "执戈守护实例",
  "来源 1": "Redis mem:global",
  "内容 1": { "user_preferences": { "language": "zh-CN", "report_style": "数据驱动" } },
  "来源 2": "Redis mem:ironclad",
  "检索逻辑": "根据任务意图 'Web Development' 检索相关编码规范与服务器配置",
  "内容 2": {
    "coding_style": "优先使用 Vue3 + TailwindCSS",
    "server_config": { "host": "192.168.1.100", "port": 8080 },
    "past_issues": "上次部署时 CDN 缓存未刷新,需提醒用户"
  }
}
Enter fullscreen mode Exit fullscreen mode

执戈加载技能

{
  "技能匹配": { "task_type": "web_development" },
  "加载来源": "agents/ironclad/skills/web_development/SKILL.md",
  "加载内容": {
    "skill_name": "web_development",
    "instructions": "使用 HTML/CSS/JS + Chart.js 生成数据图表网页...",
    "tools": ["read_file", "write_file", "bash"],
    "external_apis": []
  }
}
Enter fullscreen mode Exit fullscreen mode

⚠️ 子实例执行防护(D18 + D20):

  1. 工具截断 (D18): 所有工具返回值经 truncate_tool_output() 处理,超 8000 Token 强制截断 + 警告
  2. 死循环熔断 (D20): DeerFlow SubagentConfig.max_turns = 15 (硬限制),Worker 陷入"改 Bug→报错→再改"循环时,15 轮后自动终止
  3. 超时兜底 (D20): SubagentConfig.timeout_seconds = 300 (5 分钟),超时触发 CancelledError + 清理资源
  4. 上下文折叠 (D24): DeerFlow SummarizationMiddleware 已内置,配置 trigger: token_ratio=0.7 + keep: last_3_turns无需额外开发

Phase 11: 执戈完成任务 → 文件交付

执戈汇总结果 → 发布 tasks.complete

{
  "写入者": "执戈守护实例",
  "存储": "Redis Stream: tasks.complete",
  "数据": {
    "trace_id": "trace-0a1b2c3d",
    "topic": "tasks.complete",
    "timestamp": "2025-01-15T10:06:30Z",
    "source": "ironclad-guard-1",
    "payload": {
      "task_id": "task-0a1b2c3d",
      "guard_id": "ironclad-guard-1",
      "task_type": "web_development",

      "result_summary": "已生成可视化网页,包含 3 个数据图表、5 个来源引用、响应式布局",

      "deliverables": [
        {
          "name": "2024 年 AI 趋势情报报告.md",
          "delivery_type": "text",
          "preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md",
          "download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/report.md"
        },
        {
          "name": "AI 趋势可视化报告.html",
          "delivery_type": "webpage",
          "preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.html",
          "download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/report.html"
        },
        {
          "name": "AI 市场规模图表.png",
          "delivery_type": "image",
          "preview_url": "https://filebrowser.example.com/outputs/task-0a1b2c3d/chart.png",
          "download_url": "https://filebrowser.example.com/dl/task-0a1b2c3d/chart.png"
        }
      ],

      "confidence": 0.90,
      "quality_score": 95,
      "quality_breakdown": {
        "feature_completeness": 0.95,
        "page_performance": 0.90,
        "cross_browser_compat": 0.95,
        "security": 1.0
      },

      "completed_at": "2025-01-15T10:06:30Z"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Phase 12: 天书最终验收 + 记忆沉淀 + 推送用户

⚠️ 设计约束(不可违背):

  1. confidence 计算: 必须基于来源渠道加权平均(official=0.80×2.0权重, authoritative=0.70×2.0, industry=0.60×1.0, social=0.40×1.0, anonymous=0.20×1.0),不允许 LLM 自评拍脑袋
  2. quality_score 计算: 必须按 task_type 预定义的维度权重累加(如代码开发: 编译通过40%+测试30%+质量20%+安全10%),不允许单一维度打分
  3. 来源优先级衰减: 如果结果中没有任何 official/authoritative 来源,最终 confidence × 0.8 衰减
  4. 天书验收决策: confidence >= min_confidence AND quality_score >= 60 才 PASS,两个条件必须同时满足
  5. 验收不通过处理: 自动打回 Guardian 重做,最多重试 2 次,超过后标记 failed 并通知用户
  6. tasks.complete 消息格式: 必须包含 confidence + confidence_breakdown + quality_score + quality_breakdown + deliverables 五个字段,缺一不可

天书更新 SQLite

{
  "写入者": "天书",
  "存储": "SQLite tasks 表",
  "数据": {
    "status": "completed",
    "result_summary": "任务完成:情报采集 + 网页生成,质量评分 95/100",
    "confidence": 0.90,
    "quality_score": 95,
    "deliverables": "[{\"name\":\"AI 趋势可视化报告.html\",\"preview_url\":\"https://filebrowser.example.com/outputs/task-0a1b2c3d/report.html\"},...]",
    "completed_at": "2025-01-15T10:06:30Z",
    "metadata": {
      "route_type": "chain",
      "chain_steps_completed": "2/2",
      "total_duration_seconds": 390
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

异步记忆沉淀 (ConsolidateMemory 逻辑说明)

这是一个后台任务,由天书在验收通过后触发,负责将本次任务的“新知”提取入库:

  1. 事实提取: 从观澜的报告摘要中提取新的事实(如"AI 芯片市场达 500 亿")存入 mem:global
  2. 偏好修正: 如果用户在执戈生成的代码上做了修改(如“把 Tailwind 换成 Bootstrap"),天书捕获到反馈后,更新 mem:ironclad 中的 coding_style
  3. 错误复盘: 如果某子任务失败(如 comfyUI 超时),记录 "avoid_comfyui_heavy_models" 到 mem:ironcladpast_issues 中,防止下次踩坑。
  4. 去重合并: 如果新旧记忆冲突(如旧记录说 server_ip 是 192.168.1.50,新任务是 192.168.1.100),以最新任务的记录为准进行覆盖。
{
  "执行者": "天书 (后台 ConsolidateMemory)",
  "触发时机": "任务验收通过后",
  "输入": {
    "task_summary": "调研 2024 年 AI 趋势并生成网页",
    "result": "观澜置信度 0.86, 执戈质量 95/100",
    "user_feedback": null
  },
  "输出 1": "Redis mem:global",
  "数据 1": {
    "user_preferences": { "language": "zh-CN", "report_style": "数据驱动,包含图表" },
    "project_progress": { "AI 行业研究 2024": "已完成" }
  },
  "输出 2": "Redis mem:vanguard",
  "数据 2": {
    "successful_sources": ["gartner.com", "openai.com"],
    "failed_sources": []
  },
  "输出 3": "Redis mem:ironclad",
  "数据 3": {
    "coding_style": "Vue3 + TailwindCSS + Chart.js",
    "server_config": { "host": "192.168.1.100", "port": 8080 }
  }
}
Enter fullscreen mode Exit fullscreen mode

推送用户

{
  "发送者": "天书",
  "接收者": "用户",
  "数据": {
    "task_id": "task-0a1b2c3d",
    "status": "completed",
    "summary": "任务完成!链式任务 2/2 全部通过验收,总耗时 6 分 30 秒。",
    "chain_results": [
      { "agent": "vanguard-s", "task": "情报采集", "confidence": 0.86, "quality": 90 },
      { "agent": "ironclad", "task": "网页开发", "confidence": 0.90, "quality": 95 }
    ],
    "deliverables": [
      { "name": "AI 趋势情报报告.md", "preview": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.md" },
      { "name": "AI 趋势可视化报告.html", "preview": "https://filebrowser.example.com/outputs/task-0a1b2c3d/report.html" },
      { "name": "AI 市场规模图表.png", "preview": "https://filebrowser.example.com/outputs/task-0a1b2c3d/chart.png" }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)