DEV Community

Cover image for Building an AI Agent Registry Server with FastAPI: Enabling Seamless Agent Discovery via A2A Protocol -Part III
Seenivasa Ramadurai
Seenivasa Ramadurai

Posted on

Building an AI Agent Registry Server with FastAPI: Enabling Seamless Agent Discovery via A2A Protocol -Part III

In today’s AI-driven world, intelligent agents are transforming how enterprises operate from chatbots that handle customer queries to autonomous assistants making complex decisions. But as organizations develop specialized AI agents using different tools and technologies, a pressing question emerges:

How can AI agents built by different teams or companies communicate and collaborate smoothly?

Here comes Agent2Agent (A2A) Protocol — an open, standardized communication framework designed to bridge that gap.

What is the A2A Protocol?

The Agent2Agent Protocol is an open standard that enables autonomous AI agents, regardless of their origin or technology stack, to discover, communicate, and coordinate with each other effectively.

Imagine a Talent Acquisition pipeline powered entirely by AI:

A Resume Parser Agent extracts structured data from resumes.

An Interview Scheduler Agent finds suitable time slots for interviews.

A Candidate Evaluator Agent assesses technical skills and experience.

A Profile Creator Agent sets up employee profiles post-hiring.

Without a standardized communication method, orchestrating these agents involves brittle, one-off integrations that are difficult to build and maintain at scale. The A2A Protocol solves this by providing a common language and workflow for agents to interact reliably and securely.

How Does A2A Enable Multi-Agent Collaboration?

The AI Agent Server Registry — The Phone Book of Agents
For agents to collaborate, they first need to find each other. This is where the AI Agent Server Registry plays a crucial role — acting like a phone book for AI agents.

Teams can publish their agents’ "Agent Cards" to this registry, making their capabilities discoverable by other agents. Once discovered, agents can initiate conversations, exchange data, and collaborate seamlessly.

Core Specifications of the A2A Protocol

A2A defines a clear standard for agent communication:

Common Transport & Format: JSON-RPC 2.0 over HTTP(S) ensures messages are structured and transmitted consistently.

Discovery Mechanisms: Agents advertise their capabilities via Agent Cards—digital business cards providing identity, skills, endpoints, and authentication details.

Task Management Workflows: Standardized protocols to initiate, track, and complete tasks, including long-running or multi-turn interactions.

Support for Multiple Data Types: Beyond text, agents can exchange files, structured forms, and rich media.

Security & Asynchronicity: Built-in principles for secure, asynchronous communication, including human-in-the-loop support.

Agent Discovery: Finding the Right Agent for the Task
The Agent Card is key to discovery—a JSON document summarizing an agent’s identity, skills, endpoint URL, and authentication requirements. How can client agents find these Agent Cards?

1. Well-Known URI

Each agent can host its Agent Card at a standard URL:
https://{agent-domain}/.well-known/agent.json
Clients simply fetch this endpoint to discover an agent’s capabilities. This method is simple, automated, and scalable — perfect for public or organization-wide discovery.

2. Curated Registries

In enterprise settings, a central registry holds Agent Cards. Agents register themselves here, and clients query the registry based on capabilities or tags. This centralized approach supports governance, policy enforcement, and curated access.

3. Direct Configuration / Private Discovery

For private, tightly coupled systems or during development, clients may be pre-configured with Agent Card URLs or details, bypassing dynamic discovery for simplicity.

Securing Agent Cards and Communications

Agent Cards might include sensitive info such as internal URLs or authentication hints. Protecting this data is crucial:

Use access controls and require authentication for Agent Card endpoints.

Employ mTLS and network restrictions to secure communication.

Registries can return tailored Agent Cards based on client permissions.

Avoid embedding secrets directly in Agent Cards; instead, handle credentials out-of-band.

Bringing It All Together:

Building an A2A-Enabled Ecosystem In this blog series, I’ll walk you through building a FastAPI-based Agent Registry Server and a sample Math Agent, Enterprise Doc search Agent that registers itself for discovery. We’ll demonstrate how other agents or client applications can find and interact with registered agents, all powered by the A2A protocol.

1. FastAPI-Based Python Registry-server

# File Name : registry_server.py
# This program Creates In-memory based AI Agents regisry-server using Google A2A protocol
# Author: Sreeni Ramadurai 


import logging
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
import time
import asyncio
from datetime import datetime, timedelta
from contextlib import asynccontextmanager

