DEV Community

Alex Spinov
Alex Spinov

Posted on

Apache Kafka Has a Free API: Here's How to Use It for Event Streaming

Apache Kafka is the backbone of event-driven architectures at companies like LinkedIn, Netflix, and Uber. While Kafka uses its own binary protocol, Confluent provides a free REST Proxy, and the Python/Node clients make Kafka accessible via simple API calls.

Why Use Kafka?

  • Stream millions of events per second
  • Decouple microservices with publish/subscribe
  • Replay events for debugging or rebuilding state
  • Process data in real-time with exactly-once semantics

Getting Started with Python

from kafka import KafkaProducer, KafkaConsumer
import json

# Producer: send events
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send events
producer.send('user-events', {
    'event': 'page_view',
    'user_id': 'user-123',
    'page': '/products',
    'timestamp': '2026-03-28T10:30:00Z'
})
producer.flush()
print("Event sent!")
Enter fullscreen mode Exit fullscreen mode

Consumer: Process Events

consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    group_id='analytics-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    print(f"Received: {event['event']} from {event['user_id']} at {event['timestamp']}")
Enter fullscreen mode Exit fullscreen mode

Confluent REST Proxy

No client library needed — use plain HTTP:

# Produce a message
curl -s -X POST "http://localhost:8082/topics/user-events" \
  -H "Content-Type: application/vnd.kafka.json.v2+json" \
  -d '{
    "records": [
      {"value": {"event": "purchase", "user_id": "user-456", "amount": 49.99}}
    ]
  }'

# Create a consumer
curl -s -X POST "http://localhost:8082/consumers/my-group" \
  -H "Content-Type: application/vnd.kafka.v2+json" \
  -d '{"name": "my-consumer", "format": "json", "auto.offset.reset": "earliest"}'

# Subscribe to topics
curl -s -X POST "http://localhost:8082/consumers/my-group/instances/my-consumer/subscription" \
  -H "Content-Type: application/vnd.kafka.v2+json" \
  -d '{"topics": ["user-events"]}'

# Consume messages
curl -s "http://localhost:8082/consumers/my-group/instances/my-consumer/records" \
  -H "Accept: application/vnd.kafka.json.v2+json" | jq '.[] | .value'
Enter fullscreen mode Exit fullscreen mode

Event-Driven Order Processing

from kafka import KafkaProducer, KafkaConsumer
import json
import threading

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def process_orders():
    consumer = KafkaConsumer(
        'orders',
        bootstrap_servers=['localhost:9092'],
        group_id='order-processor',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    for message in consumer:
        order = message.value
        print(f"Processing order {order['order_id']}...")

        # Validate inventory
        if validate_inventory(order):
            # Charge payment
            producer.send('payments', {
                'order_id': order['order_id'],
                'amount': order['total'],
                'user_id': order['user_id']
            })

            # Send notification
            producer.send('notifications', {
                'user_id': order['user_id'],
                'message': f"Order {order['order_id']} confirmed!",
                'channel': 'email'
            })

            producer.flush()
            print(f"Order {order['order_id']} processed successfully")
        else:
            producer.send('order-failures', order)
            producer.flush()

def validate_inventory(order):
    return True  # Check your inventory DB

# Start processing in background
thread = threading.Thread(target=process_orders, daemon=True)
thread.start()
Enter fullscreen mode Exit fullscreen mode

Real-Time Analytics Pipeline

from collections import Counter
from datetime import datetime, timedelta
import time

def real_time_analytics():
    consumer = KafkaConsumer(
        'user-events',
        bootstrap_servers=['localhost:9092'],
        group_id='analytics',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    window_size = 60  # 1 minute windows
    event_counts = Counter()
    window_start = time.time()

    for message in consumer:
        event = message.value
        event_counts[event['event']] += 1

        # Every minute, report metrics
        if time.time() - window_start >= window_size:
            print(f"\n=== Analytics Window ({datetime.now().strftime('%H:%M:%S')}) ===")
            for event_type, count in event_counts.most_common():
                print(f"  {event_type}: {count} events")

            event_counts.clear()
            window_start = time.time()

real_time_analytics()
Enter fullscreen mode Exit fullscreen mode

Admin API: Topic Management

from kafka.admin import KafkaAdminClient, NewTopic

admin = KafkaAdminClient(bootstrap_servers='localhost:9092')

# Create topics
topics = [
    NewTopic(name='orders', num_partitions=6, replication_factor=1),
    NewTopic(name='payments', num_partitions=3, replication_factor=1),
    NewTopic(name='notifications', num_partitions=3, replication_factor=1),
]
admin.create_topics(new_topics=topics)

# List topics
print("Topics:", admin.list_topics())

# Get topic details
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
for topic in consumer.topics():
    partitions = consumer.partitions_for_topic(topic)
    print(f"  {topic}: {len(partitions)} partitions")
Enter fullscreen mode Exit fullscreen mode

Real-World Use Case

A ride-sharing company processes 1M+ events per second through Kafka — GPS updates, ride requests, pricing changes, and driver assignments. Each event flows through multiple consumers: one calculates surge pricing, another updates the map, a third handles billing. What would take a monolith minutes happens in under 100ms with Kafka's parallel processing.

What You Can Build

  • Event sourcing system with complete audit trail
  • Real-time analytics processing millions of events
  • Microservices choreography without tight coupling
  • Change data capture streaming database changes
  • IoT data pipeline handling sensor data at scale

Need custom event streaming solutions? I build data pipelines and distributed systems.

Email me: spinov001@gmail.com
Check out my developer tools: https://apify.com/spinov001

Top comments (0)