DEV Community

Mukesh Rajbanshi
Mukesh Rajbanshi

Posted on

How to Integrate RabbitMQ (Message Broker) in NestJS – Step-by-Step Guide

Nestjs+Rabbitmq

Overview

This guide provides a complete step-by-step process for integrating RabbitMQ in a NestJS application using Docker and the built-in @nestjs/microservices package.

RabbitMQ is a robust message broker that facilitates asynchronous communication between microservices, promoting loose coupling, scalability, and fault tolerance.

1. 🐳 Setting Up RabbitMQ with Docker

Prerequisites

  • Docker Desktop installed and running
  • Basic knowledge of command line interface

Step 1: Pull and Run RabbitMQ Container

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Enter fullscreen mode Exit fullscreen mode

Note: Port 5672 is used for RabbitMQ messaging, and 15672 is for the web-based management UI.

Step 2: Access RabbitMQ Management Dashboard

Open your browser and navigate to: http://localhost:15672

Login credentials:

  • Username: guest
  • Password: guest

Step 3: Stop/Restart Container (Optional)

# Stop container
docker stop rabbitmq

# Start container
docker start rabbitmq
Enter fullscreen mode Exit fullscreen mode

2. 📦 Installing Required Packages

npm install @nestjs/microservices amqplib
npm install --save-dev @types/amqplib
Enter fullscreen mode Exit fullscreen mode

3. ⚙️ Configuration Setup

Constants Definition

// src/constants.ts
export const FILE_UPLOAD_QUEUE = 'file-upload-queue';
export const EMAIL_SEND_QUEUE = 'email-send-queue';
Enter fullscreen mode Exit fullscreen mode

Environment Configuration

Create a .env file in your project root:

RABBITMQ_URL=amqp://localhost:5672
RABBITMQ_FILE_UPLOAD_QUEUE=file-upload-queue
RABBITMQ_EMAIL_SEND_QUEUE=email-send-queue
Enter fullscreen mode Exit fullscreen mode

Configuration Module

// src/config/rabbitmq.config.ts
import { registerAs } from '@nestjs/config';

export default registerAs('rabbitmq', () => ({
  url: process.env.RABBITMQ_URL || 'amqp://localhost:5672',
  FILE_UPLOAD_QUEUE: process.env.RABBITMQ_FILE_UPLOAD_QUEUE || 'file-upload-queue',
  EMAIL_SEND_QUEUE: process.env.RABBITMQ_EMAIL_SEND_QUEUE || 'email-send-queue',
}));
Enter fullscreen mode Exit fullscreen mode

Main Application Configuration

// src/main.ts
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { ConfigService } from '@nestjs/config';
import { AppModule } from './app.module';
import { FILE_UPLOAD_QUEUE, EMAIL_SEND_QUEUE } from './constants';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const configService = app.get(ConfigService);

  const microservices = [
    {
      transport: Transport.RMQ,
      options: {
        urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
        queue: FILE_UPLOAD_QUEUE,
        queueOptions: { durable: true },
      },
    },
    {
      transport: Transport.RMQ,
      options: {
        urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
        queue: EMAIL_SEND_QUEUE,
        queueOptions: { durable: true },
      },
    },
  ];

  microservices.forEach((ms) => app.connectMicroservice(ms));
  await app.startAllMicroservices();

  await app.listen(3000);
}

bootstrap();
Enter fullscreen mode Exit fullscreen mode

4. 📤 Producer Module Setup

Producer Module

// src/producer/producer.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ProducerService } from './producer.service';
import { EMAIL_SEND_QUEUE, FILE_UPLOAD_QUEUE } from '../constants';

@Module({
  imports: [
    ConfigModule,
    ClientsModule.registerAsync([
      {
        name: FILE_UPLOAD_QUEUE,
        imports: [ConfigModule],
        useFactory: (configService: ConfigService) => ({
          transport: Transport.RMQ,
          options: {
            urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
            queue: FILE_UPLOAD_QUEUE,
            queueOptions: { durable: true },
          },
        }),
        inject: [ConfigService],
      },
      {
        name: EMAIL_SEND_QUEUE,
        imports: [ConfigModule],
        useFactory: (configService: ConfigService) => ({
          transport: Transport.RMQ,
          options: {
            urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
            queue: EMAIL_SEND_QUEUE,
            queueOptions: { durable: true },
          },
        }),
        inject: [ConfigService],
      },
    ]),
  ],
  providers: [ProducerService],
  exports: [ProducerService],
})
export class ProducerModule {}
Enter fullscreen mode Exit fullscreen mode

