DEV Community

Alain Airom
Alain Airom

Posted on

Implementing a Basic MCP Server and Two Communicating Agents in Python

A hands-on exercise to describe MCP server and agents in a basic understandable (so I hope) way!

Introduction — The General Role of an MCP Server

MCP — TLDR;

In the world of multi-agent systems and distributed intelligence, effective communication and coordination between autonomous entities are paramount. The Model Context Protocol (MCP) emerges as a foundational framework designed to facilitate seamless, structured, and context-aware interactions between independent agents.

At its core, MCP addresses the challenge of managing dynamic workflows where agents need to exchange not just raw data, but also shared understanding, task delegation, status updates, and actionable insights. It provides a standardized mechanism for agents to:

  • Identify Themselves: Registering their presence and capabilities within a shared environment (often mediated by a central server).
  • Delegate Tasks: Agent A can assign specific computations or data processing steps to Agent B.
  • Exchange Contextual Messages: Beyond simple data, messages carry intent, state, and relevance, allowing agents to react intelligently.
  • Receive Status Updates: Agents can be informed about the progress or completion of tasks they’ve delegated.
  • Coordinate Complex Workflows: Enabling a series of interdependent actions where one agent’s output becomes another’s input, leading to a unified goal. By defining clear communication structures and encouraging a “speak the same language” approach, MCP ensures that even diverse agents can collaborate effectively, transforming raw data into processed information and ultimately, into informed decisions. This protocol lays the groundwork for robust, scalable, and intelligent distributed systems, moving us closer to truly collaborative AI ecosystems.

Image from https://modelcontextprotocol.io/

The motivation for this code

During a recent discussion with a business partner, the dialog was oriented towards the foundational concepts driving the current technological wave: agentic systems and the significant buzz surrounding terms like ‘agents’ and the Model Context Protocol (MCP). It quickly became apparent that while these concepts are at the forefront of innovation and generating considerable hype, their practical implications aren’t universally understood. I made a concrete, and basic demonstrator designed to demystify these ideas and show their potential in a clear and simple manner.

Some useful refernces (I used among others…)

The Basic MCP Server (server.py)

Befors jumping into the code, one important disclaimer here!

One might ask why I do not use/import one on the many ‘very’ popular mcp servers from the GitHub repositories? This is some intentional idea for this demonstration. In this case, the term “MCP Server” (Model Context Protocol Server) is conceptual, referring to the specific role and functionality of the Custom Implementation, showcasing;

  • Listen for network connections.
  • Accept agent connections.
  • Handle identity/registration (mapping Agent IDs to sockets).
  • Route/Relay messages between agents based on the MCP structure ({"type": "message", "recipient": "AgentB", ...}).
# server.py
import socket
import threading
import json
import time
import os
import logging
from datetime import datetime

HOST = '127.0.0.1'
PORT = 65432
BUFFER_SIZE = 1024
OUTPUT_DIR = "./output" # Define the output directory

def setup_logging(name):
    """Configures file-based, timestamped logging for the given name."""
    os.makedirs(OUTPUT_DIR, exist_ok=True) # Create output directory if it doesn't exist

    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    log_file = os.path.join(OUTPUT_DIR, f"{name}_{timestamp}.log")

    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    # File handler
    file_handler = logging.FileHandler(log_file)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

    # Optional: Console handler for immediate feedback when not detached
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    return logger

