Digging into a new universe of data streaming after IBM’s recent accquisition of Confluent!
Introduction
Data is no longer a static asset — it’s a live pulse. Following IBM’s $11B acquisition of Confluent, the bridge between real-time streaming and enterprise AI has never been shorter. I’m diving into the new universes of data-in-motion to see how this ‘Smart Data’ foundation is changing the game for hybrid cloud and agentic AI. 🚀
In the traditional world of data, systems often talk to each other like a game of “telephone” — one service calls another, which calls another, creating a fragile web of dependencies. If one system goes down or slows down, the entire chain breaks. Event streaming changes this paradigm by turning data into a continuous flow of events.
Apache Kafka acts as the “central nervous system” for this flow. Instead of direct connections, services publish events to a central cluster, and any interested service can “tune in” to listen. This decoupling allows businesses to react to customer actions in real-time — whether that’s processing a payment, updating a live dashboard, or triggering a delivery — without the systems ever needing to know each other exists.
Implementation: Building an E-commerce Stream
Using two basic code samples I found on the net, one as a “producer” and one as a “consumer” to explain the concepts of Kafka, I asked Bob to build a simple comprehensive application.
- Simple Producer
from confluent_kafka import Producer
import json
# Configuration for connecting to the Kafka cluster
config = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(config)
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Order sent to {msg.topic()} [{msg.partition()}]")
# Simulate an order
order_data = {
"order_id": 1001,
"user": "jane_doe",
"total": 59.99,
"items": ["Wireless Mouse", "Keyboard"]
}
# Trigger the send (Asynchronous)
producer.produce(
'orders',
key="1001",
value=json.dumps(order_data),
callback=delivery_report
)
producer.flush() # Wait for any outstanding messages to be delivered
- Simple Consumer
from confluent_kafka import Consumer
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'shipping-group', # Helps Kafka track which orders this group has seen
'auto.offset.reset': 'earliest'
}
consumer = Consumer(config)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(1.0) # Check for new messages every 1 second
if msg is None: continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received Order: {msg.value().decode('utf-8')}")
print("Action: Initiating packaging and shipping...")
finally:
consumer.close()
After reading my conceptual samples, Bob provided the application structure which follows.
The Producer: Capturing the Event
The Producer (found in src/producer.py) acts as the storefront. When an order is placed, it creates a JSON-serializable dictionary containing the order ID, user details, and items. It uses the confluent-kafka library to "produce" this message to a topic called orders, using the order_id as a key to ensure all updates for a specific order land in the same partition. A delivery callback function is utilized to provide immediate feedback on whether the message successfully reached the Kafka broker or if a retry is necessary.
"""
Kafka Producer - E-commerce Order Processing System
Simulates an online store sending order events to Kafka
"""
from confluent_kafka import Producer
import json
import time
import random
from datetime import datetime
# Configuration for connecting to the Kafka cluster
config = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'ecommerce-producer'
}
producer = Producer(config)
def delivery_report(err, msg):
"""
Callback function called once for each message produced to indicate delivery result.
Triggered by poll() or flush().
"""
if err is not None:
print(f"❌ Message delivery failed: {err}")
else:
print(f"✅ Order sent to topic '{msg.topic()}' [partition {msg.partition()}] at offset {msg.offset()}")
# Sample product catalog
PRODUCTS = [
{"name": "Wireless Mouse", "price": 29.99},
{"name": "Mechanical Keyboard", "price": 89.99},
{"name": "USB-C Cable", "price": 12.99},
{"name": "Laptop Stand", "price": 45.00},
{"name": "Webcam HD", "price": 79.99},
{"name": "Headphones", "price": 149.99},
{"name": "Monitor 27\"", "price": 299.99},
{"name": "Desk Lamp", "price": 34.99}
]
USERS = ["alice_smith", "bob_jones", "charlie_brown", "diana_prince", "eve_adams"]
def generate_order(order_id):
"""Generate a random order"""
num_items = random.randint(1, 4)
items = random.sample(PRODUCTS, num_items)
order_data = {
"order_id": order_id,
"user": random.choice(USERS),
"timestamp": datetime.now().isoformat(),
"items": [item["name"] for item in items],
"total": round(sum(item["price"] for item in items), 2),
"status": "pending"
}
return order_data
def produce_orders(num_orders=10, delay=2):
"""
Produce multiple orders to Kafka
Args:
num_orders: Number of orders to generate
delay: Delay in seconds between orders
"""
print(f"🚀 Starting E-commerce Order Producer")
print(f"📊 Will produce {num_orders} orders with {delay}s delay between each")
print(f"🔗 Connected to Kafka at: {config['bootstrap.servers']}")
print("-" * 70)
for i in range(1, num_orders + 1):
try:
# Generate order
order_data = generate_order(1000 + i)
# Print order details
print(f"\n📦 Order #{order_data['order_id']}:")
print(f" User: {order_data['user']}")
print(f" Items: {', '.join(order_data['items'])}")
print(f" Total: ${order_data['total']}")
# Produce message to Kafka
producer.produce(
topic='orders',
key=str(order_data['order_id']),
value=json.dumps(order_data),
callback=delivery_report
)
# Trigger delivery reports by polling
producer.poll(0)
# Wait before next order
if i < num_orders:
time.sleep(delay)
except KeyboardInterrupt:
print("\n⚠️ Producer interrupted by user")
break
except Exception as e:
print(f"❌ Error producing message: {e}")
# Wait for all messages to be delivered
print("\n⏳ Flushing remaining messages...")
producer.flush()
print("✅ All orders sent successfully!")
if __name__ == "__main__":
import sys
# Parse command line arguments
num_orders = int(sys.argv[1]) if len(sys.argv) > 1 else 10
delay = float(sys.argv[2]) if len(sys.argv) > 2 else 2
try:
produce_orders(num_orders, delay)
except KeyboardInterrupt:
print("\n👋 Producer stopped")
except Exception as e:
print(f"❌ Fatal error: {e}")
# Made with Bob
The Consumer: Decoupled Processing

