<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: jesrzrz</title>
    <description>The latest articles on DEV Community by jesrzrz (@jesrzrz).</description>
    <link>https://dev.to/jesrzrz</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F590518%2F81cf6482-7e97-4411-8215-828ec11cbaed.png</url>
      <title>DEV Community: jesrzrz</title>
      <link>https://dev.to/jesrzrz</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/jesrzrz"/>
    <language>en</language>
    <item>
      <title>Kafka MCP Server: Building a Real-Time Message Processing Integration</title>
      <dc:creator>jesrzrz</dc:creator>
      <pubDate>Wed, 26 Nov 2025 13:10:35 +0000</pubDate>
      <link>https://dev.to/jesrzrz/kafka-mcp-server-building-a-real-time-message-processing-integration-1bi2</link>
      <guid>https://dev.to/jesrzrz/kafka-mcp-server-building-a-real-time-message-processing-integration-1bi2</guid>
      <description>&lt;p&gt;&lt;strong&gt;TL;DR:&lt;/strong&gt; I built a Model Context Protocol (MCP) server that integrates Apache Kafka with AI clients, enabling programmatic interaction with Kafka topics, message consumption/production, and schema analysis. This article covers the project architecture, challenges faced, and solutions implemented.&lt;/p&gt;




&lt;h2&gt;
  
  
  Table of Contents
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Project Overview&lt;/li&gt;
&lt;li&gt;What is the Model Context Protocol (MCP)?&lt;/li&gt;
&lt;li&gt;Architecture and Components&lt;/li&gt;
&lt;li&gt;Key Features&lt;/li&gt;
&lt;li&gt;The Journey: Problems and Solutions&lt;/li&gt;
&lt;li&gt;Technical Deep Dive&lt;/li&gt;
&lt;li&gt;Security Considerations&lt;/li&gt;
&lt;li&gt;Getting Started&lt;/li&gt;
&lt;li&gt;Lessons Learned&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  Project Overview &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;p&gt;The &lt;strong&gt;Kafka MCP Server&lt;/strong&gt; is a Python application that implements the Model Context Protocol for Apache Kafka. It exposes Kafka operations as standardized tools that can be consumed by any MCP-compatible client.&lt;/p&gt;

&lt;h3&gt;
  
  
  What Can It Do?
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;List Topics&lt;/strong&gt;: Explore all available Kafka topics in your cluster&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Inspect Schemas&lt;/strong&gt;: Retrieve Avro schemas from Confluent Schema Registry&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Generate Payloads&lt;/strong&gt;: Automatically create valid test messages based on schemas&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Produce Messages&lt;/strong&gt;: Publish messages to Kafka topics with proper serialization&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Consume Messages&lt;/strong&gt;: Read messages from topics with offset control&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Analyze Schemas&lt;/strong&gt;: Get detailed information about message structure and data types&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Who Is This For?
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Data engineers building automation around Kafka clusters&lt;/li&gt;
&lt;li&gt;Teams developing event-driven systems and needing tooling&lt;/li&gt;
&lt;li&gt;Developers interested in MCP server implementation&lt;/li&gt;
&lt;li&gt;Anyone wanting to integrate Kafka with AI-assisted workflows&lt;/li&gt;
&lt;li&gt;System architects exploring protocol-based integrations&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  The Tech Stack
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Python 3.10+&lt;/strong&gt;: Core language&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Confluent Kafka Python&lt;/strong&gt;: Kafka client library using librdkafka&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Confluent Schema Registry&lt;/strong&gt;: Managing Avro schemas&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Anthropic MCP SDK&lt;/strong&gt;: Model Context Protocol implementation&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Pydantic&lt;/strong&gt;: Configuration management&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Azure Identity&lt;/strong&gt;: Optional EntraID authentication support&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  What is the Model Context Protocol (MCP)? &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;p&gt;If you're not familiar with MCP, here's a quick overview:&lt;/p&gt;

&lt;p&gt;MCP is a standardized protocol that allows applications to expose functionality through a well-defined interface. Instead of applications needing custom integrations with every external system, an MCP server provides a standard set of operations.&lt;/p&gt;

&lt;h3&gt;
  
  
  How It Works
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;┌──────────────────┐         ┌──────────────────┐         ┌─────────────────┐
│  AI Client       │         │  MCP Server      │         │  External System│
│  (or any app)    │ ◄─────► │  (Your Python    │ ◄─────► │  (Kafka, DB,    │
│                  │   MCP   │   application)   │  Client │   APIs, etc)    │
└──────────────────┘ Protocol └──────────────────┘  Libs   └─────────────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;An MCP client can:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Discover available tools from the MCP server&lt;/li&gt;
&lt;li&gt;Call those tools with appropriate parameters&lt;/li&gt;
&lt;li&gt;Receive structured results&lt;/li&gt;
&lt;li&gt;Chain multiple tool calls together&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This is powerful because any MCP-compatible application can use your Kafka integration without custom code.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why MCP Matters
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Standardized Interface&lt;/strong&gt;: The same protocol works across different clients and applications&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Secure&lt;/strong&gt;: Credentials stay on your machine; the client never sees raw connection details&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Extensible&lt;/strong&gt;: Add new tools without changing client code&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Type-Safe&lt;/strong&gt;: Pydantic validation ensures correct parameters&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Language Agnostic&lt;/strong&gt;: Clients can be written in any language that supports the protocol&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Architecture and Components &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;p&gt;The project is organized into modular, well-separated components:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-mcp-server/
├── src/kafka_mcp/
│   ├── __init__.py              # Package metadata
│   ├── __main__.py              # Entry point
│   ├── config.py                # Configuration management
│   ├── auth.py                  # Authentication setup
│   ├── kafka_client.py          # Kafka operations
│   ├── payload_generator.py     # Avro payload generation
│   └── server.py                # MCP server implementation
├── tests/
│   └── test_payload_generator.py
├── docs/
│   ├── README.md
│   ├── INSTALLATION.md
│   ├── AUTHENTICATION_GUIDE.md
│   ├── EXAMPLES.md
│   └── SECURITY.md
├── examples/
│   ├── client_config_example.json
│   └── example_schemas.json
└── pyproject.toml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Component Responsibilities
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;config.py&lt;/strong&gt; - Configuration Management&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Settings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseSettings&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Loads environment variables using Pydantic&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;kafka_bootstrap_servers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;sasl_username&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;sasl_password&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;security_protocol&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;ssl_ca_location&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;schema_registry_url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;auth.py&lt;/strong&gt; - Authentication Setup&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Builds Kafka client configuration with SASL/SSL&lt;/li&gt;
&lt;li&gt;Supports: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512&lt;/li&gt;
&lt;li&gt;Handles certificate management&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;kafka_client.py&lt;/strong&gt; - Kafka Client Manager&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Abstracts librdkafka complexities&lt;/li&gt;
&lt;li&gt;Manages producers, consumers, and admin clients&lt;/li&gt;
&lt;li&gt;Handles serialization/deserialization with Schema Registry&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;payload_generator.py&lt;/strong&gt; - Avro Payload Generation&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Parses Avro schemas&lt;/li&gt;
&lt;li&gt;Generates valid test messages&lt;/li&gt;
&lt;li&gt;Handles complex types: records, arrays, enums, unions&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;server.py&lt;/strong&gt; - MCP Server Implementation&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Implements the MCP protocol&lt;/li&gt;
&lt;li&gt;Defines tools that clients can call&lt;/li&gt;
&lt;li&gt;Handles serialization (datetime, Decimal, bytes, sets)&lt;/li&gt;
&lt;li&gt;Coordinates all components&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Key Features &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. Topic Management
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Tool: list_topics
Input: (optional) topic_filter
Output: List of topic names, partition counts, replica counts
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Explore your Kafka cluster and understand available topics.&lt;/p&gt;

