DEV Community

Bamidele Ajibola
Bamidele Ajibola

Posted on

How to Add a Robust Background Job Queue to Your Project with Upstash and BullMQ

How to Add a Robust Background Job Queue to Your Project with Upstash and BullMQ

As your application grows, you'll inevitably encounter tasks that shouldn't make your users wait. Things like sending emails, processing images, or scheduling reminders are best handled in the background. This is where a job queue comes in.

A queue allows you to offload time-consuming tasks to a separate process, ensuring your application remains fast and responsive. In this tutorial, we'll walk through, step-by-step, how to integrate a professional-grade queue system using the power of Redis, Upstash, and BullMQ.

What You'll Build

You will learn how to set up a system that can:

  1. Define custom background jobs (like sending reminders).
  2. Add those jobs to a queue from anywhere in your application.
  3. Process those jobs reliably with a dedicated worker.

Let's get started!

Our Project Structure

First, let's understand the file structure we'll be creating inside our src/lib/ directory. This organization keeps our queue logic neat and tidy.

src/
└── lib/
    └── queue/
        ├── providers/
        │   └── upstash-queue.service.ts  # Our queue implementation
        ├── tasks/
        │   ├── reminder.ts               # Logic for a reminder task
        │   ├── send-scheduled-message.ts # Logic for another task
        │   └── index.ts                  # Exports all tasks
        ├── queue.interface.ts            # Defines the "rules" for our queue
        ├── worker.ts                     # The background process that runs tasks
        └── add-test-tasks.ts             # A script to add jobs for testing
Enter fullscreen mode Exit fullscreen mode

Step 1: Define the Blueprint (The Interface)

A good programmer defines a clear contract before building. Our queue.interface.ts file does exactly that. It tells us what any queue service we create must be able to do.

File: src/lib/queue/queue.interface.ts

`export interface BaseJobData {
  type: string; // A unique name for each job type

}

export interface QueueJob<TData extends BaseJobData = BaseJobData> {
  id?: string;
  data: TData;
  options?: {
    delay?: number; // Delay in milliseconds
    cron?: string;  // For recurring jobs
  };
}

export interface IQueueService {
  addJob<TData extends BaseJobData>(queueName: string, job: QueueJob<TData>): Promise<string>;
  processJob<TData extends BaseJobData>(
    queueName: string,
    handler: (job: { id?: string; data: TData; log: (message: string) => Promise<void> }) => Promise<unknown>,
    concurrency?: number
  ): Promise<void>;
  removeJob(queueName: string, jobId: string): Promise<void>;
  close(): Promise<void>;
}`
Enter fullscreen mode Exit fullscreen mode

Here, we've defined that any IQueueService must have methods to addJob, processJob, removeJob, and close the connection. This makes our code predictable and easy to work with.

Step 2: Build the Engine (The Queue Service)

Now, we implement the interface. We're using BullMQ, a powerful and popular queue library for Node.js that uses Redis. Our implementation will live in upstash-queue.service.ts.

