DEV Community

Alexander Voll for Codesphere Inc.

Posted on • Originally published at codesphere.com on

Building an Email Marketing Engine Part 5: Onboarding Automation Sequence with BullMQ

Image description

Introduction

Welcome back to our "Building an Email Marketing Engine" series where we use Node.js, Express.js, SQLite, SendGrid API and now BullMQ to create an email marketing engine to bypass pricey email marketing providers.

In our previous parts, we went about setting up the application, integrate it with SendGrid, creating a simple SQLite DB to store our contacts, create a login system using Auth0 and created Endpoints to add contacts to our db using external services.

In this part of our series, we will work on the trickiest part so far: automating email sending with custom delayed emails. We will do this by expanding the capabilities of our SQLite DB and implementing BullMQ for scheduling and handling emails.

As always, you can check the GitHub Repo for the latest status of our development.

Let's get to it!

Creating necessary tables

To accomodate automation, I firstly had to adjust our existing database structure. The goal was to create 3 new tables that would house automation tasks, sequences and templates for those sequences.

The logic here is as follows:

Automation Sequences

Automation Sequences are the highest level of our automation system. They store the name and the description of a sequence and are defined by the user.

Sequence Task Templates

Sequence Task Templates house the basic task information needed for a specific template. For an onboarding flow that might be 3 different emails being sent to the user for 3 days each after signing up to a service.

Automation Tasks

Automation Tasks are the actual tasks that are being scheduled. They extend the template with a few more columns, including the status, the due time or a task set id. The task set id is particularly important, since it makes tasks identifyable to a specific launched sequence. Staying in the example of an onbaording flow, each user would have 3 tasks (emails to be sent) and all of them would have the same set id.

Building an Email Marketing Engine Part 5: Onboarding Automation Sequence with BullMQ

Handling Storage of Tasks in SQLite

After those new tables were created, I started creating the logic for storing tasks in SQLite. Keep in mind that sequence templates and their according tasks are created by the user. We will implement this in a nice way when working on the frontend.

There are two main functions that handle the automation in the SQLite db are:

  1. createTasksFromSeqId
  2. sendTasks

Both of those make use of a number of helper functions I wrote beforehand. Most of them are very simple and do things like getting a specific task from the automation_tasks table or creating a unique id for task sets.

async function getSequenceTemplateBySeqId(sequenceId) {
  const db = await getDatabaseConnection();

  const sql = `SELECT * FROM sequence_tasks_template WHERE sequence_id = ?`;

  return new Promise((resolve, reject) => {
    db.all(sql, [sequenceId], (err, rows) => {
      if (err) {
        reject(err);
      } else {
        resolve(rows);
      }
    });
  });
}
Enter fullscreen mode Exit fullscreen mode

To see all helper functions, take a look at the GitHub Repo, where you will find all of them in the utils.js file.

The createTasksFromSeqId function

This is an async function takes in two parameters: sequenceId and contactId.

The function generates a unique taskSetId , then tries to fetch a sequence template using the sequenceId. If successful, it sorts the sequence template based on the order_position property.

Next, it iterates through each task in the sequence template. For the first task, it sets the dueTime to the current date and the status to "pending". For subsequent tasks, it sets the dueTime to null and the status to "unscheduled".

Then, it calls a function called createAutomationTask passing in various parameters including the taskSetId, dueTime, and status for each task.

Finally, the function returns the taskSetId.

async function createTasksFromSeqId(sequenceId, contactId) {
    const taskSetId = generateTaskSetId();

    let sequenceTemplate;
    try {
        sequenceTemplate = await getSequenceTemplateBySeqId(sequenceId);
        sequenceTemplate.sort((a, b) => a.order_position - b.order_position);
    } catch (error) {
        console.error("Error fetching sequence template:", error);
    }

    for (let i = 0; i < sequenceTemplate.length; i++) {
        const task = sequenceTemplate[i];
        let dueTime;
        let status;

        if (i === 0) {
            // The first task
            dueTime = new Date(); // Set to "now"
            status = "pending";
        } else {
            dueTime = null; // Or whatever placeholder you want to use to indicate it's not scheduled yet
            status = "unscheduled";
        }

        await createAutomationTask(
            task.sequence_id,
            contactId,
            task.subgroup_id,
            task.template_id,
            task.order_position,
            task.delay,
            taskSetId,
            dueTime,
            status
        );
    }

    return taskSetId;
}
Enter fullscreen mode Exit fullscreen mode