&lt;h3&gt;
  
  
  2. Schema Inspection
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Tool: get_topic_schema
Input: topic_name
Output: Avro schema JSON with field descriptions
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Understand the structure of messages without examining raw bytes.&lt;/p&gt;

&lt;h3&gt;
  
  
  3. Payload Generation
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Tool: generate_payload
Input: topic_name, (optional) nested_depth
Output: Valid message in Avro schema format
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Quickly create test messages that conform to the schema.&lt;/p&gt;

&lt;h3&gt;
  
  
  4. Message Production
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Tool: produce_message
Input: topic_name, value (JSON), (optional) key (JSON or string)
Output: Partition, offset, timestamp
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Publish messages with automatic Avro serialization.&lt;/p&gt;

&lt;h3&gt;
  
  
  5. Message Consumption
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Tool: consume_messages
Input: topic_name, (optional) num_messages, from_beginning
Output: List of messages with full metadata
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Read messages with flexible offset control.&lt;/p&gt;

&lt;h3&gt;
  
  
  6. Schema Analysis
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Tool: analyze_schema
Input: topic_name
Output: Field names, types, descriptions, validation rules
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Get a human-readable breakdown of schema structure.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Journey: Problems and Solutions &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;p&gt;Building this project was a learning experience. Here are the major challenges I encountered and how I solved them.&lt;/p&gt;

&lt;h3&gt;
  
  
  Problem 1: JAAS Configuration Doesn't Work with librdkafka
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;The Issue&lt;/strong&gt;: I initially tried to use Java JAAS configuration syntax (common in Kafka Java clients) with the Python Confluent client.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# ❌ This doesn't work
&lt;/span&gt;&lt;span class="n"&gt;sasl_jaas_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.security.plain.PlainLoginModule required username=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;user&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt; password=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;pass&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;;&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.jaas.config&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;sasl_jaas_config&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Why&lt;/strong&gt;: Confluent Kafka Python uses librdkafka (a C library), not the Java client. librdkafka doesn't support JAAS configuration.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Solution&lt;/strong&gt;: Use native librdkafka properties:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# ✅ This works
&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.mechanism&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;PLAIN&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.username&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;my-user&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.password&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;my-password&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;security.protocol&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;SASL_SSL&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Lesson&lt;/strong&gt;: Always check which driver/client your library uses. Different Kafka clients have different configuration approaches.&lt;/p&gt;




&lt;h3&gt;
  
  
  Problem 2: Certificate Format Mismatches
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;The Issue&lt;/strong&gt;: Confluent provided certificates in JKS format (Java KeyStore), but librdkafka expects PEM format.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Error: ssl_ca_location: Unable to open '/path/to/cert.jks' (No such file or directory)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Why&lt;/strong&gt;: JKS is a Java-specific format. librdkafka is a C library and only understands PEM (OpenSSL) format.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Solution&lt;/strong&gt;: Convert certificates using Java keytool and OpenSSL:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Extract from JKS to PKCS12 (intermediate format)&lt;/span&gt;
keytool &lt;span class="nt"&gt;-importkeystore&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-srckeystore&lt;/span&gt; client-cert.jks &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-srcstoretype&lt;/span&gt; JKS &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-srcstorepass&lt;/span&gt; &lt;span class="s1"&gt;'keystorePassword'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-destkeystore&lt;/span&gt; client-cert.p12 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-deststoretype&lt;/span&gt; PKCS12 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-deststorepass&lt;/span&gt; &lt;span class="s1"&gt;'keystorePassword'&lt;/span&gt;

&lt;span class="c"&gt;# Convert PKCS12 to PEM&lt;/span&gt;
openssl pkcs12 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-in&lt;/span&gt; client-cert.p12 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-out&lt;/span&gt; client-cert.pem &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-passin&lt;/span&gt; pass:&lt;span class="s1"&gt;'keystorePassword'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-nodes&lt;/span&gt;

&lt;span class="c"&gt;# Extract CA certificate&lt;/span&gt;
openssl x509 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-in&lt;/span&gt; ca-cert.cer &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-out&lt;/span&gt; ca-cert.pem
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Lesson&lt;/strong&gt;: Document certificate format conversions. Include examples in your authentication guide.&lt;/p&gt;




&lt;h3&gt;
  
  
  Problem 3: Schema Registry SSL Certificate Issues
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;The Issue&lt;/strong&gt;: When connecting to Confluent Schema Registry over HTTPS, the client couldn't verify the SSL certificate.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Error: HTTPError: 401 Client Error: Unauthorized for url:
https://schema-registry.example.com/subjects
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Why&lt;/strong&gt;: The Schema Registry client wasn't configured with the CA certificate for SSL verification.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Solution&lt;/strong&gt;: Pass the CA certificate to the Schema Registry client:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;confluent_kafka.schema_registry&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SchemaRegistryClient&lt;/span&gt;

&lt;span class="n"&gt;schema_registry_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;url&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;https://schema-registry.example.com:8081&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;basic.auth.user.info&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ssl.ca.location&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/path/to/ca-cert.pem&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# ← Add this
&lt;/span&gt;    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ssl.certificate.location&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/path/to/client-cert.pem&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Optional
&lt;/span&gt;    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ssl.key.location&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/path/to/client-key.pem&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Optional
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SchemaRegistryClient&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema_registry_config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Lesson&lt;/strong&gt;: Always ensure SSL certificates are available for both the Kafka broker AND the Schema Registry client.&lt;/p&gt;




