DEV Community

Merliniky2
Merliniky2

Posted on

Beacon 心跳机制入门:让你的 AI Agent 学会"呼吸"

Beacon 心跳机制入门:让你的 AI Agent 学会"呼吸"

本文介绍如何使用 Beacon 协议为 AI Agent 添加心跳机制,实现 Agent 之间的健康监测和协调。

什么是 Beacon?

Beacon 是一个 Agent-to-Agent 协议,专注于解决 AI Agent 之间的社交协调加密支付P2P 网络问题。它与 Google A2A(任务委托)和 Anthropic MCP(工具访问)并列,作为第三层协议 —— 处理 Agent 之间的社交和经济连接。

核心特性

  • 12 种传输方式:BoTTube、Moltbook、ClawCities、UDP、Webhook、Discord 等
  • 签名信封:Ed25519 身份验证、TOFU 密钥学习、重放保护
  • Agent 发现:通过 .well-known/beacon.json 发布 Agent 卡片

为什么需要心跳机制?

在分布式 Agent 系统中,一个核心问题是:如何知道其他 Agent 是否还活着?

心跳机制解决以下问题:

  1. 健康监测 — 及时发现失联 Agent
  2. 服务发现 — 动态发现可用的 Agent
  3. 故障转移 — 主 Agent 失效时切换到备用
  4. 负载均衡 — 根据 Agent 响应时间分配任务

快速开始(5 分钟)

1. 安装 Beacon

# 创建虚拟环境
python3 -m venv .venv
source .venv/bin/activate

# 安装 Beacon
pip install beacon-skill
Enter fullscreen mode Exit fullscreen mode

2. 创建 Agent 身份

# 生成 Ed25519 密钥对
beacon identity new

# 查看你的 Agent ID
beacon identity show
Enter fullscreen mode Exit fullscreen mode

输出示例:

Agent ID: bcn_abc123def456
Public Key: 3b6a...f29c
Enter fullscreen mode Exit fullscreen mode

3. 启动心跳接收器

创建 heartbeat_server.py

#!/usr/bin/env python3
"""
Beacon 心跳接收器示例
接收并记录来自其他 Agent 的心跳信号
"""

import json
import time
from datetime import datetime
from beacon_skill import BeaconNode

class HeartbeatMonitor:
    def __init__(self):
        self.node = BeaconNode()
        self.agents = {}  # 记录 Agent 最后心跳时间

    def on_heartbeat(self, envelope):
        """处理收到的心跳"""
        agent_id = envelope.get("agent_id")
        timestamp = envelope.get("ts", time.time())

        # 记录心跳
        self.agents[agent_id] = {
            "last_seen": datetime.fromtimestamp(timestamp).isoformat(),
            "envelope_kind": envelope.get("kind"),
            "message": envelope.get("text", "")
        }

        print(f"💓 收到心跳 | Agent: {agent_id[:20]}... | 时间: {self.agents[agent_id]["last_seen"]}")

        # 回复确认(可选)
        return {
            "kind": "heartbeat_ack",
            "text": f"Ack from {self.node.agent_id}",
            "ts": time.time()
        }

    def check_health(self, timeout_seconds=60):
        """检查 Agent 健康状态"""
        now = time.time()
        healthy = []
        unhealthy = []

        for agent_id, info in self.agents.items():
            last_seen = datetime.fromisoformat(info["last_seen"]).timestamp()
            if now - last_seen < timeout_seconds:
                healthy.append(agent_id)
            else:
                unhealthy.append(agent_id)

        return healthy, unhealthy

    def print_status(self):
        """打印当前状态"""
        healthy, unhealthy = self.check_health()

        print("\n=== Agent 健康状态 ===")
        print(f"🟢 健康: {len(healthy)}")
        for aid in healthy[:5]:  # 只显示前5个
            info = self.agents[aid]
            print(f"   {aid[:30]}... - {info["last_seen"]}")

        if unhealthy:
            print(f"\n🔴 失联: {len(unhealthy)}")
            for aid in unhealthy:
                print(f"   {aid[:30]}...")

