TL;DR: I built a Model Context Protocol (MCP) server that integrates Apache Kafka with AI clients, enabling programmatic interaction with Kafka topics, message consumption/production, and schema analysis. This article covers the project architecture, challenges faced, and solutions implemented.
Table of Contents
- Project Overview
- What is the Model Context Protocol (MCP)?
- Architecture and Components
- Key Features
- The Journey: Problems and Solutions
- Technical Deep Dive
- Security Considerations
- Getting Started
- Lessons Learned
Project Overview
The Kafka MCP Server is a Python application that implements the Model Context Protocol for Apache Kafka. It exposes Kafka operations as standardized tools that can be consumed by any MCP-compatible client.
What Can It Do?
- List Topics: Explore all available Kafka topics in your cluster
- Inspect Schemas: Retrieve Avro schemas from Confluent Schema Registry
- Generate Payloads: Automatically create valid test messages based on schemas
- Produce Messages: Publish messages to Kafka topics with proper serialization
- Consume Messages: Read messages from topics with offset control
- Analyze Schemas: Get detailed information about message structure and data types
Who Is This For?
- Data engineers building automation around Kafka clusters
- Teams developing event-driven systems and needing tooling
- Developers interested in MCP server implementation
- Anyone wanting to integrate Kafka with AI-assisted workflows
- System architects exploring protocol-based integrations
The Tech Stack
- Python 3.10+: Core language
- Confluent Kafka Python: Kafka client library using librdkafka
- Confluent Schema Registry: Managing Avro schemas
- Anthropic MCP SDK: Model Context Protocol implementation
- Pydantic: Configuration management
- Azure Identity: Optional EntraID authentication support
What is the Model Context Protocol (MCP)?
If you're not familiar with MCP, here's a quick overview:
MCP is a standardized protocol that allows applications to expose functionality through a well-defined interface. Instead of applications needing custom integrations with every external system, an MCP server provides a standard set of operations.
How It Works
┌──────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ AI Client │ │ MCP Server │ │ External System│
│ (or any app) │ ◄─────► │ (Your Python │ ◄─────► │ (Kafka, DB, │
│ │ MCP │ application) │ Client │ APIs, etc) │
└──────────────────┘ Protocol └──────────────────┘ Libs └─────────────────┘
An MCP client can:
- Discover available tools from the MCP server
- Call those tools with appropriate parameters
- Receive structured results
- Chain multiple tool calls together
This is powerful because any MCP-compatible application can use your Kafka integration without custom code.
Why MCP Matters
- Standardized Interface: The same protocol works across different clients and applications
- Secure: Credentials stay on your machine; the client never sees raw connection details
- Extensible: Add new tools without changing client code
- Type-Safe: Pydantic validation ensures correct parameters
- Language Agnostic: Clients can be written in any language that supports the protocol
Architecture and Components
The project is organized into modular, well-separated components:
kafka-mcp-server/
├── src/kafka_mcp/
│ ├── __init__.py # Package metadata
│ ├── __main__.py # Entry point
│ ├── config.py # Configuration management
│ ├── auth.py # Authentication setup
│ ├── kafka_client.py # Kafka operations
│ ├── payload_generator.py # Avro payload generation
│ └── server.py # MCP server implementation
├── tests/
│ └── test_payload_generator.py
├── docs/
│ ├── README.md
│ ├── INSTALLATION.md
│ ├── AUTHENTICATION_GUIDE.md
│ ├── EXAMPLES.md
│ └── SECURITY.md
├── examples/
│ ├── client_config_example.json
│ └── example_schemas.json
└── pyproject.toml
Component Responsibilities
config.py - Configuration Management
class Settings(BaseSettings):
"""Loads environment variables using Pydantic"""
kafka_bootstrap_servers: str
sasl_username: str
sasl_password: str
security_protocol: str
ssl_ca_location: str
schema_registry_url: str
auth.py - Authentication Setup
- Builds Kafka client configuration with SASL/SSL
- Supports: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
- Handles certificate management
kafka_client.py - Kafka Client Manager
- Abstracts librdkafka complexities
- Manages producers, consumers, and admin clients
- Handles serialization/deserialization with Schema Registry
payload_generator.py - Avro Payload Generation
- Parses Avro schemas
- Generates valid test messages
- Handles complex types: records, arrays, enums, unions
server.py - MCP Server Implementation
- Implements the MCP protocol
- Defines tools that clients can call
- Handles serialization (datetime, Decimal, bytes, sets)
- Coordinates all components
Key Features
1. Topic Management
Tool: list_topics
Input: (optional) topic_filter
Output: List of topic names, partition counts, replica counts
Explore your Kafka cluster and understand available topics.
2. Schema Inspection
Tool: get_topic_schema
Input: topic_name
Output: Avro schema JSON with field descriptions
Understand the structure of messages without examining raw bytes.
3. Payload Generation
Tool: generate_payload
Input: topic_name, (optional) nested_depth
Output: Valid message in Avro schema format
Quickly create test messages that conform to the schema.
4. Message Production
Tool: produce_message
Input: topic_name, value (JSON), (optional) key (JSON or string)
Output: Partition, offset, timestamp
Publish messages with automatic Avro serialization.
5. Message Consumption
Tool: consume_messages
Input: topic_name, (optional) num_messages, from_beginning
Output: List of messages with full metadata
Read messages with flexible offset control.
6. Schema Analysis
Tool: analyze_schema
Input: topic_name
Output: Field names, types, descriptions, validation rules
Get a human-readable breakdown of schema structure.
The Journey: Problems and Solutions
Building this project was a learning experience. Here are the major challenges I encountered and how I solved them.
Problem 1: JAAS Configuration Doesn't Work with librdkafka
The Issue: I initially tried to use Java JAAS configuration syntax (common in Kafka Java clients) with the Python Confluent client.
# ❌ This doesn't work
sasl_jaas_config = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";'
config = {
'sasl.jaas.config': sasl_jaas_config,
...
}
Why: Confluent Kafka Python uses librdkafka (a C library), not the Java client. librdkafka doesn't support JAAS configuration.
The Solution: Use native librdkafka properties:
# ✅ This works
config = {
'sasl.mechanism': 'PLAIN',
'sasl.username': 'my-user',
'sasl.password': 'my-password',
'security.protocol': 'SASL_SSL',
...
}
Lesson: Always check which driver/client your library uses. Different Kafka clients have different configuration approaches.
Problem 2: Certificate Format Mismatches
The Issue: Confluent provided certificates in JKS format (Java KeyStore), but librdkafka expects PEM format.
Error: ssl_ca_location: Unable to open '/path/to/cert.jks' (No such file or directory)
Why: JKS is a Java-specific format. librdkafka is a C library and only understands PEM (OpenSSL) format.
The Solution: Convert certificates using Java keytool and OpenSSL:
# Extract from JKS to PKCS12 (intermediate format)
keytool -importkeystore \
-srckeystore client-cert.jks \
-srcstoretype JKS \
-srcstorepass 'keystorePassword' \
-destkeystore client-cert.p12 \
-deststoretype PKCS12 \
-deststorepass 'keystorePassword'
# Convert PKCS12 to PEM
openssl pkcs12 \
-in client-cert.p12 \
-out client-cert.pem \
-passin pass:'keystorePassword' \
-nodes
# Extract CA certificate
openssl x509 \
-in ca-cert.cer \
-out ca-cert.pem
Lesson: Document certificate format conversions. Include examples in your authentication guide.
Problem 3: Schema Registry SSL Certificate Issues
The Issue: When connecting to Confluent Schema Registry over HTTPS, the client couldn't verify the SSL certificate.
Error: HTTPError: 401 Client Error: Unauthorized for url:
https://schema-registry.example.com/subjects
Why: The Schema Registry client wasn't configured with the CA certificate for SSL verification.
The Solution: Pass the CA certificate to the Schema Registry client:
from confluent_kafka.schema_registry import SchemaRegistryClient
schema_registry_config = {
'url': 'https://schema-registry.example.com:8081',
'basic.auth.user.info': f'{username}:{password}',
'ssl.ca.location': '/path/to/ca-cert.pem', # ← Add this
'ssl.certificate.location': '/path/to/client-cert.pem', # Optional
'ssl.key.location': '/path/to/client-key.pem', # Optional
}
client = SchemaRegistryClient(schema_registry_config)
Lesson: Always ensure SSL certificates are available for both the Kafka broker AND the Schema Registry client.
Problem 4: Consumer Offset Management with assign()
The Issue: When using manual partition assignment with assign(), calling seek() would fail with a cryptic error:
KafkaError{code=_STATE,val=-172,str="Failed to seek to offset X: Local: Erroneous state"}
Context: I was using assign() instead of subscribe() to have precise control over which offsets to read:
consumer.assign(topic_partitions) # Manually assign partitions
consumer.seek(TopicPartition(topic, partition, offset)) # ❌ FAILS!
Why: librdkafka requires partitions to be fully initialized before seek() can be called. Checking assignment() alone isn't enough because:
- The partition assignment might be acknowledged
- But librdkafka's internal state machine hasn't finished initializing
- The partition metadata hasn't been fetched yet
The Solution: Initialize partitions properly by:
- Calling
get_watermark_offsets()to fetch partition metadata - Polling multiple times to let librdkafka complete initialization
- Only then calling
seek()
# Step 1: Assign partitions
consumer.assign(topic_partitions)
# Step 2: Fetch watermark offsets (initializes metadata)
offset_map = {}
for tp in topic_partitions:
low, high = consumer.get_watermark_offsets(tp, timeout=10.0)
offset_map[tp.partition] = (low, high)
# Step 3: Poll multiple times to initialize state
for _ in range(30):
msg = consumer.poll(timeout=0.05)
if msg is None:
pass # Continue polling
else:
break # Got a message, state is initialized
# Step 4: NOW we can seek safely
for tp in topic_partitions:
low, high = offset_map[tp.partition]
start_offset = max(low, high - num_messages)
consumer.seek(TopicPartition(tp.topic, tp.partition, start_offset))
Lesson: When dealing with low-level client libraries, understand the state machine. Some operations have ordering constraints.
Problem 5: Custom Serialization for Complex Types
The Issue: When producing messages, Avro serialization failed for certain Python types:
# ❌ TypeError: Object of type datetime is not JSON serializable
payload = {
'timestamp': datetime.now(), # Avro needs numeric epoch
'data': b'\x00\x01\x02', # Avro needs base64 string
'amount': Decimal('10.50'), # Avro needs string or float
}
Why: Avro has specific type requirements that don't directly map to Python types. Additionally, the MCP protocol requires JSON serialization for responses.
The Solution: Create a custom JSON encoder:
class CustomJSONEncoder(json.JSONEncoder):
"""Handle non-standard types for JSON serialization"""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, (bytes, bytearray)):
return obj.hex() # or base64.b64encode(obj).decode()
elif isinstance(obj, Decimal):
return str(obj)
elif isinstance(obj, set):
return list(obj)
return super().default(obj)
# Use it in responses
json.dumps(result, cls=CustomJSONEncoder)
Also, handle the opposite direction when accepting payloads:
def deserialize_payload(data: dict, schema: dict) -> dict:
"""Convert string representations back to proper types"""
# Convert ISO strings back to datetime
# Convert hex strings back to bytes
# Convert string decimals back to Decimal
# etc.
Lesson: When building APIs, always think about serialization round-trips. JSON is common but has limitations.
Problem 6: Unique Consumer Groups vs. Offset Control
The Issue: I wanted:
- Fine-grained offset control (use
assign()) - No permanent consumer group resources in the cluster
These seemed contradictory because Kafka requires a group.id even when using assign().
The Solution: Use unique consumer group IDs per request:
import uuid
def consume_messages(self, topic: str, group_id: Optional[str] = None):
if group_id is None:
# Generate unique ID per request
group_id = f"kafka-mcp-consumer-{uuid.uuid4().hex[:8]}"
consumer_config = {
'group.id': group_id,
'enable.auto.commit': False, # Don't commit offsets
...
}
consumer = Consumer(consumer_config)
consumer.assign(topic_partitions) # Manual assignment
# ... seek and read ...
consumer.close() # Clean up
The unique ID ensures:
- No conflicts between concurrent requests
- No permanent consumer group state (cleaned up after timeout)
- Full offset control via
assign()andseek()
Lesson: Sometimes the solution is simpler than you think. Use generated IDs as a clean compromise.
Technical Deep Dive
Avro Schema Handling
Avro is a powerful schema format but requires careful handling. Here's how the project processes schemas:
class AvroPayloadGenerator:
def generate_payload(self, schema: dict, nested_depth: int = 0) -> dict:
"""Generate valid message matching Avro schema"""
schema_type = schema.get('type', '')
if schema_type == 'record':
# Handle complex object
return {
field['name']: self.generate_payload(field['type'], nested_depth + 1)
for field in schema['fields']
}
elif schema_type == 'array':
# Handle array
return [self.generate_payload(schema['items'], nested_depth + 1)]
elif schema_type == 'enum':
# Handle enumeration
return schema['symbols'][0] # Return first valid value
elif schema_type == 'union':
# Handle union types (null or something)
non_null_type = next(t for t in schema if t != 'null')
return self.generate_payload(non_null_type, nested_depth + 1)
elif schema_type == 'bytes':
return b'test_data'
elif schema_type == 'int':
return 42
elif schema_type == 'long':
return 9223372036854775807 # Max long value
# ... etc for string, float, double, boolean
MCP Tool Definition
Each Kafka operation is exposed as an MCP tool:
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="list_topics",
description="List all topics in Kafka cluster",
inputSchema={
"type": "object",
"properties": {
"topic_filter": {
"type": "string",
"description": "Optional filter pattern"
}
}
}
),
types.Tool(
name="consume_messages",
description="Consume messages from a Kafka topic",
inputSchema={
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Topic name"
},
"num_messages": {
"type": "integer",
"description": "Number of messages to consume",
"default": 10
},
"from_beginning": {
"type": "boolean",
"description": "Read from start of topic",
"default": False
}
},
"required": ["topic"]
}
),
# ... more tools ...
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
if name == "list_topics":
result = kafka_client.list_topics(arguments.get("topic_filter"))
return [types.TextContent(type="text", text=json.dumps(result, cls=CustomJSONEncoder))]
elif name == "consume_messages":
result = kafka_client.consume_messages(
topic=arguments["topic"],
num_messages=arguments.get("num_messages", 10),
from_beginning=arguments.get("from_beginning", False)
)
return [types.TextContent(type="text", text=json.dumps(result, cls=CustomJSONEncoder))]
# ... handle other tools ...
Configuration with Pydantic
Using Pydantic Settings makes configuration type-safe and environment-aware:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Kafka connection
kafka_bootstrap_servers: str
kafka_security_protocol: str = "SASL_SSL"
# SASL authentication
kafka_sasl_mechanism: str = "PLAIN"
kafka_sasl_username: str
kafka_sasl_password: str
# SSL/TLS
kafka_ssl_ca_location: str
kafka_ssl_certificate_location: Optional[str] = None
kafka_ssl_key_location: Optional[str] = None
kafka_ssl_key_password: Optional[str] = None
# Schema Registry
schema_registry_url: str
schema_registry_username: Optional[str] = None
schema_registry_password: Optional[str] = None
# Azure EntraID (optional)
azure_tenant_id: Optional[str] = None
azure_client_id: Optional[str] = None
azure_client_secret: Optional[str] = None
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# Convert KAFKA_BOOTSTRAP_SERVERS to kafka_bootstrap_servers
case_sensitive = False
# Usage
settings = Settings() # Automatically loads from .env and validates
Security Considerations
Building tools that handle credentials and message data requires careful security practices:
1. Environment Variables
Never hardcode credentials:
# ❌ BAD
config = {
'sasl.username': 'my-user',
'sasl.password': 'my-password-12345'
}
# ✅ GOOD
config = {
'sasl.username': os.getenv('KAFKA_SASL_USERNAME'),
'sasl.password': os.getenv('KAFKA_SASL_PASSWORD')
}
Use .env.example as a template:
# .env.example - NO REAL CREDENTIALS
KAFKA_BOOTSTRAP_SERVERS=kafka-broker:9092
KAFKA_SASL_USERNAME=your-username
KAFKA_SASL_PASSWORD=your-password
2. Certificate Management
Keep certificates secure:
# Restrict file permissions
chmod 600 client-cert.pem
chmod 600 client-key.pem
chmod 600 ca-cert.pem
# Add to .gitignore
*.pem
*.key
*.jks
3. Message Handling
Be careful with message content:
def consume_messages(self, topic: str, num_messages: int = 10):
"""Never log entire message payloads in production"""
messages = []
for msg in consumer:
# Log metadata only
logger.debug(f"Read message from partition {msg.partition()} offset {msg.offset()}")
# Don't log: logger.debug(f"Message value: {msg.value()}")
# This could leak sensitive data
messages.append({
'partition': msg.partition(),
'offset': msg.offset(),
'timestamp': msg.timestamp()[1],
'key': msg.key().decode() if msg.key() else None,
'value': msg.value(), # In API response only, not logs
})
4. Pre-commit Hooks
The project includes check_secrets.sh to prevent credential leaks:
#!/bin/bash
# Prevent committing sensitive files
if git diff --cached | grep -E "(password|secret|api_key|AKIA)"; then
echo "❌ Sensitive patterns detected in staged changes!"
exit 1
fi
if [ -f ".env" ] && git diff --cached --name-only | grep -q "\.env$"; then
echo "❌ Don't commit .env file!"
exit 1
fi
echo "✅ Pre-commit security check passed"
exit 0
Install it:
cp check_secrets.sh .git/hooks/pre-commit
chmod +x .git/hooks/pre-commit
5. Network Security
When deploying:
- Use TLS for all connections (SASL_SSL, not SASL_PLAINTEXT)
- Verify SSL certificates (don't disable verification)
- Use VPN or private networks if possible
- Rotate credentials regularly
- Monitor access logs
Getting Started
Installation
# Clone the repository
git clone <repo-url>
cd kafka-mcp-server
# Create virtual environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -e .
Configuration
# Copy template
cp .env.example .env
# Edit with your credentials
nano .env
Required settings:
# Kafka Broker
KAFKA_BOOTSTRAP_SERVERS=kafka-broker:9092,kafka-broker-2:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=your-username
KAFKA_SASL_PASSWORD=your-password
# SSL/TLS
KAFKA_SSL_CA_LOCATION=/path/to/ca-cert.pem
# Schema Registry
SCHEMA_REGISTRY_URL=https://schema-registry:8081
SCHEMA_REGISTRY_USERNAME=your-username
SCHEMA_REGISTRY_PASSWORD=your-password
Running the Server
# Start the MCP server
python -m kafka_mcp.server
# The server will start and wait for client connections
Integration with MCP Clients
To integrate with any MCP-compatible client (VS Code, IDE, AI application, etc.):
Option 1: Environment-based Configuration
export KAFKA_BOOTSTRAP_SERVERS="your-broker:9092"
export KAFKA_SASL_USERNAME="your-user"
export KAFKA_SASL_PASSWORD="your-password"
export KAFKA_SSL_CA_LOCATION="/path/to/ca-cert.pem"
export SCHEMA_REGISTRY_URL="https://your-registry:8081"
python -m kafka_mcp.server
Option 2: Configuration File
Add to your client's MCP configuration:
{
"mcpServers": {
"kafka": {
"command": "python",
"args": ["-m", "kafka_mcp.server"],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "your-broker:9092",
"KAFKA_SASL_USERNAME": "your-user",
"KAFKA_SASL_PASSWORD": "your-password",
"KAFKA_SSL_CA_LOCATION": "/path/to/ca-cert.pem",
"SCHEMA_REGISTRY_URL": "https://your-registry:8081"
}
}
}
}
Lessons Learned
1. Low-Level Libraries Have State Machines
When working with librdkafka (or similar low-level libraries), operations often have specific ordering requirements. Always check:
- What state the client needs to be in before an operation
- What operations change that state
- How to verify the state is correct
2. Certificate Formats Matter
Different clients expect different formats:
- Java uses JKS, PKCS12
- OpenSSL/C libraries use PEM
- Always convert to the right format, don't try to use the wrong one
3. Documentation is Gold
Confluent's documentation on certificate conversion and librdkafka configuration saved me hours. Good docs about your library's quirks are invaluable.
4. Separation of Concerns Scales
Having separate modules for config, auth, kafka operations, and payload generation made it easy to add the MCP server on top. Each component can be tested independently.
5. Type Hints and Validation
Using Pydantic for configuration validation caught configuration errors immediately instead of at runtime. Type hints made the code self-documenting.
6. Security Isn't Optional
Hardcoding credentials is tempting for quick prototypes but creates bad habits. Using environment variables and pre-commit hooks from the start was worth it.
7. Error Messages Matter
The cryptic librdkafka error messages (like the _STATE error) were frustrating until I understood they were pointing to real state management issues. Better error messages in documentation would have helped.
Next Steps and Future Improvements
Current Roadmap
- [ ] Message filtering by key patterns
- [ ] Batch consumption with offset tracking
- [ ] Consumer group management tools
- [ ] Topic creation/deletion
- [ ] Schema versioning and migration helpers
- [ ] Performance metrics and monitoring
Contributions Welcome
This project is open source. If you:
- Hit any issues, please report them
- Have ideas for new features, open a discussion
- Want to contribute, check the CONTRIBUTING.md file
Further Learning
If you want to dive deeper:
- Apache Kafka: Read "Kafka: The Definitive Guide" by Neha Narkhede, Gwen Shapira, and Todd Palino
- Model Context Protocol: Check the MCP specification and implementation guides
- librdkafka: The C library documentation for advanced usage
- Avro: Schema evolution and best practices
Conclusion
Building the Kafka MCP Server was an enlightening project that combined several complex systems:
- Understanding Kafka's architecture and client libraries
- Implementing the MCP protocol
- Handling authentication and certificates
- Managing serialization for complex types
- Ensuring security best practices
The result is a standardized interface that makes working with Kafka accessible to any MCP-compatible application, whether it's an IDE, automation tool, or AI-powered workflow.
Whether you're interested in Kafka integration, MCP development, or protocol-based system design, I hope this project and the lessons learned are helpful.
Feel free to check out the full source code and try it with your Kafka cluster!
Questions or Feedback?
- Found a bug? Open an issue on GitHub
- Have suggestions? Discussions are open
- Want to contribute? Pull requests welcome
-
Need help? Check the documentation in the
/docsdirectory
Happy integrating! 🚀
Tags: #Kafka #Python #MCP #MessageQueue #Integration #Protocol #Architecture
Originally posted: dev.to
Top comments (0)