Apache Pulsar is a cloud-native, multi-tenant messaging and streaming platform. It provides pub-sub messaging, queuing, and streaming in a single platform with built-in geo-replication and multi-tenancy.
What Is Apache Pulsar?
Pulsar is an Apache top-level project that separates serving from storage (using Apache BookKeeper). This architecture enables infinite scaling, instant topic creation, and geo-replication across datacenters.
Key Features:
- Multi-tenancy with namespaces
- Geo-replication across clusters
- Tiered storage (offload to S3/GCS)
- Pulsar Functions (serverless processing)
- Schema Registry
- Transactions support
- WebSocket API
- 100K+ topics per cluster
Quick Start
# Docker
docker run -d --name pulsar -p 6650:6650 -p 8080:8080 \
apachepulsar/pulsar:latest bin/pulsar standalone
# Install CLI
brew install pulsar
Pulsar Python Client
import pulsar
client = pulsar.Client("pulsar://localhost:6650")
# Producer
producer = client.create_producer("persistent://public/default/orders")
producer.send(b'{"id": 1, "total": 99.99}')
producer.send(b'{"id": 2, "total": 149.50}')
# Consumer
consumer = client.subscribe(
"persistent://public/default/orders",
"my-subscription",
consumer_type=pulsar.ConsumerType.Shared
)
while True:
msg = consumer.receive(timeout_millis=5000)
print(f"Received: {msg.data().decode()}")
consumer.acknowledge(msg)
client.close()
Pulsar Admin REST API
import requests
ADMIN = "http://localhost:8080/admin/v2"
# List tenants
tenants = requests.get(f"{ADMIN}/tenants").json()
print(f"Tenants: {tenants}")
# Create tenant
requests.put(f"{ADMIN}/tenants/my-company", json={
"allowedClusters": ["standalone"]
})
# Create namespace
requests.put(f"{ADMIN}/namespaces/my-company/production")
# List topics
topics = requests.get(f"{ADMIN}/persistent/public/default").json()
for topic in topics:
print(f"Topic: {topic}")
# Get topic stats
stats = requests.get(f"{ADMIN}/persistent/public/default/orders/stats").json()
print(f"Published: {stats['msgInCounter']}, Delivered: {stats['msgOutCounter']}")
print(f"Storage: {stats['storageSize']} bytes")
# Create topic with partitions
requests.put(f"{ADMIN}/persistent/public/default/events/partitions", json=6)
Pulsar Functions
# process_order.py - deployed as Pulsar Function
from pulsar import Function
class ProcessOrder(Function):
def process(self, input, context):
import json
order = json.loads(input)
order["processed"] = True
order["total_with_tax"] = order["total"] * 1.1
context.publish("processed-orders", json.dumps(order).encode())
return json.dumps(order)
# Deploy function
pulsar-admin functions create \
--py process_order.py \
--classname process_order.ProcessOrder \
--inputs persistent://public/default/orders \
--output persistent://public/default/processed-orders
Pulsar vs Kafka
| Feature | Pulsar | Kafka |
|---|---|---|
| Multi-tenancy | Native | No |
| Geo-replication | Built-in | MirrorMaker |
| Tiered storage | Built-in | Plugin |
| Functions | Built-in | Kafka Streams |
| Topic creation | Instant | Needs partitions |
| Queuing | Native | Consumer groups |
Resources
- Pulsar Docs
- Pulsar GitHub — 14K+ stars
- Pulsar Admin API
Need to scrape web data for your messaging pipelines? Check out my web scraping tools on Apify — production-ready actors for Reddit, Google Maps, and more. Questions? Email me at spinov001@gmail.com
Top comments (0)