DEV Community

Sebastian Iwanczyszyn
Sebastian Iwanczyszyn

Posted on

2 2 2 2

NestJS Distributed systems using service bus with @nestjstools/messaging

In today's fast-paced digital landscape, building scalable and resilient applications is paramount. As systems grow in complexity, the need for efficient communication between various services becomes crucial. Enter NestJS a powerful Node.js framework that leverages TypeScript to help developers create modular and maintainable server-side applications.

A service bus is a messaging infrastructure that facilitates communication between different components or services/microservices in a distributed system. It enables the decoupling of services, allowing them to communicate with each other asynchronously through messages instead of direct calls. This decoupling helps improve applications' scalability, fault tolerance, and maintainability.

MessageBusFlow

Instead of wiring, and configuring everything manually, what if you could have a single library that simplifies asynchronous and synchronous message handling? @nestjstools/messaging — offering an all-in-one solution with built-in support for buses, handlers, channels, and consumers. No need to build everything from scratch — just focus on your business logic while the library takes care of the rest!

Get Started

npm install -g @nestjs/cli // install nest CLI
nest new messaging_project // create project
cd messaging_project
npm i @nestjstools/messaging // install library
Enter fullscreen mode Exit fullscreen mode

Define Messaging module

@Module({
  imports: [
    MessagingModule.forRoot({
      buses: [
        {
          name: 'command.bus',
          channels: ['name-of-channel'],
        },
        {
          name: 'event.bus',
          channels: ['name-of-channel'],
        },
      ],
      channels: [
        new InMemoryChannelConfig({
          name: 'name-of-channel',
        }),
      ],
      debug: true,
    }),
  ],
  controllers: [AppController],
  providers: [
  ],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Create message & message handlers

Let’s say it will be our command & command handler

export class CreateUser {
  constructor(
    public readonly name: string,
  ) {
  }
}
Enter fullscreen mode Exit fullscreen mode
import { IMessageBus, IMessageHandler, MessageBus, MessageHandler, RoutingMessage } from '@nestjstools/messaging';

@MessageHandler('my_app_command.create_user')
export class CreateUserHandler implements IMessageHandler<CreateUser>{
  constructor(
    @MessageBus('event.bus') private readonly eventBus: IMessageBus,
  ) {
  }

  handle(message: CreateUser): Promise<object | void> {
    // TODO Logic there
    console.log(`Creating user... ${message.name}`);

    this.eventBus.dispatch(new RoutingMessage(new UserCreated(message.name), 'my_app_event.user_created'))

    return Promise.resolve();
  }
}
Enter fullscreen mode Exit fullscreen mode

Let’s say it will be our event & event handler

export class UserCreated {
  constructor(
    public readonly name: string,
  ) {
  }
}
Enter fullscreen mode Exit fullscreen mode
import { IMessageHandler, MessageHandler } from '@nestjstools/messaging';

@MessageHandler('my_app_event.user_created')
export class SendEmailOnUserCreatedHandler implements IMessageHandler<UserCreated>{
  constructor(private readonly sender: InMemoryEmailSender) {
  }

  handle(message: UserCreated): Promise<object | void> {
    //TODO Logic there
    console.log(`Sending email ... for ${message.name}`);  

    return Promise.resolve();
  }
}
Enter fullscreen mode Exit fullscreen mode

Define them as providers

@Module({
  imports: [
    MessagingModule.forRoot({
      ...
  controllers: [AppController],
  providers: [
    CreateUserHandler,
    SendSmsOnUserCreatedHandler,
  ],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Send the requests via our generated controller

import { Controller, Get } from '@nestjs/common';
import { CreateUser } from './application/command/create-user';
import { IMessageBus, MessageBus, RoutingMessage } from '@nestjstools/messaging';

@Controller()
export class AppController {
  constructor(
    @MessageBus('command.bus') private commandBus: IMessageBus, //TODO inject defined bus
  ) {}

  @Post()
  createUser(): string {
    this.commandBus.dispatch(new RoutingMessage(new CreateUser('John'), 'my_app_command.create_user'));

    return 'User created';
  }
}
Enter fullscreen mode Exit fullscreen mode

Let’s send a request as POST method to our endpoint at localhost:3000

and we should see it in the console:

Result CLI

Works! Let’s try integration with RabbitMQ

Install extension for messaging:
@nestjstools/messaging-rabbitmq-extension

and use docker to setup rabbitmq locally

//docker-compose.yaml

services:

  rabbitmq:
    image: rabbitmq:3.11.20-management-alpine
    ports:
      - 5672:5672
      - 15672:15672
Enter fullscreen mode Exit fullscreen mode
docker compose up -d
npm i @nestjstools/messaging-rabbitmq-extension
Enter fullscreen mode Exit fullscreen mode

Redefine your modules with a new config for rabbitmq

@Module({
  imports: [
    MessagingRabbitmqExtensionModule,
    MessagingModule.forRoot({
      buses: [
        {
          name: 'command.bus',
          channels: ['async-command'],
        },
        {
          name: 'event.bus',
          channels: ['async-event'],
        },
      ],
      channels: [
        new AmqpChannelConfig({
          name: 'async-command',
          connectionUri: 'amqp://guest:guest@localhost:5672/',
          exchangeName: 'my_app_command.exchange',
          bindingKeys: ['my_app_command.#'],
          exchangeType: ExchangeType.TOPIC,
          queue: 'my_app.command',
          avoidErrorsForNotExistedHandlers: false,
          middlewares: [],
          autoCreate: true,
        }),
        new AmqpChannelConfig({
          name: 'async-event',
          connectionUri: 'amqp://guest:guest@localhost:5672/',
          exchangeName: 'my_app_event.exchange',
          bindingKeys: ['my_app_event.#'],
          exchangeType: ExchangeType.TOPIC,
          queue: 'my_app.event',
          autoCreate: true,
          enableConsumer: true,
          avoidErrorsForNotExistedHandlers: true,
        }),
      ],
      debug: true,
    }),
  ],
  controllers: [AppController],
  providers: [
    CreateUserHandler,
    SendEmailOnUserCreatedHandler,
  ],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Now by sending the message you can see the results in the console:

ResultAMQPClI

As demonstrated, the consumers are actively processing messages from the RabbitMQ queue, ensuring seamless message handling within the system. The architecture allows for effortless expansion — additional services can be easily defined, and messages can be routed directly to dedicated handlers, enabling a highly scalable and decoupled microservices ecosystem.

I’m excited to share this library with you and help simplify distributed messaging in NestJS. With everything built in one place, handling asynchronous and synchronous messages has never been easier. I look forward to seeing how it enhances your microservices architecture — happy coding!

Here is an example of a repository that includes additional components, such as middleware Example project

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay