DEV Community

Cover image for Bring Redux to your queue logic: an Express setup with ES6 and bull queue
Hugo Di Francesco
Hugo Di Francesco

Posted on • Originally published at codewithhugo.com on

Bring Redux to your queue logic: an Express setup with ES6 and bull queue

There always comes a point in a web application's life where an operation is best served in the background, this is where queues come in.

There are a few queuing solutions in Node. None of them are ridiculously dominant, eg. Kue, RSMQ, Bee Queue, bull.
The issue with Kue, RSMQ and Bee Queue was its use of a done callback as the recommended API.

Bull https://github.com/OptimalBits/bull is a premium Queue package for handling jobs and messages in NodeJS. Itโ€™s backed by Redis and is pretty feature-rich. Most of all, it leverages a Promise-based processing API which means async/await.

Weโ€™ll walk through an application that sends webhooks with a given payload to a set of URLs.

You can find the full code content at https://github.com/HugoDF/express-bull-es6.

This was sent out on the Code with Hugo newsletter last Monday.
Subscribe to get the latest posts right in your inbox (before anyone else).

An Express application with Redis and a worker ๐Ÿƒโ€โ™€๏ธ

We'll start with a Node/Redis/Express setup using docker-compose (a full walkthrough can be found at
https://codewithhugo.com/setting-up-express-and-redis-with-docker-compose/),
the application will be written using ES modules (by using the esm package).

To begin we'll use the following docker-compose.yml:

version: '2'
services:
    app:
        build: .
        container_name: my-app
        environment:
            - NODE_ENV=development
            - PORT=3000
            - REDIS_URL=redis://my-cache
        command: "sh -c 'npm i && npm run dev'"
        volumes:
            - .:/var/www/app
        links:
            - redis
        ports:
            - "3000:3000"

    worker:
        build: .
        container_name: my-worker
        environment:
            - NODE_ENV=development
            - PORT=3000
            - REDIS_URL=redis://my-cache
        command: "sh -c 'npm i && npm run worker:dev'"
        volumes:
            - .:/var/www/app
        links:
            - redis

    redis:
        image: redis
        container_name: my-cache
        expose:
            - "6379"
Enter fullscreen mode Exit fullscreen mode

We'll also need a package.json as follows:

{
  "name": "express-bull-es6",
  "version": "1.0.0",
  "description": "An Express setup with Redis, bull and ES6",
  "main": "server.js",
  "scripts": {
    "start": "node -r esm server.js",
    "dev": "nodemon -r esm server.js",
    "worker": "node -r esm worker.js",
    "worker:dev": "nodemon -r esm worker.js"
  },
  "author": "Hugo Di Francesco",
  "license": "MIT",
  "dependencies": {
    "esm": "^3.0.67",
    "express": "^4.16.3",
    "nodemon": "^1.18.1"
  }
}
Enter fullscreen mode Exit fullscreen mode

A server.js:


import express from 'express';

const app = express();

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});
Enter fullscreen mode Exit fullscreen mode

And a worker.js:

console.log('Worker doing nothing');
Enter fullscreen mode Exit fullscreen mode

Running the following at the command line should get us some output (after a bit if the dependencies need to install):

$ docker-compose up
Enter fullscreen mode Exit fullscreen mode

Eventually:

my-worker | [nodemon] 1.18.1
my-worker | [nodemon] to restart at any time, enter `rs`
my-worker | [nodemon] watching: *.*
my-worker | [nodemon] starting `node -r esm worker.js`
my-app    | [nodemon] 1.18.1
my-app    | [nodemon] to restart at any time, enter `rs`
my-app    | [nodemon] watching: *.*
my-app    | [nodemon] starting `node -r esm server.js`
my-worker | Worker doing nothing
my-app    | Server listening on port 3000
Enter fullscreen mode Exit fullscreen mode

Setting up bull ๐Ÿฎ

Next, we'll want to add bull to set up somes queues.
We'll also set up bull-arena as a web UI to monitor these queues.

First install bull and bull-arena:

npm i --save bull bull-arena
Enter fullscreen mode Exit fullscreen mode

Let's create some queues in a queues.js file:

import Queue from 'bull';

export const NOTIFY_URL = 'NOTIFY_URL';

export const queues = {
  [NOTIFY_URL]: new Queue(
    NOTIFY_URL,
    process.env.REDIS_URL
  )
};
Enter fullscreen mode Exit fullscreen mode

And update server.js to include the bull-arena UI and import the NOTIFY_URL queue.

import url from 'url';

