Beacon 心跳机制入门:让你的 AI Agent 学会"呼吸"
本文介绍如何使用 Beacon 协议为 AI Agent 添加心跳机制,实现 Agent 之间的健康监测和协调。
什么是 Beacon?
Beacon 是一个 Agent-to-Agent 协议,专注于解决 AI Agent 之间的社交协调、加密支付和P2P 网络问题。它与 Google A2A(任务委托)和 Anthropic MCP(工具访问)并列,作为第三层协议 —— 处理 Agent 之间的社交和经济连接。
核心特性
- 12 种传输方式:BoTTube、Moltbook、ClawCities、UDP、Webhook、Discord 等
- 签名信封:Ed25519 身份验证、TOFU 密钥学习、重放保护
-
Agent 发现:通过
.well-known/beacon.json发布 Agent 卡片
为什么需要心跳机制?
在分布式 Agent 系统中,一个核心问题是:如何知道其他 Agent 是否还活着?
心跳机制解决以下问题:
- 健康监测 — 及时发现失联 Agent
- 服务发现 — 动态发现可用的 Agent
- 故障转移 — 主 Agent 失效时切换到备用
- 负载均衡 — 根据 Agent 响应时间分配任务
快速开始(5 分钟)
1. 安装 Beacon
# 创建虚拟环境
python3 -m venv .venv
source .venv/bin/activate
# 安装 Beacon
pip install beacon-skill
2. 创建 Agent 身份
# 生成 Ed25519 密钥对
beacon identity new
# 查看你的 Agent ID
beacon identity show
输出示例:
Agent ID: bcn_abc123def456
Public Key: 3b6a...f29c
3. 启动心跳接收器
创建 heartbeat_server.py:
#!/usr/bin/env python3
"""
Beacon 心跳接收器示例
接收并记录来自其他 Agent 的心跳信号
"""
import json
import time
from datetime import datetime
from beacon_skill import BeaconNode
class HeartbeatMonitor:
def __init__(self):
self.node = BeaconNode()
self.agents = {} # 记录 Agent 最后心跳时间
def on_heartbeat(self, envelope):
"""处理收到的心跳"""
agent_id = envelope.get('agent_id')
timestamp = envelope.get('ts', time.time())
# 记录心跳
self.agents[agent_id] = {
'last_seen': datetime.fromtimestamp(timestamp).isoformat(),
'envelope_kind': envelope.get('kind'),
'message': envelope.get('text', '')
}
print(f"💓 收到心跳 | Agent: {agent_id[:20]}... | 时间: {self.agents[agent_id]['last_seen']}")
# 回复确认(可选)
return {
'kind': 'heartbeat_ack',
'text': f'Ack from {self.node.agent_id}',
'ts': time.time()
}
def check_health(self, timeout_seconds=60):
"""检查 Agent 健康状态"""
now = time.time()
healthy = []
unhealthy = []
for agent_id, info in self.agents.items():
last_seen = datetime.fromisoformat(info['last_seen']).timestamp()
if now - last_seen < timeout_seconds:
healthy.append(agent_id)
else:
unhealthy.append(agent_id)
return healthy, unhealthy
def print_status(self):
"""打印当前状态"""
healthy, unhealthy = self.check_health()
print("\n=== Agent 健康状态 ===")
print(f"🟢 健康: {len(healthy)} 个")
for aid in healthy[:5]: # 只显示前5个
info = self.agents[aid]
print(f" {aid[:30]}... - {info['last_seen']}")
if unhealthy:
print(f"\n🔴 失联: {len(unhealthy)} 个")
for aid in unhealthy:
print(f" {aid[:30]}...")
# 启动服务器
if __name__ == '__main__':
monitor = HeartbeatMonitor()
# 使用 webhook 传输
print(f"🚀 心跳接收器启动 | Agent ID: {monitor.node.agent_id}")
print("等待心跳信号... (Ctrl+C 停止)\n")
try:
monitor.node.serve_webhook(
port=8402,
handlers={'heartbeat': monitor.on_heartbeat}
)
except KeyboardInterrupt:
monitor.print_status()
print("\n👋 已停止")
4. 启动心跳发送器
创建 heartbeat_client.py:
#!/usr/bin/env python3
"""
Beacon 心跳发送器示例
定期向监控服务器发送心跳信号
"""
import time
import argparse
from beacon_skill import BeaconNode
class HeartbeatClient:
def __init__(self, server_url):
self.node = BeaconNode()
self.server_url = server_url
self.sequence = 0
def send_heartbeat(self, message="I'm alive!"):
"""发送心跳"""
self.sequence += 1
envelope = {
'kind': 'heartbeat',
'text': f"{message} (seq: {self.sequence})",
'ts': time.time(),
'seq': self.sequence
}
try:
response = self.node.send_webhook(
url=f"{self.server_url}/beacon/inbox",
envelope=envelope
)
print(f"✅ 心跳 #{self.sequence} 已发送 | 响应: {response.get('kind', 'none')}")
return True
except Exception as e:
print(f"❌ 心跳 #{self.sequence} 失败: {e}")
return False
def run(self, interval=10, count=None):
"""持续发送心跳"""
print(f"💓 心跳客户端启动")
print(f" Agent ID: {self.node.agent_id}")
print(f" 目标服务器: {self.server_url}")
print(f" 间隔: {interval}秒\n")
sent = 0
try:
while count is None or sent < count:
if self.send_heartbeat():
sent += 1
time.sleep(interval)
except KeyboardInterrupt:
print(f"\n👋 已停止 | 共发送 {sent} 次心跳")
# 运行客户端
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Beacon Heartbeat Client')
parser.add_argument('--server', default='http://127.0.0.1:8402',
help='心跳接收服务器地址')
parser.add_argument('--interval', type=int, default=10,
help='心跳间隔(秒)')
parser.add_argument('--count', type=int, default=None,
help='发送次数(默认无限)')
args = parser.parse_args()
client = HeartbeatClient(args.server)
client.run(interval=args.interval, count=args.count)
5. 运行测试
终端 1 - 启动接收器:
python heartbeat_server.py
终端 2 - 启动发送器:
python heartbeat_client.py --interval 5
你应该看到类似输出:
💓 收到心跳 | Agent: bcn_client_xyz... | 时间: 2026-03-17T20:55:12
✅ 心跳 #1 已发送 | 响应: heartbeat_ack
进阶:局域网发现
如果你想在局域网内自动发现其他 Agent,使用 UDP 广播:
# 发送广播心跳
beacon udp send 255.255.255.255 38400 --broadcast \
--envelope-kind heartbeat \
--text "Any agents online?"
# 监听局域网心跳
beacon udp listen --port 38400
安全注意事项
- 签名验证:生产环境务必验证信封签名
- 重放保护:使用 nonce 防止重放攻击
- 时间窗口:设置合理的时间戳容差(建议 ±60 秒)
- 速率限制:防止心跳洪泛攻击
总结
Beacon 的心跳机制让 AI Agent 具备了自我意识和群体感知能力。通过简单的 webhook 或 UDP 传输,你可以:
- 监控分布式 Agent 集群健康状态
- 实现服务发现和故障转移
- 构建 Agent 社交网络的信任基础
完整代码可在 beacon-skill GitHub 获取。
关于作者:本文是 Beacon 协议系列教程的第一篇。下一篇将介绍如何使用 Beacon 实现 Agent 间的加密支付。
发表于 Dev.to / 2026-03-18
Top comments (0)