In this guide, we’ll walk through how to integrate BullMQ with NestJS using the WorkerHost processor pattern, as recommended in the official NestJS documentation. Along the way, we’ll enhance our job processing system with Bull Board for real-time monitoring and set up a Dead Letter Queue (DLQ) to catch and manage poison jobs. To tie everything together, we'll build a simple authentication system with a signup endpoint that enqueues a welcome email job—demonstrating how background processing can streamline user workflows.
📂 Folder Structure
src/
│── app.module.ts
│── main.ts
│
├── config/
│ └── bull.config.ts
│
├── modules/
│ ├── auth/
│ │ ├── auth.controller.ts
│ │ ├── auth.service.ts
│ │ └── auth.module.ts
│ │
│ └── queue/
│ ├── queue.constants.ts
│ ├── queue.module.ts
│ ├── queue.service.ts # producers
│ └── processors/
│ ├── appointment.processor.ts # WorkerHost
│ └── dlq.processor.ts # WorkerHost (DLQ)
│
└── .env
📦 Install Dependencies
npm install bullmq ioredis class-validator class-transformer
npm install @bull-board/api @bull-board/express
⚙️ Environment Configs (.env)
PORT=3000
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
REDIS_USE_TLS=false
# Queue defaults
QUEUE_DEFAULT_ATTEMPTS=4
🧭 Queue Constants (src/modules/queue/queue.constants.ts)
export enum QueueNames {
APPOINTMENT = 'appointment',
APPOINTMENT_DLQ = 'appointment-dlq', // Dead Letter Queue
}
export const JobNames = {
SCHEDULE_APPOINTMENT: 'schedule-appointment',
APPOINTMENT_DIGEST: 'appointment-digest',
DEAD_LETTER: 'dead-letter',
};
🔧 Bull Config (src/config/bull.config.ts)
// src/config/bull.config.ts
import { ConfigModule, ConfigService } from '@nestjs/config';
export const bullQueueConfig = {
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService) => {
const useTls =
configService.get<string>('REDIS_USE_TLS') === 'true' ||
(configService.get<boolean>('REDIS_USE_TLS') as any) === true;
return {
connection: {
host: configService.get<string>('REDIS_HOST'),
port: Number(configService.get<number>('REDIS_PORT')),
password: configService.get<string>('REDIS_PASSWORD') || undefined,
db: Number(configService.get<number>('REDIS_DB') || 0),
...(useTls && { tls: {} }),
},
defaultJobOptions: {
attempts: Number(configService.get<number>('QUEUE_DEFAULT_ATTEMPTS') || 4),
backoff: { type: 'exponential', delay: 3000 },
removeOnComplete: { age: 3600, count: 1000 },
removeOnFail: { age: 24 * 3600, count: 1000 },
},
};
},
};
🏗 App Module (src/app.module.ts)
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { bullQueueConfig } from './config/bull.config';
import { QueueModule } from './modules/queue/queue.module';
import { AuthModule } from './modules/auth/auth.module';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
BullModule.forRootAsync(bullQueueConfig), // <- as requested
QueueModule,
AuthModule,
],
})
export class AppModule {}
📦 Queue Module (src/modules/queue/queue.module.ts)
Registers both the main queue and the DLQ, and wires processors.
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { QueueNames } from './queue.constants';
import { QueueService } from './queue.service';
import { AppointmentProcessor } from './processors/appointment.processor';
import { DlqProcessor } from './processors/dlq.processor';
@Module({
imports: [
BullModule.registerQueue({
name: QueueNames.APPOINTMENT,
// per-queue defaults (can override root defaults)
defaultJobOptions: {
attempts: 4,
backoff: { type: 'exponential', delay: 3000 },
removeOnComplete: { age: 1800, count: 500 },
removeOnFail: { age: 24 * 3600, count: 1000 },
},
}),
BullModule.registerQueue({
name: QueueNames.APPOINTMENT_DLQ,
defaultJobOptions: {
removeOnComplete: true,
},
}),
],
providers: [QueueService, AppointmentProcessor, DlqProcessor],
exports: [QueueService],
})
export class QueueModule {}
📨 Producer Service (src/modules/queue/queue.service.ts)
Adds jobs, delayed jobs, and a repeatable digest. (We keep producer isolated here and call it from AuthService.)
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue, JobsOptions } from 'bullmq';
import { JobNames, QueueNames } from './queue.constants';
@Injectable()
export class QueueService {
constructor(
@InjectQueue(QueueNames.APPOINTMENT) private readonly appointmentQueue: Queue,
) {}
async addAppointmentJob(data: { userId: string; email: string; name: string }, opts?: JobsOptions) {
// Idempotency via jobId (prevents duplicates)
const jobId = opts?.jobId ?? `welcome:${data.email}`;
return this.appointmentQueue.add(JobNames.SCHEDULE_APPOINTMENT, data, {
jobId,
attempts: 4, // ensure attempts is set on the job so DLQ logic can read it
backoff: { type: 'exponential', delay: 3000 },
...opts,
});
}
async addDelayedAppointment(data: { userId: string; email: string; name: string }, delayMs: number) {
return this.addAppointmentJob(data, { delay: delayMs });
}
async addRepeatableDigest() {
return this.appointmentQueue.add(JobNames.APPOINTMENT_DIGEST, {}, { repeat: { cron: '0 9 * * *' } });
}
}
⚙️ Appointment Processor (WorkerHost) with DLQ routing
- Implements work in process(job).
- Emits worker events (waiting, active, stalled, paused, resumed, drained).
- On final failure (attempts exhausted), it adds a job to DLQ with a small payload.
// src/modules/queue/processors/appointment.processor.ts
import { Injectable, Logger } from '@nestjs/common';
import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq';
import { Job, Queue } from 'bullmq';
import { JobNames, QueueNames } from '../queue.constants';
@Processor(QueueNames.APPOINTMENT)
@Injectable()
export class AppointmentProcessor extends WorkerHost {
private readonly logger = new Logger(AppointmentProcessor.name);
constructor(
@InjectQueue(QueueNames.APPOINTMENT_DLQ) private readonly dlq: Queue,
) {
super();
}
// Main work
async process(job: Job<{ userId: string; email: string; name: string }>): Promise<any> {
this.logger.log(`Processing job ${job.id} for ${job.data.email}`);
await job.updateProgress(10);
// Simulate sending a welcome email (replace with real provider call)
await simulateSendWelcomeEmail(job.data.email, job.data.name);
await job.updateProgress(100);
this.logger.log(`Completed job ${job.id}`);
return { ok: true };
}
// Worker events (observability)
@OnWorkerEvent('waiting')
onWaiting(jobId: string) {
this.logger.log(`Event: waiting jobId=${jobId}`);
}
@OnWorkerEvent('active')
onActive(job: Job) {
this.logger.log(`Event: active jobId=${job?.id}`);
}
@OnWorkerEvent('stalled')
onStalled(job: Job) {
this.logger.warn(`Event: stalled jobId=${job?.id}`);
}
@OnWorkerEvent('paused')
onPaused() {
this.logger.warn('Event: worker paused');
}
@OnWorkerEvent('resumed')
onResumed() {
this.logger.log('Event: worker resumed');
}
@OnWorkerEvent('drained')
onDrained() {
this.logger.log('Event: worker drained');
}
@OnWorkerEvent('failed')
async onFailed(job: Job | undefined, err: Error) {
const id = job?.id ?? 'unknown';
const attemptsMade = job?.attemptsMade ?? 0;
const maxAttempts = job?.opts?.attempts ?? 1; // we set attempts on each job in the producer
this.logger.error(`Event: failed jobId=${id} attemptsMade=${attemptsMade}/${maxAttempts} err=${err?.message}`);
// **DLQ routing**: only when attempts exhausted (poison job)
if (job && attemptsMade >= maxAttempts) {
await this.routeToDLQ(job, err);
}
}
private async routeToDLQ(job: Job, err: Error) {
try {
await this.dlq.add(JobNames.DEAD_LETTER, {
originalJobId: job.id,
name: job.name,
queue: QueueNames.APPOINTMENT,
data: job.data, // keep small—prefer references/IDs in real apps
reason: err?.message ?? 'unknown',
attemptsMade: job.attemptsMade,
maxAttempts: job.opts.attempts ?? null,
failedAt: new Date().toISOString(),
}, { removeOnComplete: true });
this.logger.warn(`Job ${job.id} moved to DLQ (${QueueNames.APPOINTMENT_DLQ})`);
} catch (dlqErr) {
this.logger.error(`Failed to push job ${job.id} to DLQ: ${dlqErr?.message}`);
}
}
}
// Simulated email sender (10% transient failure to demo retries/backoff)
async function simulateSendWelcomeEmail(email: string, name: string) {
await new Promise((r) => setTimeout(r, 200));
if (!email) throw new Error('Invalid email');
if (Math.random() < 0.1) throw new Error('Transient mail provider error');
return true;
}
🧰 DLQ Processor (WorkerHost)
- Handles jobs placed in DLQ (e.g., persist to DB, alert ops).
// src/modules/queue/processors/dlq.processor.ts
import { Injectable, Logger } from '@nestjs/common';
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { JobNames, QueueNames } from '../queue.constants';
@Processor(QueueNames.APPOINTMENT_DLQ)
@Injectable()
export class DlqProcessor extends WorkerHost {
private readonly logger = new Logger(DlqProcessor.name);
async process(job: Job<{
originalJobId: string;
name: string;
queue: string;
data: any;
reason: string;
attemptsMade: number;
maxAttempts: number | null;
failedAt: string;
}>) {
this.logger.warn(
`DLQ: jobId=${job.id} original=${job.data.originalJobId} reason="${job.data.reason}"`,
);
// TODO:
// - Persist to DB for triage
// - Create an incident/alert
// - Optionally implement auto-retry logic back to main queue after inspection
return { handled: true };
}
}
👤 Auth Module — Signup API (Producer outside queue module)
src/modules/auth/auth.controller.ts
import { Body, Controller, Post } from '@nestjs/common';
import { IsEmail, IsString, MinLength } from 'class-validator';
import { AuthService } from './auth.service';
class SignupDto {
@IsEmail()
email: string;
@IsString()
@MinLength(3)
name: string;
@IsString()
@MinLength(6)
password: string;
}
@Controller('auth')
export class AuthController {
constructor(private readonly authService: AuthService) {}
@Post('signup')
async signup(@Body() dto: SignupDto) {
const res = await this.authService.signup(dto);
return { ok: true, queuedJobId: res.jobId };
}
}
src/modules/auth/auth.service.ts
import { Injectable } from '@nestjs/common';
import { QueueService } from '../queue/queue.service';
@Injectable()
export class AuthService {
constructor(private readonly queueService: QueueService) {}
async signup(dto: { email: string; name: string; password: string }) {
// 1) Create user in DB (omitted)
const userId = 'generated-user-id';
// 2) Enqueue welcome/appointment job (idempotent by jobId)
const job = await this.queueService.addAppointmentJob({
userId,
email: dto.email,
name: dto.name,
});
return { jobId: job.id };
}
}
src/modules/auth/auth.module.ts
import { Module } from '@nestjs/common';
import { AuthController } from './auth.controller';
import { AuthService } from './auth.service';
import { QueueModule } from '../queue/queue.module';
@Module({
imports: [QueueModule],
controllers: [AuthController],
providers: [AuthService],
})
export class AuthModule {}
🌐 Bull Board at /bull-board (src/main.ts)
Mounts Bull Board using the queue instances from Nest DI.
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { ValidationPipe } from '@nestjs/common';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { getQueueToken } from '@nestjs/bullmq';
import { QueueNames } from './modules/queue/queue.constants';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.useGlobalPipes(new ValidationPipe({ whitelist: true, transform: true }));
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/bull-board');
// Get queues from Nest DI
const appointmentQueue = app.get(getQueueToken(QueueNames.APPOINTMENT));
const dlqQueue = app.get(getQueueToken(QueueNames.APPOINTMENT_DLQ));
createBullBoard({
queues: [new BullMQAdapter(appointmentQueue), new BullMQAdapter(dlqQueue)],
serverAdapter,
});
app.use('/bull-board', serverAdapter.getRouter());
const port = process.env.PORT || 3000;
await app.listen(port);
console.log(`App: http://localhost:${port}`);
console.log(`Bull Board: http://localhost:${port}/bull-board`);
}
bootstrap();
📘 Topics & Best Practices
🔁 Retries & Backoff
- Set
attempts
andbackoff
on each job (or viadefaultJobOptions
). - Throw in
process(job)
to trigger retry. - Exponential backoff is safer for flaky downstream providers.
⏱ Delayed & Repeatable Jobs
- Delay:
queue.add(name, data, { delay: ms })
. - Repeat:
queue.add(name, data, { repeat: { cron: '0 9 * * *' }})
. - Remove a repeatable with
queue.removeRepeatable(...)
when no longer needed.
⚠️ Error Handling (Throw vs Fail, DLQ, Poison Jobs)
- Throw inside
process()
→ failed attempt recorded, retried until attempts exhausted. - On final failure, route to DLQ (see
AppointmentProcessor.onFailed()
), store minimal payload (IDs/refs) and reason. - Poison jobs should end up in DLQ; review and requeue manually (or via an ops flow).
🔐 Idempotency & Dedup
- Use
jobId
likewelcome:${email}
to dedupe enqueues. - Make processing idempotent (e.g., DB flag “email_sent” checked before sending).
🚦 Rate Limiting & Throughput Tuning
- Prefer horizontal scaling (multiple worker processes/instances).
- Keep
concurrency
reasonable to avoid saturating downstream services. - For provider rate limits, add throttling middleware or use Worker-level limiter (if you run custom workers).
📡 Worker/Queue Events
Handled via @OnWorkerEvent
: waiting
, active
, stalled
, paused
, resumed
, progress
, completed
, failed
, drained
.
Use these for metrics, logs, alerts, and DLQ routing.
🧹 Cleanup Policies
- Use
removeOnComplete
/removeOnFail
to avoid Redis bloat. - Periodically clean old jobs (Bull Board or code).
🛑 Graceful Shutdown
- Nest + @nestjs/bullmq gracefully tears down workers.
- Avoid SIGKILL; let in-flight jobs finish.
- If you hold raw Queue instances elsewhere, close them in
onModuleDestroy
.
🔭 Observability
- Bull Board for inspection.
- Log worker events.
- Export metrics (counts, durations, failure ratio) to Prometheus/Grafana.
🔎 Accessing Bull Board
- Start the app:
npm run start:dev
- Visit:
http://localhost:3000/bull-board
- Inspect queues, retry/remove failed jobs, watch job progress.
Top comments (0)