RabbitMQ with Node.js: A Practical Guide for Production Systems
If you've ever tried to add a message queue to your Node.js app and ended up with three tutorials open, each contradicting the other, this post is for you. I'm going to walk through RabbitMQ the way I wish someone had walked me through it — covering the parts that actually matter in production and the gotchas that bite you at 2 AM.
We'll cover exchanges, queues, bindings, acknowledgments, persistence, multiple consumers, naming conventions, dead letter queues, pub/sub, and — critically — what happens when your server restarts.
Table of contents
- What RabbitMQ actually is
- Setup
- The mental model: exchanges, queues, bindings
- Types of exchanges
- Types of queues
- Your first producer and consumer
- Message persistence
- Manual acknowledgments
- Multiple consumers (competing consumers)
- A sane naming convention
- Dead letter queues and reprocessing
- Pub/Sub with fanout
- Handling server restarts
- Production checklist
What RabbitMQ actually is
RabbitMQ is a message broker. Your app sends messages to it, other apps pick those messages up and process them. That's it.
Why bother? Three reasons that actually matter:
- Decoupling — your order API doesn't need to know anything about the email service. It just publishes "order created" and walks away.
- Load leveling — if you get 10,000 orders in a minute, you don't need 10,000 payment processors. The queue absorbs the spike; workers drain it at their own pace.
- Reliability — if the email service is down, messages wait in the queue. Nothing is lost.
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Producer │──────▶│ RabbitMQ │──────▶│ Consumer │
│ (API) │ │ (broker) │ │ (worker) │
└─────────────┘ └──────────────┘ └──────────────┘
publishes stores & processes
messages routes messages
Setup
Use amqplib — it's the battle-tested Node.js client:
npm install amqplib
npm install -D @types/amqplib # if you're on TypeScript
For local dev, run RabbitMQ in Docker:
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
The management UI is at http://localhost:15672 (guest/guest). Bookmark it — you'll live there while debugging.
The mental model: exchanges, queues, bindings
This is the single most important thing to understand. Most RabbitMQ confusion comes from not having this mental model in place.
┌──────────┐
│ Exchange │ ← messages arrive here first
└────┬─────┘
│
┌────────┼────────┐
│ │ │ ← bindings route the message
▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐
│Queue1│ │Queue2│ │Queue3│ ← messages wait here
└──┬───┘ └──┬───┘ └──┬───┘
│ │ │
▼ ▼ ▼
Consumer Consumer Consumer
Three concepts, three jobs:
| Concept | What it does | Analogy |
|---|---|---|
| Exchange | Receives messages from producers and decides where to send them | Post office sorting desk |
| Queue | Stores messages until a consumer picks them up | Mailbox |
| Binding | A rule connecting an exchange to a queue | "Letters addressed to X go to mailbox Y" |
Key insight: Producers never send messages directly to queues. They always go through an exchange. The exchange figures out which queues should receive the message based on bindings.
Types of exchanges
There are four exchange types. You'll use two of them 95% of the time.
1. Direct exchange — "exact match routing"
Routes messages to queues whose binding key exactly equals the message's routing key.
Producer ──[routing_key: "payment"]──▶ Direct Exchange
│
binding="payment" ─┤─ binding="email"
▼
payment queue (receives)
email queue (does not)
Use when: You have clear, distinct job types — payments, emails, notifications — and each goes to a specific worker pool. This is the best default for 90% of use cases.
2. Fanout exchange — "broadcast to everyone"
Ignores routing keys entirely. Every bound queue gets a copy of every message.
Producer ────────────▶ Fanout Exchange
│
┌───────┼────────┐
▼ ▼ ▼
email analytics inventory
queue queue queue
(gets) (gets) (gets)
Use when: Publishing events that multiple services need to react to — "order placed", "user signed up". This is the classic pub/sub pattern.
3. Topic exchange — "pattern matching routing"
Routes based on wildcard patterns. * matches one word, # matches zero or more.
routing key: "order.eu.priority"
│
▼
Topic Exchange
├── binding "order.*.priority" ✓ matches
├── binding "order.eu.*" ✓ matches
├── binding "order.#" ✓ matches
└── binding "order.us.*" ✗ doesn't match
Use when: You need multi-dimensional routing — filter by region, priority, customer tier, etc.
4. Headers exchange — "metadata-based routing"
Routes based on message headers instead of the routing key. Rarely used. If you think you need it, you probably want topic exchange instead.
Quick picker
| Your need | Use |
|---|---|
| Route jobs to specific worker pools | Direct |
| Broadcast events to multiple services | Fanout |
| Filter by multiple criteria | Topic |
| You really know why you need headers | Headers |
Types of queues
RabbitMQ has several queue types. For most apps, you only care about two.
Classic durable queue
The default. Stored on disk when declared with durable: true. Survives broker restarts (if messages are also persistent — more on that below).
await channel.assertQueue('orders.q.process', { durable: true });
Quorum queue
RabbitMQ's modern replicated queue type, built on the Raft consensus algorithm. Replicates across nodes in a cluster. Use this in production if you're running a cluster and need high availability.
await channel.assertQueue('orders.q.process', {
durable: true,
arguments: { 'x-queue-type': 'quorum' },
});
Exclusive / auto-delete queues
- Exclusive: only the declaring connection can use this queue, deleted when that connection closes.
- Auto-delete: deleted after the last consumer disconnects.
Useful for temporary RPC-style reply queues, but don't use these for production work queues — if your app crashes, your messages vanish.
Stream queue
A log-style append-only queue (like Kafka). For event sourcing or replay use cases. Niche.
Bottom line: Start with classic durable queues. Move to quorum queues when you go multi-node.
Your first producer and consumer
Let's build an order processing system. We'll evolve this throughout the post.
Connection module
Reuse one connection across the app, one channel per concurrent workflow:
// src/rabbitmq/connection.js
import amqp from 'amqplib';
let connection = null;
export async function getConnection() {
if (connection) return connection;
connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost');
return connection;
}
export async function createChannel() {
const conn = await getConnection();
return conn.createChannel();
}
Producer
// src/producers/order-producer.js
import { createChannel } from '../rabbitmq/connection.js';
const EXCHANGE = 'orders.x.direct';
const QUEUE = 'orders.q.process';
const ROUTING_KEY = 'orders.rk.created';
let channel = null;
async function initChannel() {
if (channel) return channel;
channel = await createChannel();
await channel.assertExchange(EXCHANGE, 'direct', { durable: true });
await channel.assertQueue(QUEUE, { durable: true });
await channel.bindQueue(QUEUE, EXCHANGE, ROUTING_KEY);
return channel;
}
export async function publishOrder(order) {
const ch = await initChannel();
const payload = Buffer.from(JSON.stringify(order));
return ch.publish(EXCHANGE, ROUTING_KEY, payload, {
persistent: true,
contentType: 'application/json',
messageId: order.id,
timestamp: Date.now(),
});
}
Consumer
// src/consumers/order-worker.js
import { createChannel } from '../rabbitmq/connection.js';
const QUEUE = 'orders.q.process';
export async function startOrderWorker() {
const channel = await createChannel();
await channel.assertQueue(QUEUE, { durable: true });
await channel.prefetch(10);
console.log(`[worker] waiting for orders`);
channel.consume(QUEUE, async (msg) => {
if (!msg) return;
try {
const order = JSON.parse(msg.content.toString());
await processOrder(order);
channel.ack(msg);
} catch (err) {
channel.nack(msg, false, !msg.fields.redelivered);
}
}, { noAck: false });
}
That's the whole basic pattern. Now let's make it production-ready.
Message persistence
This is where people lose messages and don't realize it until something breaks. Pay attention.
For a message to survive a broker restart, three things must all be true:
Message survives restart?
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
Exchange durable Queue durable Message persistent
{durable: true} {durable: true} {persistent: true}
│ │ │
└─────────────────┴─────────────────┘
│
ALL THREE REQUIRED
Miss any one of these and your messages vanish on restart:
// ✓ Exchange is durable
await channel.assertExchange('orders.x.direct', 'direct', { durable: true });
// ✓ Queue is durable
await channel.assertQueue('orders.q.process', { durable: true });
// ✓ Message is persistent
await channel.publish(exchange, routingKey, payload, {
persistent: true, // ← writes message to disk
});
A subtle gotcha: persistent: true does not guarantee the message has been written to disk before your publish() call returns. It just marks the message as "should be persisted". For actual confirmation the broker has accepted and stored the message, use a confirm channel:
const channel = await connection.createConfirmChannel();
await new Promise((resolve, reject) => {
channel.publish(exchange, routingKey, payload, { persistent: true }, (err) =>
err ? reject(err) : resolve()
);
});
Confirm channels add latency but give you guarantees. Use them for anything financial or legally significant — order placements, payments, audit logs.
Manual acknowledgments
Acknowledgments are how a consumer tells RabbitMQ "I've successfully processed this message, you can delete it now."
There are two modes:
-
Auto-ack (
noAck: true) — RabbitMQ deletes the message the moment it hands it to your consumer. If your app crashes mid-processing, the message is gone. Don't use this in production. -
Manual ack (
noAck: false) — RabbitMQ holds on to the message until your consumer explicitly acks it. If your consumer disconnects without acking, the message goes back in the queue for redelivery.
The ack lifecycle
Queue Consumer
│ │
│──── deliver(msg) ───────▶│
│ │ processing...
│ (message marked │
│ "unacked") │ success!
│ │
│◀────── ack(msg) ────────│
│ │
(message deleted)
Three outcomes, three methods
channel.consume(QUEUE, async (msg) => {
if (!msg) return;
try {
await processOrder(JSON.parse(msg.content.toString()));
// 1. Success — remove message from queue
channel.ack(msg);
} catch (err) {
if (isTransientError(err)) {
// 2. Transient failure — put back in queue and try again
channel.nack(msg, false, true); // (msg, allUpTo, requeue)
} else {
// 3. Permanent failure — reject and send to DLQ
channel.nack(msg, false, false);
}
}
}, { noAck: false });
The three parameters of nack(message, allUpTo, requeue):
-
allUpTo— if true, nacks all messages up to and including this one. Usually false. -
requeue— if true, message goes back in the queue. If false, it's dropped (or sent to DLQ if configured).
Critical warning: Do not requeue indefinitely. If your message has a bug that makes processing always fail ("poison message"), requeuing forever will pin a worker at 100% CPU processing the same broken message on loop. Always check msg.fields.redelivered and route to DLQ after a retry:
const shouldRequeue = !msg.fields.redelivered;
channel.nack(msg, false, shouldRequeue);
Multiple consumers (competing consumers pattern)
This is how you scale. You run N worker processes, they all consume from the same queue, and RabbitMQ distributes messages across them.
┌──────────────┐
│ orders.q. │
│ process │
│ ████████████ │ ← 12 messages waiting
└──────┬───────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐
│ W1 │ │ W2 │ │ W3 │
│ ████ │ │ ████ │ │ ████ │ ← each gets ~4
└──────┘ └──────┘ └──────┘
Requirements for this pattern to work correctly
To run multiple consumers safely, four conditions must all be met:
- One named queue, many consumers — all workers subscribe to the same queue. (If each worker has its own queue, that's pub/sub, not work queue.)
-
Manual acknowledgments enabled —
{ noAck: false }— otherwise a crashed worker loses its in-flight messages. -
Prefetch is set —
channel.prefetch(N)— without this, RabbitMQ dumps the whole queue into the first worker that connects. More on this below. - Messages are idempotent or uniquely tracked — redelivery can happen (network blips, crashed workers), so processing the same order twice must either be safe or detectable.
The prefetch gotcha
By default, RabbitMQ sends every available message to the first consumer that connects. Your "load balancing" across three workers silently becomes "worker 1 gets everything".
prefetch(N) caps how many unacked messages a consumer can hold at once:
await channel.prefetch(10); // max 10 in-flight per consumer
How to pick the number:
- Fast tasks (< 100ms) — higher prefetch (50–100) reduces roundtrip overhead
- Slow tasks (multi-second) — lower prefetch (1–10) distributes work evenly
- Highly variable tasks — prefetch 1 for strict fairness
Scaling horizontally
// worker.js — run this file multiple times
import { startOrderWorker } from './src/consumers/order-worker.js';
startOrderWorker();
Then in your deployment:
# PM2
pm2 start worker.js -i 4 # 4 workers
# Kubernetes
kubectl scale deployment order-worker --replicas=4
# Docker Compose
docker compose up --scale worker=4
Each process opens its own connection and consumes from the same queue. RabbitMQ round-robins messages based on prefetch availability.
A sane naming convention
This is where teams either set themselves up for success or create six-months-from-now confusion. Pick a convention and stick to it.
The problem with ad-hoc naming
You end up with things like:
orders.exchange ← describes what it is
orders.process ← describes what it does
order.created ← describes what happened
Three different naming philosophies, one system. When you're staring at orders.process in a log, is that a queue, an exchange, or something else?
A better convention: <domain>.<type>.<purpose>
Embed the resource type directly in the name:
| Resource | Name format | Example |
|---|---|---|
| Exchange | <domain>.x.<type-or-purpose> |
orders.x.direct |
| Queue | <domain>.q.<purpose> |
orders.q.process |
| DLQ | <domain>.q.dead |
orders.q.dead |
| Routing key | <domain>.rk.<event> |
orders.rk.created |
Before vs after
BEFORE:
orders.exchange
orders.process
orders.dead
order.created
AFTER:
orders.x.direct ← "this is an exchange"
orders.q.process ← "this is a queue"
orders.q.dead ← "this is the DLQ"
orders.rk.created ← "this is a routing key"
Now when you open the RabbitMQ management UI:
Exchanges:
orders.x.direct
orders.x.fanout
orders.x.dead
Queues:
orders.q.process
orders.q.dead
orders.q.email
orders.q.analytics
You can tell at a glance what each resource is and which domain it belongs to. For a multi-tenant SaaS, add the tenant or service prefix: payroll.orders.q.process, hr.onboarding.q.welcome.
Applying it in code
const ORDERS = {
exchange: 'orders.x.direct',
queue: 'orders.q.process',
dlq: 'orders.q.dead',
dlx: 'orders.x.dead',
routingKey: 'orders.rk.created',
dlqRoutingKey: 'orders.rk.failed',
};
await channel.assertExchange(ORDERS.exchange, 'direct', { durable: true });
await channel.assertQueue(ORDERS.queue, { durable: true });
await channel.bindQueue(ORDERS.queue, ORDERS.exchange, ORDERS.routingKey);
Reading bindQueue(ORDERS.queue, ORDERS.exchange, ORDERS.routingKey) is now unambiguous — queue, exchange, routing key, in that order.
Dead letter queues and reprocessing
When a message can't be processed — bad data, a bug, an integration timeout — you don't want it going back to the main queue forever. You want it parked somewhere you can inspect and either fix or discard.
That's a dead letter queue (DLQ).
The flow
Main queue Main exchange Dead letter exchange
(orders.q.process) (orders.x.direct) (orders.x.dead)
│ │ │
│ message fails │ │
│ (nack requeue=false │ │
│ or TTL expires │ │
│ or queue length limit) │ │
│ │ │
└──── auto-routed ────────────────────────────────────▶│
│
▼
┌───────────────┐
│ orders.q.dead │
│ ████████ │ ← park here
└───────┬───────┘
│
▼
inspect / fix / replay
Setting it up
Three things to declare:
- The dead letter exchange
- The dead letter queue, bound to that exchange
- The main queue, configured to route failed messages to that exchange
// 1. DLX and DLQ
await channel.assertExchange('orders.x.dead', 'direct', { durable: true });
await channel.assertQueue('orders.q.dead', { durable: true });
await channel.bindQueue('orders.q.dead', 'orders.x.dead', 'orders.rk.failed');
// 2. Main queue with DLX configured via x-arguments
await channel.assertQueue('orders.q.process', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'orders.x.dead',
'x-dead-letter-routing-key': 'orders.rk.failed',
// Optional: messages expire after 5 min and auto-go to DLQ
// 'x-message-ttl': 300000,
},
});
Important: You can't add x-dead-letter-exchange to a queue that was created without it. If the queue already exists with different arguments, assertQueue will throw PRECONDITION_FAILED. Delete the queue first, or create a new one with a different name.
How messages end up in the DLQ
Three triggers route a message to the DLQ automatically:
-
Consumer nacks with
requeue=false—channel.nack(msg, false, false) -
Message TTL expires — if
x-message-ttlis set -
Queue length limit exceeded — if
x-max-lengthis set
In your consumer:
try {
await processOrder(order);
channel.ack(msg);
} catch (err) {
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount > 3) {
// Give up — route to DLQ
channel.nack(msg, false, false);
} else {
// Requeue for another try
channel.nack(msg, false, true);
}
}
Reprocessing from the DLQ
Two approaches:
Manual replay — inspect, fix the root cause, then publish back to the main exchange:
// src/tools/dlq-replay.js
async function replayDLQ() {
const channel = await createChannel();
while (true) {
const msg = await channel.get('orders.q.dead', { noAck: false });
if (!msg) break; // DLQ empty
console.log('Replaying:', msg.properties.messageId);
// Republish to main exchange
channel.publish(
'orders.x.direct',
'orders.rk.created',
msg.content,
{
persistent: true,
messageId: msg.properties.messageId,
headers: { 'x-replayed': true, 'x-replayed-at': Date.now() },
}
);
channel.ack(msg);
}
}
Scheduled retry with delay — use a "retry queue" with TTL that re-routes back to the main queue after a delay:
// Retry queue: holds messages for 60s, then dead-letters back to main
await channel.assertQueue('orders.q.retry', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'orders.x.direct',
'x-dead-letter-routing-key': 'orders.rk.created',
'x-message-ttl': 60000,
},
});
Send failed messages here instead of the DLQ, and they'll auto-return to the main queue after 60 seconds. Great for transient failures (rate-limited APIs, temporary network issues).
Pub/Sub with fanout
Different shape from work queues. When an order is placed, the email service, analytics service, and inventory service each need a copy of that event.
Publisher (Order API)
│
│ publish("order placed")
▼
┌─────────────────────┐
│ orders.x.fanout │
└──────────┬──────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
orders.q.email orders.q.analytics orders.q.inventory
│ │ │
▼ ▼ ▼
Email service Analytics service Inventory service
(sends email) (logs metrics) (decrements stock)
The critical rule for pub/sub: each consumer gets its own queue.
If all three services shared one queue, each message would only go to one of them — that's a work queue, not pub/sub. With one queue per consumer bound to the fanout exchange, every consumer gets every message.
Publisher
// src/producers/event-publisher.js
import { createChannel } from '../rabbitmq/connection.js';
const EVENTS_EXCHANGE = 'orders.x.fanout';
let channel = null;
async function initChannel() {
if (channel) return channel;
channel = await createChannel();
await channel.assertExchange(EVENTS_EXCHANGE, 'fanout', { durable: true });
return channel;
}
export async function publishOrderPlaced(order) {
const ch = await initChannel();
// Fanout ignores routing key — every bound queue gets the message
ch.publish(EVENTS_EXCHANGE, '', Buffer.from(JSON.stringify(order)), {
persistent: true,
contentType: 'application/json',
type: 'order.placed',
messageId: crypto.randomUUID(),
timestamp: Date.now(),
});
}
Consumer (email service)
// src/consumers/email-service.js
import { createChannel } from '../rabbitmq/connection.js';
const EVENTS_EXCHANGE = 'orders.x.fanout';
const QUEUE = 'orders.q.email'; // each consumer service owns its queue
export async function startEmailService() {
const channel = await createChannel();
await channel.assertExchange(EVENTS_EXCHANGE, 'fanout', { durable: true });
await channel.assertQueue(QUEUE, { durable: true });
await channel.bindQueue(QUEUE, EVENTS_EXCHANGE, '');
await channel.prefetch(5);
channel.consume(QUEUE, async (msg) => {
if (!msg) return;
try {
const order = JSON.parse(msg.content.toString());
await sendOrderConfirmationEmail(order);
channel.ack(msg);
} catch (err) {
channel.nack(msg, false, !msg.fields.redelivered);
}
});
}
Duplicate this file for each consumer service, changing only the queue name:
-
orders.q.email— email service -
orders.q.analytics— analytics service -
orders.q.inventory— inventory service
Each service's queue should be durable and named (not auto-delete, not exclusive). That way, if the analytics service goes down for two hours, messages pile up in orders.q.analytics and get processed when it comes back.
Work queue vs pub/sub — the one thing to remember
| Pattern | Setup | Result |
|---|---|---|
| Work queue | Many workers, one queue | Each message processed once |
| Pub/Sub | Many consumers, one queue each | Each message processed N times |
Handling server restarts
This is the section that separates hobby code from production code. Let me walk through every failure mode.
Scenario 1: Your Node.js process restarts
Your in-memory state is gone. The let channel = null variable is reinitialized. When the first publish call comes in, initChannel() runs fresh, creates a new channel, re-asserts the exchange/queue/binding. Everything works.
But: any messages your app had in flight (received but not yet acked) are redelivered to another consumer (or the same consumer after reconnect) because RabbitMQ sees the connection dropped without ack.
This is why idempotency matters. Your processOrder() function should be safe to run twice on the same order. Check for an existing order record before inserting, use upserts, track processed message IDs in Redis — whatever fits your domain.
Scenario 2: RabbitMQ broker restarts while your app is running
This is where naive code breaks silently. Your channel variable still holds a reference to the now-dead channel object. Without event listeners, the next publish() call throws or hangs.
What survives a broker restart:
- Durable exchanges (
{ durable: true }) ✓ - Durable queues (
{ durable: true }) ✓ - Persistent messages in durable queues (
{ persistent: true }) ✓ - Bindings between durable exchanges and durable queues ✓
What doesn't:
- Your application's connection and channel objects ✗
- Non-durable anything ✗
- Messages published without
persistent: true✗
Scenario 3: Network blip
Same as broker restart from the app's perspective — the connection drops.
The production-grade reconnection module
Here's what your connection module should actually look like:
// src/rabbitmq/connection.js
import amqp from 'amqplib';
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://localhost';
const RECONNECT_DELAY_MS = 5000;
let connection = null;
let channel = null;
let connecting = null; // prevents thundering-herd reconnects
async function connect() {
try {
console.log('[rabbitmq] connecting...');
connection = await amqp.connect(RABBITMQ_URL);
connection.on('error', (err) => {
console.error('[rabbitmq] connection error:', err.message);
});
connection.on('close', () => {
console.warn('[rabbitmq] connection closed, reconnecting in 5s');
connection = null;
channel = null;
setTimeout(() => { connecting = connect(); }, RECONNECT_DELAY_MS);
});
channel = await connection.createChannel();
channel.on('error', (err) => {
console.error('[rabbitmq] channel error:', err.message);
});
channel.on('close', () => {
console.warn('[rabbitmq] channel closed');
channel = null;
});
// Re-declare topology on every (re)connect
// Safe because assertExchange/assertQueue are idempotent
// for matching durable declarations
await setupTopology(channel);
console.log('[rabbitmq] ready');
return channel;
} catch (err) {
console.error('[rabbitmq] connect failed:', err.message);
connection = null;
channel = null;
await new Promise((r) => setTimeout(r, RECONNECT_DELAY_MS));
return connect();
}
}
async function setupTopology(ch) {
// Main exchange + queue
await ch.assertExchange('orders.x.direct', 'direct', { durable: true });
// DLX setup
await ch.assertExchange('orders.x.dead', 'direct', { durable: true });
await ch.assertQueue('orders.q.dead', { durable: true });
await ch.bindQueue('orders.q.dead', 'orders.x.dead', 'orders.rk.failed');
// Main queue with DLX
await ch.assertQueue('orders.q.process', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'orders.x.dead',
'x-dead-letter-routing-key': 'orders.rk.failed',
},
});
await ch.bindQueue('orders.q.process', 'orders.x.direct', 'orders.rk.created');
// Pub/sub exchange
await ch.assertExchange('orders.x.fanout', 'fanout', { durable: true });
}
export async function getChannel() {
if (channel) return channel;
if (!connecting) connecting = connect();
return connecting;
}
export async function shutdown() {
try {
if (channel) await channel.close();
if (connection) await connection.close();
} catch (err) {
console.error('[rabbitmq] shutdown error:', err.message);
}
}
// Graceful shutdown
process.on('SIGTERM', async () => {
await shutdown();
process.exit(0);
});
Key things this handles:
- Reset state on close events — next call creates a fresh channel
- Auto-reconnect on drop — with a delay to avoid tight loops
- Re-assert topology on every reconnect — idempotent, so safe to run every time
-
Single in-flight connect — the
connectingpromise prevents races - Graceful shutdown — lets in-flight messages finish on SIGTERM
The easier path: amqp-connection-manager
Everything above is doable yourself, but unless you specifically want to understand the internals, use amqp-connection-manager:
npm install amqp-connection-manager
import amqp from 'amqp-connection-manager';
const connection = amqp.connect(['amqp://localhost']);
const channelWrapper = connection.createChannel({
json: true,
setup: async (channel) => {
// Runs on every (re)connect
await channel.assertExchange('orders.x.direct', 'direct', { durable: true });
await channel.assertQueue('orders.q.process', { durable: true });
await channel.bindQueue('orders.q.process', 'orders.x.direct', 'orders.rk.created');
},
});
// This works even if broker is temporarily down — messages buffer until reconnect
await channelWrapper.publish('orders.x.direct', 'orders.rk.created', order, {
persistent: true,
});
It handles reconnection, topology re-setup, and publish buffering out of the box. For production, this is what I'd reach for.
Production checklist
Before you ship:
Broker setup
- [ ] Running RabbitMQ 3.12+ (or latest LTS)
- [ ] Enabled management plugin for the UI
- [ ] Configured memory and disk alarms
- [ ] Set up monitoring (Prometheus metrics, alerts)
- [ ] Configured a cluster for HA (if uptime matters)
Topology
- [ ] All exchanges declared with
durable: true - [ ] All queues declared with
durable: true - [ ] All messages published with
persistent: true - [ ] Consistent naming convention across all resources
- [ ] DLX and DLQ configured for every main queue
- [ ] Topology re-declaration on reconnect
Consumer side
- [ ] Manual acks (
noAck: false) - [ ]
channel.prefetch(N)set to a sensible value - [ ]
msg.fields.redeliveredchecked before requeuing - [ ] Idempotent message processing
- [ ] Retry count header or bounded retry logic
- [ ] Errors logged with message ID for debugging
Producer side
- [ ] Confirm channels for critical messages (payments, audit)
- [ ]
drainevent handled whenpublish()returns false - [ ] Message metadata populated —
messageId,timestamp,type
Resilience
- [ ] Auto-reconnection logic (or
amqp-connection-manager) - [ ] Graceful shutdown on SIGTERM
- [ ] Connection errors logged but don't crash the process
- [ ] Consumer startup retries if broker is unavailable
Closing thoughts
RabbitMQ rewards getting the fundamentals right. Most production issues I've seen come from the same handful of mistakes:
- Forgetting one of the three persistence flags — and only noticing when a broker restart loses 10,000 messages
- Not setting
prefetch— and wondering why adding more workers didn't help - Requeuing poison messages forever — and pinning a worker at 100% CPU on the same broken message
- Not handling connection drops — and finding out in production that your app silently stopped publishing an hour ago
- Ad-hoc naming — and spending 20 minutes figuring out what each resource does when debugging
Get those five things right, adopt a clean naming convention, add a DLQ for every main queue, and you have a system you can reason about at 2 AM.
The full runnable code for everything in this post is worth putting in a small sandbox repo — spin up Docker with rabbitmq:3-management, run a producer, run three workers, kill and restart containers, see what happens. An hour of that is worth ten hours of reading.
Top comments (0)