import express from 'express';
import Arena from 'bull-arena';

import { queues, NOTIFY_URL } from './queues';

const app = express();


function getRedisConfig(redisUrl) {
  const redisConfig = url.parse(redisUrl);
  return {
    host: redisConfig.hostname || 'localhost',
    port: Number(redisConfig.port || 6379),
    database: (redisConfig.pathname || '/0').substr(1) || '0',
    password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined
  };
}

app.use('/', Arena(
  {
    queues: [
      {
        name: NOTIFY_URL,
        hostId: 'Worker',
        redis: getRedisConfig(process.env.REDIS_URL)
      }
    ]
  },
  {
    basePath: '/arena',
    disableListen: true
  }
));

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});
Enter fullscreen mode Exit fullscreen mode

On save we'll be able to open up http://localhost:3000/arena and see the following:

Screenshot of Arena in browser

Persisting webhook data with Redis

Accepting payloads and forwarding them on

The shape of our API will be the following:
A POST /webhooks endpoint that will accept a JSON POST body with a payload and a urls array, which will respond to the following request:

curl -X POST \
  http://localhost:3000/webhooks \
  -H 'Content-Type: application/json' \
  -d '{
    "payload": {
        "hello": "world"
    },
    "urls": [
        "http://localhost:3000/example",
        "http://localhost:3000/example"
    ]
}'
Enter fullscreen mode Exit fullscreen mode

A POST /webhooks/notify endpoint that will accept a JSON POST body with an id field, which will respond to a request like the following:

curl -X POST \
  http://localhost:3000/webhooks/notify \
  -H 'Content-Type: application/json' \
  -d '{
    "id": "e5d9f99f-9641-4c0a-b2ca-3b0036c2a9b3"
}'
Enter fullscreen mode Exit fullscreen mode

We'll also have a POST /example endpoint to check that our webhooks are actually being triggered.

This means we'll need body-parser:

npm install --save body-parser
Enter fullscreen mode Exit fullscreen mode

server.js will look like the following:

import url from 'url';
import express from 'express';
import bodyParser from 'body-parser';
import Arena from 'bull-arena';

import { queues, NOTIFY_URL } from './queues';

const app = express();

app.use(bodyParser.json());

app.post('/webhooks', (req, res, next) => {
  const { payload, urls } = req.body;
  res.json({
    payload,
    urls
  });
});

app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  res.sendStatus(200);
});

app.post('/example', (req, res) => {
  console.log(`Hit example with ${JSON.stringify(req.body)}`);
  return res.sendStatus(200);
});

function getRedisConfig(redisUrl) {
  const redisConfig = url.parse(redisUrl);
  return {
    host: redisConfig.hostname || 'localhost',
    port: Number(redisConfig.port || 6379),
    database: (redisConfig.pathname || '/0').substr(1) || '0',
    password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined
  };
}

app.use('/', Arena(
  {
    queues: [
      {
        name: NOTIFY_URL,
        hostId: 'Worker',
        redis: getRedisConfig(process.env.REDIS_URL)
      }
    ]
  },
  {
    basePath: '/arena',
    disableListen: true
  }
));

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});
Enter fullscreen mode Exit fullscreen mode

Persisting webhook data to Redis ๐Ÿ’พ

ioredis (a Redis client for Node) will be picked to leverage the fact that bull uses ioredis under the hood:

npm install --save ioredis
Enter fullscreen mode Exit fullscreen mode

To generate unique identifiers we'll also install the uuid package:

npm install --save uuid
Enter fullscreen mode Exit fullscreen mode

A new module, db.js looks like the following:

import Redis from 'ioredis';
import { v4 as uuidV4 } from 'uuid';

const redis = new Redis(process.env.REDIS_URL);

const WEBHOOK_PREFIX = 'webhook:';
const PAYLOAD_PREFIX = `${WEBHOOK_PREFIX}payload:`;
const URLS_PREFIX = `${WEBHOOK_PREFIX}urls:`;

const makePayloadKey = id => `${PAYLOAD_PREFIX}${id}`;
const makeUrlsKey = id => `${URLS_PREFIX}${id}`;

async function setWebhook(payload, urls) {
  const id = uuidV4();
  const transaction = redis.multi()
    .hmset(makePayloadKey(id), payload)
    .lpush(makeUrlsKey(id), urls)
  await transaction.exec();
  return id;
}