The sendTasks function

This is where the actual automation happens and tasks are being scheduled and sent to the BullMQ queue.

The sendTasks function is an async function too, that sends tasks to a specified task set. It is used to be called after the createTasksFromSeqId function, using the returned taskSetId.

It retrieves scheduled tasks for the given task set ID, sorts them based on their order position, and then iterates over each task.

For each task, it checks if the task's status is "pending" and if its due time has passed. If so, it retrieves the contact data for the task's associated contact ID and sends an email to the contact using a specified SendGrid template ID.

It then marks the task as completed in a SQL database. If there is a next task that is unscheduled, it schedules the next task based on the completed task's information and a delay. Finally, it breaks the loop after processing the first task that meets the conditions. If any errors occur during the process, they are logged to the console.

async function sendTasks(taskSetId) {
  try {
    const scheduledTasks = await getScheduledTasksBySetId(taskSetId);
    scheduledTasks.sort((a, b) => a.order_position - b.order_position);

    for (let i = 0; i < scheduledTasks.length; i++) {
      const currentTask = scheduledTasks[i];

      if (currentTask.status === "pending" && currentTask.due_time <= Date.now()) {
        const contactData = await getContactById(currentTask.contact_id);

        try {
          console.log("Sending email to:", contactData[0].email);
          await sendSingleEmail(contactData[0].email, currentTask.template_id);
          await completeTask(currentTask.id, Date.now());
          console.log("Task completed in SQL db");
        } catch (error) {
          console.error("Error sending email:", error);
          continue;
        }

        const nextTask = scheduledTasks[i + 1];
        if (nextTask && nextTask.status === "unscheduled") {
          const completedTask = await getTaskById(currentTask.id);
          const updatedTask = await scheduleNextTask(completedTask, nextTask);
          const scheduledTaskData = {
            sequence_id: updatedTask.sequence_id,
            contact_id: updatedTask.contact_id,
            subgroup_id: updatedTask.subgroup_id,
            template_id: updatedTask.template_id,
            order_position: updatedTask.order_position,
            delay: updatedTask.delay,
            task_set_id: updatedTask.task_set_id,
            dueTime: updatedTask.due_time,
            status: updatedTask.status
          };

          const delay = nextTask.delay * 1000;
          await scheduleBullTask(scheduledTaskData, delay);
        }

        break;
      }
    }
  } catch (error) {
    console.error("Error fetching scheduled tasks:", error);
  }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, if a nextTask that is unscheduled is found, its status is then set to "pending" by the scheduleNextTask helper function and its being scheduled and added to the bull queue using scheduleBullTask .

async function scheduleNextTask(lastCompletedTask, currentTask) {
  if (!lastCompletedTask || !lastCompletedTask.completed_at) {
    return { dueTime: new Date().toISOString(), status: "pending" };
  }

  const lastCompletedAtTimestamp = parseInt(lastCompletedTask.completed_at, 10);
  const lastCompletedAt = new Date(lastCompletedAtTimestamp);
  const delayDuration = currentTask.delay * 1000;
  const dueTime = new Date(lastCompletedAt.getTime() + delayDuration);
  const status = "pending";

  const db = await getDatabaseConnection();

  const sql = `UPDATE automation_tasks SET due_time = ?, status = ? WHERE id = ?`;

  return new Promise((resolve, reject) => {
    db.run(sql, [dueTime.getTime(), status, currentTask.id], (err) => {
      if (err) {
        reject(err);
      } else {
        // Fetch the updated row data
        const fetchSql = `SELECT * FROM automation_tasks WHERE id = ?`;
        db.get(fetchSql, [currentTask.id], (fetchErr, row) => {
          if (fetchErr) {
            reject(fetchErr);
          } else {
            resolve(row);
          }
        });
      }
    });
  });
}
Enter fullscreen mode Exit fullscreen mode

Using BullMQ to automate sending of scheduled tasks

BullMQ is an advanced Node.js queue and job service solution built on top of the Redis database. With BullMQ, you can manage jobs in various states (waiting, active, delayed, etc.), set job priorities, schedule recurrent jobs, and handle delayed jobs.

This is incredibly useful for our case since it's meant for big application and very easy to handle.

A Bull queue essentially consists of 3 different parts:

  1. Producers: They produce a specific job and add it to the queue
  2. Job Queue: This is where the actual jobs are saved, it is housed in a Redis database
  3. Consumers/Workers: They take care of handling the jobs in the job queue

Building an Email Marketing Engine Part 5: Onboarding Automation Sequence with BullMQ

Our setup looks like this:

Initialization and Producer

We initialize the Queue and give it a name. Then, we define our producer like a regular asynchronous function called scheduleBullTask that schedules a new task to be executed later. It adds a job to a task queue with some task data and a specified delay.

The delay is defined in the sendTasks when scheduleBullTask is called. It is retreived from the SQLite database. This is the reason why (for now) we use SQLite for storing the tasks while using BullMQs Redit database to store scheduled tasks. We want to make sure that tasks are only added t0 the job queue once they've been executed. This might change in the future though.

const { Queue } = require('bullmq');

//initialize new Queue
const taskQueue = new Queue("taskQueue", {
    connection: {
        host: "127.0.0.1",
        port: 6379
    }
});

//Producer: New Task is scheduled
async function scheduleBullTask(taskData, delay) {
    await taskQueue.add('myJobName', {
        sequence_id: taskData.sequence_id,
        contact_id: taskData.contact_id,
        subgroup_id: taskData.subgroup_id,
        template_id: taskData.template_id,
        order_position: taskData.order_position,
        delay: taskData.delay,
        task_set_id: taskData.task_set_id,
        dueTime: taskData.dueTime,
        status: taskData.status
    },
        {
            delay: delay
        });
}

module.exports = {
    scheduleBullTask
}
Enter fullscreen mode Exit fullscreen mode

Worker

Our worker defines, that once a task is ready to be executed, it accesses the job data (which houses the task information from our SQLite DB) and executed the sendTasks function which then sends the task and schedules the next one (if it exists), in turn creating a new job queue entry. This happens until all tasks are set to "completed" in the SQLite database, which is when no more jobs are added to the queue.

const { Worker } = require('bullmq');

const { sendTasks } = require("../automationJobs")

//Initialize Worker for handling
const taskProcessorWorker = new Worker("taskQueue", async (job) => {
    const taskData = job.data;

    await sendTasks(taskData.task_set_id);

    console.log("sent email for task with task_set_id:", taskData.task_set_id);

    //mark the job as completed
    return {
        id: taskData.id,
        sequence_id: taskData.sequence_id,
        status: "completed"
    };
}, {
    connection: {
        host: "127.0.0.1",
        port: 6379
    }
})

taskProcessorWorker.on('completed', (job) => {
    console.log(`Job ${job.id} Completed`);
})

taskProcessorWorker.on('failed', (job, err) => {
    console.error(`Job ${job.id} failed with error ${err.message}`);
});
Enter fullscreen mode Exit fullscreen mode

Creating an endpoint for automation

To make use of our newly created automation flow, I created a new endpoint which allows for external services to send a request, add a user to the database and start the automation flow.

For this, I made use of the addContact function we created in our last installment with some slight tweaks.

apiRouter.post("/addContactOnboarding", apiKeyMiddleware, async (req, res) => {
  try {
      const contactId = await addContact(req);
      console.log("contactId",contactId);
      const taskSetId = await createTasksFromSeqId(1, contactId);
      sendTasks(taskSetId);
      res.json({ message: `Onboarding flow started with taskSetId: ${taskSetId}` });
  } catch (error) {
      console.error(error);
      res.status(500).json({ error: error.message });
  }
});
Enter fullscreen mode Exit fullscreen mode

This allows for us to send a POST request to the defined endpoint with body that includes some information about the new contact and then automatically start the onboarding flow.

{
  "contact": {
    "sendgrid_contact_id": "",
    "email": "max@mustermann.com",
    "first_name": "Max",
    "last_name": "Mustermann"
  },
  "subgroupIds": [1, 2]
}
Enter fullscreen mode Exit fullscreen mode

Wrapping up

In this installment, we've successfully navigated the complexities of automating email sending using SQLite and BullMQ. By restructuring our database and leveraging BullMQ's capabilities, we've built a robust system that schedules and sends custom delayed emails efficiently. With the addition of a new endpoint, integrating external services and automating the onboarding flow is now seamless. As we continue to refine and enhance our email marketing engine, we will iterate on this, optimizing it even further.

Also, we're coming closer towards creating a frontend for our app!

As always, you can take a look at the latest status of the Email Marketing Engine in our GitHub Repo. To host the application yourself, just use our magic Codesphere link to get you started in only a few clicks!

Top comments (0)