class MCPServer:
    def __init__(self, host, port, logger):
        self.host = host
        self.port = port
        self.logger = logger 
        self.agents = {} 
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.running = True 

    def start(self):
        try:
            self.server_socket.bind((self.host, self.port))
            self.server_socket.listen()
            self.logger.info(f"MCP Server started and listening on {self.host}:{self.port}")
            self.logger.info(f"Type 'q' and press Enter to quit cleanly.")

            threading.Thread(target=self.accept_clients, daemon=True).start()
            self.listen_for_commands()

        except Exception as e:
            self.logger.error(f"Failed to start server: {e}. Kill the old process or try again later.")
            self.shutdown()

    def listen_for_commands(self):
        while self.running:
            try:
                command = input()
                if command.lower() == 'q':
                    self.logger.info("'q' received. Initiating graceful shutdown...")
                    self.shutdown()
                    break
            except EOFError:
                self.logger.info("EOF received. Initiating graceful shutdown...")
                self.shutdown()
                break
            except Exception as e:
                self.logger.error(f"Error listening for commands: {e}")
                pass

    def shutdown(self):
        self.running = False
        try:
            for agent_id, conn in list(self.agents.items()): # Iterate on a copy
                try:
                    conn.shutdown(socket.SHUT_RDWR)
                    conn.close()
                    self.logger.info(f"Closed connection for agent {agent_id}.")
                except Exception as e:
                    self.logger.error(f"Error closing agent {agent_id} connection: {e}")

            if self.server_socket:
                self.server_socket.close()
            self.logger.info("Server main socket closed.")
        except Exception as e:
            self.logger.error(f"Error during server shutdown: {e}")

    def accept_clients(self):
        while self.running: 
            try:
                self.server_socket.settimeout(1.0) 
                conn, addr = self.server_socket.accept()
                self.server_socket.settimeout(None) 

                self.logger.info(f"New connection established with {addr}")
                threading.Thread(target=self.handle_client, args=(conn, addr), daemon=True).start()
            except socket.timeout:
                continue 
            except Exception as e:
                if self.running: # Only log error if not explicitly shutting down
                    self.logger.error(f"Error accepting client connection: {e}")
                break

    def handle_client(self, conn, addr):
        agent_id = None
        while self.running: 
            try:
                data = conn.recv(BUFFER_SIZE).decode('utf-8').strip()
                if not data:
                    break

                for raw_msg in data.split('\n'):
                    if not raw_msg: continue
                    try:
                        message = json.loads(raw_msg)
                        self.logger.info(f"RECVD from {agent_id if agent_id else addr}: {message}") 
                    except json.JSONDecodeError:
                        self.logger.error(f"[{addr}] Received invalid JSON: {raw_msg}")
                        continue

                    if message.get("type") == "identity":
                        agent_id = message.get("sender")
                        if agent_id:
                            self.agents[agent_id] = conn
                            self.logger.info(f"Agent Registered: {agent_id} ({addr}). Active agents: {list(self.agents.keys())}")

                            status_msg = {"type": "status", "content": f"Welcome, {agent_id}! You are connected."}
                            self.send_message(conn, status_msg)
                        else:
                            self.logger.error(f"[{addr}] Identity message missing 'sender'. Disconnecting.")
                            break

                    elif message.get("type") == "message":
                        recipient = message.get("recipient")
                        content = message.get("content")

                        if recipient and content:
                            self.relay_message(agent_id, recipient, content)
                        else:
                            self.logger.warning(f"[{agent_id}] Malformed message: missing recipient or content. Message: {message}")

                    else:
                        self.logger.warning(f"[{agent_id}] Unhandled message type: {message.get('type')}. Message: {message}")

            except ConnectionResetError:
                self.logger.info(f"Connection reset by client {agent_id if agent_id else addr}.")
                break
            except Exception as e:
                if self.running: # Log error only if server is not in shutdown phase
                    self.logger.error(f"Error handling client {agent_id if agent_id else addr}: {e}")
                break

        if agent_id in self.agents:
            del self.agents[agent_id]
            self.logger.info(f"Agent Disconnected: {agent_id}. Active agents: {list(self.agents.keys())}")
        conn.close()

    def relay_message(self, sender, recipient, content):
        self.logger.info(f"RELAY from {sender} to {recipient}: '{content}'")

        if recipient in self.agents:
            recipient_conn = self.agents[recipient]

            final_message = {
                "type": "message",
                "sender": sender,
                "content": content,
                "timestamp": time.time()
            }
            self.send_message(recipient_conn, final_message)
        else:
            self.logger.error(f"Recipient '{recipient}' not found. Notifying sender {sender}.")
            if sender in self.agents:
                 error_msg = {"type": "status", "content": f"Error: Agent '{recipient}' is offline."}
                 self.send_message(self.agents[sender], error_msg)

    def send_message(self, conn, message_dict):
        try:
            data = json.dumps(message_dict) + '\n'
            conn.sendall(data.encode('utf-8'))
        except Exception as e:
            self.logger.error(f"Error sending message: {e}")

if __name__ == '__main__':
    server_logger = setup_logging("server")
    server = MCPServer(HOST, PORT, server_logger)
    server.start()
    server_logger.info("Server process exiting.")
Enter fullscreen mode Exit fullscreen mode

The Agent(s)

The first Agent — AgentA

# agentA.py
import socket
import threading
import json
import time
import os
import logging
from datetime import datetime

HOST = '127.0.0.1'
PORT = 65432
BUFFER_SIZE = 1024
OUTPUT_DIR = "./output"

def setup_logging(name):
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    log_file = os.path.join(OUTPUT_DIR, f"{name}_{timestamp}.log")
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    file_handler = logging.FileHandler(log_file)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return logger

