A hands-on exercise to describe MCP server and agents in a basic understandable (so I hope) way!
Introduction — The General Role of an MCP Server
MCP — TLDR;
In the world of multi-agent systems and distributed intelligence, effective communication and coordination between autonomous entities are paramount. The Model Context Protocol (MCP) emerges as a foundational framework designed to facilitate seamless, structured, and context-aware interactions between independent agents.
At its core, MCP addresses the challenge of managing dynamic workflows where agents need to exchange not just raw data, but also shared understanding, task delegation, status updates, and actionable insights. It provides a standardized mechanism for agents to:
- Identify Themselves: Registering their presence and capabilities within a shared environment (often mediated by a central server).
- Delegate Tasks: Agent A can assign specific computations or data processing steps to Agent B.
- Exchange Contextual Messages: Beyond simple data, messages carry intent, state, and relevance, allowing agents to react intelligently.
- Receive Status Updates: Agents can be informed about the progress or completion of tasks they’ve delegated.
- Coordinate Complex Workflows: Enabling a series of interdependent actions where one agent’s output becomes another’s input, leading to a unified goal. By defining clear communication structures and encouraging a “speak the same language” approach, MCP ensures that even diverse agents can collaborate effectively, transforming raw data into processed information and ultimately, into informed decisions. This protocol lays the groundwork for robust, scalable, and intelligent distributed systems, moving us closer to truly collaborative AI ecosystems.
The motivation for this code
During a recent discussion with a business partner, the dialog was oriented towards the foundational concepts driving the current technological wave: agentic systems and the significant buzz surrounding terms like ‘agents’ and the Model Context Protocol (MCP). It quickly became apparent that while these concepts are at the forefront of innovation and generating considerable hype, their practical implications aren’t universally understood. I made a concrete, and basic demonstrator designed to demystify these ideas and show their potential in a clear and simple manner.
Some useful refernces (I used among others…)
- MCP explained: https://medium.com/@elisowski/mcp-explained-the-new-standard-connecting-ai-to-everything-79c5a1c98288 (by Edwin Lisowski)
- What is MCP: https://modelcontextprotocol.io/docs/getting-started/intro
- What is the Model Context Protocol: https://www.cloudflare.com/learning/ai/what-is-model-context-protocol-mcp/
- What is an MCP server: https://www.k2view.com/blog/mcp-server/ (by Oren Ezra)
- Understanding MCP Servers: https://modelcontextprotocol.io/docs/learn/server-concepts
- Example of Servers: https://modelcontextprotocol.io/examples
The Basic MCP Server (server.py
)
Befors jumping into the code, one important disclaimer here!
One might ask why I do not use/import one on the many ‘very’ popular mcp servers from the GitHub repositories? This is some intentional idea for this demonstration. In this case, the term “MCP Server” (Model Context Protocol Server) is conceptual, referring to the specific role and functionality of the Custom Implementation, showcasing;
- Listen for network connections.
- Accept agent connections.
- Handle identity/registration (mapping Agent IDs to sockets).
- Route/Relay messages between agents based on the MCP structure ({"type": "message", "recipient": "AgentB", ...}).
# server.py
import socket
import threading
import json
import time
import os
import logging
from datetime import datetime
HOST = '127.0.0.1'
PORT = 65432
BUFFER_SIZE = 1024
OUTPUT_DIR = "./output" # Define the output directory
def setup_logging(name):
"""Configures file-based, timestamped logging for the given name."""
os.makedirs(OUTPUT_DIR, exist_ok=True) # Create output directory if it doesn't exist
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"{name}_{timestamp}.log")
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Optional: Console handler for immediate feedback when not detached
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
class MCPServer:
def __init__(self, host, port, logger):
self.host = host
self.port = port
self.logger = logger
self.agents = {}
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.running = True
def start(self):
try:
self.server_socket.bind((self.host, self.port))
self.server_socket.listen()
self.logger.info(f"MCP Server started and listening on {self.host}:{self.port}")
self.logger.info(f"Type 'q' and press Enter to quit cleanly.")
threading.Thread(target=self.accept_clients, daemon=True).start()
self.listen_for_commands()
except Exception as e:
self.logger.error(f"Failed to start server: {e}. Kill the old process or try again later.")
self.shutdown()
def listen_for_commands(self):
while self.running:
try:
command = input()
if command.lower() == 'q':
self.logger.info("'q' received. Initiating graceful shutdown...")
self.shutdown()
break
except EOFError:
self.logger.info("EOF received. Initiating graceful shutdown...")
self.shutdown()
break
except Exception as e:
self.logger.error(f"Error listening for commands: {e}")
pass
def shutdown(self):
self.running = False
try:
for agent_id, conn in list(self.agents.items()): # Iterate on a copy
try:
conn.shutdown(socket.SHUT_RDWR)
conn.close()
self.logger.info(f"Closed connection for agent {agent_id}.")
except Exception as e:
self.logger.error(f"Error closing agent {agent_id} connection: {e}")
if self.server_socket:
self.server_socket.close()
self.logger.info("Server main socket closed.")
except Exception as e:
self.logger.error(f"Error during server shutdown: {e}")
def accept_clients(self):
while self.running:
try:
self.server_socket.settimeout(1.0)
conn, addr = self.server_socket.accept()
self.server_socket.settimeout(None)
self.logger.info(f"New connection established with {addr}")
threading.Thread(target=self.handle_client, args=(conn, addr), daemon=True).start()
except socket.timeout:
continue
except Exception as e:
if self.running: # Only log error if not explicitly shutting down
self.logger.error(f"Error accepting client connection: {e}")
break
def handle_client(self, conn, addr):
agent_id = None
while self.running:
try:
data = conn.recv(BUFFER_SIZE).decode('utf-8').strip()
if not data:
break
for raw_msg in data.split('\n'):
if not raw_msg: continue
try:
message = json.loads(raw_msg)
self.logger.info(f"RECVD from {agent_id if agent_id else addr}: {message}")
except json.JSONDecodeError:
self.logger.error(f"[{addr}] Received invalid JSON: {raw_msg}")
continue
if message.get("type") == "identity":
agent_id = message.get("sender")
if agent_id:
self.agents[agent_id] = conn
self.logger.info(f"Agent Registered: {agent_id} ({addr}). Active agents: {list(self.agents.keys())}")
status_msg = {"type": "status", "content": f"Welcome, {agent_id}! You are connected."}
self.send_message(conn, status_msg)
else:
self.logger.error(f"[{addr}] Identity message missing 'sender'. Disconnecting.")
break
elif message.get("type") == "message":
recipient = message.get("recipient")
content = message.get("content")
if recipient and content:
self.relay_message(agent_id, recipient, content)
else:
self.logger.warning(f"[{agent_id}] Malformed message: missing recipient or content. Message: {message}")
else:
self.logger.warning(f"[{agent_id}] Unhandled message type: {message.get('type')}. Message: {message}")
except ConnectionResetError:
self.logger.info(f"Connection reset by client {agent_id if agent_id else addr}.")
break
except Exception as e:
if self.running: # Log error only if server is not in shutdown phase
self.logger.error(f"Error handling client {agent_id if agent_id else addr}: {e}")
break
if agent_id in self.agents:
del self.agents[agent_id]
self.logger.info(f"Agent Disconnected: {agent_id}. Active agents: {list(self.agents.keys())}")
conn.close()
def relay_message(self, sender, recipient, content):
self.logger.info(f"RELAY from {sender} to {recipient}: '{content}'")
if recipient in self.agents:
recipient_conn = self.agents[recipient]
final_message = {
"type": "message",
"sender": sender,
"content": content,
"timestamp": time.time()
}
self.send_message(recipient_conn, final_message)
else:
self.logger.error(f"Recipient '{recipient}' not found. Notifying sender {sender}.")
if sender in self.agents:
error_msg = {"type": "status", "content": f"Error: Agent '{recipient}' is offline."}
self.send_message(self.agents[sender], error_msg)
def send_message(self, conn, message_dict):
try:
data = json.dumps(message_dict) + '\n'
conn.sendall(data.encode('utf-8'))
except Exception as e:
self.logger.error(f"Error sending message: {e}")
if __name__ == '__main__':
server_logger = setup_logging("server")
server = MCPServer(HOST, PORT, server_logger)
server.start()
server_logger.info("Server process exiting.")
The Agent(s)
The first Agent — AgentA
# agentA.py
import socket
import threading
import json
import time
import os
import logging
from datetime import datetime
HOST = '127.0.0.1'
PORT = 65432
BUFFER_SIZE = 1024
OUTPUT_DIR = "./output"
def setup_logging(name):
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"{name}_{timestamp}.log")
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
class Agent:
def __init__(self, agent_id, logger):
self.agent_id = agent_id
self.logger = logger # Use the provided logger
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connected = False
self.running = True
self.state = "INITIATING"
self.data_to_process = [10, 25, 40, 50, 75]
self.final_result = None
def connect(self):
try:
self.sock.connect((HOST, PORT))
self.connected = True
self.logger.info(f"{self.agent_id} connected to server.")
identity_msg = {"type": "identity", "sender": self.agent_id}
self.send_message(identity_msg)
threading.Thread(target=self.listen_for_messages, daemon=True).start()
return True
except ConnectionRefusedError:
self.logger.error(f"Connection failed for {self.agent_id}. Server might be offline.")
return False
except Exception as e:
self.logger.error(f"Connection error for {self.agent_id}: {e}")
return False
def send_message(self, message_dict):
if not self.connected: return
try:
data = json.dumps(message_dict) + '\n'
self.sock.sendall(data.encode('utf-8'))
except Exception as e:
self.logger.error(f"Error sending message from {self.agent_id}: {e}")
self.connected = False
def send_to_agent(self, recipient_id, content):
message = {
"type": "message",
"sender": self.agent_id,
"recipient": recipient_id,
"content": content
}
self.logger.info(f"Sending to {recipient_id}: '{content}'")
self.send_message(message)
def listen_for_messages(self):
while self.running and self.connected:
try:
data = self.sock.recv(BUFFER_SIZE).decode('utf-8').strip()
if not data: break
for raw_msg in data.split('\n'):
if not raw_msg: continue
try:
message = json.loads(raw_msg)
self.process_message(message)
except json.JSONDecodeError:
self.logger.error(f"Received invalid JSON: {raw_msg}")
except ConnectionResetError:
self.logger.info(f"Server disconnected.")
break
except Exception as e:
self.logger.error(f"Listening error: {e}")
break
self.connected = False
self.sock.close()
def process_message(self, message):
m_type = message.get("type")
if m_type == "status":
self.logger.info(f"STATUS: {message.get('content')}")
elif m_type == "message":
sender = message.get("sender", "UNKNOWN")
content = message.get("content", "No Content")
self.logger.info(f"RECEIVED from {sender}: '{content}' (Current State: {self.state})")
if self.state == "AWAITING_CALC_RESULT" and content.startswith("Result:"):
try:
result_str = content.split("Value is ")[-1].strip('.')
self.final_result = float(result_str)
if self.final_result > 150:
decision = "Proceed"
else:
decision = "Abort"
self.logger.info(f"*** DECISION MADE ***: {decision} (Sum: {self.final_result})")
self.state = "TASK_COMPLETE"
self.send_to_agent(sender, f"Decision: {decision}. Thank you for the calculation.")
except ValueError as e:
self.logger.error(f"ERROR: Could not parse calculation result from '{content}'. Reason: {e}")
self.state = "TASK_FAILED"
except Exception as e:
self.logger.error(f"Unexpected error processing result: {e}")
self.state = "TASK_FAILED"
def disconnect(self):
self.running = False
if self.connected:
self.sock.close()
self.logger.info(f"{self.agent_id} disconnected.")
# --- Main
if __name__ == '__main__':
agent_a_logger = setup_logging("agentA")
agent_a = Agent("AgentA", agent_a_logger)
if agent_a.connect():
agent_a_logger.info("AgentA connected. Stabilizing network for other agents to come online...")
time.sleep(5) # AgentB is registered?
data_str = json.dumps(agent_a.data_to_process)
agent_a.state = "AWAITING_CALC_RESULT"
initial_request = f"TASK: Calculate the sum of the following data points: {data_str}"
agent_a.send_to_agent("AgentB", initial_request)
try:
while agent_a.connected and agent_a.state not in ["TASK_COMPLETE", "TASK_FAILED"]:
time.sleep(0.1)
if agent_a.state == "TASK_COMPLETE":
agent_a_logger.info("Automatic shutdown triggered by workflow completion.")
time.sleep(1)
elif agent_a.state == "TASK_FAILED":
agent_a_logger.warning("AgentA workflow failed. Shutting down.")
except KeyboardInterrupt:
agent_a_logger.info("Manually stopped.")
finally:
agent_a.disconnect()
else:
agent_a_logger.error("AgentA could not connect. Ensure server.py is running.")
agent_a_logger.info("AgentA process exiting.")
The second Agent — AgentB
# agentB.py
import socket
import threading
import json
import time
import math
import os
import logging
from datetime import datetime
HOST = '127.0.0.1'
PORT = 65432
BUFFER_SIZE = 1024
OUTPUT_DIR = "./output"
def setup_logging(name):
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"{name}_{timestamp}.log")
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
class Agent:
def __init__(self, agent_id, logger):
self.agent_id = agent_id
self.logger = logger
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connected = False
self.running = True
def calculate_sum(self, data_list):
self.logger.info(f"Worker: Starting summation on {len(data_list)} items...")
time.sleep(1.5)
return sum(data_list)
def connect(self):
try:
self.sock.connect((HOST, PORT))
self.connected = True
self.logger.info(f"{self.agent_id} connected to server.")
identity_msg = {"type": "identity", "sender": self.agent_id}
self.send_message(identity_msg)
threading.Thread(target=self.listen_for_messages, daemon=True).start()
return True
except ConnectionRefusedError:
self.logger.error(f"Connection failed for {self.agent_id}. Server might be offline.")
return False
except Exception as e:
self.logger.error(f"Connection error for {self.agent_id}: {e}")
return False
def send_message(self, message_dict):
if not self.connected: return
try:
data = json.dumps(message_dict) + '\n'
self.sock.sendall(data.encode('utf-8'))
except Exception as e:
self.logger.error(f"Error sending message from {self.agent_id}: {e}")
self.connected = False
def send_to_agent(self, recipient_id, content):
message = {
"type": "message",
"sender": self.agent_id,
"recipient": recipient_id,
"content": content
}
self.logger.info(f"Sending to {recipient_id}: '{content}'")
self.send_message(message)
def listen_for_messages(self):
while self.running and self.connected:
try:
data = self.sock.recv(BUFFER_SIZE).decode('utf-8').strip()
if not data: break
for raw_msg in data.split('\n'):
if not raw_msg: continue
try:
message = json.loads(raw_msg)
self.process_message(message)
except json.JSONDecodeError:
self.logger.error(f"Received invalid JSON: {raw_msg}")
except ConnectionResetError:
self.logger.info(f"Server disconnected.")
break
except Exception as e:
self.logger.error(f"Listening error: {e}")
break
self.connected = False
self.sock.close()
def process_message(self, message):
m_type = message.get("type")
if m_type == "status":
self.logger.info(f"STATUS: {message.get('content')}")
elif m_type == "message":
sender = message.get("sender", "UNKNOWN")
content = message.get("content", "No Content")
self.logger.info(f"RECEIVED from {sender}: '{content}'")
if content.startswith("TASK: Calculate the sum of the following data points:"):
self.logger.info("Worker: Received calculation request.")
try:
data_str = content.split("data points: ")[-1]
data_list = json.loads(data_str)
result = self.calculate_sum(data_list)
result_message = f"Result: Calculation complete. Sum Value is {result:.2f}."
self.send_to_agent(sender, result_message)
except (json.JSONDecodeError, IndexError, TypeError) as e:
error_msg = f"ERROR: Could not process data for calculation. Reason: {e}"
self.logger.error(error_msg)
self.send_to_agent(sender, error_msg)
except Exception as e:
self.logger.error(f"Unexpected error in calculation task: {e}")
self.send_to_agent(sender, f"ERROR: Internal calculation error: {e}")
def disconnect(self):
self.running = False
if self.connected:
self.sock.close()
self.logger.info(f"{self.agent_id} disconnected.")
# --- Main
if __name__ == '__main__':
agent_b_logger = setup_logging("agentB")
agent_b = Agent("AgentB", agent_b_logger)
if agent_b.connect():
agent_b_logger.info("AgentB connected, awaiting instructions.")
try:
while agent_b.connected:
time.sleep(0.1)
except KeyboardInterrupt:
agent_b_logger.info("Manually stopped.")
finally:
agent_b.disconnect()
else:
agent_b_logger.error("AgentB could not connect. Ensure server.py is running.")
agent_b_logger.info("AgentB process exiting.")
Some useful refernces (I used among others…)
- Enabling Agent-to-Agent Interactions through MCP: https://yia333.medium.com/enabling-agent-to-agent-interactions-through-mcp-3f2a3ea3ab85 (by Yi Ai)
- mcp-use: https://github.com/mcp-use/mcp-use
- mcp-agent: https://github.com/lastmile-ai/mcp-agent
- Example of clients: https://modelcontextprotocol.io/clients
Putting All Together
Below are provided the steps in order to test this basic implementation.
- Preparation of my Python environment 🧑🍳
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
- Running the server in terminal 1 👷♂️
python server.py
- Running the agents (agentA.py & agentB.py) code in terminals 2 & 3🕵️
# terminal 2
python agentA.py
# terminal 3
python agentB.py
- The exeution log files 📒 (if you’re somewhat familiar with my posts, you get that I’m always interested in logs and outputs 😱)
#server_2025-10-10_14-58-31.log
2025-10-10 14:58:31,501 - INFO - MCP Server started and listening on 127.0.0.1:65432
2025-10-10 14:58:31,502 - INFO - Type 'q' and press Enter to quit cleanly.
2025-10-10 14:58:36,441 - INFO - New connection established with ('127.0.0.1', 64483)
2025-10-10 14:58:36,441 - INFO - RECVD from ('127.0.0.1', 64483): {'type': 'identity', 'sender': 'AgentA'}
2025-10-10 14:58:36,441 - INFO - Agent Registered: AgentA (('127.0.0.1', 64483)). Active agents: ['AgentA']
2025-10-10 14:58:41,445 - INFO - RECVD from AgentA: {'type': 'message', 'sender': 'AgentA', 'recipient': 'AgentB', 'content': 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'}
2025-10-10 14:58:41,445 - INFO - RELAY from AgentA to AgentB: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 14:58:41,445 - ERROR - Recipient 'AgentB' not found. Notifying sender AgentA.
2025-10-10 14:58:59,634 - INFO - New connection established with ('127.0.0.1', 64486)
2025-10-10 14:58:59,634 - INFO - RECVD from ('127.0.0.1', 64486): {'type': 'identity', 'sender': 'AgentB'}
2025-10-10 14:58:59,634 - INFO - Agent Registered: AgentB (('127.0.0.1', 64486)). Active agents: ['AgentA', 'AgentB']
2025-10-10 15:00:15,798 - INFO - Agent Disconnected: AgentA. Active agents: ['AgentB']
2025-10-10 15:00:17,545 - INFO - New connection established with ('127.0.0.1', 64491)
2025-10-10 15:00:17,545 - INFO - RECVD from ('127.0.0.1', 64491): {'type': 'identity', 'sender': 'AgentA'}
2025-10-10 15:00:17,545 - INFO - Agent Registered: AgentA (('127.0.0.1', 64491)). Active agents: ['AgentB', 'AgentA']
2025-10-10 15:00:22,551 - INFO - RECVD from AgentA: {'type': 'message', 'sender': 'AgentA', 'recipient': 'AgentB', 'content': 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'}
2025-10-10 15:00:22,551 - INFO - RELAY from AgentA to AgentB: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 15:00:24,054 - INFO - RECVD from AgentB: {'type': 'message', 'sender': 'AgentB', 'recipient': 'AgentA', 'content': 'Result: Calculation complete. Sum Value is 200.00.'}
2025-10-10 15:00:24,055 - INFO - RELAY from AgentB to AgentA: 'Result: Calculation complete. Sum Value is 200.00.'
2025-10-10 15:00:24,055 - INFO - RECVD from AgentA: {'type': 'message', 'sender': 'AgentA', 'recipient': 'AgentB', 'content': 'Decision: Proceed. Thank you for the calculation.'}
2025-10-10 15:00:24,056 - INFO - RELAY from AgentA to AgentB: 'Decision: Proceed. Thank you for the calculation.'
2025-10-10 15:00:25,111 - INFO - Agent Disconnected: AgentA. Active agents: ['AgentB']
2025-10-10 15:00:33,594 - INFO - Agent Disconnected: AgentB. Active agents: []
2025-10-10 15:00:38,709 - INFO - 'q' received. Initiating graceful shutdown...
2025-10-10 15:00:38,709 - INFO - Server main socket closed.
2025-10-10 15:00:38,709 - INFO - Server process exiting.
#agentA_2025-10-10_15-00-17.log
2025-10-10 15:00:17,544 - INFO - AgentA connected to server.
2025-10-10 15:00:17,545 - INFO - AgentA connected. Stabilizing network for other agents to come online...
2025-10-10 15:00:17,545 - INFO - STATUS: Welcome, AgentA! You are connected.
2025-10-10 15:00:22,550 - INFO - Sending to AgentB: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 15:00:24,055 - INFO - RECEIVED from AgentB: 'Result: Calculation complete. Sum Value is 200.00.' (Current State: AWAITING_CALC_RESULT)
2025-10-10 15:00:24,055 - INFO - *** DECISION MADE ***: Proceed (Sum: 200.0)
2025-10-10 15:00:24,055 - INFO - Sending to AgentB: 'Decision: Proceed. Thank you for the calculation.'
2025-10-10 15:00:24,106 - INFO - Automatic shutdown triggered by workflow completion.
2025-10-10 15:00:25,111 - ERROR - Listening error: [Errno 9] Bad file descriptor
2025-10-10 15:00:25,111 - INFO - AgentA disconnected.
2025-10-10 15:00:25,111 - INFO - AgentA process exiting.
#agentB_2025-10-10_14-58-59.log
2025-10-10 14:58:59,634 - INFO - AgentB connected to server.
2025-10-10 14:58:59,634 - INFO - AgentB connected, awaiting instructions.
2025-10-10 14:58:59,635 - INFO - STATUS: Welcome, AgentB! You are connected.
2025-10-10 15:00:22,551 - INFO - RECEIVED from AgentA: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 15:00:22,552 - INFO - Worker: Received calculation request.
2025-10-10 15:00:22,552 - INFO - Worker: Starting summation on 5 items...
2025-10-10 15:00:24,054 - INFO - Sending to AgentA: 'Result: Calculation complete. Sum Value is 200.00.'
2025-10-10 15:00:24,056 - INFO - RECEIVED from AgentA: 'Decision: Proceed. Thank you for the calculation.'
2025-10-10 15:00:33,593 - INFO - Manually stopped.
2025-10-10 15:00:33,594 - ERROR - Listening error: [Errno 9] Bad file descriptor
2025-10-10 15:00:33,594 - INFO - AgentB disconnected.
2025-10-10 15:00:33,594 - INFO - AgentB process exiting.
Et voilà 🌟
Bonus 🔷
Having said all mentioned above… I also tried to implement the same logic with a popular framework; FastAPI! It looks a bit more professional!
pip install fastapi uvicorn websockets httpx pydantic
# fastapi_server.py
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from pydantic import BaseModel
from typing import Dict, Set, Optional
import asyncio
import os
import logging
from datetime import datetime
# --- Configuration & Logging ---
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"fastapi_server_{timestamp}.log")
logger = logging.getLogger("fastapi_server")
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler())
app = FastAPI(title="MCP FastAPI Server")
active_connections: Dict[str, WebSocket] = {}
known_agents: Set[str] = set()
class AgentMessage(BaseModel):
sender: str
recipient: str
content: str
type: str = "message"
class StatusMessage(BaseModel):
content: str
type: str = "status"
sender: str = "SERVER"
@app.websocket("/ws/{agent_id}")
async def websocket_endpoint(websocket: WebSocket, agent_id: str):
await websocket.accept()
active_connections[agent_id] = websocket
known_agents.add(agent_id)
logger.info(f"Agent Registered: {agent_id}. Active agents: {list(known_agents)}")
await websocket.send_json(StatusMessage(content=f"Welcome, {agent_id}! You are connected.").dict())
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
logger.info(f"Agent Disconnected: {agent_id}. Cleaning up.")
except Exception as e:
logger.error(f"Error in WebSocket for {agent_id}: {e}")
finally:
# Cleanup
if agent_id in active_connections:
del active_connections[agent_id]
if agent_id in known_agents:
known_agents.remove(agent_id)
logger.info(f"Agent {agent_id} disconnected. Active agents: {list(known_agents)}")
@app.post("/send_message", status_code=200)
async def send_message_to_agent(message: AgentMessage):
logger.info(f"RELAY from {message.sender} to {message.recipient}: '{message.content}' (via HTTP POST)")
if message.recipient not in known_agents or message.recipient not in active_connections:
logger.error(f"Recipient '{message.recipient}' is offline.")
if message.sender in active_connections:
sender_ws = active_connections[message.sender]
error_msg = StatusMessage(content=f"Error: Agent '{message.recipient}' is offline.")
await sender_ws.send_json(error_msg.dict())
raise HTTPException(status_code=404, detail=f"Recipient '{message.recipient}' is offline.")
recipient_ws = active_connections[message.recipient]
try:
await recipient_ws.send_json(message.dict())
logger.info(f"Message successfully relayed to {message.recipient}.")
return {"status": "success", "message": "Message sent."}
except Exception as e:
logger.error(f"Error relaying message to {message.recipient}: {e}")
raise HTTPException(status_code=500, detail="Failed to relay message.")
# To run
uvicorn fastapi_server:app --host 127.0.0.1 --port 8000 --log-config None
# fastapi_agentA.py
import asyncio
import websockets
import httpx
import json
import logging
import os
from datetime import datetime
import time # Used for the initial delay (race condition fix)
# --- Configuration & Logging ---
AGENT_ID = "AgentA"
SERVER_WS_URL = "ws://127.0.0.1:8000/ws"
SERVER_HTTP_URL = "http://127.0.0.1:8000/send_message"
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"{AGENT_ID}_{timestamp}.log")
logger = logging.getLogger(AGENT_ID)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler())
# --- Agent Class ---
class AgentA:
def __init__(self):
self.agent_id = AGENT_ID
self.ws_connection = None
self.http_client = httpx.AsyncClient()
self.running = True
self.state = "INITIATING"
self.data_to_process = [10, 25, 40, 50, 75]
async def connect(self):
logger.info(f"{self.agent_id} attempting to connect to WebSocket server...")
try:
self.ws_connection = await websockets.connect(f"{SERVER_WS_URL}/{self.agent_id}")
logger.info(f"{self.agent_id} connected to WebSocket server.")
# Start a background task to listen for messages
asyncio.create_task(self.listen_for_messages())
return True
except Exception as e:
logger.error(f"{self.agent_id} failed to connect to WebSocket: {e}")
return False
async def listen_for_messages(self):
try:
while self.running:
message_str = await self.ws_connection.recv()
self.process_message(json.loads(message_str))
except websockets.exceptions.ConnectionClosedOK:
logger.info(f"{self.agent_id} WebSocket connection closed gracefully.")
except Exception as e:
logger.error(f"{self.agent_id} WebSocket listening error: {e}")
finally:
self.running = False
await self.http_client.aclose()
logger.info(f"{self.agent_id} stopped listening.")
async def send_message_to_agent(self, recipient: str, content: str):
payload = {
"sender": self.agent_id,
"recipient": recipient,
"content": content
}
logger.info(f"Sending to {recipient}: '{content}'")
try:
response = await self.http_client.post(SERVER_HTTP_URL, json=payload)
response.raise_for_status()
except httpx.HTTPStatusError as e:
# This catches 4xx and 5xx errors from the server (e.g., 404 for AgentB offline)
logger.error(f"HTTP error sending message: {e.response.json().get('detail', e)}")
except Exception as e:
logger.error(f"Error sending message via HTTP: {e}")
def process_message(self, message):
m_type = message.get("type")
if m_type == "status":
logger.info(f"STATUS: {message.get('content')}")
elif m_type == "message":
sender = message.get("sender", "UNKNOWN")
content = message.get("content", "No Content")
logger.info(f"RECEIVED from {sender}: '{content}' (State: {self.state})")
# State 2: Awaiting the calculation result
if self.state == "AWAITING_CALC_RESULT" and content.startswith("Result:"):
try:
result_str = content.split("Value is ")[-1].strip('.')
final_result = float(result_str)
decision = "Proceed" if final_result > 150 else "Abort"
logger.info(f"*** DECISION MADE ***: {decision} (Sum: {final_result})")
self.state = "TASK_COMPLETE"
except Exception as e:
logger.error(f"ERROR: Could not process result: {e}")
self.state = "TASK_FAILED"
# --- Main Execution ---
async def main():
agent_a = AgentA()
if await agent_a.connect():
logger.info("AgentA connected. Stabilizing network for other agents to come online...")
# FIX FOR RACE CONDITION: Give AgentB time to start and register.
await asyncio.sleep(5)
# Step 1: AgentA sends data and requests processing
data_str = json.dumps(agent_a.data_to_process)
agent_a.state = "AWAITING_CALC_RESULT"
initial_request = f"TASK: Calculate the sum of the following data points: {data_str}"
await agent_a.send_message_to_agent("AgentB", initial_request)
# Keep AgentA running until the task is complete
while agent_a.running and agent_a.state not in ["TASK_COMPLETE", "TASK_FAILED"]:
await asyncio.sleep(0.1)
if agent_a.state == "TASK_COMPLETE":
logger.info("Automatic shutdown triggered by workflow completion.")
logger.info("AgentA process exiting.")
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("AgentA manually stopped.")
fastapi_agentB.py
import asyncio
import websockets
import httpx
import json
import logging
import os
from datetime import datetime
import time
AGENT_ID = "AgentB"
SERVER_WS_URL = "ws://127.0.0.1:8000/ws"
SERVER_HTTP_URL = "http://127.0.0.1:8000/send_message"
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"{AGENT_ID}_{timestamp}.log")
logger = logging.getLogger(AGENT_ID)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler())
class AgentB:
def __init__(self):
self.agent_id = AGENT_ID
self.ws_connection = None
self.http_client = httpx.AsyncClient()
self.running = True
def calculate_sum(self, data_list):
logger.info(f"Worker: Starting summation on {len(data_list)} items...")
time.sleep(1.5)
return sum(data_list)
async def connect(self):
logger.info(f"{self.agent_id} attempting to connect to WebSocket server...")
try:
self.ws_connection = await websockets.connect(f"{SERVER_WS_URL}/{self.agent_id}")
logger.info(f"{self.agent_id} connected to WebSocket server.")
asyncio.create_task(self.listen_for_messages())
return True
except Exception as e:
logger.error(f"{self.agent_id} failed to connect to WebSocket: {e}")
return False
async def listen_for_messages(self):
try:
while self.running:
message_str = await self.ws_connection.recv()
self.process_message(json.loads(message_str))
except websockets.exceptions.ConnectionClosedOK:
logger.info(f"{self.agent_id} WebSocket connection closed gracefully.")
except Exception as e:
logger.error(f"{self.agent_id} WebSocket listening error: {e}")
finally:
self.running = False
await self.http_client.aclose()
logger.info(f"{self.agent_id} stopped listening.")
async def send_message_to_agent(self, recipient: str, content: str):
payload = {
"sender": self.agent_id,
"recipient": recipient,
"content": content
}
logger.info(f"Sending to {recipient}: '{content}'")
try:
response = await self.http_client.post(SERVER_HTTP_URL, json=payload)
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error sending message: {e.response.json().get('detail', e)}")
except Exception as e:
logger.error(f"Error sending message via HTTP: {e}")
def process_message(self, message):
m_type = message.get("type")
if m_type == "status":
logger.info(f"STATUS: {message.get('content')}")
elif m_type == "message":
sender = message.get("sender", "UNKNOWN")
content = message.get("content", "No Content")
logger.info(f"RECEIVED from {sender}: '{content}'")
if content.startswith("TASK: Calculate the sum of the following data points:"):
logger.info("Worker: Received calculation request.")
try:
data_str = content.split("data points: ")[-1]
data_list = json.loads(data_str)
result = self.calculate_sum(data_list)
result_message = f"Result: Calculation complete. Sum Value is {result:.2f}."
asyncio.create_task(self.send_message_to_agent(sender, result_message))
except Exception as e:
error_msg = f"ERROR: Could not process data for calculation. Reason: {e}"
logger.error(error_msg)
asyncio.create_task(self.send_message_to_agent(sender, error_msg))
# --- Main
async def main():
agent_b = AgentB()
await agent_b.connect()
while agent_b.running:
await asyncio.sleep(0.1)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("AgentB manually stopped.")
Conclusion
Considering all said above… a lengthy conclusion on the Model Context Protocol (MCP) feels almost redundant!
To bring these abstract concepts to life, the code above implements a straightforward multi-agent ecosystem. This included a basic MCP server designed specifically to handle registration and routing, and two distinct agents. Through a simple, coordinated task, it illustrates how tasks are initiated and executed autonomously by AgentA and AgentB, with the MCP server managing the entire communication flow and acting as the governing hub for the agents collaboration.
Top comments (0)