In a previous article, I talked about how to run background tasks/jobs in Node.js (with the new worker_threads module in particular). But what happens if you are reaching the limits of the machine your Node.js instance is running in? Then you need to either move to a bigger machine (known as scaling vertically)or scale horizontally. Scaling vertically always has a limit, so at some point, you’ll need to scale horizontally.
But how? If your app is, for example, a web server that needs to send responses almost immediately, then you need something like a load balancer. In contrast, if your app needs to do work but it is not required to be done immediately, then you can spread the work to “worker” nodes and distribute it using queues.
Some use cases include generating daily reports, recalculating things for users on a daily basis (e.g. recommendations), processing things a user has uploaded (e.g a large csv file, importing data when a user migrates to a service, importing data when the user signs in).
A distributed queue is like storage of job descriptions that contain enough information to do the job, or enough information to figure out all of the things required to do the job. For example:
Usually, the main app (or any part of a more complex system), puts jobs into the queue. Other apps running in different machines are connected to the queue and receive those jobs. These consumers are able to process the job with the information received, or at least they are able to figure out all of the information they need and obtain it. This simple architecture has important benefits:
- Your app is divided now into two logic pieces that can be distributed in different machines
- You can scale from one to many workers without touching any code and without disrupting the execution of the main app. The queue takes care of sending the jobs to the workers through the network and in most implementations, takes care of sending the same job once to a worker
Note: Each vendor has its own jargon for queues (topics, channels), jobs (tasks, messages) and workers (consumers).
You might be thinking that you can implement this architecture yourself with your existent database and without adding complexity to the system. You can create a “jobs” table with two columns, an “id” primary key column and a “data” column with all of the job information. The main app just writes to the table and every X seconds the workers read from it to peek at the next job that is to be executed. In order to prevent other workers from reading the job, you make the operation in a transaction that also deletes the job from the table.
Voilá! Problem solved, right? Well, first of all, you are querying and waiting every X seconds. That’s not ideal, but could be okay in basic use cases. More importantly, the problem is, what happens if the worker crashes while processing the job? The job has already been deleted when it was pulled from the table and we cannot recover it… this (along with other things) is nicely solved by the libraries and services implemented for the matter and you don’t have to reinvent the wheel.
One great thing about queue systems is how they handle error scenarios. When you receive a job, this is not deleted from the queue, but it is “locked” or invisible to the rest of the workers until one of these happens, either the worker deletes it after the work is done, or there is a timeout that you can configure. So, if a worker crashes, the timeout happens and the job goes back to the queue to be consumed by other workers. When everything is fine, the worker just deletes the job once the data is processed.
That is great if the problem was in the worker (the machine was shut down, ran out of resources,etc…) but what if the problem is in the code that processes the jobs, and every time the queue sends it to a worker, the worker crashes?
Then we are in an infinite loop of failures, right? Nope, distributed queues usually have a configuration option to set a maximum number of retries. If the maximum number of retries is reached then depending on the queue you can configure different things. A typical adjustment is moving those jobs to a “failure queue” for manual inspection or to consume it for workers that just notify errors.
Not only are distributed queue implementations great for handling these errors but also, they use different mechanisms to send jobs to workers as soon as possible. Some implementations use sockets, others use HTTP long polling, and others might use other mechanisms. This is an implementation detail, but I want to highlight that is not trivial to implement, so you better use existing and battle-tested implementations rather than implementing your own.
Many times I find myself wondering what to put in the job data. The answer depends on your use case, but it always boils down to two principles:
- Don’t put too much. The amount of data you can put in the job data is limited. Check the queuing system you are using for more information. Usually, it’s big enough that we won’t reach the limit, but sometimes we are tempted to put too much. For example, if you need to process a big CSV file, you cannot put it in the queue. You’ll need to upload it first to a storage service and then create a job with a URL to the file and additional information you need such as the user that uploaded it, etc.
- Don’t put too little. If you have immutable data (e.g. a createdAt date) or data that rarely changes (e.g. usernames) you can put it in your job data. The job should be processed in a matter of seconds or minutes so usually, it is ok to put some data that might change, like a user name, but it is not critical if it’s not updated to the second. You can save queries to the database, or remove any query completely. However, if there’s information that affects how the data is processed, you should query it inside the job processor.
If you need to process big sets of data, divide them into smaller pieces. If you have to process a big CSV file, first, divide it into chunks of a certain number of rows and create a job per chunk. There are a few benefits of doing it this way:
- The data will be processed faster because it can be processed in parallel
- You make better use of your resources. It’s better to have N workers doing smaller jobs than having one worker doing heavy processing while the rest are idle or underused
- It’s also faster and more efficient to retry a small job that has failed as opposed to a big job that has failed
If you need an aggregated result from all of those small chunks you can put all of the intermediate results in a database, and when they are all done you can trigger a new job in another queue that aggregates the result. This is map/reduce in essence. “Map” is the step that divides a large job into smaller jobs and then “reduce” is the step that aggregates the result of those smaller jobs.
If you cannot divide your data beforehand you should still do the processing in small jobs. For example, if you need to use an external API that uses cursors for paginating results, calculating all of the cursors beforehand is impractical. You can process one page of results per job and once the job is processed you get the cursor to the next page and you create a new job with that cursor, so the next job will process the next page and so on.
Another interesting feature of distributed queues is that you can usually delay jobs. There’s normally a limit on this so you cannot delay a job for two years, but there are some use cases where this is useful. Some examples include:
- You want to send a welcome email to a user that signed up but not immediately just at a later time. Just create a delayed job that sends an email
- When processing a job you hit a rate limit from an API. You will probably be told when the rate limit ends so you can put the job back to the queue, but delayed that specific time
- In general, if you want to trigger something at a specific time in the future such as schedule a backup, a notification, a reminder, etc…
Most queue implementations do not guarantee the order of execution of the jobs, so don’t rely on that. However, they usually implement some way of prioritizing some jobs over others. This depends highly on the implementation, so take a look at the docs of the system you are using to see how you can achieve it if you need to.
Let’s look at some examples. Even though all queuing systems have similar features there’s not a common API for them, so we are going to see a few different examples.
Kue is a nice library developed by Automattic (the company behind Wordpress) that implements a queuing system on top of Redis. Redis is an in-memory database that can be persisted and many times is already being used for things like session storage in your application. For this reason, choosing this library can be a no-brainer. Besides, even if you are not using Redis yet, there are a few cloud providers that allow you to spin up a managed Redis server easily (e.g. Heroku or AWS). Finally, another benefit of using kue is that your stack is 100% open source so you don’t fall into any vendor lock-in.
If you need to handle a lot of work and you still want an open source solution, then I would choose RabbitMQ. I haven’t chosen it for the examples in this article because Redis is usually easier to set up and more common. However RabbitMQ has been designed specifically for these use cases, so by design, it’s technically superior.
Let’s see how to create and consume jobs using kue.
Create the queue and put a job on it:
Consume jobs from the queue:
Microsoft Azure offers two queue services. There’s a great comparison here. I’ve chosen to use Service Bus because it guarantees that a job is delivered at most to one worker.
Let’s see how to create and consume jobs using Service Bus.
With Microsoft Azure we can create the queue programmatically with the createTopicIfNotExistsmethod. Once it is created, we can start sending messages:
Some implementations, like this one, are required to create a subscription. Check out the Azure docs for more information on this topic:
The Amazon distributed queue service is called Simple Queue Service (SQS). It can be used directly but it is also possible to configure it with other AWS services for doing interesting workflows. For example, you can configure an S3 bucket to automatically send jobs to an SQS queue when a new file (object) is stored. This, for example, can be useful to process files easily (videos, images, CSVs,…).
Let’s see how we can programmatically add and consume jobs on a queue.
Create the queue and put a job on it:
Consume jobs from the queue:
Check the Node.js docs on SQS for more information.
Google Cloud, like Azure, also requires to create subscriptions (see the docs for more information). In fact, you need to create the subscription first, before sending messages to the topic/queue or they will not be available.
The documentation suggests to create both the topic and the subscription from the command line:
gcloud pubsub topics create queue_name
gcloud pubsub subscriptions create subscription_name --topic queue_name
Nevertheless, you can also create them programmatically, but now let’s see how to insert and consume jobs assuming that we have already created the queue (topic) and the subscription.
Create the queue and put a job on it:
Google Cloud Pub/Sub guarantees that a message/job is delivered at least once for every subscription, but the message could be delivered more than once (as always, check the documentation for more information):
Distributed queues are a great way of scaling your application for a few reasons:
- They allow you to divide your application into logical pieces that can be scaled individually and gracefully
- They have solid mechanisms to handle errors gracefully
- They provide other interesting features such as delayed jobs and prioritization
- There are many services with similar functionalities and also open source libraries that you can use without worrying about vendor lock-in
Plug: LogRocket, a DVR for web apps
LogRocket is a frontend logging tool that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.