&lt;h3&gt;
  
  
  Problem 4: Consumer Offset Management with &lt;code&gt;assign()&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;The Issue&lt;/strong&gt;: When using manual partition assignment with &lt;code&gt;assign()&lt;/code&gt;, calling &lt;code&gt;seek()&lt;/code&gt; would fail with a cryptic error:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;KafkaError{code=_STATE,val=-172,str="Failed to seek to offset X: Local: Erroneous state"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Context&lt;/strong&gt;: I was using &lt;code&gt;assign()&lt;/code&gt; instead of &lt;code&gt;subscribe()&lt;/code&gt; to have precise control over which offsets to read:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assign&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic_partitions&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# Manually assign partitions
&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;seek&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TopicPartition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;  &lt;span class="c1"&gt;# ❌ FAILS!
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Why&lt;/strong&gt;: librdkafka requires partitions to be fully initialized before &lt;code&gt;seek()&lt;/code&gt; can be called. Checking &lt;code&gt;assignment()&lt;/code&gt; alone isn't enough because:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The partition assignment might be acknowledged&lt;/li&gt;
&lt;li&gt;But librdkafka's internal state machine hasn't finished initializing&lt;/li&gt;
&lt;li&gt;The partition metadata hasn't been fetched yet&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;The Solution&lt;/strong&gt;: Initialize partitions properly by:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Calling &lt;code&gt;get_watermark_offsets()&lt;/code&gt; to fetch partition metadata&lt;/li&gt;
&lt;li&gt;Polling multiple times to let librdkafka complete initialization&lt;/li&gt;
&lt;li&gt;Only then calling &lt;code&gt;seek()&lt;/code&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Step 1: Assign partitions
&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assign&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic_partitions&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Step 2: Fetch watermark offsets (initializes metadata)
&lt;/span&gt;&lt;span class="n"&gt;offset_map&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;tp&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;topic_partitions&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;low&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;high&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_watermark_offsets&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timeout&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;offset_map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;tp&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;low&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;high&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Step 3: Poll multiple times to initialize state
&lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeout&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.05&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;pass&lt;/span&gt;  &lt;span class="c1"&gt;# Continue polling
&lt;/span&gt;    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;  &lt;span class="c1"&gt;# Got a message, state is initialized
&lt;/span&gt;
&lt;span class="c1"&gt;# Step 4: NOW we can seek safely
&lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;tp&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;topic_partitions&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;low&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;high&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;offset_map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;tp&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;start_offset&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;max&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;low&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;high&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;num_messages&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;seek&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TopicPartition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tp&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tp&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;start_offset&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Lesson&lt;/strong&gt;: When dealing with low-level client libraries, understand the state machine. Some operations have ordering constraints.&lt;/p&gt;




&lt;h3&gt;
  
  
  Problem 5: Custom Serialization for Complex Types
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;The Issue&lt;/strong&gt;: When producing messages, Avro serialization failed for certain Python types:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# ❌ TypeError: Object of type datetime is not JSON serializable
&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;  &lt;span class="c1"&gt;# Avro needs numeric epoch
&lt;/span&gt;    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sa"&gt;b&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="se"&gt;\x00\x01\x02&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;      &lt;span class="c1"&gt;# Avro needs base64 string
&lt;/span&gt;    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;amount&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;10.50&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;   &lt;span class="c1"&gt;# Avro needs string or float
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Why&lt;/strong&gt;: Avro has specific type requirements that don't directly map to Python types. Additionally, the MCP protocol requires JSON serialization for responses.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Solution&lt;/strong&gt;: Create a custom JSON encoder:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CustomJSONEncoder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;JSONEncoder&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Handle non-standard types for JSON serialization&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;default&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nf"&gt;isinstance&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;isoformat&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="nf"&gt;isinstance&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;bytes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;bytearray&lt;/span&gt;&lt;span class="p"&gt;)):&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;hex&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# or base64.b64encode(obj).decode()
&lt;/span&gt;        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="nf"&gt;isinstance&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Decimal&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="nf"&gt;isinstance&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;set&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;default&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Use it in responses
&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cls&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;CustomJSONEncoder&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Also, handle the opposite direction when accepting payloads:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;deserialize_payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Convert string representations back to proper types&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# Convert ISO strings back to datetime
&lt;/span&gt;    &lt;span class="c1"&gt;# Convert hex strings back to bytes
&lt;/span&gt;    &lt;span class="c1"&gt;# Convert string decimals back to Decimal
&lt;/span&gt;    &lt;span class="c1"&gt;# etc.
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Lesson&lt;/strong&gt;: When building APIs, always think about serialization round-trips. JSON is common but has limitations.&lt;/p&gt;




&lt;h3&gt;
  
  
  Problem 6: Unique Consumer Groups vs. Offset Control
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;The Issue&lt;/strong&gt;: I wanted:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Fine-grained offset control (use &lt;code&gt;assign()&lt;/code&gt;)&lt;/li&gt;
&lt;li&gt;No permanent consumer group resources in the cluster&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;These seemed contradictory because Kafka requires a &lt;code&gt;group.id&lt;/code&gt; even when using &lt;code&gt;assign()&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The Solution&lt;/strong&gt;: Use unique consumer group IDs per request:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;uuid&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;consume_messages&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;group_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;group_id&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Generate unique ID per request
&lt;/span&gt;        &lt;span class="n"&gt;group_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kafka-mcp-consumer-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;uuid4&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nb"&gt;hex&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="si"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

    &lt;span class="n"&gt;consumer_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;group.id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;group_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;enable.auto.commit&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Don't commit offsets
&lt;/span&gt;        &lt;span class="bp"&gt;...&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumer_config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assign&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic_partitions&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# Manual assignment
&lt;/span&gt;    &lt;span class="c1"&gt;# ... seek and read ...
&lt;/span&gt;    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Clean up
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The unique ID ensures:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;No conflicts between concurrent requests&lt;/li&gt;
&lt;li&gt;No permanent consumer group state (cleaned up after timeout)&lt;/li&gt;
&lt;li&gt;Full offset control via &lt;code&gt;assign()&lt;/code&gt; and &lt;code&gt;seek()&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Lesson&lt;/strong&gt;: Sometimes the solution is simpler than you think. Use generated IDs as a clean compromise.&lt;/p&gt;




&lt;h2&gt;
  
  
  Technical Deep Dive &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Avro Schema Handling
&lt;/h3&gt;

&lt;p&gt;Avro is a powerful schema format but requires careful handling. Here's how the project processes schemas:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;AvroPayloadGenerator&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;generate_payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;nested_depth&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Generate valid message matching Avro schema&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

        &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;''&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;record&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Handle complex object
&lt;/span&gt;            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;field&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;generate_payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;field&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;nested_depth&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;fields&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;array&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Handle array
&lt;/span&gt;            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;generate_payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;items&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;nested_depth&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;

        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;enum&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Handle enumeration
&lt;/span&gt;            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;symbols&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;  &lt;span class="c1"&gt;# Return first valid value
&lt;/span&gt;
        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;union&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Handle union types (null or something)
&lt;/span&gt;            &lt;span class="n"&gt;non_null_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;null&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;generate_payload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;non_null_type&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;nested_depth&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bytes&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="sa"&gt;b&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;test_data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;int&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="mi"&gt;42&lt;/span&gt;

        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;schema_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;long&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="mi"&gt;9223372036854775807&lt;/span&gt;  &lt;span class="c1"&gt;# Max long value
&lt;/span&gt;
        &lt;span class="c1"&gt;# ... etc for string, float, double, boolean
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  MCP Tool Definition
&lt;/h3&gt;

&lt;p&gt;Each Kafka operation is exposed as an MCP tool:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@server.list_tools&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;list_tools&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Tool&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Tool&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;list_topics&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;List all topics in Kafka cluster&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;inputSchema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;object&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;properties&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;topic_filter&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;string&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;description&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Optional filter pattern&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
                    &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Tool&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;consume_messages&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Consume messages from a Kafka topic&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;inputSchema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;object&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;properties&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;topic&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;string&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;description&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Topic name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
                    &lt;span class="p"&gt;},&lt;/span&gt;
                    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;num_messages&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;integer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;description&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Number of messages to consume&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;default&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;
                    &lt;span class="p"&gt;},&lt;/span&gt;
                    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;from_beginning&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;boolean&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;description&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Read from start of topic&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;default&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
                    &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="p"&gt;},&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;required&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;topic&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="c1"&gt;# ... more tools ...
&lt;/span&gt;    &lt;span class="p"&gt;]&lt;/span&gt;

&lt;span class="nd"&gt;@server.call_tool&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;call_tool&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;arguments&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TextContent&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;list_topics&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;kafka_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;list_topics&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;arguments&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;topic_filter&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;TextContent&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;text&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;text&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cls&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;CustomJSONEncoder&lt;/span&gt;&lt;span class="p"&gt;))]&lt;/span&gt;

    &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;consume_messages&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;kafka_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;consume_messages&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;arguments&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;topic&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="n"&gt;num_messages&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;arguments&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;num_messages&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;from_beginning&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;arguments&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;from_beginning&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;TextContent&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;text&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;text&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cls&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;CustomJSONEncoder&lt;/span&gt;&lt;span class="p"&gt;))]&lt;/span&gt;

    &lt;span class="c1"&gt;# ... handle other tools ...
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Configuration with Pydantic
&lt;/h3&gt;