The Consumer logic (found in src/consumer.py) demonstrates the power of Consumer Groups. Multiple independent services—Shipping, Email, and Analytics—all subscribe to the same orders topic simultaneously. Because each service belongs to its own group (e.g., shipping-group), Kafka tracks their progress (offsets) individually. This means the Shipping department can process messages at its own pace without affecting the speed of the Email service or the Analytics dashboard.
"""
Kafka Consumer - E-commerce Order Processing System
Simulates different departments (Shipping, Email, Analytics) consuming order events
"""
from confluent_kafka import Consumer, KafkaError
import json
import sys
from datetime import datetime
def create_consumer(group_id, bootstrap_servers='localhost:9092'):
"""
Create and configure a Kafka consumer
Args:
group_id: Consumer group identifier
bootstrap_servers: Kafka broker addresses
Returns:
Configured Consumer instance
"""
config = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest', # Start from beginning if no offset exists
'enable.auto.commit': True,
'auto.commit.interval.ms': 1000,
'session.timeout.ms': 6000,
'client.id': f'{group_id}-client'
}
return Consumer(config)
def process_shipping(order):
"""Process order for shipping department"""
print(f"\n📦 SHIPPING DEPARTMENT")
print(f" Order ID: {order['order_id']}")
print(f" Customer: {order['user']}")
print(f" Items to pack: {', '.join(order['items'])}")
print(f" ✅ Initiating packaging and shipping process...")
def process_email(order):
"""Process order for email service"""
print(f"\n📧 EMAIL SERVICE")
print(f" Order ID: {order['order_id']}")
print(f" Recipient: {order['user']}")
print(f" Total: ${order['total']}")
print(f" ✅ Sending order confirmation email...")
def process_analytics(order):
"""Process order for analytics dashboard"""
print(f"\n📊 ANALYTICS DASHBOARD")
print(f" Order ID: {order['order_id']}")
print(f" Revenue: ${order['total']}")
print(f" Items count: {len(order['items'])}")
print(f" Timestamp: {order['timestamp']}")
print(f" ✅ Updating live sales dashboard...")
# Department processors mapping
PROCESSORS = {
'shipping-group': process_shipping,
'email-group': process_email,
'analytics-group': process_analytics
}
def consume_orders(group_id, topic='orders'):
"""
Consume orders from Kafka topic
Args:
group_id: Consumer group (shipping-group, email-group, analytics-group)
topic: Kafka topic to subscribe to
"""
consumer = create_consumer(group_id)
consumer.subscribe([topic])
processor = PROCESSORS.get(group_id, process_shipping)
department = group_id.replace('-group', '').upper()
print(f"🚀 Starting {department} Consumer")
print(f"👥 Consumer Group: {group_id}")
print(f"📋 Subscribed to topic: {topic}")
print(f"🔗 Connected to Kafka at: localhost:9092")
print("-" * 70)
print("⏳ Waiting for messages... (Press Ctrl+C to stop)\n")
try:
message_count = 0
while True:
# Poll for messages (timeout in seconds)
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print(f"📍 Reached end of partition {msg.partition()}")
else:
print(f"❌ Consumer error: {msg.error()}")
continue
# Process the message
try:
message_count += 1
order_data = json.loads(msg.value().decode('utf-8'))
print(f"\n{'='*70}")
print(f"📨 Message #{message_count} received from partition {msg.partition()} at offset {msg.offset()}")
# Call the appropriate processor
processor(order_data)
print(f"{'='*70}")
except json.JSONDecodeError as e:
print(f"❌ Failed to decode message: {e}")
except Exception as e:
print(f"❌ Error processing message: {e}")
except KeyboardInterrupt:
print(f"\n\n⚠️ Consumer interrupted by user")
print(f"📊 Total messages processed: {message_count}")
finally:
# Close the consumer to commit final offsets
print("🔒 Closing consumer...")
consumer.close()
print("✅ Consumer closed successfully")
if __name__ == "__main__":
# Parse command line arguments
if len(sys.argv) < 2:
print("Usage: python consumer.py <group_id> [topic]")
print("\nAvailable consumer groups:")
print(" - shipping-group : Processes orders for shipping")
print(" - email-group : Sends confirmation emails")
print(" - analytics-group : Updates analytics dashboard")
print("\nExample: python consumer.py shipping-group")
sys.exit(1)
group_id = sys.argv[1]
topic = sys.argv[2] if len(sys.argv) > 2 else 'orders'
if group_id not in PROCESSORS:
print(f"⚠️ Warning: Unknown group_id '{group_id}'. Using default processor.")
try:
consume_orders(group_id, topic)
except Exception as e:
print(f"❌ Fatal error: {e}")
sys.exit(1)
# Made with Bob
The Cluster: Ensuring Reliability
The heart of the system is the Broker Kafka Cluster (defined in docker-compose.yml and [Architecture.md](https://github.com/aairom/kafka-101/blob/main/Docs/Architecture.md)). By running three brokers coordinated by Zookeeper, the architecture ensures High Availability. Each data partition is replicated across at least two brokers; if one broker fails, another automatically takes over as the "leader," ensuring that no orders are lost and the system remains online. This fault tolerance is what makes Kafka "production-ready" compared to simple message queues.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log
kafka-broker-1:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka-broker-1
container_name: kafka-broker-1
depends_on:
- zookeeper
ports:
- "9092:9092"
- "19092:19092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
volumes:
- kafka-broker-1-data:/var/lib/kafka/data
kafka-broker-2:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka-broker-2
container_name: kafka-broker-2
depends_on:
- zookeeper
ports:
- "9093:9093"
- "19093:19093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-2:29093,PLAINTEXT_HOST://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
volumes:
- kafka-broker-2-data:/var/lib/kafka/data
kafka-broker-3:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka-broker-3
container_name: kafka-broker-3
depends_on:
- zookeeper
ports:
- "9094:9094"
- "19094:19094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-3:29094,PLAINTEXT_HOST://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_JMX_PORT: 9103
KAFKA_JMX_HOSTNAME: localhost
volumes:
- kafka-broker-3-data:/var/lib/kafka/data
volumes:
zookeeper-data:
zookeeper-logs:
kafka-broker-1-data:
kafka-broker-2-data:
kafka-broker-3-data:
# Made with Bob
The Monitor: Real-Time Observability
To manage this distributed flow, the Monitor Runner (monitor_runner.py) provides a unified view of the entire ecosystem. It orchestrates the lifecycle of the producer and all consumer threads while capturing their standard output into a web-based dashboard. This allows developers to see the "path of a message" in real-time: from the moment the Producer sends Order #1001 to the moment all three consumer groups acknowledge and process it.
"""
Kafka Monitor Web Server
Provides a web interface to monitor producer and consumer outputs in real-time
"""
import asyncio
import json
import subprocess
import threading
from datetime import datetime
from pathlib import Path
from http.server import HTTPServer, SimpleHTTPRequestHandler
import socketserver
class TerminalMonitor:
"""Monitor terminal outputs and store them"""
def __init__(self):
self.outputs = {
'producer': [],
'shipping': [],
'email': [],
'analytics': []
}
self.max_lines = 1000 # Keep last 1000 lines per terminal
def add_output(self, terminal_id, line):
"""Add a line to terminal output"""
timestamp = datetime.now().strftime('%H:%M:%S')
entry = {
'timestamp': timestamp,
'line': line
}
if terminal_id in self.outputs:
self.outputs[terminal_id].append(entry)
# Keep only last max_lines
if len(self.outputs[terminal_id]) > self.max_lines:
self.outputs[terminal_id] = self.outputs[terminal_id][-self.max_lines:]
def get_outputs(self):
"""Get all terminal outputs"""
return self.outputs
def clear_terminal(self, terminal_id):
"""Clear a specific terminal"""
if terminal_id in self.outputs:
self.outputs[terminal_id] = []
def clear_all(self):
"""Clear all terminals"""
for terminal_id in self.outputs:
self.outputs[terminal_id] = []
# Global monitor instance
monitor = TerminalMonitor()
class MonitorRequestHandler(SimpleHTTPRequestHandler):
"""Custom HTTP request handler"""
def do_GET(self):
"""Handle GET requests"""
if self.path == '/':
self.path = '/monitor.html'
elif self.path == '/api/outputs':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(monitor.get_outputs()).encode())
return
elif self.path == '/api/clear':
monitor.clear_all()
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'cleared'}).encode())
return
return SimpleHTTPRequestHandler.do_GET(self)
def log_message(self, format, *args):
"""Suppress default logging"""
pass
def run_command_and_monitor(command, terminal_id, cwd=None):
"""Run a command and monitor its output"""
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
cwd=cwd,
shell=True
)
monitor.add_output(terminal_id, f"🚀 Starting: {command}")
for line in iter(process.stdout.readline, ''):
if line:
monitor.add_output(terminal_id, line.rstrip())
process.wait()
monitor.add_output(terminal_id, f"✅ Process completed with exit code: {process.returncode}")
except Exception as e:
monitor.add_output(terminal_id, f"❌ Error: {str(e)}")
def start_web_server(port=8080):
"""Start the web server"""
handler = MonitorRequestHandler
with socketserver.TCPServer(("", port), handler) as httpd:
print(f"🌐 Web Monitor Server running at http://localhost:{port}")
print(f"📊 Open your browser to view the dashboard")
print(f"Press Ctrl+C to stop")
httpd.serve_forever()
def main():
"""Main function"""
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'server':
# Just run the web server
port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
start_web_server(port)
else:
print("Usage:")
print(" python3 src/web_monitor.py server [port]")
print("\nExample:")
print(" python3 src/web_monitor.py server 8080")
if __name__ == "__main__":
main()
# Made with Bob
┌─────────────────────────────────────────────────────────────┐
│ ⚡ Kafka Cluster Monitor [Live] [🔄] [🗑️] │
├──────────────────────────────┬──────────────────────────────┤
│ 📤 Producer │ 📦 Shipping Consumer │
│ [0 lines] │ [0 lines] │
│ │ │
│ [09:30:15] 🚀 Starting... │ [09:30:17] 🚀 Starting... │
│ [09:30:16] Order #1001 │ [09:30:18] Received Order │
│ [09:30:16] ✅ Sent │ [09:30:18] ✅ Processing │
│ │ │
├──────────────────────────────┼──────────────────────────────┤
│ 📧 Email Consumer │ 📊 Analytics Consumer │
│ [0 lines] │ [0 lines] │
│ │ │
│ [09:30:17] 🚀 Starting... │ [09:30:17] 🚀 Starting... │
│ [09:30:18] Received Order │ [09:30:18] Received Order │
│ [09:30:18] ✅ Email sent │ [09:30:18] ✅ Updated │
│ │ │
└──────────────────────────────┴──────────────────────────────┘
│ Last updated: 09:30:20 Auto-scroll: ON │
└─────────────────────────────────────────────────────────────┘
Alongside with the monitor, we can also monitor the events on the console output.
./scripts/run-producer.sh 10 2
🚀 Starting Kafka Producer
=========================
📊 Configuration:
- Number of orders: 10
- Delay between orders: 2s
🚀 Starting E-commerce Order Producer
📊 Will produce 10 orders with 2.0s delay between each
🔗 Connected to Kafka at: localhost:9092
----------------------------------------------------------------------
📦 Order #1001:
User: bob_jones
Items: USB-C Cable, Mechanical Keyboard, Desk Lamp
Total: $137.97
📦 Order #1002:
User: diana_prince
Items: Wireless Mouse
Total: $29.99
✅ Order sent to topic 'orders' [partition 0] at offset 0
📦 Order #1003:
User: bob_jones
Items: USB-C Cable
Total: $12.99
✅ Order sent to topic 'orders' [partition 0] at offset 1
📦 Order #1004:
User: bob_jones
Items: Mechanical Keyboard, Headphones, Webcam HD
Total: $319.97
✅ Order sent to topic 'orders' [partition 0] at offset 2
📦 Order #1005:
User: diana_prince
Items: Laptop Stand, Monitor 27", Wireless Mouse, Webcam HD
Total: $454.97
✅ Order sent to topic 'orders' [partition 0] at offset 3
📦 Order #1006:
User: diana_prince
Items: Headphones, Laptop Stand
Total: $194.99
✅ Order sent to topic 'orders' [partition 0] at offset 4
📦 Order #1007:
User: charlie_brown
Items: Laptop Stand, Headphones, Wireless Mouse
Total: $224.98
✅ Order sent to topic 'orders' [partition 0] at offset 5
📦 Order #1008:
User: charlie_brown
Items: Laptop Stand
Total: $45.0
✅ Order sent to topic 'orders' [partition 0] at offset 6
📦 Order #1009:
User: alice_smith
Items: Mechanical Keyboard, Desk Lamp
Total: $124.98
✅ Order sent to topic 'orders' [partition 0] at offset 7
📦 Order #1010:
User: eve_adams
Items: Wireless Mouse
Total: $29.99
✅ Order sent to topic 'orders' [partition 0] at offset 8
⏳ Flushing remaining messages...
✅ Order sent to topic 'orders' [partition 0] at offset 9
✅ All orders sent successfully!
✅ Producer finished
./scripts/run-consumer.sh shipping-group
🚀 Starting Kafka Consumer
==========================
📊 Configuration:
- Consumer Group: shipping-group
- Topic: orders
🚀 Starting SHIPPING Consumer
👥 Consumer Group: shipping-group
📋 Subscribed to topic: orders
🔗 Connected to Kafka at: localhost:9092
----------------------------------------------------------------------
⏳ Waiting for messages... (Press Ctrl+C to stop)
======================================================================
📨 Message #1 received from partition 0 at offset 0
📦 SHIPPING DEPARTMENT
Order ID: 1001
Customer: bob_jones
Items to pack: USB-C Cable, Mechanical Keyboard, Desk Lamp
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #2 received from partition 0 at offset 1
📦 SHIPPING DEPARTMENT
Order ID: 1002
Customer: diana_prince
Items to pack: Wireless Mouse
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #3 received from partition 0 at offset 2
📦 SHIPPING DEPARTMENT
Order ID: 1003
Customer: bob_jones
Items to pack: USB-C Cable
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #4 received from partition 0 at offset 3
📦 SHIPPING DEPARTMENT
Order ID: 1004
Customer: bob_jones
Items to pack: Mechanical Keyboard, Headphones, Webcam HD
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #5 received from partition 0 at offset 4
📦 SHIPPING DEPARTMENT
Order ID: 1005
Customer: diana_prince
Items to pack: Laptop Stand, Monitor 27", Wireless Mouse, Webcam HD
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #6 received from partition 0 at offset 5
📦 SHIPPING DEPARTMENT
Order ID: 1006
Customer: diana_prince
Items to pack: Headphones, Laptop Stand
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #7 received from partition 0 at offset 6
📦 SHIPPING DEPARTMENT
Order ID: 1007
Customer: charlie_brown
Items to pack: Laptop Stand, Headphones, Wireless Mouse
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #8 received from partition 0 at offset 7
📦 SHIPPING DEPARTMENT
Order ID: 1008
Customer: charlie_brown
Items to pack: Laptop Stand
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #9 received from partition 0 at offset 8
📦 SHIPPING DEPARTMENT
Order ID: 1009
Customer: alice_smith
Items to pack: Mechanical Keyboard, Desk Lamp
✅ Initiating packaging and shipping process...
======================================================================
======================================================================
📨 Message #10 received from partition 0 at offset 9
📦 SHIPPING DEPARTMENT
Order ID: 1010
Customer: eve_adams
Items to pack: Wireless Mouse
✅ Initiating packaging and shipping process...
======================================================================
/scripts/run-consumer.sh email-group
zsh: no such file or directory: /scripts/run-consumer.sh
> ./scripts/run-consumer.sh email-group
🚀 Starting Kafka Consumer
==========================
📊 Configuration:
- Consumer Group: email-group
- Topic: orders
🚀 Starting EMAIL Consumer
👥 Consumer Group: email-group
📋 Subscribed to topic: orders
🔗 Connected to Kafka at: localhost:9092
----------------------------------------------------------------------
⏳ Waiting for messages... (Press Ctrl+C to stop)
======================================================================
📨 Message #1 received from partition 0 at offset 0
📧 EMAIL SERVICE
Order ID: 1001
Recipient: bob_jones
Total: $137.97
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #2 received from partition 0 at offset 1
📧 EMAIL SERVICE
Order ID: 1002
Recipient: diana_prince
Total: $29.99
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #3 received from partition 0 at offset 2
📧 EMAIL SERVICE
Order ID: 1003
Recipient: bob_jones
Total: $12.99
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #4 received from partition 0 at offset 3
📧 EMAIL SERVICE
Order ID: 1004
Recipient: bob_jones
Total: $319.97
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #5 received from partition 0 at offset 4
📧 EMAIL SERVICE
Order ID: 1005
Recipient: diana_prince
Total: $454.97
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #6 received from partition 0 at offset 5
📧 EMAIL SERVICE
Order ID: 1006
Recipient: diana_prince
Total: $194.99
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #7 received from partition 0 at offset 6
📧 EMAIL SERVICE
Order ID: 1007
Recipient: charlie_brown
Total: $224.98
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #8 received from partition 0 at offset 7
📧 EMAIL SERVICE
Order ID: 1008
Recipient: charlie_brown
Total: $45.0
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #9 received from partition 0 at offset 8
📧 EMAIL SERVICE
Order ID: 1009
Recipient: alice_smith
Total: $124.98
✅ Sending order confirmation email...
======================================================================
======================================================================
📨 Message #10 received from partition 0 at offset 9
📧 EMAIL SERVICE
Order ID: 1010
Recipient: eve_adams
Total: $29.99
✅ Sending order confirmation email...
======================================================================
There we go with out Kafka cluster! That’s a wrap 💯
Conclusion
Streaming matters because data is most valuable the moment it is created. By using a Kafka cluster, we move away from “batch processing” and toward real-time intelligence.
The implementation shown here — with a replicated cluster, a robust producer, and independent consumer groups — provides a blueprint for systems that are not only fast but also incredibly resilient. Whether you are scaling to handle thousands of orders per second or simply trying to decouple your microservices, Kafka provides the reliable foundation needed for a modern, event-driven architecture.
>>> Thanks for reading <<<
Links
- GitHub repository for the code sample: https://github.com/aairom/kafka-101
- Apache Kafka repository: https://github.com/apache/kafka
- Introduction to Apache Kafka: https://www.confluent.io/what-is-apache-kafka/
- Confluent: https://www.confluent.io/
- Confluent Platform: https://www.confluent.io/product/confluent-platform/
- Confluent Cloud: https://www.confluent.io/confluent-cloud/




Top comments (0)