class Agent:
    def __init__(self, agent_id, logger):
        self.agent_id = agent_id
        self.logger = logger # Use the provided logger
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connected = False
        self.running = True
        self.state = "INITIATING"
        self.data_to_process = [10, 25, 40, 50, 75] 
        self.final_result = None

    def connect(self):
        try:
            self.sock.connect((HOST, PORT))
            self.connected = True
            self.logger.info(f"{self.agent_id} connected to server.")
            identity_msg = {"type": "identity", "sender": self.agent_id}
            self.send_message(identity_msg)
            threading.Thread(target=self.listen_for_messages, daemon=True).start()
            return True
        except ConnectionRefusedError:
            self.logger.error(f"Connection failed for {self.agent_id}. Server might be offline.")
            return False
        except Exception as e:
            self.logger.error(f"Connection error for {self.agent_id}: {e}")
            return False

    def send_message(self, message_dict):
        if not self.connected: return
        try:
            data = json.dumps(message_dict) + '\n'
            self.sock.sendall(data.encode('utf-8'))
        except Exception as e:
            self.logger.error(f"Error sending message from {self.agent_id}: {e}")
            self.connected = False

    def send_to_agent(self, recipient_id, content):
        message = {
            "type": "message",
            "sender": self.agent_id,
            "recipient": recipient_id,
            "content": content
        }
        self.logger.info(f"Sending to {recipient_id}: '{content}'")
        self.send_message(message)

    def listen_for_messages(self):
        while self.running and self.connected:
            try:
                data = self.sock.recv(BUFFER_SIZE).decode('utf-8').strip()
                if not data: break 
                for raw_msg in data.split('\n'):
                    if not raw_msg: continue
                    try:
                        message = json.loads(raw_msg)
                        self.process_message(message)
                    except json.JSONDecodeError:
                        self.logger.error(f"Received invalid JSON: {raw_msg}")
            except ConnectionResetError:
                self.logger.info(f"Server disconnected.")
                break
            except Exception as e:
                self.logger.error(f"Listening error: {e}")
                break
        self.connected = False
        self.sock.close()

    def process_message(self, message):
        m_type = message.get("type")

        if m_type == "status":
            self.logger.info(f"STATUS: {message.get('content')}")

        elif m_type == "message":
            sender = message.get("sender", "UNKNOWN")
            content = message.get("content", "No Content")
            self.logger.info(f"RECEIVED from {sender}: '{content}' (Current State: {self.state})")

            if self.state == "AWAITING_CALC_RESULT" and content.startswith("Result:"):
                try:
                    result_str = content.split("Value is ")[-1].strip('.')
                    self.final_result = float(result_str)

                    if self.final_result > 150:
                        decision = "Proceed"
                    else:
                        decision = "Abort"

                    self.logger.info(f"*** DECISION MADE ***: {decision} (Sum: {self.final_result})")
                    self.state = "TASK_COMPLETE"

                    self.send_to_agent(sender, f"Decision: {decision}. Thank you for the calculation.")

                except ValueError as e:
                    self.logger.error(f"ERROR: Could not parse calculation result from '{content}'. Reason: {e}")
                    self.state = "TASK_FAILED"
                except Exception as e:
                    self.logger.error(f"Unexpected error processing result: {e}")
                    self.state = "TASK_FAILED"

    def disconnect(self):
        self.running = False
        if self.connected:
            self.sock.close()
            self.logger.info(f"{self.agent_id} disconnected.")

# --- Main 
if __name__ == '__main__':
    agent_a_logger = setup_logging("agentA")
    agent_a = Agent("AgentA", agent_a_logger)

    if agent_a.connect():
        agent_a_logger.info("AgentA connected. Stabilizing network for other agents to come online...")

        time.sleep(5) # AgentB is registered?

        data_str = json.dumps(agent_a.data_to_process)
        agent_a.state = "AWAITING_CALC_RESULT"

        initial_request = f"TASK: Calculate the sum of the following data points: {data_str}"
        agent_a.send_to_agent("AgentB", initial_request)

        try:
            while agent_a.connected and agent_a.state not in ["TASK_COMPLETE", "TASK_FAILED"]:
                time.sleep(0.1)

            if agent_a.state == "TASK_COMPLETE":
                agent_a_logger.info("Automatic shutdown triggered by workflow completion.")
                time.sleep(1) 
            elif agent_a.state == "TASK_FAILED":
                 agent_a_logger.warning("AgentA workflow failed. Shutting down.")

        except KeyboardInterrupt:
            agent_a_logger.info("Manually stopped.")
        finally:
            agent_a.disconnect()
    else:
        agent_a_logger.error("AgentA could not connect. Ensure server.py is running.")
    agent_a_logger.info("AgentA process exiting.")
