DEV Community

Alex Spinov
Alex Spinov

Posted on

Apache Pulsar Has a Free API: Multi-Tenant Distributed Messaging Platform

Apache Pulsar is a cloud-native, multi-tenant messaging and streaming platform. It provides pub-sub messaging, queuing, and streaming in a single platform with built-in geo-replication and multi-tenancy.

What Is Apache Pulsar?

Pulsar is an Apache top-level project that separates serving from storage (using Apache BookKeeper). This architecture enables infinite scaling, instant topic creation, and geo-replication across datacenters.

Key Features:

  • Multi-tenancy with namespaces
  • Geo-replication across clusters
  • Tiered storage (offload to S3/GCS)
  • Pulsar Functions (serverless processing)
  • Schema Registry
  • Transactions support
  • WebSocket API
  • 100K+ topics per cluster

Quick Start

# Docker
docker run -d --name pulsar -p 6650:6650 -p 8080:8080 \
  apachepulsar/pulsar:latest bin/pulsar standalone

# Install CLI
brew install pulsar
Enter fullscreen mode Exit fullscreen mode

Pulsar Python Client

import pulsar

client = pulsar.Client("pulsar://localhost:6650")

# Producer
producer = client.create_producer("persistent://public/default/orders")
producer.send(b'{"id": 1, "total": 99.99}')
producer.send(b'{"id": 2, "total": 149.50}')

# Consumer
consumer = client.subscribe(
    "persistent://public/default/orders",
    "my-subscription",
    consumer_type=pulsar.ConsumerType.Shared
)

while True:
    msg = consumer.receive(timeout_millis=5000)
    print(f"Received: {msg.data().decode()}")
    consumer.acknowledge(msg)

client.close()
Enter fullscreen mode Exit fullscreen mode

Pulsar Admin REST API

import requests

ADMIN = "http://localhost:8080/admin/v2"

# List tenants
tenants = requests.get(f"{ADMIN}/tenants").json()
print(f"Tenants: {tenants}")

# Create tenant
requests.put(f"{ADMIN}/tenants/my-company", json={
    "allowedClusters": ["standalone"]
})

# Create namespace
requests.put(f"{ADMIN}/namespaces/my-company/production")

# List topics
topics = requests.get(f"{ADMIN}/persistent/public/default").json()
for topic in topics:
    print(f"Topic: {topic}")

# Get topic stats
stats = requests.get(f"{ADMIN}/persistent/public/default/orders/stats").json()
print(f"Published: {stats['msgInCounter']}, Delivered: {stats['msgOutCounter']}")
print(f"Storage: {stats['storageSize']} bytes")

# Create topic with partitions
requests.put(f"{ADMIN}/persistent/public/default/events/partitions", json=6)
Enter fullscreen mode Exit fullscreen mode

Pulsar Functions

# process_order.py - deployed as Pulsar Function
from pulsar import Function

class ProcessOrder(Function):
    def process(self, input, context):
        import json
        order = json.loads(input)
        order["processed"] = True
        order["total_with_tax"] = order["total"] * 1.1
        context.publish("processed-orders", json.dumps(order).encode())
        return json.dumps(order)
Enter fullscreen mode Exit fullscreen mode
# Deploy function
pulsar-admin functions create \
  --py process_order.py \
  --classname process_order.ProcessOrder \
  --inputs persistent://public/default/orders \
  --output persistent://public/default/processed-orders
Enter fullscreen mode Exit fullscreen mode

Pulsar vs Kafka

Feature Pulsar Kafka
Multi-tenancy Native No
Geo-replication Built-in MirrorMaker
Tiered storage Built-in Plugin
Functions Built-in Kafka Streams
Topic creation Instant Needs partitions
Queuing Native Consumer groups

Resources


Need to scrape web data for your messaging pipelines? Check out my web scraping tools on Apify — production-ready actors for Reddit, Google Maps, and more. Questions? Email me at spinov001@gmail.com

Top comments (0)