DEV Community

Vitor Braggion
Vitor Braggion

Posted on

Prioritizing Concurrent Requests: Queuing system to handle distributed processes and messages with NodeJS and Bull

Hey devs! How's it going?

My name is Vitor Braggion, I'm the Co-Founder and responsible for the technology at ufrilla.

A while ago, we had a little technical issue.

For those who don't know, ufrilla is a startup that connects people who want to work as freelancers in the events industry with event producers, in a very simple way. Besides helping these producers manage and operate everything, which is a real headache.

The problem was this: The client opens a specific number of positions on our platform, and freelancers show interest in working. Then the manager selects the people they want to work at their event, and the first ones to confirm will actually work. For example: If the company opens 5 Bartender positions, they can select from over 100 interested people, but only the first 5 to confirm will work.

As the number of users started growing significantly (+35k users at the time), there were often freelancers confirming for the position at the same time, resulting in simultaneous requests. To check if there are still available positions, we have to check the number of confirmed people in the database and verify if the positions have been filled. If there's still availability, we confirm their participation. Now imagine several of these requests being processed at the same time, or in a very short interval, it's like multiple requests checking for availability at the same time, and upon verification, all find availability. So after confirming availability, they all confirm participation in the position.

For example: We have 1 position available, and 5 people confirm participation simultaneously. So we have 5 different requests to confirm the position, but all these requests check in database that there is availability, all at the same time. Since they all verified availability, they will all confirm participation. In the end, all five freelancers will be confirmed for one position, instead of just one person.

This problem gave our team a headache and probably for the freelancers as well, as we had to cancel with the freelancers who were already planning their entire day (or should have been, haha) to work, and still manually.

The solution I found to solve this problem was to implement queues in the API endpoint to confirm participation in the position. While one request was being processed, the others would be in line waiting for the processing of the current request to be completed before being processed. Following the queue rule, which is priority over arrival order (First in, first out - FIFO).

To facilitate understanding of the problem, the solution, and to be able to apply it in various contexts, I will create a very simple example. Let's solve the following: we have to retrieve a value from the database and add +1 to this value and save it again. For example: If the number in the database starts with 0 (zero) and the API receives a thousand requests, then in the end, the number in the database will be a thousand. But what if these requests are simultaneous? Will the final value be correct?

Let's start implementing this solution without queues and see what happens. But before that, I'll provide the database modeling and script for us to send several simultaneous requests to the API.

Note: I created a NodeJS API to receive and process requests, with endpoints to 'add +1' with queue and without queue. I won't show the code about the API architecture here, as it's not the focus, but rather the key code about our solution. If you want to view the entire code, I'll provide the GitHub link.

DATABASE MODELING

database modeling

CODE TO SEND SO MULTIPLE REQUESTS SIMULTANEOUSLY

const axios = require("axios"); // package to send our requests
const host = "http://localhost:3000/api/count";
const endpointWithQueue = `${host}/add-queue`; // endpoint with queue
const endpointWithoutQueue = `${host}/sum`; // endpoint without queue
const nReqs = 500; // number of the requests to send
const reqs = []; // arrau to put our requests
// Preparing the requests array
for (let i = 0; i < nReqs; i++) {
  reqs.push(axios.post(endpointWithQueue, { sum: 1 })); // altere qual endpoint você quer testar, com fila ou sem fila.
}
// Sending all requests simultaneosly
Promise.all(reqs).then(
  (_) => console.log("SUCESSO! Todas as requisições foram enviadas."),
  (err) => console.log(err)
);
Enter fullscreen mode Exit fullscreen mode

SOLUTION WITHOUT QUEUE

API endpoint to call the method 'sum +!1’:

router.post('/sum', (req, res) => {
    controller.sum(req, res)
});
Enter fullscreen mode Exit fullscreen mode

Method to sum +1 in the column 'sum’ from database:

const { Count } = require("./../../config/models");
exports.sum = async (req, res) => {
  let { sum } = req.body;
  this._sum(sum)
    .then((_) => res.sendStatus(200))
    .catch((err) => res.sendStatus(500));
};
exports._sum = async (sum) => {
  const myCount = await Count.findOne({ where: { id: 1 } });
  sum = myCount.sum + sum;
  return Count.update({ sum }, { where: { id: 1 } }).then(
    (rows) => {
      console.log(`${myCount.sum} + 1 = ${sum}`);
      return rows;
    },
    (err) => {
      console.log(err);
      throw err;
    }
  );
};
Enter fullscreen mode Exit fullscreen mode

When sending multiple simultaneous requests to this endpoint without a queue, you will notice that the value in the database will be completely different from what we expected. Since we sent 500 simultaneous requests, we expected the value "500" in the database, but the value remained only "1".

solution without a queue

SOLUTION WITH QUEUE

To implement the solution with a queue, I used a package called 'Bull' (https://github.com/OptimalBits/bull). It's a library that helps with distributed job control, providing some very useful solutions for this type of work, such as background job processing, queues with priorities (FIFO, LIFO, and others), among other features. 'Bull' uses Redis for queue storage, so if your application crashes for any reason, once it's back online, it will continue executing the processes that are in the queue. In our case, we'll use the FIFO (First in, first out) queue solution, meaning priority based on arrival order.

Here's the code for the endpoints and the queue processor:

const { Router } = require("express");
const controller = require("./controller");
const router = new Router();
const Bull = require("bull");
const Queue = new Bull("Queue", { redis: { port: 6379, host: "redis" } });
router.post("/add-queue", (req, res) => {
  Queue.add({ ...req.body });
  return res.sendStatus(200);
});
router.post("/sum", (req, res) => {
  controller.sum(req, res);
});
Queue.process(async (job) => {
  const { sum } = job.data;
  return controller._sum(sum);
});
exports.router = router;
Enter fullscreen mode Exit fullscreen mode

When we send the 500 simultaneous requests again, we'll notice that now the value in the database will be correct. Because our application has organized the requests into a queue, now only one request will be executed at a time. Upon checking our API log, we'll notice that the process is happening in the background:

solution with a queue

GitHub: https://github.com/VitorBrangioni/http-requests-queue

This is the solution I found to solve this problem, I hope this content can help you. Then you just need to adapt this solution to the problem you are facing.

Give me feedback on what you think of this solution, whether it helped or not. But regardless, I did it from the heart!! 🙂

And of course... What can we improve on this? Do you know of a better solution? If so, share it with us and together we will get better. Because nothing is better than sharing knowledge 😉

Best regards and let's code,
Vitor Braggion.

Top comments (1)

Collapse
 
aminmansuri profile image
hidden_dude

Another way to accomplish the same thing is to do it with SQL like so:

UPDATE table SET sum = sum + 1

If it requires several statements you can use a Serializable transaction isolation which is the same as your queue does. But both in the case of you Queue and Serializable realize that you are severely reducing parallelism.