Enter fullscreen mode Exit fullscreen mode

The second Agent — AgentB

# agentB.py
import socket
import threading
import json
import time
import math
import os
import logging
from datetime import datetime

HOST = '127.0.0.1'
PORT = 65432
BUFFER_SIZE = 1024
OUTPUT_DIR = "./output"

def setup_logging(name):
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    log_file = os.path.join(OUTPUT_DIR, f"{name}_{timestamp}.log")
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    file_handler = logging.FileHandler(log_file)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return logger

class Agent:
    def __init__(self, agent_id, logger):
        self.agent_id = agent_id
        self.logger = logger 
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connected = False
        self.running = True

    def calculate_sum(self, data_list):
        self.logger.info(f"Worker: Starting summation on {len(data_list)} items...")
        time.sleep(1.5) 
        return sum(data_list)

    def connect(self):
        try:
            self.sock.connect((HOST, PORT))
            self.connected = True
            self.logger.info(f"{self.agent_id} connected to server.")
            identity_msg = {"type": "identity", "sender": self.agent_id}
            self.send_message(identity_msg)
            threading.Thread(target=self.listen_for_messages, daemon=True).start()
            return True
        except ConnectionRefusedError:
            self.logger.error(f"Connection failed for {self.agent_id}. Server might be offline.")
            return False
        except Exception as e:
            self.logger.error(f"Connection error for {self.agent_id}: {e}")
            return False

    def send_message(self, message_dict):
        if not self.connected: return
        try:
            data = json.dumps(message_dict) + '\n'
            self.sock.sendall(data.encode('utf-8'))
        except Exception as e:
            self.logger.error(f"Error sending message from {self.agent_id}: {e}")
            self.connected = False

    def send_to_agent(self, recipient_id, content):
        message = {
            "type": "message",
            "sender": self.agent_id,
            "recipient": recipient_id,
            "content": content
        }
        self.logger.info(f"Sending to {recipient_id}: '{content}'")
        self.send_message(message)

    def listen_for_messages(self):
        while self.running and self.connected:
            try:
                data = self.sock.recv(BUFFER_SIZE).decode('utf-8').strip()
                if not data: break 
                for raw_msg in data.split('\n'):
                    if not raw_msg: continue
                    try:
                        message = json.loads(raw_msg)
                        self.process_message(message)
                    except json.JSONDecodeError:
                        self.logger.error(f"Received invalid JSON: {raw_msg}")
            except ConnectionResetError:
                self.logger.info(f"Server disconnected.")
                break
            except Exception as e:
                self.logger.error(f"Listening error: {e}")
                break
        self.connected = False
        self.sock.close()

    def process_message(self, message):
        m_type = message.get("type")

        if m_type == "status":
            self.logger.info(f"STATUS: {message.get('content')}")

        elif m_type == "message":
            sender = message.get("sender", "UNKNOWN")
            content = message.get("content", "No Content")
            self.logger.info(f"RECEIVED from {sender}: '{content}'")

            if content.startswith("TASK: Calculate the sum of the following data points:"):
                self.logger.info("Worker: Received calculation request.")

                try:
                    data_str = content.split("data points: ")[-1]
                    data_list = json.loads(data_str)

                    result = self.calculate_sum(data_list)

                    result_message = f"Result: Calculation complete. Sum Value is {result:.2f}."
                    self.send_to_agent(sender, result_message)

                except (json.JSONDecodeError, IndexError, TypeError) as e:
                    error_msg = f"ERROR: Could not process data for calculation. Reason: {e}"
                    self.logger.error(error_msg)
                    self.send_to_agent(sender, error_msg)
                except Exception as e:
                    self.logger.error(f"Unexpected error in calculation task: {e}")
                    self.send_to_agent(sender, f"ERROR: Internal calculation error: {e}")

    def disconnect(self):
        self.running = False
        if self.connected:
            self.sock.close()
            self.logger.info(f"{self.agent_id} disconnected.")

# --- Main
if __name__ == '__main__':
    agent_b_logger = setup_logging("agentB")
    agent_b = Agent("AgentB", agent_b_logger)

    if agent_b.connect():
        agent_b_logger.info("AgentB connected, awaiting instructions.")
        try:
            while agent_b.connected:
                time.sleep(0.1)
        except KeyboardInterrupt:
            agent_b_logger.info("Manually stopped.")
        finally:
            agent_b.disconnect()
    else:
        agent_b_logger.error("AgentB could not connect. Ensure server.py is running.")
    agent_b_logger.info("AgentB process exiting.")
Enter fullscreen mode Exit fullscreen mode

Some useful refernces (I used among others…)

