How to Add a Robust Background Job Queue to Your Project with Upstash and BullMQ
As your application grows, you'll inevitably encounter tasks that shouldn't make your users wait. Things like sending emails, processing images, or scheduling reminders are best handled in the background. This is where a job queue comes in.
A queue allows you to offload time-consuming tasks to a separate process, ensuring your application remains fast and responsive. In this tutorial, we'll walk through, step-by-step, how to integrate a professional-grade queue system using the power of Redis, Upstash, and BullMQ.
What You'll Build
You will learn how to set up a system that can:
- Define custom background jobs (like sending reminders).
- Add those jobs to a queue from anywhere in your application.
- Process those jobs reliably with a dedicated worker.
Let's get started!
Our Project Structure
First, let's understand the file structure we'll be creating inside our src/lib/
directory. This organization keeps our queue logic neat and tidy.
src/
└── lib/
└── queue/
├── providers/
│ └── upstash-queue.service.ts # Our queue implementation
├── tasks/
│ ├── reminder.ts # Logic for a reminder task
│ ├── send-scheduled-message.ts # Logic for another task
│ └── index.ts # Exports all tasks
├── queue.interface.ts # Defines the "rules" for our queue
├── worker.ts # The background process that runs tasks
└── add-test-tasks.ts # A script to add jobs for testing
Step 1: Define the Blueprint (The Interface)
A good programmer defines a clear contract before building. Our queue.interface.ts
file does exactly that. It tells us what any queue service we create must be able to do.
File: src/lib/queue/queue.interface.ts
`export interface BaseJobData {
type: string; // A unique name for each job type
}
export interface QueueJob<TData extends BaseJobData = BaseJobData> {
id?: string;
data: TData;
options?: {
delay?: number; // Delay in milliseconds
cron?: string; // For recurring jobs
};
}
export interface IQueueService {
addJob<TData extends BaseJobData>(queueName: string, job: QueueJob<TData>): Promise<string>;
processJob<TData extends BaseJobData>(
queueName: string,
handler: (job: { id?: string; data: TData; log: (message: string) => Promise<void> }) => Promise<unknown>,
concurrency?: number
): Promise<void>;
removeJob(queueName: string, jobId: string): Promise<void>;
close(): Promise<void>;
}`
Here, we've defined that any IQueueService
must have methods to addJob
, processJob
, removeJob
, and close
the connection. This makes our code predictable and easy to work with.
Step 2: Build the Engine (The Queue Service)
Now, we implement the interface. We're using BullMQ, a powerful and popular queue library for Node.js that uses Redis. Our implementation will live in upstash-queue.service.ts
.
File: src/lib/queue/providers/upstash-queue.service.ts
(This is a larger file, so we'll focus on the key parts. The full code is in your project.)
`
import { Queue, Worker, Job } from "bullmq";
import { IQueueService, BaseJobData } from "@/lib/queue/queue.interface";
export class UpstashQueueService implements IQueueService {
private queues: Map<string, Queue>;
private workers: Map<string, Worker>;
private connectionOptions: ConnectionOptions;
constructor(redisUrl: string) {
// ... connection logic ...
}
// Method to add a job
async addJob<TData extends BaseJobData>(
queueName: string,
job: { name: string; data: TData; options?: JobsOptions }
): Promise<string> {
const queue = this.getQueue(queueName);
const result = await queue.add(job.name, job.data, job.options);
return result.id!;
}
// Method to start processing jobs
async processJob<TData extends BaseJobData>(
queueName: string,
handler: (job: { id?: string; data: TData; log: (message: string) => Promise<void> }) => Promise<unknown>,
// ...
): Promise<void> {
// ... worker setup and logic ...
}
// ... other methods like removeJob and close ...
}
`
This class handles all the low-level communication with Redis. It knows how to create queues, add jobs to them, and set up workers.
Step 3: Create the To-Do List (The Tasks)
A task is the actual work you want to do in the background. Let's look at our reminder task.
File: src/lib/queue/tasks/reminder.ts
import { BaseJobData } from "../queue.interface";
// 1. A unique name for this task
export const REMINDER_TASK_NAME = "user.reminder";
// 2. The specific data this task needs (with type safety!)
export interface ReminderJobData extends BaseJobData {
type: typeof REMINDER_TASK_NAME;
userId: string;
phoneNumber?: string;
email?: string;
message: string;
reminderTime: Date;
}
// 3. The function that does the work
export const reminderTask = async (job: { data: ReminderJobData; log: (message: string) => Promise<void> }) => {
const { userId, message, reminderTime, phoneNumber, email } = job.data;
await job.log(`Sending reminder to user ${userId}: "${message}" at ${reminderTime}`);
// In a real application, you would add your email/SMS sending logic here.
// For now, we simulate work with a delay.
await new Promise((resolve) => setTimeout(resolve, 1000));
await job.log("Reminder sent successfully.");
};
To keep things organized, we export all our tasks from a single index.ts
file.
File: src/lib/queue/tasks/index.ts
export * from "./reminder";
export * from "./send-scheduled-message";
Step 4: Hire the Doer (The Worker)
The worker is a dedicated script whose only job is to listen for new tasks on the queue and execute them.
File: src/lib/queue/worker.ts
import { UpstashQueueService } from "./providers/upstash-queue.service";
import * as tasks from './tasks';
import { BaseJobData } from "./queue.interface";
const QUEUE_NAME = "default";
const service = new UpstashQueueService(process.env.REDIS_URL || "redis://localhost:6379");
// A map to connect task names to their functions
const taskHandlers = {
[tasks.REMINDER_TASK_NAME]: tasks.reminderTask,
[tasks.SEND_SCHEDULED_MESSAGE_TASK_NAME]: tasks.sendScheduledMessageTask,
};
type TaskName = keyof typeof taskHandlers;
const main = async () => {
console.log("Starting worker...");
// Tell the service to start processing jobs from the queue
await service.processJob(QUEUE_NAME, async (job) => {
const jobData = job.data as BaseJobData & { type: TaskName };
const handler = taskHandlers[jobData.type]; // Find the right function
if (handler) {
await handler(job as any); // Run the task
} else {
throw new Error(`No handler found for task type: ${jobData.type}`);
}
});
console.log(`Worker listening to queue: ${QUEUE_NAME}`);
}
main().catch(console.error);
Step 5: Putting it all Together (Running and Testing)
Now for the fun part! Let's see it in action.
Prerequisites
- Node.js: Make sure you have Node.js installed.
- Redis: You need a running Redis instance. You can run one locally using Docker or sign up for a free database on Upstash.
-
Dependencies: Install the necessary packages.
npm install bullmq npm install -D ts-node # For running TypeScript files directly
1. Set up your Environment
Create a .env
file in your project's root and add your Redis connection URL.
REDIS_URL="redis://your_redis_host:port"
2. Create a Test Script
We'll use a simple script to add a few jobs to our queue.
File: src/lib/queue/add-test-tasks.ts
import { UpstashQueueService } from "./providers/upstash-queue.service";
import { REMINDER_TASK_NAME, ReminderJobData } from "./tasks/reminder";
import { SEND_SCHEDULED_MESSAGE_TASK_NAME, SendScheduledMessageJobData } from "./tasks/send-scheduled-message";
const addTestTasks = async () => {
const queueService = new UpstashQueueService(process.env.REDIS_URL!);
console.log("Adding test tasks to the queue...");
// Add a reminder job
const reminderData: ReminderJobData = {
type: REMINDER_TASK_NAME,
userId: "user-123",
message: "Don't forget the meeting!",
reminderTime: new Date(),
email: "test@example.com"
};
await queueService.addJob("default", { name: REMINDER_TASK_NAME, data: reminderData });
console.log("Test tasks added!");
await queueService.close();
};
addTestTasks();
3. Run the System
You'll need two separate terminal windows.
In Terminal 1, start the worker:
npx ts-node src/lib/queue/worker.ts
You should see Starting worker...
followed by Worker listening to queue: default
. It's now waiting for jobs.
In Terminal 2, add some jobs:
npx ts-node src/lib/queue/add-test-tasks.ts
You'll see Adding test tasks to the queue...
.
Now, look back at Terminal 1. You will see the worker spring to life, logging messages as it processes the reminder task you just added!
Congratulations! You have successfully integrated a powerful, reliable, and scalable background job system into your application. You can now adapt this pattern to handle any long-running task your project requires.
Top comments (0)