async function getWebhook(id) {
  const transaction = redis.multi()
    .hgetall(makePayloadKey(id))
    .lrange(makeUrlsKey(id), 0, -1);
  const [[_, payload], [__, urls]] = await transaction.exec();
  return {
    payload,
    urls
  };
}

export const db = {
  setWebhook,
  getWebhook
};
Enter fullscreen mode Exit fullscreen mode

Payloads and URLs are modelled as webhook:payload:<some-uuid> and webhook:urls:<some-uuid> respectively.

Payloads are Redis hashes (since the payload is a JSON object), and URLs are Redis lists (since we're dealing with a list of strings).

We run into an issue whereby we want to make sure we're setting/getting the payload and urls at the same time, hence the use of multi().

multi allows us to build transactions (operations that should be executed atomically).
At this scale (no traffic ๐Ÿ˜„), considering we only every add (never update) and that we use UUIDs, we could just as well have not used transactions,
but we'll be good engineers and go ahead and use them anyways.

The more involved lines:

const transaction = redis.multi()
  .hgetall(makePayloadKey(id))
  .lrange(makeUrlsKey(id), 0, -1);
const [[_, payload], [__, urls]] = await transaction.exec();
Enter fullscreen mode Exit fullscreen mode

Warrant an explanation:

  1. hgetall gets all the key-value pairs in the hash,
  2. lrange gets values of the list, when used with 1 as start and -1 as end, it gets the whole list
  3. const output = await multi().op1().op2().exec()

    • Sets output to an array of return values from op1, op2
    • In other words output = [ [ errorOp1, replyOp1 ], [ errorOp2, replyOp2 ] ]
    • In order to reflect this, we ignore errors (not such good practice) and only get the replies
    • A better solution would be to do:
    const [[errPayload, payload], [errUrls, urls]] = await transaction.exec();
    if (errPayload) {
      throw errPayload;
    }
    if (errUrls) {
      throw errUrls
    }
    

Saving POST data using the new db module

In server.js now looks like the following:

import url from 'url';
import express from 'express';
import bodyParser from 'body-parser';
import Arena from 'bull-arena';

import { db } from './db';
import { queues, NOTIFY_URL } from './queues';

const app = express();

app.use(bodyParser.json());

app.post('/webhooks', async (req, res, next) => {
  const { payload, urls } = req.body;
  try {
    const id = await db.setWebhook(payload, urls);
    return res.json({
      id
    });
  } catch (error) {
    next(error);
  }
});

app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  try {
    const { payload, urls } = await db.getWebhook(id);
    return res.sendStatus(200);
  } catch (error) {
    next(error);
  }
});

app.post('/example', (req, res) => {
  console.log(`Hit example with ${JSON.stringify(req.body)}`);
  return res.sendStatus(200);
});

function getRedisConfig(redisUrl) {
  const redisConfig = url.parse(redisUrl);
  return {
    host: redisConfig.hostname || 'localhost',
    port: Number(redisConfig.port || 6379),
    database: (redisConfig.pathname || '/0').substr(1) || '0',
    password: redisConfig.auth ? redisConfig.auth.split(':')[1] : undefined
  };
}