Putting All Together

Below are provided the steps in order to test this basic implementation.

  • Preparation of my Python environment 🧑‍🍳
python3 -m venv venv
source venv/bin/activate

pip install --upgrade pip
Enter fullscreen mode Exit fullscreen mode
  • Running the server in terminal 1 👷‍♂️
python server.py
Enter fullscreen mode Exit fullscreen mode
  • Running the agents (agentA.py & agentB.py) code in terminals 2 & 3🕵️
# terminal 2
python agentA.py   
# terminal 3
python agentB.py
Enter fullscreen mode Exit fullscreen mode
  • The exeution log files 📒 (if you’re somewhat familiar with my posts, you get that I’m always interested in logs and outputs 😱)
#server_2025-10-10_14-58-31.log
2025-10-10 14:58:31,501 - INFO - MCP Server started and listening on 127.0.0.1:65432
2025-10-10 14:58:31,502 - INFO - Type 'q' and press Enter to quit cleanly.
2025-10-10 14:58:36,441 - INFO - New connection established with ('127.0.0.1', 64483)
2025-10-10 14:58:36,441 - INFO - RECVD from ('127.0.0.1', 64483): {'type': 'identity', 'sender': 'AgentA'}
2025-10-10 14:58:36,441 - INFO - Agent Registered: AgentA (('127.0.0.1', 64483)). Active agents: ['AgentA']
2025-10-10 14:58:41,445 - INFO - RECVD from AgentA: {'type': 'message', 'sender': 'AgentA', 'recipient': 'AgentB', 'content': 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'}
2025-10-10 14:58:41,445 - INFO - RELAY from AgentA to AgentB: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 14:58:41,445 - ERROR - Recipient 'AgentB' not found. Notifying sender AgentA.
2025-10-10 14:58:59,634 - INFO - New connection established with ('127.0.0.1', 64486)
2025-10-10 14:58:59,634 - INFO - RECVD from ('127.0.0.1', 64486): {'type': 'identity', 'sender': 'AgentB'}
2025-10-10 14:58:59,634 - INFO - Agent Registered: AgentB (('127.0.0.1', 64486)). Active agents: ['AgentA', 'AgentB']
2025-10-10 15:00:15,798 - INFO - Agent Disconnected: AgentA. Active agents: ['AgentB']
2025-10-10 15:00:17,545 - INFO - New connection established with ('127.0.0.1', 64491)
2025-10-10 15:00:17,545 - INFO - RECVD from ('127.0.0.1', 64491): {'type': 'identity', 'sender': 'AgentA'}
2025-10-10 15:00:17,545 - INFO - Agent Registered: AgentA (('127.0.0.1', 64491)). Active agents: ['AgentB', 'AgentA']
2025-10-10 15:00:22,551 - INFO - RECVD from AgentA: {'type': 'message', 'sender': 'AgentA', 'recipient': 'AgentB', 'content': 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'}
2025-10-10 15:00:22,551 - INFO - RELAY from AgentA to AgentB: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 15:00:24,054 - INFO - RECVD from AgentB: {'type': 'message', 'sender': 'AgentB', 'recipient': 'AgentA', 'content': 'Result: Calculation complete. Sum Value is 200.00.'}
2025-10-10 15:00:24,055 - INFO - RELAY from AgentB to AgentA: 'Result: Calculation complete. Sum Value is 200.00.'
2025-10-10 15:00:24,055 - INFO - RECVD from AgentA: {'type': 'message', 'sender': 'AgentA', 'recipient': 'AgentB', 'content': 'Decision: Proceed. Thank you for the calculation.'}
2025-10-10 15:00:24,056 - INFO - RELAY from AgentA to AgentB: 'Decision: Proceed. Thank you for the calculation.'
2025-10-10 15:00:25,111 - INFO - Agent Disconnected: AgentA. Active agents: ['AgentB']
2025-10-10 15:00:33,594 - INFO - Agent Disconnected: AgentB. Active agents: []
2025-10-10 15:00:38,709 - INFO - 'q' received. Initiating graceful shutdown...
2025-10-10 15:00:38,709 - INFO - Server main socket closed.
2025-10-10 15:00:38,709 - INFO - Server process exiting.
Enter fullscreen mode Exit fullscreen mode
#agentA_2025-10-10_15-00-17.log
2025-10-10 15:00:17,544 - INFO - AgentA connected to server.
2025-10-10 15:00:17,545 - INFO - AgentA connected. Stabilizing network for other agents to come online...
2025-10-10 15:00:17,545 - INFO - STATUS: Welcome, AgentA! You are connected.
2025-10-10 15:00:22,550 - INFO - Sending to AgentB: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 15:00:24,055 - INFO - RECEIVED from AgentB: 'Result: Calculation complete. Sum Value is 200.00.' (Current State: AWAITING_CALC_RESULT)
2025-10-10 15:00:24,055 - INFO - *** DECISION MADE ***: Proceed (Sum: 200.0)
2025-10-10 15:00:24,055 - INFO - Sending to AgentB: 'Decision: Proceed. Thank you for the calculation.'
2025-10-10 15:00:24,106 - INFO - Automatic shutdown triggered by workflow completion.
2025-10-10 15:00:25,111 - ERROR - Listening error: [Errno 9] Bad file descriptor
2025-10-10 15:00:25,111 - INFO - AgentA disconnected.
2025-10-10 15:00:25,111 - INFO - AgentA process exiting.
Enter fullscreen mode Exit fullscreen mode
#agentB_2025-10-10_14-58-59.log
2025-10-10 14:58:59,634 - INFO - AgentB connected to server.
2025-10-10 14:58:59,634 - INFO - AgentB connected, awaiting instructions.
2025-10-10 14:58:59,635 - INFO - STATUS: Welcome, AgentB! You are connected.
2025-10-10 15:00:22,551 - INFO - RECEIVED from AgentA: 'TASK: Calculate the sum of the following data points: [10, 25, 40, 50, 75]'
2025-10-10 15:00:22,552 - INFO - Worker: Received calculation request.
2025-10-10 15:00:22,552 - INFO - Worker: Starting summation on 5 items...
2025-10-10 15:00:24,054 - INFO - Sending to AgentA: 'Result: Calculation complete. Sum Value is 200.00.'
2025-10-10 15:00:24,056 - INFO - RECEIVED from AgentA: 'Decision: Proceed. Thank you for the calculation.'
2025-10-10 15:00:33,593 - INFO - Manually stopped.
2025-10-10 15:00:33,594 - ERROR - Listening error: [Errno 9] Bad file descriptor
2025-10-10 15:00:33,594 - INFO - AgentB disconnected.
2025-10-10 15:00:33,594 - INFO - AgentB process exiting.
Enter fullscreen mode Exit fullscreen mode

Et voilà 🌟

Bonus 🔷

Having said all mentioned above… I also tried to implement the same logic with a popular framework; FastAPI! It looks a bit more professional!

pip install fastapi uvicorn websockets httpx pydantic
Enter fullscreen mode Exit fullscreen mode
# fastapi_server.py
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from pydantic import BaseModel
from typing import Dict, Set, Optional
import asyncio
import os
import logging
from datetime import datetime

# --- Configuration & Logging ---
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"fastapi_server_{timestamp}.log")

logger = logging.getLogger("fastapi_server")
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler())