File: src/lib/queue/providers/upstash-queue.service.ts
(This is a larger file, so we'll focus on the key parts. The full code is in your project.)

`

import { Queue, Worker, Job } from "bullmq";
import { IQueueService, BaseJobData } from "@/lib/queue/queue.interface";

export class UpstashQueueService implements IQueueService {
  private queues: Map<string, Queue>;
  private workers: Map<string, Worker>;
  private connectionOptions: ConnectionOptions;

  constructor(redisUrl: string) {
    // ... connection logic ...
  }

  // Method to add a job
  async addJob<TData extends BaseJobData>(
    queueName: string,
    job: { name: string; data: TData; options?: JobsOptions }
  ): Promise<string> {
    const queue = this.getQueue(queueName);
    const result = await queue.add(job.name, job.data, job.options);
    return result.id!;
  }

  // Method to start processing jobs
  async processJob<TData extends BaseJobData>(
    queueName: string,
    handler: (job: { id?: string; data: TData; log: (message: string) => Promise<void> }) => Promise<unknown>,
    // ...
  ): Promise<void> {
    // ... worker setup and logic ...
  }

  // ... other methods like removeJob and close ...
}
`
Enter fullscreen mode Exit fullscreen mode

This class handles all the low-level communication with Redis. It knows how to create queues, add jobs to them, and set up workers.

Step 3: Create the To-Do List (The Tasks)

A task is the actual work you want to do in the background. Let's look at our reminder task.

File: src/lib/queue/tasks/reminder.ts

import { BaseJobData } from "../queue.interface";

// 1. A unique name for this task
export const REMINDER_TASK_NAME = "user.reminder";

// 2. The specific data this task needs (with type safety!)
export interface ReminderJobData extends BaseJobData {
  type: typeof REMINDER_TASK_NAME;
  userId: string;
  phoneNumber?: string;
  email?: string;
  message: string;
  reminderTime: Date;
}

// 3. The function that does the work
export const reminderTask = async (job: { data: ReminderJobData; log: (message: string) => Promise<void> }) => {
  const { userId, message, reminderTime, phoneNumber, email } = job.data;

  await job.log(`Sending reminder to user ${userId}: "${message}" at ${reminderTime}`);

  // In a real application, you would add your email/SMS sending logic here.
  // For now, we simulate work with a delay.
  await new Promise((resolve) => setTimeout(resolve, 1000));

  await job.log("Reminder sent successfully.");
};
Enter fullscreen mode Exit fullscreen mode

To keep things organized, we export all our tasks from a single index.ts file.

File: src/lib/queue/tasks/index.ts

export * from "./reminder";
export * from "./send-scheduled-message";
Enter fullscreen mode Exit fullscreen mode

Step 4: Hire the Doer (The Worker)

The worker is a dedicated script whose only job is to listen for new tasks on the queue and execute them.

File: src/lib/queue/worker.ts

import { UpstashQueueService } from "./providers/upstash-queue.service";
import * as tasks from './tasks';
import { BaseJobData } from "./queue.interface";

const QUEUE_NAME = "default";
const service = new UpstashQueueService(process.env.REDIS_URL || "redis://localhost:6379");

// A map to connect task names to their functions
const taskHandlers = {
    [tasks.REMINDER_TASK_NAME]: tasks.reminderTask,
    [tasks.SEND_SCHEDULED_MESSAGE_TASK_NAME]: tasks.sendScheduledMessageTask,
};

type TaskName = keyof typeof taskHandlers;

const main = async () => {
    console.log("Starting worker...");

    // Tell the service to start processing jobs from the queue
    await service.processJob(QUEUE_NAME, async (job) => {
        const jobData = job.data as BaseJobData & { type: TaskName };
        const handler = taskHandlers[jobData.type]; // Find the right function

        if (handler) {
            await handler(job as any); // Run the task
        } else {
            throw new Error(`No handler found for task type: ${jobData.type}`);
        }
    });

    console.log(`Worker listening to queue: ${QUEUE_NAME}`);
}

main().catch(console.error);
Enter fullscreen mode Exit fullscreen mode

Step 5: Putting it all Together (Running and Testing)

Now for the fun part! Let's see it in action.

Prerequisites

  1. Node.js: Make sure you have Node.js installed.
  2. Redis: You need a running Redis instance. You can run one locally using Docker or sign up for a free database on Upstash.
  3. Dependencies: Install the necessary packages.

    npm install bullmq
    npm install -D ts-node # For running TypeScript files directly
    

1. Set up your Environment

Create a .env file in your project's root and add your Redis connection URL.

REDIS_URL="redis://your_redis_host:port"
Enter fullscreen mode Exit fullscreen mode

2. Create a Test Script

We'll use a simple script to add a few jobs to our queue.

File: src/lib/queue/add-test-tasks.ts

import { UpstashQueueService } from "./providers/upstash-queue.service";
import { REMINDER_TASK_NAME, ReminderJobData } from "./tasks/reminder";
import { SEND_SCHEDULED_MESSAGE_TASK_NAME, SendScheduledMessageJobData } from "./tasks/send-scheduled-message";

const addTestTasks = async () => {
    const queueService = new UpstashQueueService(process.env.REDIS_URL!);
    console.log("Adding test tasks to the queue...");

    // Add a reminder job
    const reminderData: ReminderJobData = {
        type: REMINDER_TASK_NAME,
        userId: "user-123",
        message: "Don't forget the meeting!",
        reminderTime: new Date(),
        email: "test@example.com"
    };
    await queueService.addJob("default", { name: REMINDER_TASK_NAME, data: reminderData });

    console.log("Test tasks added!");
    await queueService.close();
};

addTestTasks();
Enter fullscreen mode Exit fullscreen mode

3. Run the System

You'll need two separate terminal windows.

In Terminal 1, start the worker:

npx ts-node src/lib/queue/worker.ts
Enter fullscreen mode Exit fullscreen mode

You should see Starting worker... followed by Worker listening to queue: default. It's now waiting for jobs.

In Terminal 2, add some jobs:

npx ts-node src/lib/queue/add-test-tasks.ts
Enter fullscreen mode Exit fullscreen mode

You'll see Adding test tasks to the queue....

Now, look back at Terminal 1. You will see the worker spring to life, logging messages as it processes the reminder task you just added!


Congratulations! You have successfully integrated a powerful, reliable, and scalable background job system into your application. You can now adapt this pattern to handle any long-running task your project requires.

Top comments (0)