&lt;p&gt;Using Pydantic Settings makes configuration type-safe and environment-aware:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pydantic_settings&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseSettings&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Settings&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseSettings&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Kafka connection
&lt;/span&gt;    &lt;span class="n"&gt;kafka_bootstrap_servers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;kafka_security_protocol&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SASL_SSL&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

    &lt;span class="c1"&gt;# SASL authentication
&lt;/span&gt;    &lt;span class="n"&gt;kafka_sasl_mechanism&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PLAIN&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;kafka_sasl_username&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;kafka_sasl_password&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;

    &lt;span class="c1"&gt;# SSL/TLS
&lt;/span&gt;    &lt;span class="n"&gt;kafka_ssl_ca_location&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;kafka_ssl_certificate_location&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;kafka_ssl_key_location&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;kafka_ssl_key_password&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="c1"&gt;# Schema Registry
&lt;/span&gt;    &lt;span class="n"&gt;schema_registry_url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;schema_registry_username&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;schema_registry_password&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="c1"&gt;# Azure EntraID (optional)
&lt;/span&gt;    &lt;span class="n"&gt;azure_tenant_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;azure_client_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="n"&gt;azure_client_secret&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Config&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;env_file&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;.env&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;env_file_encoding&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="c1"&gt;# Convert KAFKA_BOOTSTRAP_SERVERS to kafka_bootstrap_servers
&lt;/span&gt;        &lt;span class="n"&gt;case_sensitive&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;

&lt;span class="c1"&gt;# Usage
&lt;/span&gt;&lt;span class="n"&gt;settings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Settings&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Automatically loads from .env and validates
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Security Considerations &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;p&gt;Building tools that handle credentials and message data requires careful security practices:&lt;/p&gt;

&lt;h3&gt;
  
  
  1. Environment Variables
&lt;/h3&gt;

&lt;p&gt;Never hardcode credentials:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# ❌ BAD
&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.username&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;my-user&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.password&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;my-password-12345&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# ✅ GOOD
&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.username&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_SASL_USERNAME&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sasl.password&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_SASL_PASSWORD&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Use &lt;code&gt;.env.example&lt;/code&gt; as a template:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# .env.example - NO REAL CREDENTIALS&lt;/span&gt;
&lt;span class="nv"&gt;KAFKA_BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;kafka-broker:9092
&lt;span class="nv"&gt;KAFKA_SASL_USERNAME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;your-username
&lt;span class="nv"&gt;KAFKA_SASL_PASSWORD&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;your-password
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  2. Certificate Management
&lt;/h3&gt;

&lt;p&gt;Keep certificates secure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Restrict file permissions&lt;/span&gt;
&lt;span class="nb"&gt;chmod &lt;/span&gt;600 client-cert.pem
&lt;span class="nb"&gt;chmod &lt;/span&gt;600 client-key.pem
&lt;span class="nb"&gt;chmod &lt;/span&gt;600 ca-cert.pem

&lt;span class="c"&gt;# Add to .gitignore&lt;/span&gt;
&lt;span class="k"&gt;*&lt;/span&gt;.pem
&lt;span class="k"&gt;*&lt;/span&gt;.key
&lt;span class="k"&gt;*&lt;/span&gt;.jks
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  3. Message Handling
&lt;/h3&gt;

&lt;p&gt;Be careful with message content:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;consume_messages&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;num_messages&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Never log entire message payloads in production&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="n"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Log metadata only
&lt;/span&gt;        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Read message from partition &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; offset &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Don't log: logger.debug(f"Message value: {msg.value()}")
&lt;/span&gt;        &lt;span class="c1"&gt;# This could leak sensitive data
&lt;/span&gt;
        &lt;span class="n"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;partition&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;offset&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;()[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;key&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;key&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;value&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;  &lt;span class="c1"&gt;# In API response only, not logs
&lt;/span&gt;        &lt;span class="p"&gt;})&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  4. Pre-commit Hooks
&lt;/h3&gt;

&lt;p&gt;The project includes &lt;code&gt;check_secrets.sh&lt;/code&gt; to prevent credential leaks:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;
&lt;span class="c"&gt;# Prevent committing sensitive files&lt;/span&gt;

&lt;span class="k"&gt;if &lt;/span&gt;git diff &lt;span class="nt"&gt;--cached&lt;/span&gt; | &lt;span class="nb"&gt;grep&lt;/span&gt; &lt;span class="nt"&gt;-E&lt;/span&gt; &lt;span class="s2"&gt;"(password|secret|api_key|AKIA)"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;then
    &lt;/span&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"❌ Sensitive patterns detected in staged changes!"&lt;/span&gt;
    &lt;span class="nb"&gt;exit &lt;/span&gt;1
&lt;span class="k"&gt;fi

if&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt; &lt;span class="nt"&gt;-f&lt;/span&gt; &lt;span class="s2"&gt;".env"&lt;/span&gt; &lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; git diff &lt;span class="nt"&gt;--cached&lt;/span&gt; &lt;span class="nt"&gt;--name-only&lt;/span&gt; | &lt;span class="nb"&gt;grep&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\.&lt;/span&gt;&lt;span class="s2"&gt;env$"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;then
    &lt;/span&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"❌ Don't commit .env file!"&lt;/span&gt;
    &lt;span class="nb"&gt;exit &lt;/span&gt;1
&lt;span class="k"&gt;fi

&lt;/span&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"✅ Pre-commit security check passed"&lt;/span&gt;
&lt;span class="nb"&gt;exit &lt;/span&gt;0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Install it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cp &lt;/span&gt;check_secrets.sh .git/hooks/pre-commit
&lt;span class="nb"&gt;chmod&lt;/span&gt; +x .git/hooks/pre-commit
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  5. Network Security
&lt;/h3&gt;