app = FastAPI(title="MCP FastAPI Server")
active_connections: Dict[str, WebSocket] = {}
known_agents: Set[str] = set()

class AgentMessage(BaseModel):
    sender: str
    recipient: str
    content: str
    type: str = "message"

class StatusMessage(BaseModel):
    content: str
    type: str = "status"
    sender: str = "SERVER"

@app.websocket("/ws/{agent_id}")
async def websocket_endpoint(websocket: WebSocket, agent_id: str):
    await websocket.accept()

   active_connections[agent_id] = websocket
    known_agents.add(agent_id)
    logger.info(f"Agent Registered: {agent_id}. Active agents: {list(known_agents)}")

    await websocket.send_json(StatusMessage(content=f"Welcome, {agent_id}! You are connected.").dict())

    try:
        while True:
           await websocket.receive_text()

    except WebSocketDisconnect:
        logger.info(f"Agent Disconnected: {agent_id}. Cleaning up.")
    except Exception as e:
        logger.error(f"Error in WebSocket for {agent_id}: {e}")
    finally:
        # Cleanup
        if agent_id in active_connections:
            del active_connections[agent_id]
        if agent_id in known_agents:
            known_agents.remove(agent_id)
        logger.info(f"Agent {agent_id} disconnected. Active agents: {list(known_agents)}")

@app.post("/send_message", status_code=200)
async def send_message_to_agent(message: AgentMessage):
    logger.info(f"RELAY from {message.sender} to {message.recipient}: '{message.content}' (via HTTP POST)")

    if message.recipient not in known_agents or message.recipient not in active_connections:
        logger.error(f"Recipient '{message.recipient}' is offline.")

        if message.sender in active_connections:
            sender_ws = active_connections[message.sender]
            error_msg = StatusMessage(content=f"Error: Agent '{message.recipient}' is offline.")
            await sender_ws.send_json(error_msg.dict())

        raise HTTPException(status_code=404, detail=f"Recipient '{message.recipient}' is offline.")

    recipient_ws = active_connections[message.recipient]
    try:

        await recipient_ws.send_json(message.dict())
        logger.info(f"Message successfully relayed to {message.recipient}.")
        return {"status": "success", "message": "Message sent."}
    except Exception as e:
        logger.error(f"Error relaying message to {message.recipient}: {e}")
        raise HTTPException(status_code=500, detail="Failed to relay message.")
