DEV Community

Cover image for How to use ZeroMQ Pipeline Pattern in Node.js
Francisco Mendes
Francisco Mendes

Posted on • Edited on

How to use ZeroMQ Pipeline Pattern in Node.js

Overview

I bet many of us have thought about decopulating a backend and splitting it into microservices. Let's say you have a monolithic backend and then you decide to add something like file processing and you'd rather have a microservice that has the sole function of processing files.

But let's assume that you want to process several files simultaneously instead of one at a time, in this case I believe it would be ideal to distribute the work among several microservices responsible solely and exclusively for processing files.

To distribute the work among the different applications we will need an intermediary and the most popular solution is the use of a message broker. However, not all of us need a solution as advanced as the use of a message broker, it is in these specific cases (smaller applications) that I like to use ZeroMQ.

If you don't know ZeroMQ, that's okay because it's a technology that isn't widely shared in the community, so if you want to know more about ZeroMQ, I recommend reading this article, which will give you a better introduction than me.

Today's example

The idea of today's example is to create a simple application (server) that will send multiple messages to another application/s (worker) that will be responsible for just logging those same messages.

Let's code

As you may have already understood, we are going to have two backends. One of the backends we will call a server, which will be our message sender. The other backend will be the worker, which will be our small microservice.

First and foremost, let's install our dependencies:

npm install zeromq --save
Enter fullscreen mode Exit fullscreen mode

Now we can start working on our server but first I have to explain the pattern we are going to use today.

The Pipeline pattern, also known as Push/Pull, allows you to distribute tasks among several workers evenly, which are arranged in a certain pipeline.

Now that you have a little idea we can start by importing our client and configuring it:

// @/server.js
const zmq = require("zeromq");

const sock = new zmq.Push();

const main = async () => {
  try {
    await sock.bind("tcp://*:7777");
    // ...
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Then let's create a for loop to send a total of one hundred messages to our worker/s. First let's log which message is being sent, then we'll send that same message using the sock.send() function.

But before sending the message we have to convert the integer number to string. Finally, to delay the sending of messages, let's add a timeout of five hundred milliseconds.

// @/server.js
const zmq = require("zeromq");

const sock = new zmq.Push();

const main = async () => {
  try {
    await sock.bind("tcp://*:7777");
    for (let job = 1; job <= 100; job++) {
      console.log(`Sending Job ${job}`)
      await sock.send(job);
      await new Promise((resolve) => setTimeout(resolve, 500));
    }
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Now we can start working on our worker. First let's import our client and configure it:

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Pull();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7777");
    // ...
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

In our worker we are going to add an infinite for loop so that it never stops its execution and what we are going to do is receive the messages from our server. Finally, let's log each of the messages that our worker receives.

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Pull();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7777");
    for await (const [msg] of sock) {
      console.log(`Received Job ${msg.toString()}`);
    }
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

The way we are going to test our project is very simple, we will have three windows in the terminal open and only one will be to start the server and the rest will be used by our workers. Like this:

terminal logs

As you can see from the gif, the workers ran at different times but the tasks were evenly distributed between them.

In the same way as when I stopped the execution of one of the workers and then started it again, the tasks were evenly distributed again without any problem.

Conclusion

As always, I hope you found it interesting. If you noticed any errors in this article, please mention them in the comments. 🧑🏻‍💻

Hope you have a great day! 🤩

Top comments (2)

Collapse
 
imthedeveloper profile image
ImTheDeveloper • Edited

I guess since file processing is seen as a long running task we may elect to run a queue here which could then have multiple workers pulling from it rather than a pub/sub model with balanced consumption.

I use a similar pattern to this pipeline when processing requests to an external API since durability is not required and the request/response time is very short.

Collapse
 
franciscomendes10866 profile image
Francisco Mendes

I fully agree because I would take a similar approach, but in this article I wanted to show that we don't need to know about queues, brokers, among other things. We simply install a client and start using it and progressively learn the rest. 🥳