DEV Community

ronak navadia
ronak navadia

Posted on

Level Up Your NestJS App with BullMQ Queues, DLQs & Bull Board

In this guide, we’ll walk through how to integrate BullMQ with NestJS using the WorkerHost processor pattern, as recommended in the official NestJS documentation. Along the way, we’ll enhance our job processing system with Bull Board for real-time monitoring and set up a Dead Letter Queue (DLQ) to catch and manage poison jobs. To tie everything together, we'll build a simple authentication system with a signup endpoint that enqueues a welcome email job—demonstrating how background processing can streamline user workflows.

📂 Folder Structure

src/
│── app.module.ts
│── main.ts
│
├── config/
│   └── bull.config.ts
│
├── modules/
│   ├── auth/
│   │   ├── auth.controller.ts
│   │   ├── auth.service.ts
│   │   └── auth.module.ts
│   │
│   └── queue/
│       ├── queue.constants.ts
│       ├── queue.module.ts
│       ├── queue.service.ts         # producers
│       └── processors/
│           ├── appointment.processor.ts  # WorkerHost
│           └── dlq.processor.ts          # WorkerHost (DLQ)
│
└── .env

Enter fullscreen mode Exit fullscreen mode

📦 Install Dependencies

npm install bullmq ioredis class-validator class-transformer
npm install @bull-board/api @bull-board/express
Enter fullscreen mode Exit fullscreen mode

⚙️ Environment Configs (.env)

PORT=3000

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
REDIS_USE_TLS=false

# Queue defaults
QUEUE_DEFAULT_ATTEMPTS=4
Enter fullscreen mode Exit fullscreen mode

🧭 Queue Constants (src/modules/queue/queue.constants.ts)

export enum QueueNames {
  APPOINTMENT = 'appointment',
  APPOINTMENT_DLQ = 'appointment-dlq', // Dead Letter Queue
}

export const JobNames = {
  SCHEDULE_APPOINTMENT: 'schedule-appointment',
  APPOINTMENT_DIGEST: 'appointment-digest',
  DEAD_LETTER: 'dead-letter',
};
Enter fullscreen mode Exit fullscreen mode

🔧 Bull Config (src/config/bull.config.ts)

// src/config/bull.config.ts
import { ConfigModule, ConfigService } from '@nestjs/config';

export const bullQueueConfig = {
  imports: [ConfigModule],
  inject: [ConfigService],
  useFactory: async (configService: ConfigService) => {
    const useTls =
      configService.get<string>('REDIS_USE_TLS') === 'true' ||
      (configService.get<boolean>('REDIS_USE_TLS') as any) === true;

    return {
      connection: {
        host: configService.get<string>('REDIS_HOST'),
        port: Number(configService.get<number>('REDIS_PORT')),
        password: configService.get<string>('REDIS_PASSWORD') || undefined,
        db: Number(configService.get<number>('REDIS_DB') || 0),
        ...(useTls && { tls: {} }),
      },
      defaultJobOptions: {
        attempts: Number(configService.get<number>('QUEUE_DEFAULT_ATTEMPTS') || 4),
        backoff: { type: 'exponential', delay: 3000 },
        removeOnComplete: { age: 3600, count: 1000 },
        removeOnFail: { age: 24 * 3600, count: 1000 },
      },
    };
  },
};
Enter fullscreen mode Exit fullscreen mode

🏗 App Module (src/app.module.ts)

import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { bullQueueConfig } from './config/bull.config';
import { QueueModule } from './modules/queue/queue.module';
import { AuthModule } from './modules/auth/auth.module';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    BullModule.forRootAsync(bullQueueConfig), // <- as requested
    QueueModule,
    AuthModule,
  ],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

📦 Queue Module (src/modules/queue/queue.module.ts)

Registers both the main queue and the DLQ, and wires processors.

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { QueueNames } from './queue.constants';
import { QueueService } from './queue.service';
import { AppointmentProcessor } from './processors/appointment.processor';
import { DlqProcessor } from './processors/dlq.processor';

@Module({
  imports: [
    BullModule.registerQueue({
      name: QueueNames.APPOINTMENT,
      // per-queue defaults (can override root defaults)
      defaultJobOptions: {
        attempts: 4,
        backoff: { type: 'exponential', delay: 3000 },
        removeOnComplete: { age: 1800, count: 500 },
        removeOnFail: { age: 24 * 3600, count: 1000 },
      },
    }),
    BullModule.registerQueue({
      name: QueueNames.APPOINTMENT_DLQ,
      defaultJobOptions: {
        removeOnComplete: true,
      },
    }),
  ],
  providers: [QueueService, AppointmentProcessor, DlqProcessor],
  exports: [QueueService],
})
export class QueueModule {}
Enter fullscreen mode Exit fullscreen mode

📨 Producer Service (src/modules/queue/queue.service.ts)

Adds jobs, delayed jobs, and a repeatable digest. (We keep producer isolated here and call it from AuthService.)

import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue, JobsOptions } from 'bullmq';
import { JobNames, QueueNames } from './queue.constants';

@Injectable()
export class QueueService {
  constructor(
    @InjectQueue(QueueNames.APPOINTMENT) private readonly appointmentQueue: Queue,
  ) {}

  async addAppointmentJob(data: { userId: string; email: string; name: string }, opts?: JobsOptions) {
    // Idempotency via jobId (prevents duplicates)
    const jobId = opts?.jobId ?? `welcome:${data.email}`;
    return this.appointmentQueue.add(JobNames.SCHEDULE_APPOINTMENT, data, {
      jobId,
      attempts: 4, // ensure attempts is set on the job so DLQ logic can read it
      backoff: { type: 'exponential', delay: 3000 },
      ...opts,
    });
  }

  async addDelayedAppointment(data: { userId: string; email: string; name: string }, delayMs: number) {
    return this.addAppointmentJob(data, { delay: delayMs });
  }

  async addRepeatableDigest() {
    return this.appointmentQueue.add(JobNames.APPOINTMENT_DIGEST, {}, { repeat: { cron: '0 9 * * *' } });
  }
}
Enter fullscreen mode Exit fullscreen mode

⚙️ Appointment Processor (WorkerHost) with DLQ routing

  • Implements work in process(job).
  • Emits worker events (waiting, active, stalled, paused, resumed, drained).
  • On final failure (attempts exhausted), it adds a job to DLQ with a small payload.
// src/modules/queue/processors/appointment.processor.ts
import { Injectable, Logger } from '@nestjs/common';
import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq';
import { Job, Queue } from 'bullmq';
import { JobNames, QueueNames } from '../queue.constants';

@Processor(QueueNames.APPOINTMENT)
@Injectable()
export class AppointmentProcessor extends WorkerHost {
  private readonly logger = new Logger(AppointmentProcessor.name);

  constructor(
    @InjectQueue(QueueNames.APPOINTMENT_DLQ) private readonly dlq: Queue,
  ) {
    super();
  }

  // Main work
  async process(job: Job<{ userId: string; email: string; name: string }>): Promise<any> {
    this.logger.log(`Processing job ${job.id} for ${job.data.email}`);
    await job.updateProgress(10);

    // Simulate sending a welcome email (replace with real provider call)
    await simulateSendWelcomeEmail(job.data.email, job.data.name);

    await job.updateProgress(100);
    this.logger.log(`Completed job ${job.id}`);
    return { ok: true };
  }

  // Worker events (observability)
  @OnWorkerEvent('waiting')
  onWaiting(jobId: string) {
    this.logger.log(`Event: waiting jobId=${jobId}`);
  }

  @OnWorkerEvent('active')
  onActive(job: Job) {
    this.logger.log(`Event: active jobId=${job?.id}`);
  }

  @OnWorkerEvent('stalled')
  onStalled(job: Job) {
    this.logger.warn(`Event: stalled jobId=${job?.id}`);
  }

  @OnWorkerEvent('paused')
  onPaused() {
    this.logger.warn('Event: worker paused');
  }

  @OnWorkerEvent('resumed')
  onResumed() {
    this.logger.log('Event: worker resumed');
  }

  @OnWorkerEvent('drained')
  onDrained() {
    this.logger.log('Event: worker drained');
  }