Enter fullscreen mode Exit fullscreen mode
# To run 
uvicorn fastapi_server:app --host 127.0.0.1 --port 8000 --log-config None
Enter fullscreen mode Exit fullscreen mode
# fastapi_agentA.py
import asyncio
import websockets
import httpx
import json
import logging
import os
from datetime import datetime
import time # Used for the initial delay (race condition fix)

# --- Configuration & Logging ---
AGENT_ID = "AgentA"
SERVER_WS_URL = "ws://127.0.0.1:8000/ws"
SERVER_HTTP_URL = "http://127.0.0.1:8000/send_message"
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"{AGENT_ID}_{timestamp}.log")

logger = logging.getLogger(AGENT_ID)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler())

# --- Agent Class ---
class AgentA:
    def __init__(self):
        self.agent_id = AGENT_ID
        self.ws_connection = None
        self.http_client = httpx.AsyncClient()
        self.running = True
        self.state = "INITIATING"
        self.data_to_process = [10, 25, 40, 50, 75] 

    async def connect(self):
        logger.info(f"{self.agent_id} attempting to connect to WebSocket server...")
        try:
            self.ws_connection = await websockets.connect(f"{SERVER_WS_URL}/{self.agent_id}")
            logger.info(f"{self.agent_id} connected to WebSocket server.")

            # Start a background task to listen for messages
            asyncio.create_task(self.listen_for_messages())
            return True
        except Exception as e:
            logger.error(f"{self.agent_id} failed to connect to WebSocket: {e}")
            return False

    async def listen_for_messages(self):
        try:
            while self.running:
                message_str = await self.ws_connection.recv()
                self.process_message(json.loads(message_str))
        except websockets.exceptions.ConnectionClosedOK:
            logger.info(f"{self.agent_id} WebSocket connection closed gracefully.")
        except Exception as e:
            logger.error(f"{self.agent_id} WebSocket listening error: {e}")
        finally:
            self.running = False
            await self.http_client.aclose()
            logger.info(f"{self.agent_id} stopped listening.")

    async def send_message_to_agent(self, recipient: str, content: str):
        payload = {
            "sender": self.agent_id,
            "recipient": recipient,
            "content": content
        }
        logger.info(f"Sending to {recipient}: '{content}'")
        try:
            response = await self.http_client.post(SERVER_HTTP_URL, json=payload)
            response.raise_for_status()
        except httpx.HTTPStatusError as e:
            # This catches 4xx and 5xx errors from the server (e.g., 404 for AgentB offline)
            logger.error(f"HTTP error sending message: {e.response.json().get('detail', e)}")
        except Exception as e:
            logger.error(f"Error sending message via HTTP: {e}")

    def process_message(self, message):
        m_type = message.get("type")

        if m_type == "status":
            logger.info(f"STATUS: {message.get('content')}")

        elif m_type == "message":
            sender = message.get("sender", "UNKNOWN")
            content = message.get("content", "No Content")
            logger.info(f"RECEIVED from {sender}: '{content}' (State: {self.state})")

            # State 2: Awaiting the calculation result
            if self.state == "AWAITING_CALC_RESULT" and content.startswith("Result:"):
                try:
                    result_str = content.split("Value is ")[-1].strip('.')
                    final_result = float(result_str)

                    decision = "Proceed" if final_result > 150 else "Abort"

                    logger.info(f"*** DECISION MADE ***: {decision} (Sum: {final_result})")
                    self.state = "TASK_COMPLETE"

                except Exception as e:
                    logger.error(f"ERROR: Could not process result: {e}")
                    self.state = "TASK_FAILED"

