RabbitMQ is the most widely deployed open-source message broker — handling billions of messages daily for companies like Bloomberg, VMware, and Goldman Sachs. And its API is completely free.
Why RabbitMQ?
- Battle-tested — 15+ years in production at massive scale
- Multi-protocol — AMQP, MQTT, STOMP, HTTP
- Flexible routing — direct, fanout, topic, headers exchanges
- Clustering — built-in high availability and replication
- Management UI — free web dashboard for monitoring
- Plugin ecosystem — 30+ official plugins
Quick Start (Docker)
# Start RabbitMQ with management UI
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# Management UI: http://localhost:15672
# Default credentials: guest/guest
Publisher (Send Messages)
// publisher.js
import amqp from "amqplib";
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
const queue = "orders";
await channel.assertQueue(queue, { durable: true });
// Send an order
const order = {
id: "ORD-001",
customer: "john@example.com",
items: [{ name: "Widget", qty: 3, price: 9.99 }],
total: 29.97,
timestamp: new Date().toISOString(),
};
channel.sendToQueue(queue, Buffer.from(JSON.stringify(order)), {
persistent: true, // Survives broker restart
});
console.log("Order sent:", order.id);
setTimeout(() => {
connection.close();
process.exit(0);
}, 500);
Consumer (Process Messages)
// consumer.js
import amqp from "amqplib";
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
const queue = "orders";
await channel.assertQueue(queue, { durable: true });
// Process one message at a time
channel.prefetch(1);
console.log("Waiting for orders...");
channel.consume(queue, async (msg) => {
const order = JSON.parse(msg.content.toString());
console.log(`Processing order ${order.id}...`);
// Simulate processing (payment, inventory, etc.)
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log(`Order ${order.id} completed. Total: $${order.total}`);
// Acknowledge — removes message from queue
channel.ack(msg);
});
Exchange Patterns
Fanout (Broadcast to All Queues)
// Broadcast a notification to ALL services
const exchange = "notifications";
await channel.assertExchange(exchange, "fanout", { durable: true });
channel.publish(exchange, "", Buffer.from(JSON.stringify({
type: "user.signup",
userId: "U-123",
email: "new@user.com",
})));
// Each service gets its own queue bound to the exchange:
// - Email service: sends welcome email
// - Analytics: tracks signup
// - CRM: creates contact
Topic (Pattern-Based Routing)
const exchange = "events";
await channel.assertExchange(exchange, "topic", { durable: true });
// Publish with routing keys
channel.publish(exchange, "order.created", Buffer.from(JSON.stringify({ orderId: "O-1" })));
channel.publish(exchange, "order.paid", Buffer.from(JSON.stringify({ orderId: "O-1" })));
channel.publish(exchange, "user.signup", Buffer.from(JSON.stringify({ userId: "U-1" })));
// Subscribe to patterns:
// "order.*" → gets order.created AND order.paid
// "*.created" → gets order.created AND user.created
// "#" → gets EVERYTHING
Dead Letter Queue (Handle Failures)
// Setup: main queue with dead letter exchange
await channel.assertExchange("dlx", "direct", { durable: true });
await channel.assertQueue("failed-orders", { durable: true });
await channel.bindQueue("failed-orders", "dlx", "orders");
await channel.assertQueue("orders", {
durable: true,
arguments: {
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "orders",
"x-message-ttl": 60000, // 60 seconds to process
},
});
// If a message is rejected or times out, it goes to "failed-orders"
channel.consume("orders", (msg) => {
try {
const order = JSON.parse(msg.content.toString());
processOrder(order);
channel.ack(msg);
} catch (error) {
// Send to dead letter queue for manual inspection
channel.nack(msg, false, false);
}
});
RPC Pattern (Request-Reply)
// Client: send request and wait for response
async function callRpc(data) {
const { queue: replyQueue } = await channel.assertQueue("", { exclusive: true });
const correlationId = crypto.randomUUID();
return new Promise((resolve) => {
channel.consume(replyQueue, (msg) => {
if (msg.properties.correlationId === correlationId) {
resolve(JSON.parse(msg.content.toString()));
}
}, { noAck: true });
channel.sendToQueue("rpc_queue", Buffer.from(JSON.stringify(data)), {
correlationId,
replyTo: replyQueue,
});
});
}
const result = await callRpc({ action: "calculate_price", items: [1, 2, 3] });
RabbitMQ vs Kafka vs Redis Pub/Sub
| Feature | RabbitMQ | Kafka | Redis Pub/Sub |
|---|---|---|---|
| Model | Message queue | Event log | Pub/Sub |
| Delivery | At-least-once | Exactly-once | At-most-once |
| Ordering | Per queue | Per partition | No guarantee |
| Replay | No (consumed = gone) | Yes (retention period) | No |
| Routing | Flexible exchanges | Topics only | Channels |
| Throughput | 50K msg/s | 1M+ msg/s | 500K msg/s |
| Best for | Task queues, RPC | Event streaming | Real-time notifications |
Need to scrape data from any website and get it in structured JSON? Check out my web scraping tools on Apify — no coding required, results in minutes.
Have a custom data extraction project? Email me at spinov001@gmail.com — I build tailored scraping solutions for businesses.
Top comments (0)