DEV Community

Cover image for Using Bull.js to manage job queues in a Node.js micro-service stack
Alec Brunelle
Alec Brunelle

Posted on • Updated on • Originally published at Medium

Using Bull.js to manage job queues in a Node.js micro-service stack

Want more great content like this? Sign up for my newsletter, visit: alec.coffee/signup

When switching to a micro-service oriented stack versus the ol' single monolith, new problems arise. The simple job processor of the past doesn't fit in this new architecture. We found Bull, a Node.js package, to cover our needs, but needed tweaks to work in this new world. Due to this module being open-source, I knew the improvements we made to it could be easily integrated into the main remote repo.

Goals

Let's say we want do some specialized work, scanning an image to extract text for instance. This is a situation where a job queue could come in handy, this work is being done in the background, away from a user facing interface.

  • Get image from user
  • Queue job with image attached
  • Job gets worked on
  • Job results are sent back to app database

Two popular packages in the wild which could help you do the forementioned work are DelayedJob and Celery. These allow you to manage jobs with a fast key-store like Redis. These assume the processing of the job and the job queue live in the same service. If you have one service performing a task, e.g. the image processor, and another service which acts as a job queue, we cannot use these traditional constructs.

This (Diagram 1)
single-service-with-jobs

versus

This (Diagram 2)
multiple-services-job-queue

A Solution

Me and my coworkers found ourselves in this situation and when searching for answers, we found Bull might suffice. Keeping it 2018, this Node.js package is lightning fast, built to work with Redis and has an active community. It didn't quite fit our needs at first as it processed jobs in the same app as the queue-mechanism, see diagram 1. This is fine for traditional apps, but for our setup we needed to manage jobs across systems (see diagram 2). We needed to make this work in an async fashion where the worker may not be in the same repo or service as the service running Bull itself.

We need to think about how we want to manage a jobs life-cycle. Good thing someone contributed a diagram quite recently to the projects Github.

Bull's Job Lifecycle Diagram
bull-lifecycle-diagram

Bull had a simple way to define the processing logic (refer to diagram 1), what a job does when in the active queue:

queue.process(async () => {
  doWork()
})
Enter fullscreen mode Exit fullscreen mode

This way, whenever a job came into a waiting queue, Bull knew how to process it and throw it to the completed queue. Right now, Bull managed all the state transitions on it's own, we need to switch to manual. You may be thinking, "to work in this new fashion, how about we just don't define this process method?", we tried this, and it worked!. Forward into the weeds we go.

but for our setup we needed to manage jobs across systems

After digging into the code more, Bull defines state transition methods on two simple objects, Job and Queue.

After researching, the methods to do manual state transitions were private. It means that the authors didn't write these methods to be used publicly. This makes sense as Bull was never designed to do what we want to do with it. What do we need to do to make these public? After some more digging, we found someone else trying to do the same thing as us.

The issue can be found here.
profession-hacker-gif

Just using the private functions as is would have been fine but we are professional developers.

I would recommend that you write a few unit tests specifically for testing the code using the private functions... - @manast

The maintainer had a great suggestion, write unit tests for the private functions. The next best thing for this would be to at least write documentation for the functions so that they are understood by the community and strengthened their viability to be used publicly. And that's what we did.

Open Source Bonus

For the actual pattern we described at the beginning (diagram 2), an addition to the reference docs were added to make this a viable pattern. Making this a known pattern encourages usage of the feature and possibly leads to other users finding issues when using in production. Typescript types were also available so we updated those as well. After using it for some time (processing approx. 500k jobs), we found a bug and were able to easily fix it using our extended knowledge of the package. Talk about bringing a third class feature to first class!

I am very happy with the outcome of the project as not only did we satisfy our requirements but also made open source contributions. This led to us understanding the packages internals and also led to us being able to easily add features for our use case. Having an active maintainer on the project who knew the ins and outside also made the entire process run smoothly.

Top comments (7)

Collapse
 
jdforsythe profile image
Jeremy Forsythe

We're doing something similar with microservices and workers processing jobs coming out of the queue. Our current architecture has one "producer" worker adding jobs to the queue and a "consumer" worker using the .process() method to grab the job when it's ready. Both of these workers are horizontally scalable. The "consumer" worker determines what type of job it is and hands it off to one of several workers (by calling a GRPC service) based on the job data.

Is there something in particular you ran into that made you decide to manually move jobs through the queue instead of letting Bull handle it? So far we haven't run into any issues and I hadn't anticipated any since we can easily add more consumer workers to handle any increased load, but I'd be interested in knowing what problems you ran into that led you to manually polling for jobs instead of using the built-in mechanisms.

Collapse
 
aleccool213 profile image
Alec Brunelle

Is there something in particular you ran into that made you decide to manually move jobs through the queue instead of letting Bull handle it?

For us, we had the need for workers (consumers in your instance) to be in any language or framework. That reason and also the need for horizontal scalability like you mentioned.

Producers for us could be anyone as the Bull Queue was hooked up to Kafka topics for different job types. As messages came into the topics, the queue pulled them and inserted the jobs into the queues.

Consumers pulled jobs from the Bull queue through an HTTP REST API which lived on in the Bull Queue.

This was a while ago and at a different company at which I am at currently but I wish I made the Kafka and HTTP REST code we wrote surrounding Bull open-source.

Collapse
 
mistersingh179 profile image
Mister Singh

hi @aleccool213 . I am new to bull and was hoping you can help me understand this.

Couldn't the worker be just ran in a separate process on a different server. As long as it can connect to the same redis instance, and knows the name of the queue, it will receive the jobs and then it can process them.

// worker.js

const Bull = require('Bull')
const myFirstQueue = new Bull('my-first-queue', 'redis://mypassword@myredis.server.com:1234')
myFirstQueue.process('*', (job, done) => {  done() })

// node worker.js

Is this not what is being shown in diagram 2, or are you accomplishing something different which i am not following?

Thanks

Collapse
 
aleccool213 profile image
Alec Brunelle

Good question. "micro-services" is a bloated term so I did assume a bit. When I talk about services, I mean they are completely separated, no shared anything, including redis instances. The pro of this approach is that any of these services (except the job queue of course) can be in written in any programming language. There are more pros to this approach, but to get those I will recommend you research micro-service architecture yourself 👍

Collapse
 
hmake98 profile image
Harsh Makwana

How can I remove or update the delayed queue in bullmq? It has an option like a drain but It'll delete all delayed queue contains by the queue service. Is there any option available where I can remove or update the queue array?

Collapse
 
satanman profile image
Dr. Pepper

Then you can easily wrap bull in an API for use with external systems.

It would be cool for people who're just starting off with microservices to see an example of this.

Collapse
 
aleccool213 profile image
Alec Brunelle

I agree. It would take some effort on my end to re-produce this unfortunately. I implemented this at a company I used to work at and we didn't make that code open-source (it totally should have been).