DEV Community

Alex Spinov
Alex Spinov

Posted on

RabbitMQ Has a Free API: Here's How to Use It for Message Queue Automation

RabbitMQ's Management API gives you complete control over your message broker via HTTP. You can manage queues, exchanges, bindings, users, and monitor everything — all without touching the CLI.

Why Use the RabbitMQ Management API?

  • Monitor queue depths and consumer counts in real-time
  • Automate queue creation and exchange bindings
  • Build custom monitoring dashboards
  • Manage users, permissions, and virtual hosts programmatically

Getting Started

Enable the management plugin and access the API on port 15672:

# Enable management plugin
rabbitmq-plugins enable rabbitmq_management

# Check cluster overview
curl -s -u guest:guest "http://localhost:15672/api/overview" | jq '{rabbitmq_version: .rabbitmq_version, messages: .queue_totals.messages, consumers: .object_totals.consumers}'

# List all queues
curl -s -u guest:guest "http://localhost:15672/api/queues" | jq '.[] | {name: .name, messages: .messages, consumers: .consumers, state: .state}'
Enter fullscreen mode Exit fullscreen mode

Python Client

import requests
import pika
import json

class RabbitMQManager:
    def __init__(self, host='localhost', port=15672, user='guest', password='guest'):
        self.base_url = f"http://{host}:{port}/api"
        self.auth = (user, password)

    def get_queues(self, vhost='%2F'):
        resp = requests.get(f"{self.base_url}/queues/{vhost}", auth=self.auth)
        return resp.json()

    def create_queue(self, name, vhost='%2F', durable=True):
        resp = requests.put(
            f"{self.base_url}/queues/{vhost}/{name}",
            json={"durable": durable, "auto_delete": False},
            auth=self.auth
        )
        return resp.status_code == 201 or resp.status_code == 204

    def create_exchange(self, name, exchange_type='direct', vhost='%2F'):
        resp = requests.put(
            f"{self.base_url}/exchanges/{vhost}/{name}",
            json={"type": exchange_type, "durable": True},
            auth=self.auth
        )
        return resp.status_code in (201, 204)

    def create_binding(self, exchange, queue, routing_key='', vhost='%2F'):
        resp = requests.post(
            f"{self.base_url}/bindings/{vhost}/e/{exchange}/q/{queue}",
            json={"routing_key": routing_key},
            auth=self.auth
        )
        return resp.status_code == 201

    def get_queue_messages(self, queue, count=5, vhost='%2F'):
        resp = requests.post(
            f"{self.base_url}/queues/{vhost}/{queue}/get",
            json={"count": count, "ackmode": "ack_requeue_true", "encoding": "auto"},
            auth=self.auth
        )
        return resp.json()

# Usage
mgr = RabbitMQManager()

# Create infrastructure
mgr.create_exchange('orders', 'topic')
mgr.create_queue('order-processing')
mgr.create_queue('order-notifications')
mgr.create_binding('orders', 'order-processing', 'order.created')
mgr.create_binding('orders', 'order-notifications', 'order.*')

print("Message infrastructure created!")
Enter fullscreen mode Exit fullscreen mode

Queue Monitoring Dashboard

def queue_health_report(mgr):
    queues = mgr.get_queues()

    print(f"{'Queue':30s} {'Messages':>10s} {'Rate':>10s} {'Consumers':>10s} {'Status':>10s}")
    print('-' * 75)

    for q in sorted(queues, key=lambda x: x.get('messages', 0), reverse=True):
        name = q['name']
        msgs = q.get('messages', 0)
        rate = q.get('message_stats', {}).get('publish_details', {}).get('rate', 0)
        consumers = q.get('consumers', 0)

        status = 'OK'
        if msgs > 10000:
            status = 'CRITICAL'
        elif msgs > 1000:
            status = 'WARNING'
        elif consumers == 0 and msgs > 0:
            status = 'NO CONSUMER'

        print(f"{name:30s} {msgs:>10d} {rate:>10.1f}/s {consumers:>10d} {status:>10s}")

queue_health_report(mgr)
Enter fullscreen mode Exit fullscreen mode

Dead Letter Queue Handler

def setup_dead_letter_queue(mgr, source_queue, dlx_exchange='dlx', dlq_name=None):
    dlq_name = dlq_name or f"{source_queue}.dlq"

    mgr.create_exchange(dlx_exchange, 'direct')
    mgr.create_queue(dlq_name)
    mgr.create_binding(dlx_exchange, dlq_name, source_queue)

    # Update source queue with DLX
    requests.put(
        f"{mgr.base_url}/queues/%2F/{source_queue}",
        json={
            "durable": True,
            "arguments": {
                "x-dead-letter-exchange": dlx_exchange,
                "x-dead-letter-routing-key": source_queue
            }
        },
        auth=mgr.auth
    )
    print(f"DLQ setup: {source_queue} -> {dlx_exchange} -> {dlq_name}")

setup_dead_letter_queue(mgr, 'order-processing')
Enter fullscreen mode Exit fullscreen mode

Publishing Messages via API

# Publish a message via Management API
curl -s -u guest:guest -X POST "http://localhost:15672/api/exchanges/%2F/orders/publish" \
  -H "Content-Type: application/json" \
  -d '{
    "properties": {"content_type": "application/json"},
    "routing_key": "order.created",
    "payload": "{\"order_id\": 123, \"total\": 49.99}",
    "payload_encoding": "string"
  }'
Enter fullscreen mode Exit fullscreen mode

Real-World Use Case

An e-commerce platform processed 50K orders daily through RabbitMQ. Using the Management API, they built an automated scaling system: when queue depth exceeded 5K messages, it spun up additional consumer instances. When depth dropped below 100, it scaled down. This saved 60% on compute costs while maintaining sub-second processing times.

What You Can Build

  • Queue monitoring dashboard with alerting on depth/rate
  • Auto-scaling system based on queue metrics
  • Dead letter queue processor for failed message recovery
  • Multi-tenant messaging with automated vhost creation
  • Event replay tool re-publishing messages from archives

Need custom messaging solutions? I build event-driven systems and automation tools.

Email me: spinov001@gmail.com
Check out my developer tools: https://apify.com/spinov001

Top comments (0)