RabbitMQ's Management API gives you complete control over your message broker via HTTP. You can manage queues, exchanges, bindings, users, and monitor everything — all without touching the CLI.
Why Use the RabbitMQ Management API?
- Monitor queue depths and consumer counts in real-time
- Automate queue creation and exchange bindings
- Build custom monitoring dashboards
- Manage users, permissions, and virtual hosts programmatically
Getting Started
Enable the management plugin and access the API on port 15672:
# Enable management plugin
rabbitmq-plugins enable rabbitmq_management
# Check cluster overview
curl -s -u guest:guest "http://localhost:15672/api/overview" | jq '{rabbitmq_version: .rabbitmq_version, messages: .queue_totals.messages, consumers: .object_totals.consumers}'
# List all queues
curl -s -u guest:guest "http://localhost:15672/api/queues" | jq '.[] | {name: .name, messages: .messages, consumers: .consumers, state: .state}'
Python Client
import requests
import pika
import json
class RabbitMQManager:
def __init__(self, host='localhost', port=15672, user='guest', password='guest'):
self.base_url = f"http://{host}:{port}/api"
self.auth = (user, password)
def get_queues(self, vhost='%2F'):
resp = requests.get(f"{self.base_url}/queues/{vhost}", auth=self.auth)
return resp.json()
def create_queue(self, name, vhost='%2F', durable=True):
resp = requests.put(
f"{self.base_url}/queues/{vhost}/{name}",
json={"durable": durable, "auto_delete": False},
auth=self.auth
)
return resp.status_code == 201 or resp.status_code == 204
def create_exchange(self, name, exchange_type='direct', vhost='%2F'):
resp = requests.put(
f"{self.base_url}/exchanges/{vhost}/{name}",
json={"type": exchange_type, "durable": True},
auth=self.auth
)
return resp.status_code in (201, 204)
def create_binding(self, exchange, queue, routing_key='', vhost='%2F'):
resp = requests.post(
f"{self.base_url}/bindings/{vhost}/e/{exchange}/q/{queue}",
json={"routing_key": routing_key},
auth=self.auth
)
return resp.status_code == 201
def get_queue_messages(self, queue, count=5, vhost='%2F'):
resp = requests.post(
f"{self.base_url}/queues/{vhost}/{queue}/get",
json={"count": count, "ackmode": "ack_requeue_true", "encoding": "auto"},
auth=self.auth
)
return resp.json()
# Usage
mgr = RabbitMQManager()
# Create infrastructure
mgr.create_exchange('orders', 'topic')
mgr.create_queue('order-processing')
mgr.create_queue('order-notifications')
mgr.create_binding('orders', 'order-processing', 'order.created')
mgr.create_binding('orders', 'order-notifications', 'order.*')
print("Message infrastructure created!")
Queue Monitoring Dashboard
def queue_health_report(mgr):
queues = mgr.get_queues()
print(f"{'Queue':30s} {'Messages':>10s} {'Rate':>10s} {'Consumers':>10s} {'Status':>10s}")
print('-' * 75)
for q in sorted(queues, key=lambda x: x.get('messages', 0), reverse=True):
name = q['name']
msgs = q.get('messages', 0)
rate = q.get('message_stats', {}).get('publish_details', {}).get('rate', 0)
consumers = q.get('consumers', 0)
status = 'OK'
if msgs > 10000:
status = 'CRITICAL'
elif msgs > 1000:
status = 'WARNING'
elif consumers == 0 and msgs > 0:
status = 'NO CONSUMER'
print(f"{name:30s} {msgs:>10d} {rate:>10.1f}/s {consumers:>10d} {status:>10s}")
queue_health_report(mgr)
Dead Letter Queue Handler
def setup_dead_letter_queue(mgr, source_queue, dlx_exchange='dlx', dlq_name=None):
dlq_name = dlq_name or f"{source_queue}.dlq"
mgr.create_exchange(dlx_exchange, 'direct')
mgr.create_queue(dlq_name)
mgr.create_binding(dlx_exchange, dlq_name, source_queue)
# Update source queue with DLX
requests.put(
f"{mgr.base_url}/queues/%2F/{source_queue}",
json={
"durable": True,
"arguments": {
"x-dead-letter-exchange": dlx_exchange,
"x-dead-letter-routing-key": source_queue
}
},
auth=mgr.auth
)
print(f"DLQ setup: {source_queue} -> {dlx_exchange} -> {dlq_name}")
setup_dead_letter_queue(mgr, 'order-processing')
Publishing Messages via API
# Publish a message via Management API
curl -s -u guest:guest -X POST "http://localhost:15672/api/exchanges/%2F/orders/publish" \
-H "Content-Type: application/json" \
-d '{
"properties": {"content_type": "application/json"},
"routing_key": "order.created",
"payload": "{\"order_id\": 123, \"total\": 49.99}",
"payload_encoding": "string"
}'
Real-World Use Case
An e-commerce platform processed 50K orders daily through RabbitMQ. Using the Management API, they built an automated scaling system: when queue depth exceeded 5K messages, it spun up additional consumer instances. When depth dropped below 100, it scaled down. This saved 60% on compute costs while maintaining sub-second processing times.
What You Can Build
- Queue monitoring dashboard with alerting on depth/rate
- Auto-scaling system based on queue metrics
- Dead letter queue processor for failed message recovery
- Multi-tenant messaging with automated vhost creation
- Event replay tool re-publishing messages from archives
Need custom messaging solutions? I build event-driven systems and automation tools.
Email me: spinov001@gmail.com
Check out my developer tools: https://apify.com/spinov001
Top comments (0)