app.use('/', Arena(
  {
    queues: [
      {
        name: NOTIFY_URL,
        hostId: 'Worker',
        redis: getRedisConfig(process.env.REDIS_URL)
      }
    ]
  },
  {
    basePath: '/arena',
    disableListen: true
  }
));

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`)
});
Enter fullscreen mode Exit fullscreen mode

The main updates are:

app.post('/webhooks', async (req, res, next) => {
  const { payload, urls } = req.body;
  try {
    const id = await db.setWebhook(payload, urls);
    return res.json({
      id
    });
  } catch (error) {
    next(error);
  }
});
Enter fullscreen mode Exit fullscreen mode

and:

app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  try {
    const { payload, urls } = await db.getWebhook(id);
    return res.sendStatus(200);
  } catch (error) {
    next(error);
  }
});
Enter fullscreen mode Exit fullscreen mode

You'll notice that the POST /webhooks/notify handler still doesn't actually notify anything or anyone ๐Ÿ™ˆ.

Queuing jobs ๐Ÿญ

To queue jobs, we use the queue.add method and pass it what we want to appear in job.data:

queues[NOTIFY_URL].add({
  payload,
  url,
  id
});
Enter fullscreen mode Exit fullscreen mode

We want to send a request to each URL independently (that's sort of the point of the whole queue setup) which means we want:

app.post('/webhooks/notify', async (req, res, next) => {
  const { id } = req.body;
  try {
    const { payload, urls } = await db.getWebhook(id);
    urls.forEach(url => {
      queues[NOTIFY_URL].add({
        payload,
        url,
        id
      });
    });
    return res.sendStatus(200);
  } catch (error) {
    next(error);
  }
});
Enter fullscreen mode Exit fullscreen mode

Where the notable change is:

urls.forEach(url => {
  queues[NOTIFY_URL].add({
    payload,
    url,
    id
  });
});
Enter fullscreen mode Exit fullscreen mode

Now that we've done this, if we create a new webhook:

curl -X POST \
  http://localhost:3000/webhooks \
  -H 'Content-Type: application/json' \
  -d '{
        "payload": {
                "hello": "world"
        },
        "urls": [
                "http://localhost:3000/example",
                "http://localhost:3000/example"
        ]
}'
{"id":"5fc395bf-ca2f-4654-a7ac-52f6890d0deb"}
Enter fullscreen mode Exit fullscreen mode

{"id":"5fc395bf-ca2f-4654-a7ac-52f6890d0deb"} make sure to copy the id to input into the following command:

curl -X POST \
  http://localhost:3000/webhooks/notify \
  -H 'Content-Type: application/json' \
  -d '{
    "id": "5fc395bf-ca2f-4654-a7ac-52f6890d0deb"
}'
OK
Enter fullscreen mode Exit fullscreen mode

The jobs have been added to the queue, as we can check by opening bull-arena UI at http://localhost:3000/arena/Worker/NOTIFY_URL/waiting:

Screenshot of waiting jobs on the NOTIFY_URL queue

By clicking on one of the __default__ jobs, we can see the payload, urls and id are being passed in correctly:

Data content of job in queue

Processing jobs โš™๏ธ

We now want to actually process the queued jobs, ie ping some urls with some data.

To do that let's bring in axios as a HTTP client:

npm install --save axios
Enter fullscreen mode Exit fullscreen mode

Create a processors.js file:

import { NOTIFY_URL } from './queues';
import axios from 'axios';

export const processorInitialisers = {
  [NOTIFY_URL]: db => job => {
    console.log(`Posting to ${job.data.url}`);
    return axios.post(job.data.url, job.data.payload);
  }
}
Enter fullscreen mode Exit fullscreen mode

For some context, the reasons we've gone with a db => job => Promise type signature even though we don't need the DB currently is
to illustrate how I would pass the database or any other dependencies into the processorInitialiser.

Some other processor initialiser could look like the following:

const myOtherProcessorInitialiser = db => async job => {
  const webhook = await db.getWebhook(job.data.id);
  return Promise.all(
    webhook.urls.map(
      url => axios.post(url, webhook.payload)
    )
  );
};
Enter fullscreen mode Exit fullscreen mode

To finish off, we need to actually hook up the processors to the queue, that's done using queue.process, so in worker.js we will now have:

import { queues } from './queues';
import { processorInitialisers } from './processors';
import { db } from './db';

Object.entries(queues).forEach(([queueName, queue]) => {
  console.log(`Worker listening to '${queueName}' queue`);
  queue.process(processorInitialisers[queueName](db));
});
Enter fullscreen mode Exit fullscreen mode

We can test the webhooks work by creating one that points to http://localhost:3000/example, triggering it using /webhook/notify and checking the logs, something like:

my-worker | Posting to http://localhost:3000/example
my-app    | Hit example with {"hello":"world"}
my-worker | Posting to http://localhost:3000/example
my-app    | Hit example with {"hello":"world"}
Enter fullscreen mode Exit fullscreen mode

Some other stuff to do before you ship this ๐Ÿšข

We should really not be exposing the bull-arena UI to the public, so if you plan on using this setup in a hosted environment either do an:

if (process.env.NODE_ENV !== 'product') {
  // Bull arena logic
}
Enter fullscreen mode Exit fullscreen mode

Or add HTTP basic auth to it using a middleware of some sort.

You can read a more in-depth write up about using Docker Compose, Redis and Node/Express: https://codewithhugo.com/setting-up-express-and-redis-with-docker-compose/).

For more information about using esm, see: https://codewithhugo.com/es6-by-example-a-module/cli-to-wait-for-postgres-in-docker-compose/.

This was sent out on the Code with Hugo newsletter last Monday.
Subscribe to get the latest posts right in your inbox (before anyone else).

Michaล‚ Parzuchowski

Oldest comments (2)

Collapse
 
bugsenpai profile image
bugsenpai

really helpful tutorial

Collapse
 
hugo__df profile image
Hugo Di Francesco

Happy to help :)