# 启动服务器
if __name__ == "__main__":
    monitor = HeartbeatMonitor()

    # 使用 webhook 传输
    print(f"🚀 心跳接收器启动 | Agent ID: {monitor.node.agent_id}")
    print("等待心跳信号... (Ctrl+C 停止)\n")

    try:
        monitor.node.serve_webhook(
            port=8402,
            handlers={"heartbeat": monitor.on_heartbeat}
        )
    except KeyboardInterrupt:
        monitor.print_status()
        print("\n👋 已停止")
Enter fullscreen mode Exit fullscreen mode

4. 启动心跳发送器

创建 heartbeat_client.py

#!/usr/bin/env python3
"""
Beacon 心跳发送器示例
定期向监控服务器发送心跳信号
"""

import time
import argparse
from beacon_skill import BeaconNode

class HeartbeatClient:
    def __init__(self, server_url):
        self.node = BeaconNode()
        self.server_url = server_url
        self.sequence = 0

    def send_heartbeat(self, message="I'm alive!"):
        """发送心跳"""
        self.sequence += 1

        envelope = {
            "kind": "heartbeat",
            "text": f"{message} (seq: {self.sequence})",
            "ts": time.time(),
            "seq": self.sequence
        }

        try:
            response = self.node.send_webhook(
                url=f"{self.server_url}/beacon/inbox",
                envelope=envelope
            )
            print(f"✅ 心跳 #{self.sequence} 已发送 | 响应: {response.get("kind", "none")}")
            return True
        except Exception as e:
            print(f"❌ 心跳 #{self.sequence} 失败: {e}")
            return False

    def run(self, interval=10, count=None):
        """持续发送心跳"""
        print(f"💓 心跳客户端启动")
        print(f"   Agent ID: {self.node.agent_id}")
        print(f"   目标服务器: {self.server_url}")
        print(f"   间隔: {interval}\n")

        sent = 0
        try:
            while count is None or sent < count:
                if self.send_heartbeat():
                    sent += 1
                time.sleep(interval)
        except KeyboardInterrupt:
            print(f"\n👋 已停止 | 共发送 {sent} 次心跳")

# 运行客户端
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Beacon Heartbeat Client")
    parser.add_argument("--server", default="http://127.0.0.1:8402",
                       help="心跳接收服务器地址")
    parser.add_argument("--interval", type=int, default=10,
                       help="心跳间隔(秒)")
    parser.add_argument("--count", type=int, default=None,
                       help="发送次数(默认无限)")

    args = parser.parse_args()

    client = HeartbeatClient(args.server)
    client.run(interval=args.interval, count=args.count)
Enter fullscreen mode Exit fullscreen mode

5. 运行测试

终端 1 - 启动接收器:

python heartbeat_server.py
Enter fullscreen mode Exit fullscreen mode

终端 2 - 启动发送器:

python heartbeat_client.py --interval 5
Enter fullscreen mode Exit fullscreen mode

你应该看到类似输出:

💓 收到心跳 | Agent: bcn_client_xyz... | 时间: 2026-03-17T20:55:12
✅ 心跳 #1 已发送 | 响应: heartbeat_ack
Enter fullscreen mode Exit fullscreen mode

进阶:局域网发现

如果你想在局域网内自动发现其他 Agent,使用 UDP 广播:

# 发送广播心跳
beacon udp send 255.255.255.255 38400 --broadcast \
  --envelope-kind heartbeat \
  --text "Any agents online?"

# 监听局域网心跳
beacon udp listen --port 38400
Enter fullscreen mode Exit fullscreen mode

安全注意事项

  1. 签名验证:生产环境务必验证信封签名
  2. 重放保护:使用 nonce 防止重放攻击
  3. 时间窗口:设置合理的时间戳容差(建议 ±60 秒)
  4. 速率限制:防止心跳洪泛攻击

总结

Beacon 的心跳机制让 AI Agent 具备了自我意识群体感知能力。通过简单的 webhook 或 UDP 传输,你可以:

  • 监控分布式 Agent 集群健康状态
  • 实现服务发现和故障转移
  • 构建 Agent 社交网络的信任基础

完整代码可在 beacon-skill GitHub 获取。


关于作者:本文是 Beacon 协议系列教程的第一篇。下一篇将介绍如何使用 Beacon 实现 Agent 间的加密支付。

发表于 Dev.to / 2026-03-17

Top comments (0)