DEV Community

Cover image for How to use ZeroMQ Pub/Sub Pattern in Node.js
Francisco Mendes
Francisco Mendes

Posted on

How to use ZeroMQ Pub/Sub Pattern in Node.js

Overview

Pub/Sub is a pattern where the publisher is not programmed to send a message (payload) to a specific receiver. These messages are sent by publishers to specific channels, and receivers can subscribe to one or more channels to consume those same messages.

Imagine that you have a monolithic backend, however you want to add a new feature to that backend, such as sending emails. Instead of this backend being responsible for sending the emails, you can make it a publisher that sends the emails to a channel to be consumed by another backend (receiver) that will be responsible for sending the emails (like newsletters).

Today's example

The implementation of this process is quite simple, and that's why in today's example I decided to create a simple Api so that it will receive the body of our request and will send it to a specific channel to be consumed by a receiver and log it.

Let's code

As you may have already understood, we are going to have two backends. One of the backends we will call a server, which will be our message sender. The other backend will be the worker, which will be our small microservice.

First and foremost, let's install our dependencies:

npm install fastify zeromq --save
Enter fullscreen mode Exit fullscreen mode

Now let's create a simple API:

// @/server.js
const Fastify = require("fastify");

const app = Fastify();

app.post("/", (request, reply) => {
  return reply.send({ ...request.body });
});

const main = async () => {
  try {
    await app.listen(3000);
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Now we can import zeromq and create an instance of it. Then we will create our ZeroMQ socket of the Publisher type and we will accept connections through an address defined by us, however this is asynchronous and has to be done as soon as the application is started. Like this:

// @/server.js
const Fastify = require("fastify");
const zmq = require("zeromq");

const app = Fastify();
const sock = new zmq.Publisher();

app.post("/", async (request, reply) => {
  return reply.send({ ...request.body });
});

const main = async () => {
  try {
    await sock.bind("tcp://*:7890");
    await app.listen(3000);
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Now when sending the data from the request body we have to use the sock.send() function. Where we are going to pass a single argument which will be an array and this array needs two elements.

The first element is the channel to which we want to post the message and the second element is the respective message. This way we will send the data from the response body as our message but first we have to convert the JSON to string.

// @/server.js
const Fastify = require("fastify");
const zmq = require("zeromq");

const app = Fastify();
const sock = new zmq.Publisher();

app.post("/", async (request, reply) => {
  await sock.send(["dev.to", JSON.stringify({ ...request.body })]);
  return reply.send("Sent to the subscriber/worker.");
});

const main = async () => {
  try {
    await sock.bind("tcp://*:7890");
    await app.listen(3000);
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Now we can start working on our worker. Now let's import zeromq and create an instance of it. Then we will create our ZeroMQ socket of the Subscriber type and we will accept connections through the address that we defined before.

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Subscriber();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7890");
    // ...
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Now with an instance of our client created and the connection established, we can subscribe to our channel to receive messages from it.

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Subscriber();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7890");
    sock.subscribe("dev.to");
    // ...
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Next, let's create a for loop so we can log each of the messages that are published in the specific channel. From our socket we want two things, the topic (which is the channel where the message comes from) and the respective message. Let's not forget to parse the string message back to JSON.

// @/worker.js
const zmq = require("zeromq");

const sock = new zmq.Subscriber();

const main = async () => {
  try {
    sock.connect("tcp://localhost:7890");
    sock.subscribe("dev.to");
    for await (const [topic, msg] of sock) {
      console.log("Received message from " + topic + " channel and this is the content:");
      console.log(JSON.parse(msg));
    }
  } catch (err) {
    console.error(err);
    process.exit(1);
  }
};
main();
Enter fullscreen mode Exit fullscreen mode

Now when testing our Api with a tool similar to Postman, you can send a json object in the request body with the properties you want.

testing api

Then you should have something similar to this on your terminal:

terminal logs

Conclusion

As always, I hope you found it interesting. If you noticed any errors in this article, please mention them in the comments. 🧑🏻‍💻

Hope you have a great day! 🥸 ✌️

Top comments (6)

Collapse
 
kosm profile image
Kos-M

i'm not sure if is good practice to block loop with that for .
I saw that in the examples of zeromq too.
I could prefer better , events for example sokcer.on('msg', (msg, topic)=>{} )
Do anyone knows if events are possible there ?
Some examples/patterns uses events and some others uses these loops .

Collapse
 
franciscomendes10866 profile image
Francisco Mendes

Here is a list of events that you can use with the package used in this article and that is recommended by the zeromq documentation. -> bit.ly/3mzMXFH

Even though we're blocking the event loop with the for loop, I don't think we're compromising our application too much because each message is handled quickly. But yes, I would also prefer to use events.

Collapse
 
kosm profile image
Kos-M

yeah i know , this is for an example/demo . code should runs ok . I had in mind the integration of that in a more complex system , when i wrote comment above :)
By the way i'm trying to create a small lib/system to pass work from master to workers , with msg queues , to achieve distributed computing , its under development and enough buggy at the moment but you can take a look if you want :)
github.com/queue-xec

Thread Thread
 
franciscomendes10866 profile image
Francisco Mendes • Edited

I understood. I would only use ZeroMQ in personal projects and very small applications.

Otherwise I would use Redis in most cases and in more advanced projects I would use RabbitMQ.

Collapse
 
mnathani profile image
Murtaza Nathani

Wow.. this was explained amazingly..

Can you explain how does this work or maybe a link to mdn resources

for await ( [ ...] of sock )

Collapse
 
franciscomendes10866 profile image
Francisco Mendes • Edited

Thanks so much for the feedback! 😊

In the for loop I'm destructuring the socket, in this case the socket returns an array and I'm just getting the elements I want.

The For Of Loop -> bit.ly/3tLULaF
Array Destructuring -> bit.ly/3EnYXCf