Apache Kafka is the backbone of event-driven architectures at companies like LinkedIn, Netflix, and Uber. While Kafka uses its own binary protocol, Confluent provides a free REST Proxy, and the Python/Node clients make Kafka accessible via simple API calls.
Why Use Kafka?
- Stream millions of events per second
- Decouple microservices with publish/subscribe
- Replay events for debugging or rebuilding state
- Process data in real-time with exactly-once semantics
Getting Started with Python
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer: send events
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send events
producer.send('user-events', {
'event': 'page_view',
'user_id': 'user-123',
'page': '/products',
'timestamp': '2026-03-28T10:30:00Z'
})
producer.flush()
print("Event sent!")
Consumer: Process Events
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='analytics-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
print(f"Received: {event['event']} from {event['user_id']} at {event['timestamp']}")
Confluent REST Proxy
No client library needed — use plain HTTP:
# Produce a message
curl -s -X POST "http://localhost:8082/topics/user-events" \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-d '{
"records": [
{"value": {"event": "purchase", "user_id": "user-456", "amount": 49.99}}
]
}'
# Create a consumer
curl -s -X POST "http://localhost:8082/consumers/my-group" \
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{"name": "my-consumer", "format": "json", "auto.offset.reset": "earliest"}'
# Subscribe to topics
curl -s -X POST "http://localhost:8082/consumers/my-group/instances/my-consumer/subscription" \
-H "Content-Type: application/vnd.kafka.v2+json" \
-d '{"topics": ["user-events"]}'
# Consume messages
curl -s "http://localhost:8082/consumers/my-group/instances/my-consumer/records" \
-H "Accept: application/vnd.kafka.json.v2+json" | jq '.[] | .value'
Event-Driven Order Processing
from kafka import KafkaProducer, KafkaConsumer
import json
import threading
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def process_orders():
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
order = message.value
print(f"Processing order {order['order_id']}...")
# Validate inventory
if validate_inventory(order):
# Charge payment
producer.send('payments', {
'order_id': order['order_id'],
'amount': order['total'],
'user_id': order['user_id']
})
# Send notification
producer.send('notifications', {
'user_id': order['user_id'],
'message': f"Order {order['order_id']} confirmed!",
'channel': 'email'
})
producer.flush()
print(f"Order {order['order_id']} processed successfully")
else:
producer.send('order-failures', order)
producer.flush()
def validate_inventory(order):
return True # Check your inventory DB
# Start processing in background
thread = threading.Thread(target=process_orders, daemon=True)
thread.start()
Real-Time Analytics Pipeline
from collections import Counter
from datetime import datetime, timedelta
import time
def real_time_analytics():
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='analytics',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
window_size = 60 # 1 minute windows
event_counts = Counter()
window_start = time.time()
for message in consumer:
event = message.value
event_counts[event['event']] += 1
# Every minute, report metrics
if time.time() - window_start >= window_size:
print(f"\n=== Analytics Window ({datetime.now().strftime('%H:%M:%S')}) ===")
for event_type, count in event_counts.most_common():
print(f" {event_type}: {count} events")
event_counts.clear()
window_start = time.time()
real_time_analytics()
Admin API: Topic Management
from kafka.admin import KafkaAdminClient, NewTopic
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
# Create topics
topics = [
NewTopic(name='orders', num_partitions=6, replication_factor=1),
NewTopic(name='payments', num_partitions=3, replication_factor=1),
NewTopic(name='notifications', num_partitions=3, replication_factor=1),
]
admin.create_topics(new_topics=topics)
# List topics
print("Topics:", admin.list_topics())
# Get topic details
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
for topic in consumer.topics():
partitions = consumer.partitions_for_topic(topic)
print(f" {topic}: {len(partitions)} partitions")
Real-World Use Case
A ride-sharing company processes 1M+ events per second through Kafka — GPS updates, ride requests, pricing changes, and driver assignments. Each event flows through multiple consumers: one calculates surge pricing, another updates the map, a third handles billing. What would take a monolith minutes happens in under 100ms with Kafka's parallel processing.
What You Can Build
- Event sourcing system with complete audit trail
- Real-time analytics processing millions of events
- Microservices choreography without tight coupling
- Change data capture streaming database changes
- IoT data pipeline handling sensor data at scale
Need custom event streaming solutions? I build data pipelines and distributed systems.
Email me: spinov001@gmail.com
Check out my developer tools: https://apify.com/spinov001
Top comments (0)