DEV Community

Ranjan Purbey
Ranjan Purbey

Posted on

Background Jobs in SvelteKit with BullMQ

Is your SvelteKit app struggling with tasks like sending emails, resizing images, or processing data? With BullMQ, you can offload these heavy jobs to the background and keep your app lightning-fast. In this post, we’ll show you how to set it up and tackle real-world tasks like a pro. Let’s dive in!

tl;dr Set up BullMQ workers in hooks.server.js. Check out the example

What is BullMQ?

BullMQ is a Node.js library for creating and managing job queues with Redis. It helps you run time-consuming tasks in the background efficiently. With built-in features like retries, job scheduling, and concurrency control, BullMQ makes handling complex workflows in your app simple and reliable.

Step 1: Install dependencies

First, install ioredis (Redis client for node.js) and bullmq:

pnpm i -D ioredis bullmq
Enter fullscreen mode Exit fullscreen mode

Even though you can add jobs to a bullmq queue from a serverless environment like Vercel, the workers must run on a traditional long-lived node.js server. Hence, replace adapter-auto with adapter-node:

pnpm rm @sveltejs/adapter-auto && pnpm i -D @sveltejs/adapter-node
Enter fullscreen mode Exit fullscreen mode

Don't forget to update your Svelte config (svelte.config.js) with the newly installed node adapter.

Step 2: Setup Job Queue and Processor

Next, let's set up a BullMQ job queue and its processor. Create a .js file in the src/lib/server/ directory:

// src/lib/server/background-jobs.js

import { REDIS_URL } from "$env/static/private";
import { Queue, Worker } from "bullmq";
import IORedis from "ioredis";

const Q_NAME = "q";

export const jobsQueue = new Queue(Q_NAME, {
  connection: new IORedis(REDIS_URL),
});

const sleep = (t) => new Promise((resolve) => setTimeout(resolve, t * 100));

export const setupBullMQProcessor = () => {
  new Worker(
    Q_NAME,
    async (job) => {
      for (let i = 0; i <= 100; i++) {
        await sleep(Math.random());
        await job.updateProgress(i);
        await job.log(`Processing job at interval ${i}`);

        if (Math.random() * 200 < 1) throw new Error(`Random error at ${i}`);
      }

      return `This is the return value of job (${job.id})`;
    },
    // https://docs.bullmq.io/bull/patterns/persistent-connections#maxretriesperrequest
    { connection: new IORedis(REDIS_URL, { maxRetriesPerRequest: null }) }
  );
};

Enter fullscreen mode Exit fullscreen mode

Here, we've also created a utility function that instantiates a BullMQ worker to listen for and process jobs in the queue Q_NAME.

We need to call this function in our hooks.server.js file—either at the top level or within the init hook.

// src/hooks.server.js

// ...
import { building } from "$app/environment";
import { setupBullMQProcessor } from "$lib/server/background-jobs";
// ...
if (!building) {
  setupBullMQProcessor();
}
// ...
Enter fullscreen mode Exit fullscreen mode

The !building check skips setting up the worker (and in turn a Redis connection) during the build, speeding up the process.

🎉 BullMQ is now ready to be used in our SvelteKit app 🎉

Demo time

In order to test the setup, let's create a POST endpoint to enqueue a job.

// src/routes/+server.ts

import { jobsQueue } from "$lib/server/background-jobs";

export const POST = async () => {
  const { id: jobId } = await jobsQueue.add("job", {});

  /*
  The following code passes the job's progress to the client as a stream.
  If you don't need to update the client with the progress, you can skip
  the following. You can also use web-sockets or polling for that.
  */
  const stream = new ReadableStream({
    async pull(controller) {
      const job = await jobsQueue.getJob(jobId);
      controller.enqueue(
        JSON.stringify(
          job.failedReason
            ? { error: job.failedReason }
            : job.returnvalue
            ? { data: job.returnvalue }
            : { progress: job.progress }
        )
      );
      controller.enqueue("\n");

      if (job.finishedOn) {
        controller.close();
      }

      // wait for 1-second before sending the next status update
      await new Promise((r) => setTimeout(r, 1e3));
    },
  });

  return new Response(stream, {
    headers: { "content-type": "text/plain" },
  });
};
Enter fullscreen mode Exit fullscreen mode

