事件(Event):让 Agent 学会"喊话"
FROST 五维元模型系列 · 第 2 篇
从"哑巴 Agent"到"喊话 Agent"
上一篇文章我们讲了武器(Weapon)——Agent 的技能军械库。
但光有技能还不够。想象一个团队,每个人都会干活,但谁也不说话:
- A 干完了,不知道通知 B
- B 不知道 A 干完了,自顾自地继续
- C 发现问题了,没法喊人来看
- D 任务完成了,没人知道结果
这不是团队,是一群孤立的个体。
当前大多数 Agent 框架就是这样:每个 Agent 只会闷头干活,干完就完了。没有通知,没有广播,没有协作。
FROST 的答案是:事件(Event)。
事件总线:Agent 的神经网络
在 FROST-SOP V2.0 中,我们引入了 EventBus(事件总线)。
事件总线是 Agent 家族的神经网络。 Agent 通过发布/订阅事件通信,而不是直接调用彼此。
from core.event_bus import get_event_bus, Event, EventType
# 获取全局事件总线
bus = get_event_bus()
# 发布一个事件
bus.publish(Event(
event_type=EventType.STEP_COMPLETED,
source="agent_scout",
data={"step": "fetch_data", "result": {"count": 42}}
))
这就是 Agent"喊话"的方式——不需要知道谁在听,喊一声就行。
事件类型:Agent 能喊什么?
FROST 定义了清晰的事件类型体系:
| 层级 | 事件类型 | 含义 |
|---|---|---|
| 任务 | TASK_CREATED |
新任务被创建 |
TASK_DECOMPOSED |
祖辈完成任务分解 | |
TASK_COMPLETED |
任务全部阶段完成 | |
TASK_FAILED |
任务失败(不可恢复) | |
| 阶段 | STAGE_STARTED |
SOP 阶段开始执行 |
STAGE_COMPLETED |
SOP 阶段执行完成 | |
STAGE_FAILED |
SOP 阶段执行失败 | |
| 步骤 | STEP_COMPLETED |
Agent 单步 Skill 执行完成 |
| Agent | AGENT_CREATED |
Agent 被创建 |
AGENT_DESTROYED |
Agent 被销毁 |
命名规范是 <层级>_<动词>,一看就懂。
发布/订阅模式:谁想听谁举手
事件总线的核心是发布/订阅模式:
- 发布者(Publisher):Agent 完成某件事,喊一声
- 订阅者(Subscriber):其他 Agent 注册监听,有消息就收到通知
- 事件总线(EventBus):中间人,负责传递
bus = get_event_bus()
# 定义一个监听器
def on_task_completed(event: Event):
print(f"任务完成: {event.data}")
# 订阅事件
bus.subscribe("task_completed", on_task_completed)
# 另一个 Agent 发布事件
bus.publish(Event(
event_type="task_completed",
source="agent_parent",
data={"task_id": "task_001", "output": "report.pdf"}
))
解耦是关键:发布者不需要知道谁在听,订阅者不需要知道谁发的。
Agent 生命周期事件:出生与死亡
每个 Agent 都有生命周期:
创建 (AGENT_CREATED) → 运行 (running) → 销毁 (AGENT_DESTROYED)
FROST-SOP V2.0 让 Agent 自动发布这两个事件:
class Agent:
def __init__(self, name, event_driven=True):
self.name = name
if event_driven:
self._publish_event("agent_created", {
"agent_name": self.name,
"generation": self.generation,
})
def destroy(self):
self._write_agent_status("destroyed", "")
if self._event_driven:
self._publish_event("agent_destroyed", {
"agent_name": self.name,
"destroyed_at": datetime.now().isoformat(),
})
这让实时监控 Agent 生命周期成为可能。
步骤完成事件:SOP 执行的里程碑
Agent 执行 SOP 时,每一步完成都会发布 STEP_COMPLETED 事件。
这让实时监控 SOP 执行进度成为可能:
# 监控系统订阅步骤完成事件
def on_step_completed(event: Event):
data = event.data
print(f"[{data["agent_name"]}] 步骤 {data["step_name"]} 完成")
bus.subscribe("step_completed", on_step_completed)
异步事件总线:V3.0 的进化
FROST-SOP V3.0 引入了 AsyncEventBus(异步事件总线):
from core.event_bus import get_async_event_bus, Event
async_bus = get_async_event_bus()
# 支持异步订阅者
async def on_task_completed_async(event: Event):
await send_notification(event.data)
async_bus.subscribe_async("task_completed", on_task_completed_async)
# 异步发布
await async_bus.publish(Event(
event_type="task_completed",
source="agent_parent",
data={"task_id": "task_001"}
))
为什么需要异步版本?
- V2.0 EventBus(同步):适用于 Streamlit UI 和简单场景
- V3.0 AsyncEventBus:适用于异步任务编排
事件持久化:让过去有据可查
事件不仅实时分发,还持久化到数据库:
db.insert("event_log", {
"event_id": event.event_id,
"event_type": event.event_type,
"source": event.source,
"data": json.dumps(event.data),
"timestamp": event.timestamp.isoformat(),
})
这意味着:审计追溯、问题排查、性能分析都成为可能。
为什么叫"事件"而不是"消息"?
在传统消息队列中,消息是数据——只是传递信息。
FROST 的事件是事实——代表系统中真实发生的事情:
-
AGENT_CREATED= 真的有一个 Agent 出生了 -
STEP_COMPLETED= 真的有一个步骤完成了 -
TASK_FAILED= 真的有一个任务失败了
事件不只是数据,更是系统的记忆。
下一步:任务(Task)
武器解决了"Agent 能做什么",事件解决了"Agent 之间怎么通信"。
但谁来组织这一切?
下一篇预告: 《任务(Task):让 Agent 学会协作》
相关链接
- FROST 教学框架:https://gitee.com/liao_liang_7514/frost
- FROST-SOP 工程平台:https://gitee.com/liao_liang_7514/frost-sop
- 第一篇《武器(Weapon)》:让 Agent 知道"我会什么"
FROST:Giving agents lineage, memory, and honor.
Top comments (0)