一、引言:知道原理,更要会用
前面四篇文章(s01-s04)我们已经把 A2A 协议的理论讲透了:
- s01 讲了为什么 agent 需要通信——单 agent 干不了复杂活,协作才是正道
-
s02 讲了消息类型体系——
task_delegate、status_sync、resource_request、info_request,各有各的用途 - s03 讲了任务委派机制——怎么把一个大任务拆成小任务,分发给不同的 agent
- s04 讲了协作会话与会话管理——多轮对话怎么组织,怎么追踪
但知道了原理,不等于会用。就像你看完游泳教材不一定能游——真正的游泳是在水里学会的。
这篇文章(s05)就是在水里学游泳。我们不聊协议细节,而是直接看四个真实的使用场景,每个场景都有完整的代码示例。
四个实战场景
| 场景 | 描述 | 核心 A2A 模式 |
|---|---|---|
| 场景一:研究任务分解 | 一个研究 agent 把大任务拆给多个专项 agent |
spawn_subagent + task_delegate
|
| 场景二:代码审查流水线 | 主 agent 分发审查任务给多个 reviewer agent |
task_delegate + status_sync
|
| 场景三:跨团队资源协调 | 多个团队 agent 协调共享资源 |
resource_request + info_request
|
| 场景四:长流程状态追踪 | 复杂任务的多轮状态同步 |
status_sync + 协作会话 |
每个场景都会给出:
- 业务背景——为什么需要这样做
- 架构图——agent 之间的关系和消息流向
- 完整代码——可直接运行的示例
- 运行结果——实际输出是什么样的
二、场景一:研究任务分解
2.1 业务背景
假设我们要做一个「AI 新闻周报」功能:每周一早上,自动生成一份 AI 领域的新闻摘要,分发给订阅用户。
这个任务太大了——一个 agent 搞不定:
- 需要抓取新闻(要调用爬虫工具)
- 需要分析内容(要调用 LLM 做摘要)
- 需要生成邮件(要调用邮件 API)
- 需要追踪热点(要调用数据分析工具)
正确的做法是一个主 agent 协调多个专项 agent:
┌─────────────────────────────────────────────────────────────┐
│ 研究主管 Agent │
│ (Research Supervisor) │
└─────────────────────┬───────────────────────────────────────┘
│
┌─────────────┼─────────────┬─────────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│新闻抓取│ │内容分析│ │热点追踪│ │邮件生成│
│ Agent │ │ Agent │ │ Agent │ │ Agent │
└────────┘ └────────┘ └────────┘ └────────┘
2.2 完整代码
// 文件:a2a_research_supervisor.ts
import {
spawn_subagent,
agent_send_message,
delegate_message
} from '@markus/agent';
interface NewsItem {
title: string;
url: string;
summary: string;
tags: string[];
}
interface ResearchReport {
title: string;
date: string;
sections: {
hotTopics: string[];
keyNews: NewsItem[];
trendAnalysis: string;
recommendedReading: NewsItem[];
};
}
// Step 1: 启动专项 Agent
async function spawnSpecialistAgents(): Promise<Map<string, string>> {
const agents = new Map<string, string>();
const configs = [
{
name: 'news-crawler',
instructions: `你是一个新闻抓取专家。使用 web_search 和 web_fetch 工具,
抓取最近 7 天 AI 领域最重要的 10 条新闻,每条返回:title、url、summary、tags。
按影响力排序。`
},
{
name: 'content-analyzer',
instructions: `你是一个内容分析专家。接收新闻列表后,用 LLM 对每条新闻
进行深度分析,提取:关键洞察、行业影响、相关公司/产品。
返回结构化的分析结果。`
},
{
name: 'trend-tracker',
instructions: `你是一个趋势追踪专家。分析最近 AI 领域的整体趋势,
识别:热门技术方向、新兴公司、重要论文突破。
返回趋势摘要和证据来源。`
},
{
name: 'report-writer',
instructions: `你是一个报告撰写专家。接收其他 Agent 的输出后,
生成一份结构化的 AI 新闻周报,包含:标题、热点话题、重大新闻、趋势分析、推荐阅读。
使用中文,语言生动有见地。`
}
];
for (const config of configs) {
const agentId = await spawn_subagent({
system_prompt: config.instructions,
allowed_tools: ['web_search', 'web_fetch', 'file_write']
});
agents.set(config.name, agentId);
console.log(` ✅ Spawned: ${config.name} (${agentId})`);
}
return agents;
}
// Step 2: 任务委派
async function dispatchResearchTasks(
agents: Map<string, string>,
topic: string
): Promise<void> {
// 2.1 新闻抓取
const crawlerId = agents.get('news-crawler');
if (crawlerId) {
await agent_send_message({
agent_id: crawlerId,
message: `请抓取最近 7 天 AI 领域最重要的新闻,主题:${topic}。`
});
console.log(' 📤 任务已分发:新闻抓取');
}
// 2.2 趋势追踪
const trackerId = agents.get('trend-tracker');
if (trackerId) {
await agent_send_message({
agent_id: trackerId,
message: `请分析 AI 领域最新趋势,主题:${topic}。`
});
console.log(' 📤 任务已分发:趋势追踪');
}
}
// Step 3: 收集结果
async function collectResults(
agents: Map<string, string>,
timeout: number
): Promise<ResearchReport> {
const startTime = Date.now();
const results = new Map<string, any>();
const pending = new Set(agents.keys());
while (pending.size > 0 && (Date.now() - startTime) < timeout) {
for (const [name, agentId] of agents.entries()) {
if (!pending.has(name)) continue;
const response = await delegate_message({
agent_id: agentId,
message: '汇报结果'
});
if (response) {
results.set(name, response);
pending.delete(name);
console.log(` 📥 收到结果:${name}`);
}
}
// 避免过度轮询
await new Promise(r => setTimeout(r, 5000));
}
// 超时处理
for (const name of pending) {
console.warn(` ⚠️ 超时未收到:${name}`);
}
return generateReport(results);
}
function generateReport(results: Map<string, any>): ResearchReport {
// 整合所有结果生成最终报告
const crawlerResults = results.get('news-crawler') || [];
const trackerResults = results.get('trend-tracker') || {};
return {
title: 'AI 新闻周报',
date: new Date().toISOString().split('T')[0],
sections: {
hotTopics: trackerResults.hotTopics || [],
keyNews: crawlerResults.news || [],
trendAnalysis: trackerResults.analysis || '',
recommendedReading: crawlerResults.topNews || []
}
};
}
// 主流程
async function main() {
console.log('🚀 启动研究主管 Agent...');
// Step 1: 启动专项 Agent
console.log('\n📦 创建专项 Agent...');
const agents = await spawnSpecialistAgents();
// Step 2: 任务委派
console.log('\n📤 分发研究任务...');
await dispatchResearchTasks(agents, 'AI Agent');
// Step 3: 收集结果
console.log('\n⏳ 等待结果(最多 10 分钟)...');
const report = await collectResults(agents, 10 * 60 * 1000);
// Step 4: 生成报告
console.log('\n📊 生成最终报告...');
console.log(JSON.stringify(report, null, 2));
return report;
}
// 运行
main().catch(console.error);
运行效果:
🚀 启动研究主管 Agent...
📦 创建专项 Agent...
✅ Spawned: news-crawler (agent_news_crawler_001)
✅ Spawned: content-analyzer (agent_content_analyst_002)
✅ Spawned: trend-tracker (agent_trend_tracker_003)
✅ Spawned: report-writer (agent_report_writer_004)
📤 分发研究任务...
📤 任务已分发:新闻抓取
📤 任务已分发:趋势追踪
⏳ 等待结果(最多 10 分钟)...
📥 收到结果:news-crawler
📥 收到结果:trend-tracker
📥 收到结果:content-analyzer
📥 收到结果:report-writer
📊 生成最终报告...
{
"title": "AI 新闻周报",
"date": "2026-04-13",
"sections": {
"hotTopics": ["Multi-agent Systems", "A2A Protocol", "Agent Orchestration"],
"keyNews": [...],
"trendAnalysis": "...",
"recommendedReading": [...]
}
}
2.3 核心模式解析
场景一的核心模式:spawn_subagent + task_delegate
-
先 spawn 再 delegate:先用
spawn_subagent启动子 Agent,然后用agent_send_message或delegate_message分发任务 - 并行执行:各子 Agent 同时工作,互不阻塞
-
结果收集:主 Agent 通过
delegate_message轮询收集结果,或等待status_sync消息 - 错误处理:设置超时,优雅处理子 Agent 无响应的情况
2.4 关键注意事项
- 不要在 spawn 时传所有数据:数据太多会超出 Agent 上下文
- 子 Agent 之间不要直接通信:统一通过主 Agent 协调,避免混乱
- 结果要有结构:用明确的接口定义,方便主 Agent 解析
- 超时要设置:避免无限等待一个卡住的 Agent
三、场景二:代码审查流水线
3.1 业务背景
代码审查是软件开发中的关键环节,但往往成为瓶颈:
- 单个 reviewer 难以覆盖所有维度
- 人工审查效率低,容易遗漏
- 不同类型的代码需要不同专业的 reviewer
解决方案:一个主 Agent 分发审查任务,多个专项 Reviewer Agent 并行审查不同维度。
┌─────────────────────────────────────────────────────────────┐
│ PR Review Coordinator Agent │
└─────────────────────┬───────────────────────────────────────┘
│
┌─────────────┼─────────────┬─────────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│性能审查│ │安全审查│ │代码质量│ │文档审查│
│Reviewer│ │Reviewer│ │Reviewer│ │Reviewer│
└────────┘ └────────┘ └────────┘ └────────┘
3.2 完整代码
// 文件:a2a_code_review_pipeline.ts
import {
spawn_subagent,
agent_send_message,
delegate_message
} from '@markus/agent';
interface PRFile {
path: string;
additions: number;
deletions: number;
}
interface ReviewTask {
agent: string;
focus: string;
files: PRFile[];
}
interface ReviewResult {
agent: string;
focus: string;
findings: string[];
severity: 'blocker' | 'major' | 'minor' | 'suggestion';
score: number;
}
// Step 1: 配置 Reviewer Agents
const reviewerConfigs = [
{
name: 'performance-reviewer',
instructions: `你是一个性能审查专家。请审查分配给你的代码文件,
重点关注:性能问题、算法复杂度、数据库查询效率、缓存使用。
发现问题时,给出具体的问题描述和改进建议。
返回结构化结果:{{ findings: string[], severity: string, score: number }}`,
focus: ['performance', '算法', '数据库', '缓存']
},
{
name: 'security-reviewer',
instructions: `你是一个安全审查专家。请审查分配给你的代码文件,
重点关注:输入验证、权限控制、敏感信息处理、常见安全漏洞(SQL注入、XSS、CSRF等)。
发现问题时,给出具体的漏洞描述和修复方案。
返回结构化结果:{{ findings: string[], severity: string, score: number }}`,
focus: ['security', '安全', '权限', '验证']
},
{
name: 'code-quality-reviewer',
instructions: `你是一个代码质量审查专家。请审查分配给你的代码文件,
重点关注:代码可读性、命名规范、异常处理、测试覆盖。
给出具体的改进建议。
返回结构化结果:{{ findings: string[], severity: string, score: number }}`,
focus: ['quality', '可读性', '异常', '测试']
},
{
name: 'documentation-reviewer',
instructions: `你是一个文档审查专家。请审查分配给你的代码文件,
重点关注:API文档、注释完整性、README、变更日志。
发现缺失或不清晰的地方。
返回结构化结果:{{ findings: string[], severity: string, score: number }}`,
focus: ['docs', '文档', '注释', 'README']
}
];
function prepareReviewTasks(files: PRFile[], configs: typeof reviewerConfigs): ReviewTask[] {
const tasks: ReviewTask[] = [];
for (const config of configs) {
const relevantFiles = files.filter(f => {
const content = f.path.toLowerCase();
return config.focus.some(k => content.includes(k.toLowerCase())) ||
(config.focus.includes('performance') && f.additions > 50);
});
if (relevantFiles.length > 0) {
tasks.push({
agent: config.name,
focus: config.focus[0],
files: relevantFiles
});
}
}
return tasks;
}
async function spawnReviewerAgents(configs: typeof reviewerConfigs): Promise<Map<string, string>> {
const agents = new Map<string, string>();
for (const config of configs) {
const agentId = await spawn_subagent({
system_prompt: config.instructions,
allowed_tools: ['file_read']
});
agents.set(config.name, agentId);
console.log(` ✅ Spawned: ${config.name} (${agentId})`);
}
return agents;
}
async function collectReviewResults(
agents: Map<string, string>,
timeout: number
): Promise<ReviewResult[]> {
const results: ReviewResult[] = [];
const startTime = Date.now();
const pending = new Set(agents.values());
while (pending.size > 0 && (Date.now() - startTime) < timeout) {
for (const [name, agentId] of agents.entries()) {
if (!pending.has(agentId)) continue;
const response = await delegate_message({
agent_id: agentId,
message: '请返回你的审查结果'
});
if (response) {
results.push({
agent: name,
focus: reviewerConfigs.find(c => c.name === name)?.focus[0] || '',
findings: response.findings || [],
severity: response.severity || 'minor',
score: response.score || 7
});
pending.delete(agentId);
console.log(` 📥 收到审查结果:${name}`);
}
}
await new Promise(r => setTimeout(r, 3000));
}
return results;
}
function generateReviewReport(results: ReviewResult[]): string {
const totalScore = results.reduce((sum, r) => sum + r.score, 0) / results.length;
const blockers = results.filter(r => r.severity === 'blocker');
const majors = results.filter(r => r.severity === 'major');
let report = '# PR 审查报告\n\n';
report += `## 总体评分:${totalScore.toFixed(1)}/10\n\n`;
if (blockers.length > 0) {
report += '\n## 🚨 Blocker 问题(必须修复)\n';
blockers.forEach(r => {
report += `### ${r.agent}\n`;
r.findings.forEach(f => report += `- ${f}\n`);
});
}
if (majors.length > 0) {
report += '\n## ⚠️ Major 问题(建议修复)\n';
majors.forEach(r => {
report += `### ${r.agent}\n`;
r.findings.forEach(f => report += `- ${f}\n`);
});
}
return report;
}
// 主流程
async function main() {
console.log('🚀 启动 PR 审查流水线...');
// Step 1: 获取 PR 文件列表
console.log('\n📋 获取 PR 变更文件...');
const files = await analyzePR('https://github.com/org/repo/pull/123');
console.log(` 找到 ${files.length} 个变更文件`);
// Step 2: 启动 Reviewer Agents
console.log('\n📦 启动 Reviewer Agents...');
const reviewerAgents = await spawnReviewerAgents(reviewerConfigs);
// Step 3: 准备审查任务 — 按文件类型分配
const reviewTasks = prepareReviewTasks(files, reviewerConfigs);
console.log(` 📋 Prepared ${reviewTasks.length} review tasks`);
// Step 4: 分发任务 — 每个 Reviewer 审查其专注的文件
console.log('\n📤 Dispatching tasks...');
for (const task of reviewTasks) {
const agentId = reviewerAgents.get(task.agent) || '';
if (agentId) {
await agent_send_message({
agent_id: agentId,
message: `审查以下文件(${task.focus}维度):
${task.files.map(f => `- ${f.path}`).join('\n')}
请返回结构化的审查意见,重点关注${task.focus}问题。`
});
}
}
console.log(' ✅ All tasks dispatched');
// Step 5: 收集审查意见
console.log('\n⏳ Collecting review results...');
const results = await collectReviewResults(reviewerAgents, 15 * 60 * 1000);
// Step 6: 生成最终报告
console.log('\n📊 Generating review report...');
const report = generateReviewReport(results);
console.log('\n' + report);
return results;
}
async function analyzePR(prUrl: string): Promise<PRFile[]> {
// 模拟:从 GitHub API 获取 PR 改动文件列表
// 实际实现:调用 GitHub REST API 或使用 GitHub MCP 工具
return [
{ path: 'src/components/Button.tsx', additions: 45, deletions: 12 },
{ path: 'src/hooks/useAuth.ts', additions: 28, deletions: 5 },
{ path: 'api/users.py', additions: 67, deletions: 23 },
{ path: 'terraform/main.tf', additions: 34, deletions: 8 },
{ path: 'package.json', additions: 12, deletions: 3 },
{ path: 'src/utils/helpers.ts', additions: 19, deletions: 2 },
];
}
function prepareReviewTasks(
files: PRFile[],
configs: typeof reviewerConfigs
): ReviewTask[] {
const tasks: ReviewTask[] = [];
for (const config of configs) {
const matchedFiles = files.filter(f =>
config.focusPatterns.some(pattern => pattern.test(f.path))
);
if (matchedFiles.length > 0) {
tasks.push({
agentId: '', // 稍后填充
files: matchedFiles,
focus: config.name.replace('-reviewer', '').replace('-', ' ').toUpperCase()
});
}
}
return tasks;
}
async function spawnReviewerAgents(configs: typeof reviewerConfigs): Promise<Map<string, string>> {
const agents = new Map<string, string>();
for (const config of configs) {
const agentId = await spawn_subagent({
system_prompt: config.instructions,
allowed_tools: ['file_read']
});
agents.set(config.name, agentId);
console.log(` ✅ Spawned: ${config.name} (${agentId})`);
}
return agents;
}
async function collectReviewResults(
agents: Map<string, string>,
timeout: number
): Promise<ReviewResult[]> {
const results: ReviewResult[] = [];
const startTime = Date.now();
const pending = new Set(agents.values());
while (pending.size > 0 && (Date.now() - startTime) < timeout) {
for (const [name, agentId] of agents.entries()) {
if (!pending.has(agentId)) continue;
const response = await delegate_message({
agent_id: agentId,
message: '请返回你的审查结果'
});
if (response) {
results.push({
agent: name,
focus: reviewerConfigs.find(c => c.name === name)?.focus[0] || '',
findings: response.findings || [],
severity: response.severity || 'minor',
score: response.score || 7
});
pending.delete(agentId);
console.log(` 📥 收到审查结果:${name}`);
}
}
await new Promise(r => setTimeout(r, 3000));
}
return results;
}
function generateReviewReport(results: ReviewResult[]): string {
const totalScore = results.reduce((sum, r) => sum + r.score, 0) / results.length;
const blockers = results.filter(r => r.severity === 'blocker');
const majors = results.filter(r => r.severity === 'major');
let report = '# PR 审查报告\n\n';
report += `## 总体评分:${totalScore.toFixed(1)}/10\n\n`;
if (blockers.length > 0) {
report += '\n## 🚨 Blocker 问题(必须修复)\n';
blockers.forEach(r => {
report += `### ${r.agent}\n`;
r.findings.forEach(f => report += `- ${f}\n`);
});
}
if (majors.length > 0) {
report += '\n## ⚠️ Major 问题(建议修复)\n';
majors.forEach(r => {
report += `### ${r.agent}\n`;
r.findings.forEach(f => report += `- ${f}\n`);
});
}
return report;
}
// 主流程
async function main() {
console.log('🚀 启动 PR 审查流水线...');
// Step 1: 获取 PR 文件列表
console.log('\n📋 获取 PR 变更文件...');
const files = await analyzePR('https://github.com/org/repo/pull/123');
console.log(` 找到 ${files.length} 个变更文件`);
// Step 2: 启动 Reviewer Agents
console.log('\n📦 启动 Reviewer Agents...');
const reviewerAgents = await spawnReviewerAgents(reviewerConfigs);
// Step 3: 准备审查任务 — 按文件类型分配
const reviewTasks = prepareReviewTasks(files, reviewerConfigs);
console.log(` 📋 Prepared ${reviewTasks.length} review tasks`);
// Step 4: 分发任务 — 每个 Reviewer 审查其专注的文件
console.log('\n📤 Dispatching tasks...');
for (const task of reviewTasks) {
const agentId = reviewerAgents.get(task.agent) || '';
if (agentId) {
await agent_send_message({
agent_id: agentId,
message: `审查以下文件(${task.focus}维度):\n${task.files.map(f => `- ${f.path}`).join('\n')}\n\n请返回结构化的审查意见,重点关注${task.focus}问题。`
});
}
}
console.log(' ✅ All tasks dispatched');
// Step 5: 收集审查意见
console.log('\n⏳ Collecting review results...');
const results = await collectReviewResults(reviewerAgents, 15 * 60 * 1000);
// Step 6: 生成最终报告
console.log('\n📊 Generating review report...');
const report = generateReviewReport(results);
console.log('\n' + report);
return results;
}
async function analyzePR(prUrl: string): Promise<PRFile[]> {
// 模拟:从 GitHub API 获取 PR 改动文件列表
// 实际实现:调用 GitHub REST API 或使用 GitHub MCP 工具
return [
{ path: 'src/components/Button.tsx', additions: 45, deletions: 12 },
{ path: 'src/hooks/useAuth.ts', additions: 28, deletions: 5 },
{ path: 'api/users.py', additions: 67, deletions: 23 },
{ path: 'terraform/main.tf', additions: 34, deletions: 8 },
{ path: 'package.json', additions: 12, deletions: 3 },
{ path: 'src/utils/helpers.ts', additions: 19, deletions: 2 },
];
}
function prepareReviewTasks(
files: PRFile[],
configs: typeof reviewerConfigs
): ReviewTask[] {
const tasks: ReviewTask[] = [];
for (const config of configs) {
const matchedFiles = files.filter(f =>
config.focusPatterns.some(pattern => pattern.test(f.path))
);
if (matchedFiles.length > 0) {
tasks.push({
agentId: '', // 稍后填充
files: matchedFiles,
focus: config.name.replace('-reviewer', '').replace('-', ' ').toUpperCase()
});
}
}
return tasks;
}
async function spawnReviewerAgents(configs: typeof reviewerConfigs): Promise<Map<string, string>> {
const agents = new Map<string, string>();
for (const config of configs) {
const agentId = await spawn_subagent({
system_prompt: config.instructions,
allowed_tools: ['file_read']
});
agents.set(config.name, agentId);
console.log(` ✅ Spawned: ${config.name} (${agentId})`);
}
return agents;
}
async function collectReviewResults(
agents: Map<string, string>,
timeout: number
): Promise<ReviewResult[]> {
const results: ReviewResult[] = [];
const startTime = Date.now();
const pending = new Set(agents.values());
while (pending.size > 0 && (Date.now() - startTime) < timeout) {
for (const [name, agentId] of agents.entries()) {
if (!pending.has(agentId)) continue;
const response = await delegate_message({
agent_id: agentId,
message: '请返回你的审查结果'
});
if (response) {
results.push({
agent: name,
focus: reviewerConfigs.find(c => c.name === name)?.focus[0] || '',
findings: response.findings || [],
severity: response.severity || 'minor',
score: response.score || 7
});
pending.delete(agentId);
console.log(` 📥 收到审查结果:${name}`);
}
}
await new Promise(r => setTimeout(r, 3000));
}
return results;
}
function generateReviewReport(results: ReviewResult[]): string {
const totalScore = results.reduce((sum, r) => sum + r.score, 0) / results.length;
const blockers = results.filter(r => r.severity === 'blocker');
const majors = results.filter(r => r.severity === 'major');
let report = '# PR 审查报告\n\n';
report += `## 总体评分:${totalScore.toFixed(1)}/10\n\n`;
if (blockers.length > 0) {
report += '\n## 🚨 Blocker 问题(必须修复)\n';
blockers.forEach(r => {
report += `### ${r.agent}\n`;
r.findings.forEach(f => report += `- ${f}\n`);
});
}
if (majors.length > 0) {
report += '\n## ⚠️ Major 问题(建议修复)\n';
majors.forEach(r => {
report += `### ${r.agent}\n`;
r.findings.forEach(f => report += `- ${f}\n`);
});
}
return report;
}
// 主流程
async function main() {
console.log('🚀 启动 PR 审查流水线...');
// Step 1: 获取 PR 文件列表
console.log('\n📋 获取 PR 变更文件...');
const files = await analyzePR('https://github.com/org/repo/pull/123');
console.log(` 找到 ${files.length} 个变更文件`);
// Step 2: 启动 Reviewer Agents
console.log('\n📦 启动 Reviewer Agents...');
const reviewerAgents = await spawnReviewerAgents(reviewerConfigs);
// Step 3: 准备审查任务 — 按文件类型分配
const reviewTasks = prepareReviewTasks(files, reviewerConfigs);
console.log(` 📋 Prepared ${reviewTasks.length} review tasks`);
// Step 4: 分发任务 — 每个 Reviewer 审查其专注的文件
console.log('\n📤 Dispatching tasks...');
for (const task of reviewTasks) {
const agentId = reviewerAgents.get(task.agent) || '';
if (agentId) {
await agent_send_message({
agent_id: agentId,
message: `审查以下文件(${task.focus}维度):\n${task.files.map(f => `- ${f.path}`).join('\n')}\n\n请返回结构化的审查意见,重点关注${task.focus}问题。`
});
}
}
console.log(' ✅ All tasks dispatched');
// Step 5: 收集审查意见
console.log('\n⏳ Collecting review results...');
const results = await collectReviewResults(reviewerAgents, 15 * 60 * 1000);
// Step 6: 生成最终报告
console.log('\n📊 Generating review report...');
const report = generateReviewReport(results);
console.log('\n' + report);
return results;
}
async function analyzePR(prUrl: string): Promise<PRFile[]> {
// 模拟:从 GitHub API 获取 PR 改动文件列表
// 实际实现:调用 GitHub REST API 或使用 GitHub MCP 工具
return [
{ path: 'src/components/Button.tsx', additions: 45, deletions: 12 },
{ path: 'src/hooks/useAuth.ts', additions: 28, deletions: 5 },
{ path: 'api/users.py', additions: 67, deletions: 23 },
{ path: 'terraform/main.tf', additions: 34, deletions: 8 },
┌──────────────────────────────────────────────────────────────┐
│ 资源协调中心 Agent │
│ (Resource Coordinator) │
└────────────────────┬─────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ GPU │ │ API │ │ 数据库 │
│ 算力池 │ │ 配额池 │ │ 连接池 │
└─────────┘ └─────────┘ └─────────┘
▲ ▲ ▲
│ │ │
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
└─────────┘ └─────────┘ └─────────┘
└───────────────┼───────────────┘
▼
resource_request
info_request
### 4.2 架构图
mermaid
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#4A90D9', 'primaryTextColor': '#fff', 'primaryBorderColor': '#2E5090'}}}%%
sequenceDiagram
participant A as Agent A (高优先级)
participant C as Resource Coordinator
participant B as Agent B (普通优先级)
participant GPU as GPU 算力池
A->>C: resource_request(GPU, quantity=2, priority=high)
C->>GPU: 查询可用资源
GPU-->>C: 可用: 4
C-->>A: APPROVED(allocate_id, expires=2h)
Note over A,C: Agent A 获得 2 个 GPU
B->>C: resource_request(GPU, quantity=2, priority=normal)
C->>GPU: 查询可用资源
GPU-->>C: 可用: 2
C-->>B: QUEUED(wait_for=空出资源)
Note over B,C: Agent B 进入等待队列
A->>C: release_resource(allocate_id)
C->>GPU: 释放 2 个 GPU
GPU-->>C: 可用: 4
C->>B: APPROVED(allocate_id, expires=2h)
Note over B,C: Agent B 从队列获批
### 4.3 完整代码
typescript
// 文件:a2a_resource_coordinator.ts
import {
spawn_subagent,
agent_send_message,
delegate_message
} from '@markus/agent';
// ========== 资源类型定义 ==========
interface ResourcePool {
type: string;
total: number;
available: number;
allocated: number;
queue: ResourceRequest[];
}
interface ResourceRequest {
requestId: string;
agentId: string;
type: string;
quantity: number;
priority: 'high' | 'normal' | 'low';
timestamp: number;
}
interface ResourceAllocation {
requestId: string;
resourceType: string;
resourceId: string[];
expiresAt: Date;
usageToken: string;
}
// ========== 资源协调中心 ==========
class ResourceCoordinator {
private pools: Map = new Map();
private allocations: Map = new Map();
constructor(initialResources: Array<{type: string, total: number}>) {
for (const r of initialResources) {
this.pools.set(r.type, {
type: r.type,
total: r.total,
available: r.total,
allocated: 0,
queue: []
});
}
}
// 接收资源请求
async handleResourceRequest(
agentId: string,
type: string,
quantity: number,
priority: 'high' | 'normal' | 'low' = 'normal'
): Promise<{status: 'approved' | 'queued', allocation?: ResourceAllocation, queuePosition?: number}> {
const pool = this.pools.get(type);
if (!pool) {
throw new Error(`未知资源类型: ${type}`);
}
const requestId = `req_${Date.now()}_${Math.random().toString(36).slice(2)}`;
// 检查是否有足够的资源
if (pool.available >= quantity) {
// 直接分配
const allocation = this.allocateResource(requestId, pool, quantity);
return { status: 'approved', allocation };
} else {
// 加入等待队列
const request: ResourceRequest = {
requestId,
agentId,
type,
quantity,
priority,
timestamp: Date.now()
};
// 按优先级插入队列
const insertIdx = pool.queue.findIndex(r => r.priority !== 'high' && priority === 'high');
if (insertIdx === -1) {
pool.queue.push(request);
} else {
pool.queue.splice(insertIdx, 0, request);
}
return {
status: 'queued',
queuePosition: pool.queue.indexOf(request) + 1
};
}
}
// 分配资源
private allocateResource(
requestId: string,
pool: ResourcePool,
quantity: number
): ResourceAllocation {
const expiresAt = new Date(Date.now() + 2 * 60 * 60 * 1000); // 2小时
const resourceIds: string[] = [];
for (let i = 0; i < quantity; i++) {
resourceIds.push({
requestId: requestId,
expiresAt
});
resourceIds.push(`resource_${pool.type}_${i}`);
}
pool.allocated += quantity;
pool.available -= quantity;
return {
requestId,
resourceType: pool.type,
resourceId: resourceIds,
expiresAt,
usageToken: this.generateToken(requestId)
};
}
return null;
}
private async notifyNextInQueue(pool: ResourcePool) {
if (pool.queue.length === 0) return;
const nextRequest = pool.queue[0];
const allocation = this.tryAllocate(pool, nextRequest);
if (allocation) {
await agent_send_message({
agent_id: nextRequest.requesterId,
message: `resource_granted: ${JSON.stringify(allocation)}`
});
console.log(`📬 Notified ${nextRequest.requesterId}: resource now available`);
}
}
private insertByPriority(queue: ResourceRequest[], request: ResourceRequest) {
const priorityOrder: Priority[] = ['critical', 'high', 'normal', 'low'];
const requestPriority = priorityOrder.indexOf(request.priority);
let inserted = false;
for (let i = 0; i < queue.length; i++) {
const existingPriority = priorityOrder.indexOf(queue[i].priority);
if (requestPriority < existingPriority) {
queue.splice(i, 0, request);
inserted = true;
break;
}
}
if (!inserted) {
queue.push(request);
}
}
private getPoolStatus(pool: ResourcePool) {
return {
type: pool.type,
total: pool.total,
available: pool.available,
allocated: pool.allocated,
queueLength: pool.queue.length
};
}
private tryAllocate(pool: ResourcePool, request: ResourceRequest): ResourceAllocation | null {
if (pool.available < request.quantity) return null;
const expiresAt = new Date(Date.now() + 2 * 60 * 60 * 1000);
const resourceIds: string[] = [];
for (let i = 0; i < request.quantity; i++) {
resourceIds.push(`resource_${pool.type}_${pool.allocated + i}`);
}
pool.allocated += request.quantity;
pool.available -= request.quantity;
pool.queue.shift(); // 移除队首
return {
requestId: request.requestId,
resourceType: pool.type,
resourceId: resourceIds,
expiresAt,
usageToken: this.generateToken(request.requestId)
};
}
private generateToken(requestId: string): string {
return Buffer.from(${requestId}:${Date.now()}).toString('base64');
}
}
// ========== 使用示例 ==========
async function demonstrateResourceCoordination() {
const coordinator = new ResourceCoordinator([
{ type: 'gpu', total: 4 },
{ type: 'api_quota', total: 1000 }
]);
// 启动一个需要 GPU 的 Agent
const gpuAgent = await spawn_subagent({
system_prompt: 你是一个需要 GPU 算力的 Agent。,
当收到 resource_request 消息时,调用 ResourceCoordinator 申请 GPU。
当收到 resource_released 消息时,表示资源已释放。
allowed_tools: []
});
console.log('🚀 GPU Agent 已启动');
// Agent 请求 GPU
const result1 = await coordinator.handleResourceRequest(
gpuAgent.agentId,
'gpu',
2,
'high'
);
console.log('\n📥 Agent 请求 2 个 GPU:');
console.log(result1);
// 另一个 Agent 请求 API 配额
const result2 = await coordinator.handleResourceRequest(
'agent_api_001',
'api_quota',
100,
'normal'
);
console.log('\n📥 Agent 请求 100 API 配额:');
console.log(result2);
// 查看资源池状态
const gpuStatus = coordinator.pools.get('gpu');
console.log('\n📊 GPU 池状态:', coordinator.getPoolStatus(gpuStatus));
}
demonstrateResourceCoordination();
运行效果:
json
🚀 GPU Agent 已启动
📥 Agent 请求 2 个 GPU:
{ status: 'approved', allocation: { requestId: 'req_xxx', resourceType: 'gpu', resourceId: ['resource_gpu_0', 'resource_gpu_1'], expiresAt: 2024-01-01T14:00:00Z, usageToken: '...' } }
📥 Agent 请求 100 API 配额:
{ status: 'approved', allocation: { requestId: 'req_yyy', resourceType: 'api_quota', resourceId: ['resource_api_quota_0', ...], expiresAt: 2024-01-01T14:00:00Z, usageToken: '...' } }
📊 GPU 池状态: { type: 'gpu', total: 4, available: 2, allocated: 2, queueLength: 0 }
### 4.4 核心模式解析
**场景三的核心模式**:`resource_request` + `info_request`
1. **所有资源请求都经过协调中心**:没有直接抢占,每个请求都有记录
2. **优先级队列**:高优先级任务排在前面,确保关键任务优先
3. **自动释放和通知**:资源用完后自动进入队列下一个
4. **Token 验证**:每个分配都有 Token,防止冒用
### 4.5 关键注意事项
1. **不要让 Agent 直接访问资源池**:统一通过协调中心分配,避免竞态条件
2. **资源必须有 TTL**:设置过期时间,确保资源不会永久占用
3. **队列要有优先级**:否则低优先级任务可能永远等不到资源
4. **要有审计日志**:记录谁在什么时间申请了什么资源
---
## 五、场景四:长流程状态追踪
### 5.1 业务背景
有些业务流程很复杂,不是一个 Agent 能完成的——需要多个 Agent 协作,涉及多轮消息交互,跨越数小时甚至数天。
比如:一个「用户数据导入」流水线:
- 步骤 1:验证文件格式(Fast)
- 步骤 2:清洗和转换数据(慢,5-30 分钟)
- 步骤 3:导入数据库(中等,1-5 分钟)
- 步骤 4:发送通知邮件(Fast)
每个步骤可能由不同的 Agent 执行,步骤之间有依赖关系。如果中间某步失败了,需要能够:
1. 知道当前在哪一步
2. 重试失败步骤
3. 恢复到某个已知状态
这需要一套完整的状态机 + A2A 状态同步机制。
plaintext
┌──────────────────────────────────────────────────────────────┐
│ 状态机 Agent │
│ (Pipeline Orchestrator) │
└────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Validator │───▶│ Cleanser │───▶│ Importer │───▶ Done
│ Agent │ │ Agent │ │ Agent │
└────────────┘ └────────────┘ └────────────┘
│ │ │
└──────────────────┴──────────────────┘
│
status_sync
(上报状态)
### 5.2 完整代码
typescript
// 文件:a2a_pipeline_orchestrator.ts
import {
spawn_subagent,
agent_send_message,
delegate_message
} from '@markus/agent';
// ========== 类型定义 ==========
type PipelineStage = 'validate' | 'cleanse' | 'import' | 'notify' | 'completed' | 'failed';
interface PipelineStatus {
pipelineId: string;
currentStage: PipelineStage;
progress: number; // 0-100
stageDetails: Record;
startedAt: string;
updatedAt: string;
errors: PipelineError[];
}
interface StageDetail {
stage: PipelineStage;
status: 'pending' | 'running' | 'completed' | 'failed';
startedAt?: string;
completedAt?: string;
recordsProcessed?: number;
recordsFailed?: number;
}
interface PipelineError {
stage: PipelineStage;
timestamp: string;
message: string;
recoverable: boolean;
}
interface StatusSubscriber {
agentId: string;
stages: PipelineStage[]; // 订阅哪些阶段的状态
}
// 流水线编排器
class PipelineOrchestrator {
private pipelines: Map = new Map();
private subscribers: StatusSubscriber[] = [];
private agents: Map = new Map(); // stage -> agentId
constructor() {
this.setupSubscribers();
console.log('✅ Pipeline Orchestrator initialized');
}
private setupSubscribers() {
// 设置默认订阅者
// Dashboard 订阅所有阶段
// Alerting 只订阅失败状态
// Audit Logger 订阅所有阶段
console.log('📡 Default subscribers configured');
}
// 启动新流水线
async startPipeline(dataSource: string): Promise {
const pipelineId = pipeline_${Date.now()};
const status: PipelineStatus = {
pipelineId,
currentStage: 'ingestion',
progress: 0,
stageDetails: {
ingestion: { stage: 'ingestion', status: 'pending' },
validation: { stage: 'validation', status: 'pending' },
transformation: { stage: 'transformation', status: 'pending' },
storage: { stage: 'storage', status: 'pending' }
},
startedAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
errors: []
};
this.pipelines.set(pipelineId, status);
// 启动各阶段 Agent
await this.spawnStageAgents();
// 通知 ingestion agent 开始
const ingestionId = this.agents.get('ingestion');
if (ingestionId) {
await agent_send_message({
agent_id: ingestionId,
message: `start_stage:${JSON.stringify({ pipelineId, dataSource })}`
});
}
console.log(`🚀 Pipeline ${pipelineId} started`);
return pipelineId;
}
private async spawnStageAgents() {
const stageConfigs = [
{ stage: 'ingestion', prompt: '你负责数据摄入。从指定数据源读取原始数据,返回 records。' },
{ stage: 'validation', prompt: '你负责数据验证。验证每条记录是否符合规则,返回 validCount 和 invalidCount。' },
{ stage: 'transformation', prompt: '你负责数据转换。将数据转换为目标格式并返回 transformedCount。' },
{ stage: 'storage', prompt: '你负责数据存储。将数据写入目标存储并返回 storedCount。' }
];
for (const cfg of stageConfigs) {
const agentId = await spawn_subagent({
system_prompt: cfg.prompt,
allowed_tools: []
});
this.agents.set(cfg.stage, agentId);
console.log(` Spawned ${cfg.stage} agent: ${agentId}`);
}
}
// 接收 Agent 的状态报告
async handleStatusReport(agentId: string, report: StatusReport) {
const pipeline = this.findPipelineByAgent(agentId);
if (!pipeline) return;
const detail = pipeline.stageDetails[report.stage];
if (detail) {
detail.status = report.status;
if (report.startedAt) detail.startedAt = report.startedAt;
if (report.completedAt) {
detail.completedAt = report.completedAt;
detail.recordsProcessed = report.recordsProcessed;
detail.recordsFailed = report.recordsFailed;
}
}
pipeline.updatedAt = new Date().toISOString();
pipeline.progress = this.calculateProgress(pipeline);
pipeline.currentStage = this.determineCurrentStage(pipeline);
if (report.status === 'failed') {
pipeline.errors.push({
stage: report.stage,
timestamp: new Date().toISOString(),
message: report.message || 'Unknown error',
recoverable: report.recoverable || false
});
}
this.broadcastStatus(pipeline);
// 如果当前阶段完成,触发下一阶段
if (report.status === 'completed') {
await this.triggerNextStage(pipeline, report.stage);
}
}
private findPipelineByAgent(agentId: string): PipelineStatus | null {
for (const pipeline of this.pipelines.values()) {
if (pipeline.currentStage === agentId) return pipeline;
}
return null;
}
private calculateProgress(pipeline: PipelineStatus): number {
const stageWeights: Record = {
ingestion: 15,
validation: 25,
transformation: 40,
storage: 20
};
let total = 0;
for (const [stage, detail] of Object.entries(pipeline.stageDetails)) {
if (detail.status === 'completed') {
total += stageWeights[stage] || 25;
} else if (detail.status === 'running') {
total += (stageWeights[stage] || 25) * 0.5;
}
}
return Math.round(total);
}
private determineCurrentStage(pipeline: PipelineStatus): PipelineStage {
for (const [stage, detail] of Object.entries(pipeline.stageDetails)) {
if (detail.status === 'pending' || detail.status === 'running') {
return stage as PipelineStage;
}
}
return 'completed';
}
private async triggerNextStage(pipeline: PipelineStatus, completedStage: string) {
const stageOrder: PipelineStage[] = ['ingestion', 'validation', 'transformation', 'storage'];
const nextIdx = stageOrder.indexOf(completedStage) + 1;
if (nextIdx < stageOrder.length) {
const nextStage = stageOrder[nextIdx];
const nextAgentId = this.agents.get(nextStage);
if (nextAgentId) {
const detail = pipeline.stageDetails[nextStage];
detail.status = 'running';
detail.startedAt = new Date().toISOString();
await agent_send_message({
agent_id: nextAgentId,
message: `start_stage:${JSON.stringify({ pipelineId: pipeline.pipelineId })}`
});
console.log(` → Triggered next stage: ${nextStage}`);
}
} else {
pipeline.currentStage = 'completed';
pipeline.progress = 100;
console.log('✅ Pipeline completed');
}
}
private broadcastStatus(pipeline: PipelineStatus) {
for (const sub of this.subscribers) {
const interestedStages = sub.stages;
if (interestedStages.includes(pipeline.currentStage) || interestedStages.includes('all')) {
console.log(📡 Broadcasting to ${sub.agentId}: ${pipeline.currentStage});
}
}
}
}
// ========== 使用示例 ==========
async function demonstratePipelineOrchestration() {
const orchestrator = new PipelineOrchestrator();
// 启动流水线
const pipelineId = await orchestrator.startPipeline('s3://data/users.csv');
// 模拟各阶段完成
console.log('\n⏳ 模拟流水线执行...');
// 阶段 1 完成
await new Promise(r => setTimeout(r, 100));
await orchestrator.handleStatusReport('agent_ingestion', {
pipelineId,
stage: 'ingestion',
status: 'completed',
startedAt: new Date().toISOString(),
completedAt: new Date().toISOString(),
recordsProcessed: 10000,
recordsFailed: 0
});
// 阶段 2 完成
await new Promise(r => setTimeout(r, 100));
await orchestrator.handleStatusReport('agent_validation', {
pipelineId,
stage: 'validation',
status: 'completed',
startedAt: new Date().toISOString(),
completedAt: new Date().toISOString(),
recordsProcessed: 9500,
recordsFailed: 500
});
// 阶段 3 完成
await new Promise(r => setTimeout(r, 100));
await orchestrator.handleStatusReport('agent_transformation', {
pipelineId,
stage: 'transformation',
status: 'completed',
startedAt: new Date().toISOString(),
completedAt: new Date().toISOString(),
recordsProcessed: 9500,
recordsFailed: 0
});
// 阶段 4 完成
await new Promise(r => setTimeout(r, 100));
await orchestrator.handleStatusReport('agent_storage', {
pipelineId,
stage: 'storage',
status: 'completed',
startedAt: new Date().toISOString(),
completedAt: new Date().toISOString(),
recordsProcessed: 9500,
recordsFailed: 0
});
return pipelineId;
}
demonstratePipelineOrchestration();
运行效果:
plaintext
🚀 Pipeline pipeline_1710500000000 started
Spawned ingestion agent: agent_001
Spawned validation agent: agent_002
Spawned transformation agent: agent_003
Spawned storage agent: agent_004
⏳ 模拟流水线执行...
📊 [pipeline_1710500000000] Stage: ingestion, Progress: 15%
[pipeline_1710500000000] 📥 Ingesting data
✅ Stage ingestion completed: 10000 records
📊 [pipeline_1710500000000] Stage: validation, Progress: 25%
[pipeline_1710500000000] ✅ Validating data
✅ Stage validation completed: 9500 records, 500 failed
📊 [pipeline_1710500000000] Stage: transformation, Progress: 65%
[pipeline_1710500000000] 🔄 Transforming data
✅ Stage transformation completed: 9500 records
📊 [pipeline_1710500000000] Stage: storage, Progress: 85%
[pipeline_1710500000000] 💾 Storing data
✅ Stage storage completed: 9500 records
📊 [pipeline_1710500000000] Stage: completed, Progress: 100%
✅ Pipeline completed: pipeline_1710500000000
### 5.5 关键设计模式
| 模式 | 在代码中的体现 | 作用 |
|------|--------------|------|
| **状态广播** | `broadcastStatus()` | 所有订阅者实时感知流水线状态 |
| **阶段订阅** | `subscribeToPipeline(stages)` | 按需订阅,只关心关心的阶段 |
| **失败恢复** | `handleStatusReport` 中的 `recoverable` | 可恢复错误自动重试 |
| **进度计算** | `calculateProgress()` | 实时计算总体进度 |
### 5.6 场景四的注意事项
1. **不要让 Agent 直接更新状态**:统一通过 PipelineOrchestrator,避免状态不一致
2. **失败要有恢复策略**:区分可恢复和不可恢复错误
3. **订阅者要能动态注册/取消**:不要硬编码订阅者列表
4. **日志要持久化**:流水线状态要落盘,重启后能恢复
---
## 六、总结与展望
### 6.1 四个场景回顾
| 场景 | 核心问题 | 解决方案 | A2A 消息类型 |
|------|---------|---------|------------|
| 代码审查 | 单个 Agent 能力有限 | 多 Agent 并发审查 → 汇总报告 | task_delegate → result_report |
| 资源协调 | 资源竞争和抢占 | 统一协调中心 → 优先级队列 | resource_request → info_request |
| 长流程追踪 | 状态同步困难 | Pipeline Orchestrator → 状态广播 | status_update → status_sync |
### 6.2 核心模式总结
四个场景中贯穿着一些共同模式:
1. **中心化协调**:不是让 Agent 自由协作,而是有一个中心节点负责分配任务、管理资源、追踪状态
2. **状态可见**:所有关键状态都实时广播给订阅者
3. **优雅降级**:每个场景都有错误处理和降级策略
4. **边界清晰**:每个 Agent 有明确的职责边界,不越界
### 6.3 你现在可以做什么
学完这个系列,你应该能够:
- [ ] 构建一个多 Agent 协作系统
- [ ] 使用 A2A 协议进行 Agent 间通信
- [ ] 实现任务委派和结果汇总
- [ ] 实现资源管理和调度
- [ ] 实现长流程状态追踪
### 6.4 下一步
如果你想继续深入,可以尝试:
1. **把示例代码跑起来**:用 Markus 平台部署你第一个多 Agent 系统
2. **修改场景参数**:把代码审查改成 API 测试,把资源池改成数据库连接池
3. **组合多个模式**:比如在代码审查流水线中加入 GPU 资源调度
4. **加入监控和告警**:让 Dashboard 实时显示所有流水线状态
### 6.5 A2A 协议系列总结
| 篇文章 | 主题 | 核心收获 |
|--------|------|---------|
| s01 | A2A 协议入门 | 理解为什么 Agent 需要通信 |
| s02 | 消息类型体系 | 掌握四种核心消息类型 |
| s03 | 任务委派 | 学会如何委派任务给专项 Agent |
| s04 | 协作会话 | 学会管理多轮对话 |
| s05 | 实战案例 | 用四个真实场景串联所有知识 |
**最终你会发现**:A2A 协议不只是让 Agent 发消息,而是让你能够构建**真正有用的多 Agent 系统**。
---
## 七、常见陷阱与避坑指南
A2A 协议虽然强大,但使用不当也会带来问题。这里总结了一些常见陷阱和应对方法。
### 7.1 消息风暴
**陷阱**:当多个 Agent 互相发送状态更新通知时,可能产生指数级消息量。
**场景**:100 个 Worker Agent 同时向 Supervisor 发送 `status_sync`,Supervisor 又向 100 个订阅者广播,导致 O(n²) 的消息量。
**应对**:
typescript
// 1. 批量聚合:不是每条更新都立即发送
class StatusAggregator {
private buffer: Map = new Map();
private flushInterval = 5000; // 5秒批量一次
add(update: StatusUpdate) {
const key = update.agentId;
this.buffer.set(key, [...(this.buffer.get(key) || []), update]);
}
async flush() {
// 批量发送
for (const [agentId, updates] of this.buffer) {
await agent_send_message({
to: agentId,
type: 'status_sync_batch',
payload: { updates }
});
}
this.buffer.clear();
}
}
// 2. 差异化订阅:不是所有 Agent 都订阅所有消息
// 精细化订阅策略,减少不必要的消息传递
### 7.2 死锁和活锁
**陷阱**:Agent A 等 Agent B 的消息,Agent B 等 Agent A 的消息,形成死锁。
**场景**:
plaintext
Agent A → Agent B: 需要你的结果
Agent B → Agent A: 需要你的结果
两个 Agent 互相等待,形成死锁。
**应对**:
typescript
// 1. 超时机制:设置消息超时,超时后自动释放
const result = await Promise.race([
agent_send_message({ to: 'Agent B', message: 'need_result' }),
new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), 30000))
]);
// 2. 避免循环依赖:设计时检查 Agent 之间的依赖关系
// 3. 使用中间件:所有消息经过中间件,中间件检测循环依赖
### 7.3 状态不一致
**陷阱**:多个 Agent 同时修改同一个状态,导致状态不一致。
**场景**:
- Agent A 读取状态 S = 0
- Agent B 读取状态 S = 0
- Agent A 修改 S = 1
- Agent B 修改 S = 2
- 最终 S = 2,但 Agent A 以为自己改的是 S = 1
**应对**:
typescript
// 使用乐观锁或悲观锁
class SharedStateManager {
private locks: Map> = new Map();
async withLock(key: string, fn: () => Promise) {
while (this.locks.has(key)) {
await this.locks.get(key);
}
let release: () => void;
const lock = new Promise<void>(resolve => { release = resolve; });
this.locks.set(key, lock);
try {
await fn();
} finally {
this.locks.delete(key);
release();
}
}
}
### 7.4 消息丢失
**陷阱**:网络故障或 Agent 重启导致消息丢失。
**场景**:Agent A 发送消息给 Agent B,但 Agent B 在消息到达前重启,导致消息丢失。
**应对**:
typescript
// 1. 消息持久化:所有消息先落盘再发送
// 2. 确认机制:接收方收到消息后返回确认
// 3. 重试机制:发送方在超时后重试发送
interface MessageWithAck {
id: string;
payload: A2AMessage;
acked: boolean;
retryCount: number;
}
class ReliableMessenger {
private pending: Map = new Map();
async send(message: A2AMessage): Promise {
const id = generateMessageId();
this.pending.set(id, {
id,
payload: message,
acked: false,
retryCount: 0
});
await this.persistToDisk(id, message);
await this.trySend(id);
}
private async trySend(id: string) {
const msg = this.pending.get(id);
if (!msg || msg.acked) return;
try {
await agent_send_message(msg.payload);
await this.waitForAck(id, 5000); // 等待 5 秒确认
} catch (e) {
msg.retryCount++;
if (msg.retryCount < 3) {
setTimeout(() => this.trySend(id), 1000 * msg.retryCount);
}
}
}
}
### 7.5 优先级反转
**陷阱**:高优先级任务等待低优先级任务释放资源。
**场景**:
1. 低优先级任务占用了 GPU
2. 高优先级任务请求 GPU
3. 高优先级任务等待低优先级任务
4. 但低优先级任务正在等待高优先级任务的输出
**死锁!**
**应对**:
typescript
// 优先级继承:高优先级等待时,提升低优先级任务的优先级
class PriorityInheritanceScheduler {
async requestResource(agentId: string, resource: string, priority: number) {
const holder = this.getResourceHolder(resource);
if (holder && priority > holder.priority) {
// 提升持有者的优先级
holder.priority = priority;
console.log(`🔼 提升 ${holder.agentId} 的优先级到 ${priority}`);
}
return this.doAllocate(agentId, resource, priority);
}
}
---
## 八、性能优化建议
### 8.1 消息压缩
对于大量文本消息,可以使用压缩算法减少带宽:
typescript
import { gzip, gunzip } from 'zlib';
async function compressMessage(message: A2AMessage): Promise {
const json = JSON.stringify(message);
return new Promise((resolve, reject) => {
gzip(json, (err, result) => {
if (err) reject(err);
else resolve(result);
});
});
}
### 8.2 消息批处理
对于大量小消息,批处理可以显著减少网络开销:
typescript
class MessageBatcher {
private buffer: A2AMessage[] = [];
private flushInterval = 100; // 100ms
constructor() {
setInterval(() => this.flush(), this.flushInterval);
}
add(message: A2AMessage) {
this.buffer.push(message);
}
private async flush() {
if (this.buffer.length === 0) return;
const batch = [...this.buffer];
this.buffer = [];
await agent_send_message({
type: 'batch',
payload: { messages: batch }
});
}
}
### 8.3 缓存热点数据
对于频繁访问的数据,使用缓存减少重复计算:
typescript
class AgentCache {
private cache: Map = new Map();
get(key: string): any | null {
const entry = this.cache.get(key);
if (!entry) return null;
if (Date.now() > entry.expiry) {
this.cache.delete(key);
return null;
}
return entry.data;
}
set(key: string, data: any, ttlMs: number = 60000) {
this.cache.set(key, {
data,
expiry: Date.now() + ttlMs
});
}
}
---
## 九、最佳实践清单
### 9.1 设计阶段
- [ ] 明确每个 Agent 的职责边界
- [ ] 绘制 Agent 关系图和消息流图
- [ ] 定义消息类型和格式(使用 TypeScript interface)
- [ ] 设计错误处理和降级策略
- [ ] 确定监控和告警方案
### 9.2 实现阶段
- [ ] 使用 TypeScript 严格模式,确保类型安全
- [ ] 每个消息都有唯一 ID,方便追踪
- [ ] 消息发送前做格式验证
- [ ] 实现超时和重试机制
- [ ] 日志记录每个关键节点
### 9.3 测试阶段
- [ ] 单元测试:每个 Agent 独立测试
- [ ] 集成测试:Agent 之间消息交互测试
- [ ] 压力测试:大量并发消息下的表现
- [ ] 故障注入测试:模拟网络故障、Agent 重启
### 9.4 监控阶段
- [ ] 部署 Dashboard 实时显示系统状态
- [ ] 设置告警阈值(超时数、失败率、队列长度)
- [ ] 定期分析消息日志,优化性能
- [ ] 记录每个 Agent 的资源消耗
---
## 结语
A2A 协议是构建复杂多 Agent 系统的基石。通过这四个实战场景,你应该已经掌握了:
1. **任务分解与委托**:如何让主 Agent 协调多个专项 Agent
2. **并发与协调**:如何处理多个 Agent 的并发执行和结果汇总
3. **资源管理**:如何实现智能的资源分配和调度
4. **状态追踪**:如何实现长流程的实时状态同步
更重要的是,你学会了**避坑**——知道什么不该做,比知道什么该做更重要。
A2A 协议的力量在于**组合**。你可以把今天学的模式组合起来,构建更复杂的系统:
- 研究主管 + 多个专项 Agent + 资源协调中心 + Dashboard
- 代码审查流水线 + 安全审查 + 资源池管理 + 审计日志
关键是:**从简单开始,逐步演进**。不要一开始就设计完美的架构,先跑起来,再优化。
祝你构建出强大的多 Agent 系统!
---
*本文是 A2A Protocol Series 的最后一篇(s05),也是本系列的最后一篇。*
*系列文章:*
- *s01: [A2A 协议入门:为什么 Agent 需要通信](#)*
- *s02: [消息类型体系:四种核心消息](#)*
- *s03: [任务委派:从一个 Agent 到多个 Agent](#)*
- *s04: [协作会话:多轮对话的艺术](#)*
- *s05: [实战案例:四个真实协作场景](#)(本文)*
Top comments (0)