&lt;p&gt;When deploying:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use TLS for all connections (SASL_SSL, not SASL_PLAINTEXT)&lt;/li&gt;
&lt;li&gt;Verify SSL certificates (don't disable verification)&lt;/li&gt;
&lt;li&gt;Use VPN or private networks if possible&lt;/li&gt;
&lt;li&gt;Rotate credentials regularly&lt;/li&gt;
&lt;li&gt;Monitor access logs&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Getting Started &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Installation
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Clone the repository&lt;/span&gt;
git clone &amp;lt;repo-url&amp;gt;
&lt;span class="nb"&gt;cd &lt;/span&gt;kafka-mcp-server

&lt;span class="c"&gt;# Create virtual environment&lt;/span&gt;
python3 &lt;span class="nt"&gt;-m&lt;/span&gt; venv venv
&lt;span class="nb"&gt;source &lt;/span&gt;venv/bin/activate  &lt;span class="c"&gt;# On Windows: venv\Scripts\activate&lt;/span&gt;

&lt;span class="c"&gt;# Install dependencies&lt;/span&gt;
pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nb"&gt;.&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Configuration
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Copy template&lt;/span&gt;
&lt;span class="nb"&gt;cp&lt;/span&gt; .env.example .env

&lt;span class="c"&gt;# Edit with your credentials&lt;/span&gt;
nano .env
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Required settings:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Kafka Broker
KAFKA_BOOTSTRAP_SERVERS=kafka-broker:9092,kafka-broker-2:9092
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=your-username
KAFKA_SASL_PASSWORD=your-password

# SSL/TLS
KAFKA_SSL_CA_LOCATION=/path/to/ca-cert.pem

# Schema Registry
SCHEMA_REGISTRY_URL=https://schema-registry:8081
SCHEMA_REGISTRY_USERNAME=your-username
SCHEMA_REGISTRY_PASSWORD=your-password
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Running the Server
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Start the MCP server&lt;/span&gt;
python &lt;span class="nt"&gt;-m&lt;/span&gt; kafka_mcp.server

&lt;span class="c"&gt;# The server will start and wait for client connections&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Integration with MCP Clients
&lt;/h3&gt;

&lt;p&gt;To integrate with any MCP-compatible client (VS Code, IDE, AI application, etc.):&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Option 1: Environment-based Configuration&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;KAFKA_BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"your-broker:9092"&lt;/span&gt;
&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;KAFKA_SASL_USERNAME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"your-user"&lt;/span&gt;
&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;KAFKA_SASL_PASSWORD&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"your-password"&lt;/span&gt;
&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;KAFKA_SSL_CA_LOCATION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"/path/to/ca-cert.pem"&lt;/span&gt;
&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;SCHEMA_REGISTRY_URL&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"https://your-registry:8081"&lt;/span&gt;

python &lt;span class="nt"&gt;-m&lt;/span&gt; kafka_mcp.server
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Option 2: Configuration File&lt;/strong&gt;&lt;br&gt;
Add to your client's MCP configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"mcpServers"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"kafka"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"command"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"python"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"args"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"-m"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"kafka_mcp.server"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"env"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"KAFKA_BOOTSTRAP_SERVERS"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"your-broker:9092"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"KAFKA_SASL_USERNAME"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"your-user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"KAFKA_SASL_PASSWORD"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"your-password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"KAFKA_SSL_CA_LOCATION"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"/path/to/ca-cert.pem"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"SCHEMA_REGISTRY_URL"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"https://your-registry:8081"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Lessons Learned &lt;a&gt;&lt;/a&gt;
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. Low-Level Libraries Have State Machines
&lt;/h3&gt;

&lt;p&gt;When working with librdkafka (or similar low-level libraries), operations often have specific ordering requirements. Always check:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What state the client needs to be in before an operation&lt;/li&gt;
&lt;li&gt;What operations change that state&lt;/li&gt;
&lt;li&gt;How to verify the state is correct&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  2. Certificate Formats Matter
&lt;/h3&gt;

&lt;p&gt;Different clients expect different formats:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Java uses JKS, PKCS12&lt;/li&gt;
&lt;li&gt;OpenSSL/C libraries use PEM&lt;/li&gt;
&lt;li&gt;Always convert to the right format, don't try to use the wrong one&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  3. Documentation is Gold
&lt;/h3&gt;

&lt;p&gt;Confluent's documentation on certificate conversion and librdkafka configuration saved me hours. Good docs about your library's quirks are invaluable.&lt;/p&gt;

&lt;h3&gt;
  
  
  4. Separation of Concerns Scales
&lt;/h3&gt;

&lt;p&gt;Having separate modules for config, auth, kafka operations, and payload generation made it easy to add the MCP server on top. Each component can be tested independently.&lt;/p&gt;

&lt;h3&gt;
  
  
  5. Type Hints and Validation
&lt;/h3&gt;

&lt;p&gt;Using Pydantic for configuration validation caught configuration errors immediately instead of at runtime. Type hints made the code self-documenting.&lt;/p&gt;

&lt;h3&gt;
  
  
  6. Security Isn't Optional
&lt;/h3&gt;

&lt;p&gt;Hardcoding credentials is tempting for quick prototypes but creates bad habits. Using environment variables and pre-commit hooks from the start was worth it.&lt;/p&gt;

&lt;h3&gt;
  
  
  7. Error Messages Matter
&lt;/h3&gt;

&lt;p&gt;The cryptic librdkafka error messages (like the &lt;code&gt;_STATE&lt;/code&gt; error) were frustrating until I understood they were pointing to real state management issues. Better error messages in documentation would have helped.&lt;/p&gt;




&lt;h2&gt;
  
  
  Next Steps and Future Improvements
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Current Roadmap
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;[ ] Message filtering by key patterns&lt;/li&gt;
&lt;li&gt;[ ] Batch consumption with offset tracking&lt;/li&gt;
&lt;li&gt;[ ] Consumer group management tools&lt;/li&gt;
&lt;li&gt;[ ] Topic creation/deletion&lt;/li&gt;
&lt;li&gt;[ ] Schema versioning and migration helpers&lt;/li&gt;
&lt;li&gt;[ ] Performance metrics and monitoring&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Contributions Welcome
&lt;/h3&gt;

&lt;p&gt;This project is open source. If you:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Hit any issues, please report them&lt;/li&gt;
&lt;li&gt;Have ideas for new features, open a discussion&lt;/li&gt;
&lt;li&gt;Want to contribute, check the CONTRIBUTING.md file&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Further Learning
&lt;/h3&gt;

&lt;p&gt;If you want to dive deeper:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Apache Kafka&lt;/strong&gt;: Read "Kafka: The Definitive Guide" by Neha Narkhede, Gwen Shapira, and Todd Palino&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Model Context Protocol&lt;/strong&gt;: Check the MCP specification and implementation guides&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;librdkafka&lt;/strong&gt;: The C library documentation for advanced usage&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Avro&lt;/strong&gt;: Schema evolution and best practices&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Building the Kafka MCP Server was an enlightening project that combined several complex systems:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Understanding Kafka's architecture and client libraries&lt;/li&gt;
&lt;li&gt;Implementing the MCP protocol&lt;/li&gt;
&lt;li&gt;Handling authentication and certificates&lt;/li&gt;
&lt;li&gt;Managing serialization for complex types&lt;/li&gt;
&lt;li&gt;Ensuring security best practices&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The result is a standardized interface that makes working with Kafka accessible to any MCP-compatible application, whether it's an IDE, automation tool, or AI-powered workflow.&lt;/p&gt;

&lt;p&gt;Whether you're interested in Kafka integration, MCP development, or protocol-based system design, I hope this project and the lessons learned are helpful.&lt;/p&gt;

&lt;p&gt;Feel free to check out the &lt;a href="https://github.com/jesrzrz/mcp-kafka-client" rel="noopener noreferrer"&gt;full source code&lt;/a&gt; and try it with your Kafka cluster!&lt;/p&gt;




&lt;h2&gt;
  
  
  Questions or Feedback?
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Found a bug?&lt;/strong&gt; Open an issue on GitHub&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Have suggestions?&lt;/strong&gt; Discussions are open&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Want to contribute?&lt;/strong&gt; Pull requests welcome&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Need help?&lt;/strong&gt; Check the documentation in the &lt;code&gt;/docs&lt;/code&gt; directory&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Happy integrating! 🚀&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Tags:&lt;/strong&gt; #Kafka #Python #MCP #MessageQueue #Integration #Protocol #Architecture&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Originally posted:&lt;/strong&gt; &lt;a href="https://dev.to"&gt;dev.to&lt;/a&gt;&lt;/p&gt;

</description>
      <category>mcp</category>
      <category>kafka</category>
      <category>architecture</category>
      <category>automation</category>
    </item>
    <item>
      <title>Kafka Integration with External APIs: Particularities, Patterns, and Anti Patterns</title>
      <dc:creator>jesrzrz</dc:creator>
      <pubDate>Wed, 22 Jan 2025 09:20:41 +0000</pubDate>
      <link>https://dev.to/jesrzrz/kafka-integration-with-external-apis-particularities-patterns-and-anti-patterns-40mp</link>
      <guid>https://dev.to/jesrzrz/kafka-integration-with-external-apis-particularities-patterns-and-anti-patterns-40mp</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Integrating Kafka with external services via REST or gRPC APIs introduces unique challenges. This article explores the particularities of such integrations, best practices (patterns) to enhance resilience and scalability, and common pitfalls (anti patterns) to avoid.&lt;/p&gt;

&lt;p&gt;While this discussion focuses on Kafka, it is essential to understand its role within the broader context of event driven architecture (EDA). Kafka is a leading technology in EDA, enabling real time data flow and processing. However, the principles, challenges, and patterns discussed here can be applied to similar systems.&lt;/p&gt;




&lt;h2&gt;
  
  
  Particularities of Kafka to API Integrations
&lt;/h2&gt;

&lt;p&gt;Kafka to API integrations present unique complexities due to the fundamental differences between event driven systems and APIs' synchronous communication model. Understanding these particularities is essential to design efficient and reliable systems.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Asynchronous vs. Synchronous Paradigms&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Kafka operates on an asynchronous, decoupled architecture, while APIs often follow synchronous, tightly coupled request-response paradigms.&lt;/li&gt;
&lt;li&gt;Bridging these paradigms involves buffering, batching, or introducing mechanisms to manage the differences in data flow.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Message Acknowledgments&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Message acknowledgment in Kafka should occur only after confirming successful API responses.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Rate Limiting&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Many APIs have strict rate limits. Kafka consumers must implement flow control mechanisms to avoid exceeding these limits.&lt;/li&gt;
&lt;li&gt;Dynamic throttling mechanisms can adjust processing rates based on API responses, such as &lt;code&gt;429 Too Many Requests&lt;/code&gt; errors.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Error Handling&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;API failures must be managed through retries, fallback strategies, and dead letter topics to ensure message processing continuity.&lt;/li&gt;
&lt;li&gt;Differentiating between transient and permanent errors is critical for designing effective retry strategies.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Data Transformation&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Kafka messages often require transformation before being sent to external APIs, aligning the data format with the API's schema.&lt;/li&gt;
&lt;li&gt;Schema validation can ensure compatibility and prevent downstream failures.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Monitoring and Observability&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Real time monitoring of API request success rates, latencies, and failures is vital for maintaining operational stability.&lt;/li&gt;
&lt;li&gt;Observability tools integrated with Kafka and the external API provide insights into bottlenecks and errors.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Challenges of Using REST APIs in Real Time Streaming Contexts
&lt;/h2&gt;

&lt;p&gt;REST APIs, while widely used and versatile, pose specific challenges when integrated with Kafka in real time streaming scenarios. Understanding these limitations helps in making informed design choices.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Lack of Native Streaming Support&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;REST APIs follow a request response model, which is inherently unsuitable for continuous data streaming. This mismatch leads to inefficiencies when dealing with high throughput Kafka topics.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Higher Latency&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;REST APIs often introduce higher latency due to the overhead of HTTP/1.1 connections and text based serialization formats like JSON or XML. In real time systems, this can result in delays that accumulate over time.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Rate Limiting and Throttling&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Many REST APIs enforce strict rate limits, making it challenging to maintain the high throughput typical of Kafka streams without sophisticated throttling mechanisms.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Stateless Nature&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;REST APIs are stateless, requiring each request to carry all necessary context. This increases payload sizes and adds overhead in scenarios with frequent interactions.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Error Recovery Complexity&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Handling transient and permanent errors in REST APIs can be complex, requiring robust retry mechanisms and fallback strategies to avoid message loss or duplication.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  How gRPC Alleviates These Challenges
&lt;/h2&gt;

&lt;p&gt;In contrast to REST, gRPC offers several features that make it more suitable for Kafka integrations in real time streaming contexts:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Native Support for Streaming&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;gRPC's bidirectional streaming capabilities allow efficient handling of continuous data flows, aligning well with Kafka's event driven architecture.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Lower Latency&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;gRPC leverages HTTP/2 and binary serialization with Protocol Buffers, resulting in reduced latency and smaller payload sizes compared to REST.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Connection Efficiency&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Persistent connections in HTTP/2 minimize the overhead of repeated handshakes, providing a more efficient communication model for high frequency interactions.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Error Propagation&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;gRPC provides built in mechanisms for propagating detailed error codes, making it easier to implement nuanced retry and recovery strategies.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Streaming Flow Control&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;The protocol supports advanced flow control mechanisms, which help manage data streams effectively without overwhelming the API.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By adopting gRPC, developers can mitigate many of the challenges posed by REST APIs, creating more efficient and scalable integrations with Kafka.&lt;/p&gt;




&lt;h2&gt;
  
  
  REST vs. gRPC: Key Differences and Recommendations
&lt;/h2&gt;

&lt;p&gt;Understanding the distinctions between REST and gRPC APIs is crucial when designing Kafka integrations. Each has its strengths and considerations, and the choice depends on use case requirements.&lt;/p&gt;

&lt;h3&gt;
  
  
  Key Differences
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Communication Protocol&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;REST uses HTTP/1.1 with text based formats like JSON or XML.&lt;/li&gt;
&lt;li&gt;gRPC leverages HTTP/2 with a binary format (Protocol Buffers), enabling higher efficiency and lower latency.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Data Serialization&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;REST's text based serialization results in larger payloads and higher processing overhead.&lt;/li&gt;
&lt;li&gt;gRPC's binary serialization minimizes payload size and improves performance.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Streaming Support&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;REST typically supports only request response.&lt;/li&gt;
&lt;li&gt;gRPC natively supports bidirectional streaming, ideal for real time integrations.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Tooling and Ecosystem&lt;/strong&gt;:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;REST has a mature ecosystem with widespread adoption and support.&lt;/li&gt;
&lt;li&gt;gRPC offers strong tooling but requires additional setup and learning for developers new to Protocol Buffers.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Recommendations for Integration
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;When Using REST APIs&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Prioritize connection pooling to reduce latency.&lt;/li&gt;
&lt;li&gt;Implement retry logic for transient errors, such as &lt;code&gt;5xx&lt;/code&gt; responses.&lt;/li&gt;
&lt;li&gt;Use compression (e.g., Gzip) for large payloads to optimize bandwidth.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;When Using gRPC APIs&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Leverage gRPC's streaming capabilities for real-time data flows.&lt;/li&gt;
&lt;li&gt;Monitor HTTP/2 connection health to avoid unexpected disconnections.&lt;/li&gt;
&lt;li&gt;Ensure Protocol Buffer schemas are versioned and backward-compatible.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;General Best Practices&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use API gateways to centralize and standardize API interactions.&lt;/li&gt;
&lt;li&gt;Implement schema registry tools to validate message payloads before sending them to the API.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Integration Patterns
&lt;/h2&gt;

&lt;p&gt;Designing robust Kafka to API integrations requires adopting well established patterns that ensure resilience, scalability, and fault tolerance. This section introduces essential patterns for success.&lt;/p&gt;

&lt;h3&gt;
  
  
  1. &lt;strong&gt;Retry with Exponential Backoff&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;When API requests fail due to transient issues (e.g., network errors or temporary service unavailability), retries with exponential backoff help mitigate the problem without overwhelming the system.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Implementation Tips&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use a retry topic to manage messages needing retries.&lt;/li&gt;
&lt;li&gt;Gradually increase the retry delay to prevent API overload.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  2. &lt;strong&gt;Circuit Breaker&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;A circuit breaker prevents cascading failures when the external API is unavailable or under stress.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Benefits&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Protects Kafka consumers from long wait times.&lt;/li&gt;
&lt;li&gt;Redirects failed messages to a retry or dead letter topic while the circuit is open.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  3. &lt;strong&gt;Batch Processing&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Batching messages before sending them to the API reduces overhead and improves throughput, especially for APIs that support bulk operations.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Considerations&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Implement batch size limits to balance performance and API constraints.&lt;/li&gt;
&lt;li&gt;Use Kafka's consumer groups to parallelize batch formation.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  4. &lt;strong&gt;Backpressure and Flow Control&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;To align Kafka's high throughput with API rate limits, implement flow control mechanisms:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pause Kafka consumers when API rate limits are approached.&lt;/li&gt;
&lt;li&gt;Use buffers to temporarily hold messages until processing resumes.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  5. &lt;strong&gt;Dead Letter Topics (DLTs)&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Messages that fail after multiple retries should be routed to a DLT for further analysis or manual resolution.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Advantages&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Prevents blocking other messages in the pipeline.&lt;/li&gt;
&lt;li&gt;Provides a mechanism for forensic debugging.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  Anti Patterns
&lt;/h2&gt;

&lt;p&gt;Avoiding common pitfalls in Kafka to API integrations is as important as following best practices. This section highlights anti patterns and their solutions.&lt;/p&gt;

&lt;h3&gt;
  
  
  1. &lt;strong&gt;Blind Retries&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Retrying indefinitely without analyzing failure causes can overwhelm the external API and create a feedback loop.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Solution&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Implement a maximum retry count and route persistent failures to a DLT.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  2. &lt;strong&gt;Ignoring Rate Limits&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Failing to respect API rate limits can lead to throttling, increased latency, or even bans.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Solution&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Monitor API responses for rate limit headers and adjust the consumer rate dynamically.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  3. Overloading with Large Batches
&lt;/h3&gt;

&lt;p&gt;Sending excessively large batches to APIs can lead to timeouts or errors.&lt;/p&gt;

&lt;p&gt;Solution:&lt;/p&gt;

&lt;p&gt;Optimize batch size based on API specifications and performance tests.&lt;/p&gt;

&lt;h3&gt;
  
  
  4. Lack of Circuit Breaker
&lt;/h3&gt;

&lt;p&gt;Without a circuit breaker, repeated API calls during an outage can lead to a complete system breakdown.&lt;/p&gt;

&lt;p&gt;Solution:&lt;/p&gt;

&lt;p&gt;Monitor failure rates and implement a circuit breaker with fallback strategies.&lt;/p&gt;

&lt;h3&gt;
  
  
  5. Immediate Acknowledgment in Kafka
&lt;/h3&gt;

&lt;p&gt;Acknowledging messages in Kafka before confirming successful API processing can lead to data loss or inconsistency.&lt;/p&gt;

&lt;p&gt;Solution:&lt;/p&gt;

&lt;p&gt;Acknowledge Kafka messages only after receiving and validating API responses.&lt;/p&gt;




&lt;p&gt;Integrating Kafka with external APIs demands careful consideration of the asynchronous and synchronous nature of the systems. By leveraging patterns like retries, circuit breakers, and batching, you can design resilient and scalable systems. At the same time, avoiding anti-patterns minimizes failures.&lt;/p&gt;

&lt;p&gt;Whether you're working with REST or gRPC APIs, thoughtful integration strategies will enable robust data pipelines.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>eventdriven</category>
      <category>architecture</category>
      <category>api</category>
    </item>
    <item>
      <title>Why Schema Compatibility Matters</title>
      <dc:creator>jesrzrz</dc:creator>
      <pubDate>Mon, 13 Jan 2025 08:59:07 +0000</pubDate>
      <link>https://dev.to/jesrzrz/why-schema-compatibility-matters-2lkf</link>
      <guid>https://dev.to/jesrzrz/why-schema-compatibility-matters-2lkf</guid>
      <description>&lt;p&gt;When using Avro serialization in Kafka, schemas play a pivotal role in ensuring data consistency and interoperability. However, one often-overlooked aspect of schema management is defining a clear schema compatibility policy. As someone experienced in working with Confluent Kafka, I’ve seen firsthand how this decision can directly influence project outcomes.&lt;/p&gt;

&lt;p&gt;This post explores the importance of schema compatibility, when to pin schema versions, and practical examples to guide your decision-making.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Role of Schema Compatibility
&lt;/h2&gt;

&lt;p&gt;Schema compatibility refers to the rules that govern how schemas evolve over time while ensuring backward and forward compatibility with consumers and producers. In a Kafka environment using Avro, schemas are registered and managed in the Confluent Schema Registry, making compatibility policies a cornerstone of reliable data pipelines.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Types of Schema Compatibility&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Confluent Schema Registry supports the following compatibility policies:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Backward Compatibility: Consumers using the older schema can read data produced with the new schema.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Forward Compatibility: New consumers can read data produced with an older schema.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Full Compatibility: Ensures both backward and forward compatibility.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;None: No compatibility checks are enforced, which can lead to breaking changes.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Why It Matters&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Without a well-defined compatibility policy, schema evolution can introduce breaking changes. For instance:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;A producer introduces a new field, breaking older consumers.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A required field is removed, causing deserialization failures.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Real-World Scenarios: Lessons from Experience
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Case 1: Managing Rapid Evolution in a Distributed System&lt;/strong&gt;&lt;br&gt;
In one scenario, a team opted for NONE compatibility during early development to iterate quickly. However, as the system scaled, unexpected schema changes caused consumer applications to fail. For example, renaming a field led to deserialization errors in downstream applications.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Lesson Learned:&lt;/strong&gt; A default BACKWARD compatibility policy would have allowed schema evolution while maintaining compatibility with existing consumers.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Case 2: Pinning a Schema Version for Stable Applications&lt;/strong&gt;&lt;br&gt;
In another example, a team managing sensitive data needed downstream services to operate with a fixed schema version for consistency. They pinned the schema ID in the producer configuration to prevent unintended schema changes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;props.put("value.schema.id", "15"); // Fixed schema version
props.put("auto.register.schemas", "false");
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Lesson Learned:&lt;/strong&gt; Pinning schema versions is crucial when stability and consistency are non-negotiable.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Case 3: Using Latest Schema in Flexible Pipelines&lt;/strong&gt;&lt;br&gt;
Conversely, a team working on a data enrichment pipeline enabled use.latest.version=true to accommodate frequent schema updates. With BACKWARD compatibility, they ensured existing consumers could handle enriched data without disruption.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Lesson Learned:&lt;/strong&gt; Using the latest schema version works well for dynamic pipelines, provided backward compatibility is guaranteed.&lt;/p&gt;

&lt;h2&gt;
  
  
  Best Practices for Managing Schema Versions
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Define Compatibility Early: Always set a compatibility policy in the Schema Registry during project initialization.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Pin Versions for Stability: Use fixed schema IDs for systems where consistency is critical.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Leverage use.latest.version for Agility: For flexible pipelines, opt for the latest schema version but ensure backward compatibility.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Test Schema Changes in Staging: Validate schema evolution in a non-production environment to avoid unexpected issues.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Enable Monitoring: Use tools like Confluent Control Center to monitor schema changes and compatibility violations.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>architecture</category>
      <category>eventdriven</category>
      <category>kafka</category>
      <category>data</category>
    </item>
    <item>
      <title>Reaction to Saga with Springboot &amp; Kstream</title>
      <dc:creator>jesrzrz</dc:creator>
      <pubDate>Wed, 09 Feb 2022 18:25:56 +0000</pubDate>
      <link>https://dev.to/jesrzrz/reaction-to-saga-with-springboot-kstream-2733</link>
      <guid>https://dev.to/jesrzrz/reaction-to-saga-with-springboot-kstream-2733</guid>
      <description>&lt;p&gt;Today an article about building a saga transaction system with springboot and kafka streams reached to me, and of course I felt very curious about it, so I got to work inmediately. The article is &lt;a href="https://github.com/piomin/sample-spring-kafka-microservices/blob/streams-full" rel="noopener noreferrer"&gt;this &lt;/a&gt;&lt;/p&gt;

&lt;p&gt;At first look there exists &lt;em&gt;two modes&lt;/em&gt; : full and semi kafka streams. I´ll look for the difference...&lt;/p&gt;

&lt;p&gt;After taking a look to the readme and the picture, now I see that it seems an event driven arch where microservices publish &amp;amp; consume commands. We have bounded contexts and each domain is independent from the others. I see...&lt;/p&gt;

&lt;p&gt;Now a question: if this is a Saga, how is the rollback made (or published, or executed or whatever) &lt;br&gt;
As far as i know, a &lt;a href="https://microservices.io/patterns/data/saga.html" rel="noopener noreferrer"&gt;Saga transaction&lt;/a&gt; is a step made in a bussines flow, but in the picture I´m seeing a join between 2 streams where 2 different microservices publish the &lt;em&gt;transaction ok&lt;/em&gt; command. What happens when one of them fail? Lets see...&lt;/p&gt;

&lt;p&gt;Going deeper, in the full streams version of the github, the solution considers a ktable, but it's used as a store for the orders, and is the component of this solution from where the controller layer retrieve its data to expose it as an API. Still looking for the rollback system...&lt;/p&gt;

&lt;p&gt;I found it! But I think that it consists in a local mechanism. I´m still thinking that we are losing de distributed rollback transaction where one of the two domains has to emit the rollback event.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Edit 1:&lt;/strong&gt; Yesterday I ran out of time&lt;/p&gt;

&lt;p&gt;The magic is done &lt;a href="https://github.com/piomin/sample-spring-kafka-microservices/blob/streams-full/payment-service/src/main/java/pl/piomin/payment/PaymentApp.java" rel="noopener noreferrer"&gt;here &lt;/a&gt;-&amp;gt; &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5fqkxbgvofzkng37a3dy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5fqkxbgvofzkng37a3dy.png" width="695" height="461"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this example what is being done is to check the number of reservations(randomly updated) to publish a ok command or ko to the result stream if there exists enough stock.&lt;/p&gt;

&lt;p&gt;In this example something that took my attention was the way is being replace a common business layer (@&lt;em&gt;repository @service&lt;/em&gt; ) with a materialized state store where the reservartions are being stored.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fiagvoyrd750wpuxym00c.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fiagvoyrd750wpuxym00c.png" width="521" height="435"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here, for each order that is being consumed from the &lt;em&gt;orders&lt;/em&gt; stream, it´s being stored in the state stored grouped by customerId that is the key and processed by the Aggregator service, which is the one who decides what to do with the order: confirm, reject or process, based in the status of the order.&lt;/p&gt;

&lt;p&gt;But now I have another question to the fist one (how is the distributed rollback made?): How do I know the status of the order? It is not shared between all the microservices instances, because as I just see, the order event it´s being consumed from the stream. So let´s see.&lt;/p&gt;

&lt;p&gt;Reaching out to the &lt;a href="https://github.com/piomin/sample-spring-kafka-microservices/blob/streams-full/order-service/src/main/java/pl/piomin/order/service/OrderGeneratorService.java" rel="noopener noreferrer"&gt;place &lt;/a&gt; where the orders are being genereated I confirmmed that everything is ramdomly generated but the status, it´s being hardcoded to 'NEW'.&lt;/p&gt;

&lt;p&gt;So, now we are here:&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Feedb3ovos0telamdb0f3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Feedb3ovos0telamdb0f3.png" width="800" height="261"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I see... Now going to check if&lt;a href="https://github.com/piomin/sample-spring-kafka-microservices/blob/streams-full/stock-service/src/main/java/pl/piomin/stock/StockApp.java" rel="noopener noreferrer"&gt;stock service&lt;/a&gt; does it´s things in the same way, the answer is yes, but slightly different implemented, resulting in something like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmrabc5cf7t38qwbdjgxw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmrabc5cf7t38qwbdjgxw.png" alt="Image description" width="800" height="445"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now that I know more of what I´m seeing I think that this piece of code that exists in the stock service and payment service does the distributed rollback or commit magic:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3jz0vf32oy1eeh9o6l85.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3jz0vf32oy1eeh9o6l85.png" width="664" height="224"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now lets look for the &lt;em&gt;CONFIRMED _&amp;amp; _ROLLBACK&lt;/em&gt; command. First of all the join function:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftfs2ysnp1m3oxa0boi3t.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftfs2ysnp1m3oxa0boi3t.png" alt="Image description" width="574" height="314"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;So, definetively is very simple: &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Join between payment stream and stock stream by order_id&lt;/li&gt;
&lt;li&gt;Check the status of both payment event and stock event&lt;/li&gt;
&lt;li&gt;Publish the result command (Confirm,rollbak) into the orders stream (the initial stream)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The business flow ends like this&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqwmdfwu7bng77oa7q0on.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqwmdfwu7bng77oa7q0on.png" alt="Image description" width="800" height="518"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;So yes, all distributed transactions are published, processed and consummed in an event driven way. Now I have my answer, and this poc has been analyzed.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;BUT&lt;/strong&gt;&lt;br&gt;
I know that this is a POC, but what happens when something fails? A very clear weakpoint is located in the join between the orders service. As we are working in a distributed async architecture we have to deal with eventual consistency (or consistency that will never happen, you know). In a event streaming ecosystem, reactivity and realtime decissions are common and they are what make this kind of architecture to have sense, so I´m thinking that stock should be updated in realtime.&lt;/p&gt;

&lt;p&gt;So the questions are: &lt;br&gt;
what happens if payment fails? the stock goes inconsistent&lt;br&gt;
what happens if stock fails? the payment is never confirmed neither cancelled. The bank would block client's money for a while.&lt;/p&gt;

&lt;p&gt;This join is never been accomplished:&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft0cr9fvlqpjdvqcrdyof.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft0cr9fvlqpjdvqcrdyof.png" alt="Image description" width="800" height="557"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To forward events that never join, punctuate() can be used to periodically go through a store and emit events that don't have a match and have been in the statestore for a while.&lt;/p&gt;

&lt;p&gt;But there were some &lt;a href="https://issues.apache.org/jira/browse/KAFKA-10847" rel="noopener noreferrer"&gt;petitions &lt;/a&gt; to the kafka team for making it &lt;a href="https://docs.confluent.io/platform/current/streams/javadocs/javadoc/org/apache/kafka/streams/kstream/SlidingWindows.html#ofTimeDifferenceAndGrace-java.time.Duration-java.time.Duration-" rel="noopener noreferrer"&gt;easier&lt;/a&gt;:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fd1xki6sr2s7dkhx8x6bi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fd1xki6sr2s7dkhx8x6bi.png" alt="Image description" width="800" height="225"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;So I will give a try. Stay tunned.&lt;/p&gt;

</description>
      <category>saga</category>
      <category>springboot</category>
      <category>kafka</category>
      <category>kafkastreams</category>
    </item>
  </channel>
</rss>
