BullMQ is a Node.js job queue built on Redis — reliable, fast, and battle-tested. Process background jobs, schedule tasks, and handle retries without external services.
Why BullMQ?
- Redis-backed: Fast and reliable
- Retries: Automatic with exponential backoff
- Rate limiting: Control processing speed
- Priorities: Process important jobs first
- Delayed jobs: Schedule for later
- Repeatable: Cron-like scheduling
- Flows: Job dependencies and parent-child relationships
- Dashboard: Bull Board for monitoring
Install
npm install bullmq
Producer
import { Queue } from 'bullmq';
const emailQueue = new Queue('emails', {
connection: { host: 'localhost', port: 6379 },
});
// Add a job
await emailQueue.add('welcome', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thanks for signing up.',
});
// Delayed job (send in 1 hour)
await emailQueue.add('reminder', {
to: 'user@example.com',
subject: 'Complete your profile',
}, { delay: 60 * 60 * 1000 });
// Repeatable job (daily at 9 AM)
await emailQueue.add('daily-digest', {}, {
repeat: { cron: '0 9 * * *' },
});
Worker
import { Worker } from 'bullmq';
const worker = new Worker('emails', async (job) => {
console.log(`Processing ${job.name}: ${job.data.to}`);
switch (job.name) {
case 'welcome':
await sendWelcomeEmail(job.data);
break;
case 'reminder':
await sendReminderEmail(job.data);
break;
case 'daily-digest':
await sendDailyDigest();
break;
}
return { sent: true, timestamp: Date.now() };
}, {
connection: { host: 'localhost', port: 6379 },
concurrency: 5,
});
worker.on('completed', (job) => console.log(`Job ${job.id} completed`));
worker.on('failed', (job, err) => console.error(`Job ${job.id} failed:`, err));
Retries
await emailQueue.add('important-email', data, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
});
Rate Limiting
const worker = new Worker('api-calls', processJob, {
limiter: {
max: 10,
duration: 1000,
},
});
Job Priorities
await queue.add('urgent', data, { priority: 1 }); // Processed first
await queue.add('normal', data, { priority: 5 }); // Processed later
await queue.add('low', data, { priority: 10 }); // Processed last
Flows (Job Dependencies)
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({ connection });
await flowProducer.add({
name: 'send-invoice',
queueName: 'emails',
data: { orderId: '123' },
children: [
{ name: 'generate-pdf', queueName: 'documents', data: { orderId: '123' } },
{ name: 'calculate-tax', queueName: 'billing', data: { orderId: '123' } },
],
});
Parent job runs only after all children complete.
Bull Board (Dashboard)
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullMQAdapter(emailQueue)],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());
Real-World Use Case
An e-commerce site processed orders synchronously — checkout took 8 seconds. After moving email, PDF generation, and inventory updates to BullMQ, checkout dropped to 200ms. Failed payment webhooks automatically retry 3 times before alerting support.
Need to automate data collection? Check out my Apify actors for ready-made scrapers, or email spinov001@gmail.com for custom solutions.
Top comments (0)