DEV Community

Lucas Silva
Lucas Silva

Posted on

How to Instantly Track Message Broker Job Status Within the Same Request With NestJS and RabbitMQ

Suppose you have a distributed job system. When you create a job to be processed by workers, you want to return the result status if the job completes within an interval of 500 milliseconds or up to 10 seconds. This can be a challenging task. However, NestJS Microservices offer an easy way to achieve this.

Let's say you have a basic NestJS Microservice setup with an API and a RabbitMQ consumer. The project setup configuration would look like this:

Setting Up NestJS Microservices

// main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrapHttp() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}

async function bootstrapConsumer() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.RMQ,
      options: {
        urls: ['amqp://localhost:5672'],
        queue: 'cats_queue',
        queueOptions: {
          durable: false,
        },
      },
    },
  );

  await app.listen();
}

if (require.main === module) {
  if (process.env.MODE === 'consumer') {
    bootstrapConsumer();
  } else if (process.env.MODE === 'http') {
    bootstrapHttp();
  } else {
    throw new Error('Invalid mode');
  }
}

Enter fullscreen mode Exit fullscreen mode
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'cats_queue',
          noAck: true,
          queueOptions: {
            durable: false,
          },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Implementing the Controller

Initially, the controller might look like this:

// app.controller.ts
import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy, MessagePattern, Payload } from '@nestjs/microservices';

type EnqueuedJob = {
  status: 'enqueued';
  jobId: string;
};

type ProcessedJob = {
  status: 'processed';
  jobId: string;
};

type JobState = EnqueuedJob | ProcessedJob;

const sleep = async (ms: number) => {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
};

const generateRandomId = (): string => {
  return Math.random().toString(36).substring(7);
};

@Controller()
export class AppController {
  constructor(
    @Inject('JOB_QUEUE_SERVICE') private readonly client: ClientProxy,
  ) {}

  @Get()
  createJob(): JobState {
    const jobId = generateRandomId();

    this.client.send('process_job', { jobId }).subscribe();

    return {
      status: 'enqueued',
      jobId,
    };
  }

  @Get('state/:jobId')
  getJobState(jobId: string): JobState {
    return {
      status: 'processed',
      jobId,
    };
  }

  @MessagePattern('process_job')
  async getNotifications(@Payload() data: { jobId: string }) {
    console.log('Processing job', data.jobId);
    await sleep(10000);
    console.log('Processed job', data.jobId);
  }
}
Enter fullscreen mode Exit fullscreen mode

Running the Project

To run the project in consumer mode:

MODE=consumer yarn start
Enter fullscreen mode Exit fullscreen mode

To run the project in HTTP mode:

    MODE=http yarn start
Enter fullscreen mode Exit fullscreen mode

Now you can create a job by sending a GET request to the / endpoint.

curl http://localhost:3000
Enter fullscreen mode Exit fullscreen mode

With the subscribe function, the producer service awaits the consumer to process the message and reply to the messages auto generated id, not the one we created early. Look:

RabbitMQ Queue Message Props

The message at rabbitmq has a property called reply_to. This property is set on runtime at the producer, waiting for a signal. NestJS then manages to use this unique string to send a message (the consumer functions's serialized return data) back to the producer, using the callback queue which is named by the reply_to property and the correlation_id. The return message is then sent to the producer, and the producer can then handle the return message.

Modifying the Code for Synchronous Job Status Tracking

The createJob method is responsible for creating a job ID and sending it to the queue. The client then needs to poll the /state/ endpoint to check if the job is completed. However, using NestJS Microservices' ClientProxy interface, which is compatible with RxJS, we can handle callbacks more effectively.

Update the message sending code as follows:

this.client
  .send('process_job', { jobId })
  .subscribe({
    next(value) {
      console.log('Processed. Result:', value);
    },
  });

Enter fullscreen mode Exit fullscreen mode

Tracking Job Status Within the Same Request

To track the job status within the same request, set a timeout. If the job is not completed within the timeout, return a status of enqueued. If the job is completed within the timeout, return a status of processed.

Update the createJob method:

import { Subject, lastValueFrom } from 'rxjs';
//...

@Get()
createJob() {
  const jobId = generateRandomId();
  const maxWaitTimeMs = 1000;
  const jobStateSubject = new Subject<JobState>();

  // Set a timeout to return 'enqueued' if the job is not completed within the timeout
  const timeoutId = setTimeout(() => {
    jobStateSubject.next({
      status: 'enqueued',
      jobId,
    });
    jobStateSubject.complete();
  }, maxWaitTimeMs);

  this.client
    .send('process_job', { jobId })
    .subscribe({
      next(value) {
        // Clear the timeout and return 'processed' once the job is completed
        console.log('Received value', value);
        clearTimeout(timeoutId);
        jobStateSubject.next({
          status: 'processed',
          jobId,
        });
        jobStateSubject.complete();
      },
    });

  return lastValueFrom(jobStateSubject);
}
Enter fullscreen mode Exit fullscreen mode

You may need to adjust the timeout value based on your requirements.

Troubleshooting

The Producer Is Not Receiving the Response from the Consumer
Ensure that the consumer function returns a value. undefined is not valid. If you need to return nothing, return an empty object {} or null.

Top comments (0)