from python_a2a import AgentCard
from python_a2a.discovery import AgentRegistry

# Data model for Agent registration
class AgentRegistration(BaseModel):
    name: str
    description: str
    url: str
    version: str
    capabilities: dict = {}
    skills: List[dict] = []

class HeartbeatRequest(BaseModel):
    url: str

# Create registry server and FastAPI app
registry_server = AgentRegistry(
    name="A2A Registry Server",
    description="Registry server for agent discovery"
)

# Constants for cleanup
HEARTBEAT_TIMEOUT = 30  # seconds
CLEANUP_INTERVAL = 10   # seconds

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Start the cleanup task when the server starts."""
    cleanup_task = asyncio.create_task(cleanup_stale_agents())
    yield
    cleanup_task.cancel()

app = FastAPI(title="A2A Agent Registry Server", description="FastAPI server for agent discovery", lifespan=lifespan)

async def cleanup_stale_agents():
    """Periodically clean up agents that haven't sent heartbeats."""
    while True:
        try:
            current_time = time.time()
            agents_to_remove = []

            # Check each agent's last heartbeat time
            for url, last_seen in registry_server.last_seen.items():
                if current_time - last_seen > HEARTBEAT_TIMEOUT:
                    agents_to_remove.append(url)
                    logging.warning(f"Agent {url} has not sent heartbeat for {HEARTBEAT_TIMEOUT} seconds, removing from registry")

            # Remove stale agents
            for url in agents_to_remove:
                registry_server.unregister_agent(url)
                logging.info(f"Removed stale agent: {url}")

        except Exception as e:
            logging.error(f"Error during cleanup: {e}")

        await asyncio.sleep(CLEANUP_INTERVAL)

@app.post("/registry/register", response_model=AgentCard, status_code=201)
async def register_agent(registration: AgentRegistration):
    """Registers a new agent with the registry."""
    agent_card = AgentCard(**registration.dict())
    registry_server.register_agent(agent_card)
    return agent_card

@app.get("/registry/agents", response_model=List[AgentCard])
async def list_registered_agents():
    """Lists all currently registered agents."""
    return list(registry_server.get_all_agents())

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy"}

@app.post("/registry/heartbeat")
async def heartbeat(request: HeartbeatRequest):
    """Handle agent heartbeat."""
    try:
        if request.url in registry_server.agents:
            registry_server.last_seen[request.url] = time.time()
            logging.info(f"Received heartbeat from agent at {request.url}")
            return {"success": True}
        logging.warning(f"Received heartbeat from unregistered agent: {request.url}")
        return {"success": False, "error": "Agent not registered"}, 404
    except Exception as e:
        logging.error(f"Error processing heartbeat: {e}")
        return {"success": False, "error": str(e)}, 400

@app.get("/registry/agents/{url}", response_model=AgentCard)
async def get_agent(url: str):
    """Get a specific agent by URL."""
    agent = registry_server.get_agent(url)
    if agent:
        return agent
    raise HTTPException(status_code=404, detail=f"Agent with URL '{url}' not found")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    uvicorn.run(app, host="0.0.0.0", port=8000) 
Enter fullscreen mode Exit fullscreen mode

  1. Simple A2A protocol based Agent -
import sys
import time
import threading
import argparse
import socket
import logging
from typing import Optional, List

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
import requests

from python_a2a import AgentCard, A2AServer, run_server, Message, TextContent, MessageRole
from python_a2a.discovery import AgentRegistry, enable_discovery

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Find an available port
def find_free_port() -> int:
    """Find an available port to use."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('0.0.0.0', 0))
        return s.getsockname()[1]

class SampleAgent(A2AServer):
    """A sample agent that registers with a remote registry."""

    def __init__(self, name: str, description: str, url: str, registry_url: str):
        """Initialize the sample agent and attempt to register."""
        agent_card = AgentCard(
            name=name,
            description=description,
            url=url,
            version="1.0.0",
            capabilities={
                "streaming": False,
                "pushNotifications": False,
                "stateTransitionHistory": False,
                "google_a2a_compatible": True,
                "parts_array_format": True
            }
        )
        super().__init__(agent_card=agent_card)
        self.registry_url = registry_url
        self._registration_retries = 3
        self._heartbeat_interval = 30  # seconds
        self._discovery_client = None

    async def setup(self):
        """Registers the agent with the registry."""
        if self.registry_url:
            for attempt in range(self._registration_retries):
                try:
                    # Register with discovery
                    self._discovery_client = enable_discovery(
                        self, 
                        registry_url=self.registry_url, 
                        heartbeat_interval=self._heartbeat_interval
                    )

                    # Add heartbeat logging
                    def heartbeat_callback(results):
                        for result in results:
                            if result.get("success"):
                                logger.info(f"Heartbeat successful with registry {result['registry']}")
                            else:
                                logger.warning(f"Heartbeat failed with registry {result['registry']}: {result.get('message', 'Unknown error')}")

                    # Set the callback
                    self._discovery_client.heartbeat_callback = heartbeat_callback

                    # Verify registration
                    response = requests.get(f"{self.registry_url}/registry/agents")
                    if response.status_code == 200:
                        agents = response.json()
                        if any(agent["url"] == self.url for agent in agents):
                            logger.info(
                                f"Agent '{self.agent_card.name}' registered successfully with registry: {self.registry_url}"
                            )
                            return  # Success, exit the retry loop
                        else:
                            logger.warning(f"Registration verification failed (attempt {attempt + 1}/{self._registration_retries})")
                    else:
                        logger.warning(f"Failed to verify registration: {response.status_code} (attempt {attempt + 1}/{self._registration_retries})")

                    # Wait before retrying
                    time.sleep(2)
                except Exception as e:
                    logger.error(f"Error during registration attempt {attempt + 1}: {e}")
                    if attempt < self._registration_retries - 1:
                        time.sleep(2)

            logger.error(f"Failed to register agent after {self._registration_retries} attempts")

    def handle_message(self, message: Message) -> Message:
        """Handle incoming messages."""
        return Message(
            content=TextContent(
                text=f"Hello from {self.agent_card.name}! I received: {message.content.text}"
            ),
            role=MessageRole.AGENT,
            parent_message_id=message.message_id,
            conversation_id=message.conversation_id
        )

def run_agent(name: str, port: int, registry_url: str):
    """Runs a sample agent that registers with the specified registry."""
    agent = SampleAgent(
        name=name,
        description=f"Sample agent '{name}' demonstrating remote registration.",
        url=f"http://localhost:{port}",
        registry_url=registry_url
    )
    run_server(agent, host="0.0.0.0", port=port)
    logger.info(f"Agent '{name}' started on http://localhost:{port}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="A2A Sample Agent")
    parser.add_argument("--name", type=str, default="SampleAgent", help="Name of the agent")
    parser.add_argument("--port", type=int, default=None, help="Port for the agent to run on")
    parser.add_argument("--registry-url", type=str, required=True, help="URL of the agent registry server")
    args = parser.parse_args()

    agent_port = args.port or find_free_port()
    run_agent(args.name, agent_port, args.registry_url)
Enter fullscreen mode Exit fullscreen mode

A2A -Out of the box Agent Hosting using run_agent method

run_agent(args.name, agent_port, args.registry_url)

Agent-Card

Agent Self Registration and Heartbeat

A2A Registry-server get_agents Endpoint method invocation and Result

Invoking A2A-Math Agent

A2A Agent Discovery Demo

Next Blog I will talk more about building different Agents using different vendors framework and add Streaming , Push-notifications capabilities , Task , Conversation , LLM powered agents etc..

Until then, happy building agentic AI applications!.

Thanks
Sreeni Ramadorai

Top comments (8)

Collapse
 
sinan_20c9b68e9d8bc3221fa profile image
Sinan

Can i know where are you storing the agent details. Suppose we have many agents , should we store in a database?

Collapse
 
sreeni5018 profile image
Seenivasa Ramadurai • Edited

Hi Sinan
The blog shows in memory . However we should be using Any NoSQL or RDBMS with text column where you can store agent card as JSON . Hope this helps

Thanks
Sreeni

Collapse
 
sinan_20c9b68e9d8bc3221fa profile image
Sinan

Thank you @sreeni5018

Thread Thread
 
sreeni5018 profile image
Seenivasa Ramadurai

You are welcome Sinan!!

Collapse
 
pankaj_jainani_e2ef8fc5d9 profile image
Pankaj Jainani

Too good Sreeni boss

Collapse
 
sreeni5018 profile image
Seenivasa Ramadurai

Thank you Pankaj.

Collapse
 
michael_liang_0208 profile image
Michael Liang

Great post!

Collapse
 
sreeni5018 profile image
Seenivasa Ramadurai

Thank you Michael