DEV Community

Cover image for Message Queue in Node.js
Bhavika Aggarwal
Bhavika Aggarwal

Posted on

Message Queue in Node.js

Hello, in this article , we are going to implement a message queue in node.js using BullMQ library which is built on the top of redis.

We will implement two message queues. One for adding refund task for a particular order. On successful completion of refund task, we will initiate notification task to notify user about refund completion. For notification task, we will use another queue.

Step 1: Setting up Project

Make a new folder "messaging_queue" and initialize it with npm init and add dependencies.

mkdir messaging_queue
Enter fullscreen mode Exit fullscreen mode
cd messaging_queue
Enter fullscreen mode Exit fullscreen mode
npm init
Enter fullscreen mode Exit fullscreen mode
npm i express bullmq -D
Enter fullscreen mode Exit fullscreen mode

Step 2: Implementation of Queue

Firstly, create a refundQueue.js file to write the code for implementing a refundQueue and a function to add refund tasks to refundQueue.

const { Queue } =require("bullmq")

const refundQueue = new Queue("refund-queue",{
    connection:{
        host:"127.0.0.1",
        port:"6379"
    }
})

async function addRefundTask(order){
    const response = await refundQueue.add(`refund-to-${order.id}`,{
        amount:order.amount,
        id:order.id,
        user_id:order.user_id
    })

    console.log("Job added to refundQueue with id: ",response.id)
}

exports.addRefundTask=addRefundTask
Enter fullscreen mode Exit fullscreen mode

Now, create a notificationQueue.js file to write the code for implementing a notificationQueue and a function to add notification tasks to notificationQueue.

const { Queue } =require("bullmq")

const notificationQueue = new Queue("notification-queue",{
    connection:{
        host:"127.0.0.1",
        port:"6379"
    }
})

async function addNotificationTask(user_id,order_id){
    const response = await notificationQueue.add(`notification-to-${user_id}`,{
        order_id:order_id,
        user_id:user_id
    })

    console.log("Job added to notificationQueue with id: ",response.id)
}

exports.addNotificationTask=addNotificationTask

Enter fullscreen mode Exit fullscreen mode

Step 3: Refund and Notification Process

Now, create two files refundProcess.js and notificationProcess.js to write the code for implementation of refund and notification process.

refundProcess.js

const refundComplete=()=>new Promise((res,rej)=>setTimeout(()=>res(),4*1000));

async function refundProcess(id,amount,user_id){
    console.log(`Refund for order ${id} has started`);
    console.log(`Refund Amount: ${amount}`)
    console.log(`User ID: ${user_id}`)
    await refundComplete()
    console.log("Refund Completed Successfully!")
}

exports.refundProcess=refundProcess

Enter fullscreen mode Exit fullscreen mode

notificationProcess.js

const notificationComplete=()=>new Promise((res,rej)=>setTimeout(()=>res(),2*1000));

async function notificationProcess(user_id,order_id){
    console.log(`Notify user ${user_id} about refund for order ${order_id}`)
    await notificationComplete()
    console.log("Notification Sent Successfully!")
}

exports.notificationProcess=notificationProcess

Enter fullscreen mode Exit fullscreen mode

Step 4: Implementation of Worker

Now, create a new file worker.js to implement refundWorker and notificationWorker.

const { Worker } =require("bullmq")
const { refundProcess } =require("./refundProcess")
const { notificationProcess } =require("./notificationProcess")

const refundWorker = new Worker("refund-queue", async(job)=>{
    console.log(`Refund Job ${job.id} started`)
    await refundProcess(job.data.id,job.data.amount,job.data.user_id)
})

const notificationWorker = new Worker("notification-queue", async(job)=>{
    console.log(`Notification Job ${job.id} started`)
    await notificationProcess(job.data.user_id,job.data.order_id)
})

exports.refundWorker= refundWorker
exports.notificationWorker=notificationWorker

Enter fullscreen mode Exit fullscreen mode

Step 5: Express Server

Now, in index.js file , write code for implementation of express server.

const express=require('express')
const { addRefundTask } = require("./refundQueue")
const { refundWorker } = require("./worker")
const { addNotificationTask } = require('./notificationQueue')
const { notificationWorker }=require('./worker')

async function init(){
    const app=express()

    const PORT = 8000;

    const order1={
        id:"order1",
        amount:4000,
        user_id:"user1"
    }

    const order2={
        id:"order2",
        amount:10000,
        user_id:"user2"
    }

    app.listen(PORT,()=>{
        console.log("Server running at port 8000")
    })

    await addRefundTask(order1);
    await addRefundTask(order2)



    refundWorker.on('completed', async(job) => {
        console.log(`Refund Job ${job.id} has completed!`);
        await addNotificationTask(job.data.user_id,job.data.id);
    });

    refundWorker.on('failed', (job, err) => {
        console.log(`Refund Job ${job.id} has failed with ${err.message}`);
    });

    notificationWorker.on('completed', (job) => {
        console.log(`Notification Job ${job.id} has completed!`);
    });

    notificationWorker.on('failed', (job, err) => {
        console.log(`Notification Job ${job.id} has failed with ${err.message}`);
    });
}

init()

Enter fullscreen mode Exit fullscreen mode

Here, we will add two orders to refundQueue.
We are using two event listeners 'completed' and 'failed' for both refundWorker and notificationWorker. On successful completion of refund task, a notification task is added to notificationQueue.

Step 6: Docker setup

We need a redis server running in local computer to run the code of BullMQ. So, we will use docker for that. Ensure docker is installed in your system. And, create a docker-compose.yml file.

version : '3.4'

services:
  redis:
    container_name : redis-server
    image : redis
    ports:
      - 6379:6379
    stdin_open : true

Enter fullscreen mode Exit fullscreen mode

Now, start the redis container using following command

docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Now, we can run our express server

node index.js
Enter fullscreen mode Exit fullscreen mode

Here, is the output

Output Image
Github Repository Link

Thank you for reading the article !

Top comments (1)

Collapse
 
sidharrth profile image
Siv Deploys

As is github code Compile/Run Suggestions:

  • Start script missing. "scripts": { "start": "node index.js", "test": "echo \"Error: no test specified\" && exit 1" },

As is github code Run Results:
BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker and QueueEvents without providing explicitly a connection or connection options is deprecated. This behaviour will be removed in the next major release