  @OnWorkerEvent('failed')
  async onFailed(job: Job | undefined, err: Error) {
    const id = job?.id ?? 'unknown';
    const attemptsMade = job?.attemptsMade ?? 0;
    const maxAttempts = job?.opts?.attempts ?? 1; // we set attempts on each job in the producer

    this.logger.error(`Event: failed jobId=${id} attemptsMade=${attemptsMade}/${maxAttempts} err=${err?.message}`);

    // **DLQ routing**: only when attempts exhausted (poison job)
    if (job && attemptsMade >= maxAttempts) {
      await this.routeToDLQ(job, err);
    }
  }

  private async routeToDLQ(job: Job, err: Error) {
    try {
      await this.dlq.add(JobNames.DEAD_LETTER, {
        originalJobId: job.id,
        name: job.name,
        queue: QueueNames.APPOINTMENT,
        data: job.data,                // keep small—prefer references/IDs in real apps
        reason: err?.message ?? 'unknown',
        attemptsMade: job.attemptsMade,
        maxAttempts: job.opts.attempts ?? null,
        failedAt: new Date().toISOString(),
      }, { removeOnComplete: true });

      this.logger.warn(`Job ${job.id} moved to DLQ (${QueueNames.APPOINTMENT_DLQ})`);
    } catch (dlqErr) {
      this.logger.error(`Failed to push job ${job.id} to DLQ: ${dlqErr?.message}`);
    }
  }
}

// Simulated email sender (10% transient failure to demo retries/backoff)
async function simulateSendWelcomeEmail(email: string, name: string) {
  await new Promise((r) => setTimeout(r, 200));
  if (!email) throw new Error('Invalid email');
  if (Math.random() < 0.1) throw new Error('Transient mail provider error');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

🧰 DLQ Processor (WorkerHost)

  • Handles jobs placed in DLQ (e.g., persist to DB, alert ops).
// src/modules/queue/processors/dlq.processor.ts
import { Injectable, Logger } from '@nestjs/common';
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { JobNames, QueueNames } from '../queue.constants';

@Processor(QueueNames.APPOINTMENT_DLQ)
@Injectable()
export class DlqProcessor extends WorkerHost {
  private readonly logger = new Logger(DlqProcessor.name);

  async process(job: Job<{
    originalJobId: string;
    name: string;
    queue: string;
    data: any;
    reason: string;
    attemptsMade: number;
    maxAttempts: number | null;
    failedAt: string;
  }>) {
    this.logger.warn(
      `DLQ: jobId=${job.id} original=${job.data.originalJobId} reason="${job.data.reason}"`,
    );

    // TODO:
    // - Persist to DB for triage
    // - Create an incident/alert
    // - Optionally implement auto-retry logic back to main queue after inspection

    return { handled: true };
  }
}
Enter fullscreen mode Exit fullscreen mode

👤 Auth Module — Signup API (Producer outside queue module)

src/modules/auth/auth.controller.ts

import { Body, Controller, Post } from '@nestjs/common';
import { IsEmail, IsString, MinLength } from 'class-validator';
import { AuthService } from './auth.service';

class SignupDto {
  @IsEmail()
  email: string;

  @IsString()
  @MinLength(3)
  name: string;

  @IsString()
  @MinLength(6)
  password: string;
}

@Controller('auth')
export class AuthController {
  constructor(private readonly authService: AuthService) {}

  @Post('signup')
  async signup(@Body() dto: SignupDto) {
    const res = await this.authService.signup(dto);
    return { ok: true, queuedJobId: res.jobId };
  }
}
Enter fullscreen mode Exit fullscreen mode

src/modules/auth/auth.service.ts

import { Injectable } from '@nestjs/common';
import { QueueService } from '../queue/queue.service';

@Injectable()
export class AuthService {
  constructor(private readonly queueService: QueueService) {}

  async signup(dto: { email: string; name: string; password: string }) {
    // 1) Create user in DB (omitted)
    const userId = 'generated-user-id';

    // 2) Enqueue welcome/appointment job (idempotent by jobId)
    const job = await this.queueService.addAppointmentJob({
      userId,
      email: dto.email,
      name: dto.name,
    });

    return { jobId: job.id };
  }
}
Enter fullscreen mode Exit fullscreen mode

src/modules/auth/auth.module.ts

import { Module } from '@nestjs/common';
import { AuthController } from './auth.controller';
import { AuthService } from './auth.service';
import { QueueModule } from '../queue/queue.module';

@Module({
  imports: [QueueModule],
  controllers: [AuthController],
  providers: [AuthService],
})
export class AuthModule {}
Enter fullscreen mode Exit fullscreen mode

🌐 Bull Board at /bull-board (src/main.ts)

Mounts Bull Board using the queue instances from Nest DI.

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { ValidationPipe } from '@nestjs/common';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { getQueueToken } from '@nestjs/bullmq';
import { QueueNames } from './modules/queue/queue.constants';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.useGlobalPipes(new ValidationPipe({ whitelist: true, transform: true }));

  const serverAdapter = new ExpressAdapter();
  serverAdapter.setBasePath('/bull-board');

  // Get queues from Nest DI
  const appointmentQueue = app.get(getQueueToken(QueueNames.APPOINTMENT));
  const dlqQueue = app.get(getQueueToken(QueueNames.APPOINTMENT_DLQ));

  createBullBoard({
    queues: [new BullMQAdapter(appointmentQueue), new BullMQAdapter(dlqQueue)],
    serverAdapter,
  });

  app.use('/bull-board', serverAdapter.getRouter());

  const port = process.env.PORT || 3000;
  await app.listen(port);
  console.log(`App:        http://localhost:${port}`);
  console.log(`Bull Board: http://localhost:${port}/bull-board`);
}
bootstrap();
Enter fullscreen mode Exit fullscreen mode