# --- Main Execution ---
async def main():
    agent_a = AgentA()

    if await agent_a.connect():
        logger.info("AgentA connected. Stabilizing network for other agents to come online...")

        # FIX FOR RACE CONDITION: Give AgentB time to start and register.
        await asyncio.sleep(5) 

        # Step 1: AgentA sends data and requests processing
        data_str = json.dumps(agent_a.data_to_process)
        agent_a.state = "AWAITING_CALC_RESULT"

        initial_request = f"TASK: Calculate the sum of the following data points: {data_str}"
        await agent_a.send_message_to_agent("AgentB", initial_request)

        # Keep AgentA running until the task is complete
        while agent_a.running and agent_a.state not in ["TASK_COMPLETE", "TASK_FAILED"]:
            await asyncio.sleep(0.1)

        if agent_a.state == "TASK_COMPLETE":
            logger.info("Automatic shutdown triggered by workflow completion.")

    logger.info("AgentA process exiting.")

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("AgentA manually stopped.")
Enter fullscreen mode Exit fullscreen mode
 fastapi_agentB.py
import asyncio
import websockets
import httpx
import json
import logging
import os
from datetime import datetime
import time

AGENT_ID = "AgentB"
SERVER_WS_URL = "ws://127.0.0.1:8000/ws"
SERVER_HTTP_URL = "http://127.0.0.1:8000/send_message"
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = os.path.join(OUTPUT_DIR, f"{AGENT_ID}_{timestamp}.log")

logger = logging.getLogger(AGENT_ID)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(logging.StreamHandler())

class AgentB:
    def __init__(self):
        self.agent_id = AGENT_ID
        self.ws_connection = None
        self.http_client = httpx.AsyncClient()
        self.running = True

    def calculate_sum(self, data_list):
        logger.info(f"Worker: Starting summation on {len(data_list)} items...")
        time.sleep(1.5) 
        return sum(data_list)

    async def connect(self):
        logger.info(f"{self.agent_id} attempting to connect to WebSocket server...")
        try:
            self.ws_connection = await websockets.connect(f"{SERVER_WS_URL}/{self.agent_id}")
            logger.info(f"{self.agent_id} connected to WebSocket server.")

            asyncio.create_task(self.listen_for_messages())
            return True
        except Exception as e:
            logger.error(f"{self.agent_id} failed to connect to WebSocket: {e}")
            return False

    async def listen_for_messages(self):
        try:
            while self.running:
                message_str = await self.ws_connection.recv()
                self.process_message(json.loads(message_str))
        except websockets.exceptions.ConnectionClosedOK:
            logger.info(f"{self.agent_id} WebSocket connection closed gracefully.")
        except Exception as e:
            logger.error(f"{self.agent_id} WebSocket listening error: {e}")
        finally:
            self.running = False
            await self.http_client.aclose()
            logger.info(f"{self.agent_id} stopped listening.")

    async def send_message_to_agent(self, recipient: str, content: str):
        payload = {
            "sender": self.agent_id,
            "recipient": recipient,
            "content": content
        }
        logger.info(f"Sending to {recipient}: '{content}'")
        try:
            response = await self.http_client.post(SERVER_HTTP_URL, json=payload)
            response.raise_for_status()
        except httpx.HTTPStatusError as e:
            logger.error(f"HTTP error sending message: {e.response.json().get('detail', e)}")
        except Exception as e:
            logger.error(f"Error sending message via HTTP: {e}")

    def process_message(self, message):
        m_type = message.get("type")

        if m_type == "status":
            logger.info(f"STATUS: {message.get('content')}")

        elif m_type == "message":
            sender = message.get("sender", "UNKNOWN")
            content = message.get("content", "No Content")
            logger.info(f"RECEIVED from {sender}: '{content}'")

            if content.startswith("TASK: Calculate the sum of the following data points:"):
                logger.info("Worker: Received calculation request.")

                try:
                    data_str = content.split("data points: ")[-1]
                    data_list = json.loads(data_str)

                   result = self.calculate_sum(data_list) 

                    result_message = f"Result: Calculation complete. Sum Value is {result:.2f}."
                    asyncio.create_task(self.send_message_to_agent(sender, result_message))

                except Exception as e:
                    error_msg = f"ERROR: Could not process data for calculation. Reason: {e}"
                    logger.error(error_msg)
                    asyncio.create_task(self.send_message_to_agent(sender, error_msg))

# --- Main 
async def main():
    agent_b = AgentB()
    await agent_b.connect()

    while agent_b.running:
        await asyncio.sleep(0.1)

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("AgentB manually stopped.")
Enter fullscreen mode Exit fullscreen mode

Conclusion

Considering all said above… a lengthy conclusion on the Model Context Protocol (MCP) feels almost redundant!

To bring these abstract concepts to life, the code above implements a straightforward multi-agent ecosystem. This included a basic MCP server designed specifically to handle registration and routing, and two distinct agents. Through a simple, coordinated task, it illustrates how tasks are initiated and executed autonomously by AgentA and AgentB, with the MCP server managing the entire communication flow and acting as the governing hub for the agents collaboration.

Top comments (0)