Overview
This guide provides a complete step-by-step process for integrating RabbitMQ in a NestJS application using Docker and the built-in @nestjs/microservices
package.
RabbitMQ is a robust message broker that facilitates asynchronous communication between microservices, promoting loose coupling, scalability, and fault tolerance.
1. 🐳 Setting Up RabbitMQ with Docker
Prerequisites
- Docker Desktop installed and running
- Basic knowledge of command line interface
Step 1: Pull and Run RabbitMQ Container
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Note: Port 5672 is used for RabbitMQ messaging, and 15672 is for the web-based management UI.
Step 2: Access RabbitMQ Management Dashboard
Open your browser and navigate to: http://localhost:15672
Login credentials:
- Username:
guest
- Password:
guest
Step 3: Stop/Restart Container (Optional)
# Stop container
docker stop rabbitmq
# Start container
docker start rabbitmq
2. 📦 Installing Required Packages
npm install @nestjs/microservices amqplib
npm install --save-dev @types/amqplib
3. ⚙️ Configuration Setup
Constants Definition
// src/constants.ts
export const FILE_UPLOAD_QUEUE = 'file-upload-queue';
export const EMAIL_SEND_QUEUE = 'email-send-queue';
Environment Configuration
Create a .env
file in your project root:
RABBITMQ_URL=amqp://localhost:5672
RABBITMQ_FILE_UPLOAD_QUEUE=file-upload-queue
RABBITMQ_EMAIL_SEND_QUEUE=email-send-queue
Configuration Module
// src/config/rabbitmq.config.ts
import { registerAs } from '@nestjs/config';
export default registerAs('rabbitmq', () => ({
url: process.env.RABBITMQ_URL || 'amqp://localhost:5672',
FILE_UPLOAD_QUEUE: process.env.RABBITMQ_FILE_UPLOAD_QUEUE || 'file-upload-queue',
EMAIL_SEND_QUEUE: process.env.RABBITMQ_EMAIL_SEND_QUEUE || 'email-send-queue',
}));
Main Application Configuration
// src/main.ts
import { NestFactory } from '@nestjs/core';
import { Transport } from '@nestjs/microservices';
import { ConfigService } from '@nestjs/config';
import { AppModule } from './app.module';
import { FILE_UPLOAD_QUEUE, EMAIL_SEND_QUEUE } from './constants';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const configService = app.get(ConfigService);
const microservices = [
{
transport: Transport.RMQ,
options: {
urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
queue: FILE_UPLOAD_QUEUE,
queueOptions: { durable: true },
},
},
{
transport: Transport.RMQ,
options: {
urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
queue: EMAIL_SEND_QUEUE,
queueOptions: { durable: true },
},
},
];
microservices.forEach((ms) => app.connectMicroservice(ms));
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
4. 📤 Producer Module Setup
Producer Module
// src/producer/producer.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ProducerService } from './producer.service';
import { EMAIL_SEND_QUEUE, FILE_UPLOAD_QUEUE } from '../constants';
@Module({
imports: [
ConfigModule,
ClientsModule.registerAsync([
{
name: FILE_UPLOAD_QUEUE,
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
transport: Transport.RMQ,
options: {
urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
queue: FILE_UPLOAD_QUEUE,
queueOptions: { durable: true },
},
}),
inject: [ConfigService],
},
{
name: EMAIL_SEND_QUEUE,
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
transport: Transport.RMQ,
options: {
urls: [configService.get('rabbitmq.url') || 'amqp://localhost:5672'],
queue: EMAIL_SEND_QUEUE,
queueOptions: { durable: true },
},
}),
inject: [ConfigService],
},
]),
],
providers: [ProducerService],
exports: [ProducerService],
})
export class ProducerModule {}
Producer Service
// src/producer/producer.service.ts
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { SendMailDto } from '../mail/dto/sendMail.dto';
import { lastValueFrom } from 'rxjs';
import { EMAIL_SEND_QUEUE, FILE_UPLOAD_QUEUE } from '../constants';
@Injectable()
export class ProducerService {
private readonly logger = new Logger(ProducerService.name);
constructor(
@Inject(FILE_UPLOAD_QUEUE) private readonly fileQueue: ClientProxy,
@Inject(EMAIL_SEND_QUEUE) private readonly emailQueue: ClientProxy,
) {}
async enqueueFileUpload(payload: {
originalname: string;
mimetype: string;
buffer: string;
}) {
try {
this.logger.log(`Enqueuing file upload: ${payload.originalname}`);
return await lastValueFrom(this.fileQueue.emit('file-upload', payload));
} catch (error) {
this.logger.error('Error enqueuing file upload:', error);
throw error;
}
}
async enqueueEmail(sendMailDto: SendMailDto) {
try {
this.logger.log(`Enqueuing email to: ${sendMailDto.to}`);
return await lastValueFrom(this.emailQueue.emit('email-send', sendMailDto));
} catch (error) {
this.logger.error('Error enqueuing email:', error);
throw error;
}
}
}
5. 📥 Consumer Module Setup
Consumer Controller
// src/consumer/consumer.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { MailService } from '../mail/mail.service';
import { SendMailDto } from '../mail/dto/sendMail.dto';
@Controller()
export class ConsumerController {
private readonly logger = new Logger(ConsumerController.name);
constructor(private readonly mailService: MailService) {}
@EventPattern('email-send')
async handleEmailSend(@Payload() sendMailDto: SendMailDto) {
this.logger.log('Processing email send request:', sendMailDto);
try {
// Uncomment when ready to implement
// await this.mailService.sendMail(sendMailDto);
this.logger.log('Email sent successfully');
} catch (error) {
this.logger.error('Error sending email:', error);
throw error;
}
}
@EventPattern('file-upload')
async handleFileUpload(
@Payload()
data: {
originalname: string;
mimetype: string;
buffer: string;
},
) {
this.logger.log('Processing file upload:', data.originalname);
try {
// Uncomment when ready to implement
// await this.uploadsService.uploadFile(data);
this.logger.log('File uploaded successfully');
} catch (error) {
this.logger.error('Error uploading file:', error);
throw error;
}
}
}
Consumer Module
// src/consumer/consumer.module.ts
import { Module } from '@nestjs/common';
import { ConsumerController } from './consumer.controller';
import { MailModule } from '../mail/mail.module';
@Module({
imports: [MailModule],
controllers: [ConsumerController],
})
export class ConsumerModule {}
6. 📧 DTOs and Interfaces
SendMail DTO
// src/mail/dto/sendMail.dto.ts
import { IsEmail, IsString, IsNotEmpty, IsOptional } from 'class-validator';
export class SendMailDto {
@IsEmail()
@IsNotEmpty()
to: string;
@IsString()
@IsNotEmpty()
subject: string;
@IsString()
@IsNotEmpty()
body: string;
@IsEmail()
@IsOptional()
from?: string;
}
File Upload Interface
// src/upload/interfaces/file-upload.interface.ts
export interface FileUploadPayload {
originalname: string;
mimetype: string;
buffer: string;
size?: number;
userId?: string;
}
7. 🏗️ App Module Integration
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ProducerModule } from './producer/producer.module';
import { ConsumerModule } from './consumer/consumer.module';
import { MailModule } from './mail/mail.module';
import rabbitmqConfig from './config/rabbitmq.config';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
load: [rabbitmqConfig],
}),
ProducerModule,
ConsumerModule,
MailModule,
],
})
export class AppModule {}
8. 🚀 Usage Example
File Upload Controller
// src/upload/upload.controller.ts
import {
Controller,
Post,
UploadedFile,
UseInterceptors,
Body
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { ProducerService } from '../producer/producer.service';
import { SendMailDto } from '../mail/dto/sendMail.dto';
@Controller('upload')
export class UploadController {
constructor(private readonly producerService: ProducerService) {}
@Post('file')
@UseInterceptors(FileInterceptor('file'))
async uploadFile(@UploadedFile() file: Express.Multer.File) {
if (!file) {
throw new Error('No file provided');
}
await this.producerService.enqueueFileUpload({
originalname: file.originalname,
mimetype: file.mimetype,
buffer: file.buffer.toString('base64'),
});
return { message: 'File queued for processing' };
}
@Post('email')
async sendEmail(@Body() sendMailDto: SendMailDto) {
await this.producerService.enqueueEmail(sendMailDto);
return { message: 'Email queued for sending' };
}
}
Upload Module
// src/upload/upload.module.ts
import { Module } from '@nestjs/common';
import { UploadController } from './upload.controller';
import { ProducerModule } from '../producer/producer.module';
@Module({
imports: [ProducerModule],
controllers: [UploadController],
})
export class UploadModule {}
9. 🔍 Health Check Implementation
// src/health/health.controller.ts
import { Controller, Get } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
@Controller('health')
export class HealthController {
constructor(private readonly configService: ConfigService) {}
@Get('rabbitmq')
async checkRabbitMQHealth() {
try {
const rabbitmqUrl = this.configService.get('rabbitmq.url');
// Add actual connection test logic here
return {
status: 'healthy',
timestamp: new Date().toISOString(),
url: rabbitmqUrl
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message,
timestamp: new Date().toISOString()
};
}
}
}
10. 📝 Additional Notes
Production Considerations
- Implement proper authentication and authorization for RabbitMQ
- Set up persistent volume storage for message durability
- Configure dead letter queues for failed message handling
- Monitor queue sizes and processing times
- Implement circuit breakers for fault tolerance
- Use connection pooling for better performance
Security Best Practices
- Use environment variables for sensitive configuration
- Implement proper SSL/TLS certificates for production
- Set up RabbitMQ user permissions and virtual hosts
- Monitor and log all message processing activities
Performance Optimization
- Configure appropriate prefetch counts for consumers
- Use message acknowledgments properly
- Implement batch processing for high-volume scenarios
- Monitor memory usage and connection limits
11. 🔍 Troubleshooting
Common Issues
- Connection refused: Ensure RabbitMQ is running on the specified port
- Queue not found: Check queue names match between producer and consumer
- Message not processed: Verify consumer is properly registered and listening
- Authentication failed: Check RabbitMQ credentials and permissions
- Memory issues: Monitor queue sizes and implement proper acknowledgments
Debugging Tips
- Enable debug logging for RabbitMQ connections
- Use the management UI to monitor queue states
- Check network connectivity between services
- Verify environment variables are loaded correctly
- Monitor application logs for connection errors
Useful Commands
# Check RabbitMQ status
docker exec rabbitmq rabbitmqctl status
# List queues
docker exec rabbitmq rabbitmqctl list_queues
# Check connections
docker exec rabbitmq rabbitmqctl list_connections
# Restart RabbitMQ service
docker restart rabbitmq
Top comments (0)