Producer Service

// src/producer/producer.service.ts
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { SendMailDto } from '../mail/dto/sendMail.dto';
import { lastValueFrom } from 'rxjs';
import { EMAIL_SEND_QUEUE, FILE_UPLOAD_QUEUE } from '../constants';

@Injectable()
export class ProducerService {
  private readonly logger = new Logger(ProducerService.name);

  constructor(
    @Inject(FILE_UPLOAD_QUEUE) private readonly fileQueue: ClientProxy,
    @Inject(EMAIL_SEND_QUEUE) private readonly emailQueue: ClientProxy,
  ) {}

  async enqueueFileUpload(payload: {
    originalname: string;
    mimetype: string;
    buffer: string;
  }) {
    try {
      this.logger.log(`Enqueuing file upload: ${payload.originalname}`);
      return await lastValueFrom(this.fileQueue.emit('file-upload', payload));
    } catch (error) {
      this.logger.error('Error enqueuing file upload:', error);
      throw error;
    }
  }

  async enqueueEmail(sendMailDto: SendMailDto) {
    try {
      this.logger.log(`Enqueuing email to: ${sendMailDto.to}`);
      return await lastValueFrom(this.emailQueue.emit('email-send', sendMailDto));
    } catch (error) {
      this.logger.error('Error enqueuing email:', error);
      throw error;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

5. 📥 Consumer Module Setup

Consumer Controller

// src/consumer/consumer.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { MailService } from '../mail/mail.service';
import { SendMailDto } from '../mail/dto/sendMail.dto';

@Controller()
export class ConsumerController {
  private readonly logger = new Logger(ConsumerController.name);

  constructor(private readonly mailService: MailService) {}

  @EventPattern('email-send')
  async handleEmailSend(@Payload() sendMailDto: SendMailDto) {
    this.logger.log('Processing email send request:', sendMailDto);
    try {
      // Uncomment when ready to implement
      // await this.mailService.sendMail(sendMailDto);
      this.logger.log('Email sent successfully');
    } catch (error) {
      this.logger.error('Error sending email:', error);
      throw error;
    }
  }

  @EventPattern('file-upload')
  async handleFileUpload(
    @Payload()
    data: {
      originalname: string;
      mimetype: string;
      buffer: string;
    },
  ) {
    this.logger.log('Processing file upload:', data.originalname);
    try {
      // Uncomment when ready to implement
      // await this.uploadsService.uploadFile(data);
      this.logger.log('File uploaded successfully');
    } catch (error) {
      this.logger.error('Error uploading file:', error);
      throw error;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Consumer Module

// src/consumer/consumer.module.ts
import { Module } from '@nestjs/common';
import { ConsumerController } from './consumer.controller';
import { MailModule } from '../mail/mail.module';

@Module({
  imports: [MailModule],
  controllers: [ConsumerController],
})
export class ConsumerModule {}
Enter fullscreen mode Exit fullscreen mode

6. 📧 DTOs and Interfaces

SendMail DTO

// src/mail/dto/sendMail.dto.ts
import { IsEmail, IsString, IsNotEmpty, IsOptional } from 'class-validator';

export class SendMailDto {
  @IsEmail()
  @IsNotEmpty()
  to: string;

  @IsString()
  @IsNotEmpty()
  subject: string;

  @IsString()
  @IsNotEmpty()
  body: string;

  @IsEmail()
  @IsOptional()
  from?: string;
}
Enter fullscreen mode Exit fullscreen mode

File Upload Interface

// src/upload/interfaces/file-upload.interface.ts
export interface FileUploadPayload {
  originalname: string;
  mimetype: string;
  buffer: string;
  size?: number;
  userId?: string;
}
Enter fullscreen mode Exit fullscreen mode

7. 🏗️ App Module Integration

// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ProducerModule } from './producer/producer.module';
import { ConsumerModule } from './consumer/consumer.module';
import { MailModule } from './mail/mail.module';
import rabbitmqConfig from './config/rabbitmq.config';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      load: [rabbitmqConfig],
    }),
    ProducerModule,
    ConsumerModule,
    MailModule,
  ],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

8. 🚀 Usage Example

File Upload Controller

// src/upload/upload.controller.ts
import { 
  Controller, 
  Post, 
  UploadedFile, 
  UseInterceptors, 
  Body 
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { ProducerService } from '../producer/producer.service';
import { SendMailDto } from '../mail/dto/sendMail.dto';

@Controller('upload')
export class UploadController {
  constructor(private readonly producerService: ProducerService) {}

  @Post('file')
  @UseInterceptors(FileInterceptor('file'))
  async uploadFile(@UploadedFile() file: Express.Multer.File) {
    if (!file) {
      throw new Error('No file provided');
    }

    await this.producerService.enqueueFileUpload({
      originalname: file.originalname,
      mimetype: file.mimetype,
      buffer: file.buffer.toString('base64'),
    });

    return { message: 'File queued for processing' };
  }

  @Post('email')
  async sendEmail(@Body() sendMailDto: SendMailDto) {
    await this.producerService.enqueueEmail(sendMailDto);
    return { message: 'Email queued for sending' };
  }
}
Enter fullscreen mode Exit fullscreen mode

Upload Module

// src/upload/upload.module.ts
import { Module } from '@nestjs/common';
import { UploadController } from './upload.controller';
import { ProducerModule } from '../producer/producer.module';

@Module({
  imports: [ProducerModule],
  controllers: [UploadController],
})
export class UploadModule {}
Enter fullscreen mode Exit fullscreen mode

9. 🔍 Health Check Implementation

// src/health/health.controller.ts
import { Controller, Get } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';

@Controller('health')
export class HealthController {
  constructor(private readonly configService: ConfigService) {}

  @Get('rabbitmq')
  async checkRabbitMQHealth() {
    try {
      const rabbitmqUrl = this.configService.get('rabbitmq.url');
      // Add actual connection test logic here
      return { 
        status: 'healthy', 
        timestamp: new Date().toISOString(),
        url: rabbitmqUrl
      };
    } catch (error) {
      return { 
        status: 'unhealthy', 
        error: error.message,
        timestamp: new Date().toISOString()
      };
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

10. 📝 Additional Notes

Production Considerations

  • Implement proper authentication and authorization for RabbitMQ
  • Set up persistent volume storage for message durability
  • Configure dead letter queues for failed message handling
  • Monitor queue sizes and processing times
  • Implement circuit breakers for fault tolerance
  • Use connection pooling for better performance

Security Best Practices

  • Use environment variables for sensitive configuration
  • Implement proper SSL/TLS certificates for production
  • Set up RabbitMQ user permissions and virtual hosts
  • Monitor and log all message processing activities

Performance Optimization

  • Configure appropriate prefetch counts for consumers
  • Use message acknowledgments properly
  • Implement batch processing for high-volume scenarios
  • Monitor memory usage and connection limits

11. 🔍 Troubleshooting

Common Issues

  1. Connection refused: Ensure RabbitMQ is running on the specified port
  2. Queue not found: Check queue names match between producer and consumer
  3. Message not processed: Verify consumer is properly registered and listening
  4. Authentication failed: Check RabbitMQ credentials and permissions
  5. Memory issues: Monitor queue sizes and implement proper acknowledgments

Debugging Tips

  • Enable debug logging for RabbitMQ connections
  • Use the management UI to monitor queue states
  • Check network connectivity between services
  • Verify environment variables are loaded correctly
  • Monitor application logs for connection errors

Useful Commands

# Check RabbitMQ status
docker exec rabbitmq rabbitmqctl status

# List queues
docker exec rabbitmq rabbitmqctl list_queues

# Check connections
docker exec rabbitmq rabbitmqctl list_connections

# Restart RabbitMQ service
docker restart rabbitmq
Enter fullscreen mode Exit fullscreen mode

Top comments (0)