📘 Topics & Best Practices

🔁 Retries & Backoff

  • Set attempts and backoff on each job (or via defaultJobOptions).
  • Throw in process(job) to trigger retry.
  • Exponential backoff is safer for flaky downstream providers.

⏱ Delayed & Repeatable Jobs

  • Delay: queue.add(name, data, { delay: ms }).
  • Repeat: queue.add(name, data, { repeat: { cron: '0 9 * * *' }}).
  • Remove a repeatable with queue.removeRepeatable(...) when no longer needed.

⚠️ Error Handling (Throw vs Fail, DLQ, Poison Jobs)

  • Throw inside process() → failed attempt recorded, retried until attempts exhausted.
  • On final failure, route to DLQ (see AppointmentProcessor.onFailed()), store minimal payload (IDs/refs) and reason.
  • Poison jobs should end up in DLQ; review and requeue manually (or via an ops flow).

🔐 Idempotency & Dedup

  • Use jobId like welcome:${email} to dedupe enqueues.
  • Make processing idempotent (e.g., DB flag “email_sent” checked before sending).

🚦 Rate Limiting & Throughput Tuning

  • Prefer horizontal scaling (multiple worker processes/instances).
  • Keep concurrency reasonable to avoid saturating downstream services.
  • For provider rate limits, add throttling middleware or use Worker-level limiter (if you run custom workers).

📡 Worker/Queue Events

Handled via @OnWorkerEvent: waiting, active, stalled, paused, resumed, progress, completed, failed, drained.
Use these for metrics, logs, alerts, and DLQ routing.

🧹 Cleanup Policies

  • Use removeOnComplete / removeOnFail to avoid Redis bloat.
  • Periodically clean old jobs (Bull Board or code).

🛑 Graceful Shutdown

  • Nest + @nestjs/bullmq gracefully tears down workers.
  • Avoid SIGKILL; let in-flight jobs finish.
  • If you hold raw Queue instances elsewhere, close them in onModuleDestroy.

🔭 Observability

  • Bull Board for inspection.
  • Log worker events.
  • Export metrics (counts, durations, failure ratio) to Prometheus/Grafana.

🔎 Accessing Bull Board

  • Start the app: npm run start:dev
  • Visit: http://localhost:3000/bull-board
  • Inspect queues, retry/remove failed jobs, watch job progress.

Top comments (0)