And on the frontend, let's add a button to trigger the above endpoint and subsequently show the job's status:

<!-- src/routes/+page.svelte -->

<script>
  let result = $state();

  $inspect(result);

  const handleClick = async () => {
    const response = await fetch("/", { method: "post" });
    const reader = await response.body.getReader();
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      result = JSON.parse(new TextDecoder().decode(value));
    }

    setTimeout(() => (result = undefined), 3e3);
  };
</script>

{#if result?.error}
  <div class="error">{result.error}</div>
{:else if result?.data}
  <div class="success">{result.data}</div>
{:else if result?.progress !== undefined}
  <label>
    {result.progress === 100 ? "Done" : "Processing..."}
    <progress value={result.progress} max="100"></progress>
    {result.progress}%
  </label>
{:else}
  <button onclick={handleClick}>Schedule Background Job</button>
{/if}

<style>
  .error {
    color: red;
  }
  .success {
    color: darkgreen;
  }
</style>
Enter fullscreen mode Exit fullscreen mode

Here is the output:

background jobs demo using sveltekit and bullmq


Bonus 🎁

You can also mount a bull-board dashboard in your SvelteKit app for easy monitoring of background jobs.

Install bull-board dependencies

pnpm i -D @bull-board/api @bull-board/hono @hono/node-server hono
Enter fullscreen mode Exit fullscreen mode

and modify your hooks.server.js:

// src/hooks.server.js

import { building } from "$app/environment";
import { jobsQueue, setupBullMQProcessor } from "$lib/server/background-jobs";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { HonoAdapter } from "@bull-board/hono";
import { serveStatic } from "@hono/node-server/serve-static";
import { Hono } from "hono";

if (!building) {
  setupBullMQProcessor();
}

const bullboard = (() => {
  const serverAdapter = new HonoAdapter(serveStatic);

  createBullBoard({
    queues: [new BullMQAdapter(jobsQueue)],
    serverAdapter,
  });
  const app = new Hono({ strict: false });
  const basePath = "/jobs";
  serverAdapter.setBasePath(basePath);
  app.route(basePath, serverAdapter.registerPlugin());

  return app;
})();

export const handle = async ({ event, resolve }) => {
  if (event.url.pathname.match(/^\/jobs($|\/)/)) {
    return bullboard.fetch(event.request);
  }

  return resolve(event);
};

Enter fullscreen mode Exit fullscreen mode

Then visit <YOUR_SERVER_URL>/jobs to see the bull-board dashboard

bull-board dashboard

Full example:

BullMQ with SvelteKit

Simple example to set up background jobs in a SvelteKit app using BullMQ

Read detailed explanation here

Usage

  1. Clone this repo

  2. Install dependencies

    pnpm i
    Enter fullscreen mode Exit fullscreen mode
  3. Add REDIS_URL in your .env file.

  4. Start the dev server

    pnpm dev
    Enter fullscreen mode Exit fullscreen mode

    or build and preview

    pnpm build && pnpm preview
    Enter fullscreen mode Exit fullscreen mode





Top comments (3)

Collapse
 
wtldcampbell profile image
R David L Campbell

I cannot get this working. All I see in the console logs when running pnpm dev is:

Error: connect EPERM /
    at PipeConnectWrap.afterConnect [as oncomplete] (node:net:1636:16) {
  errno: -4048,
  code: 'EPERM',
  syscall: 'connect',
  address: '/'
}
Enter fullscreen mode Exit fullscreen mode

My guess is I don't know what to use for the .env REDIS_URL. I tried the localhost that spun up with pnpm dev (localhost:5176)

Collapse
 
wtldcampbell profile image
R David L Campbell

Setup a Vercel Redis - all working now

Collapse
 
ranjanpurbey profile image
Ranjan Purbey

Glad you got it working. You can also run a redis server locally in a separate terminal.