DEV Community

Cover image for How to Build Scalable Automated Workflows like HubSpot using AWS Lambda, SQS, Node.js, and MongoDB
Bhaskar Sawant
Bhaskar Sawant

Posted on

How to Build Scalable Automated Workflows like HubSpot using AWS Lambda, SQS, Node.js, and MongoDB

Creating automated workflows similar to HubSpot’s powerful system involves leveraging advanced cloud technologies. This guide will walk you through the process of developing such workflows using AWS Lambda, Amazon SQS, Node.js, and MongoDB.

Understanding Automated Workflows

Automated workflows are sequences of actions triggered by specific events. They streamline repetitive tasks, send notifications, update records, and more, all without manual intervention. By integrating AWS Lambda, SQS, Node.js, and MongoDB, you can build robust workflows. This tutorial provides a step-by-step approach to creating these workflows.

What You Will Learn

  • Automated Workflow Architecture: Gain insights into the inner workings of automated workflows.

  • Understanding API Server: Learn how workflows operate through code examples and explanations.

  • Configure AWS CLI: Install and configure the AWS CLI to manage AWS resources from the command line.

  • Deploy with Serverless Framework: Create and deploy AWS Lambda functions and other resources using the Serverless Framework.

  • Integrate API with AWS SQS: Send messages to AWS SQS from your API server for reliable workflows.

  • Deploy API Server on AWS EC2: Deploy your Node.js API server to AWS EC2 for high availability and performance.

  • Scalability: Understand how to scale your workflows and API server to handle increased traffic and demand.

Prerequisites

  • An AWS account.

  • Basic knowledge of Node.js and MongoDB.

  • Access to a MongoDB Atlas account or a local MongoDB setup.

Automated Workflow Architecture.

Image description

Let’s illustrate this with an example where a workflow sends a message to a user upon a form submission. Here’s the step-by-step process:

  1. Trigger Event: When a form submission occurs, it acts as a trigger.

  2. Send Message to SQS: Add a piece of code in the form submission function to push a message to an SQS queue named TriggerQueue on every new form submission.

  3. Invoke Lambda Function: TriggerQueue triggers a Lambda function upon receiving a message.

  4. Check for Workflow in DB: The Lambda function checks if any workflow exists in the database that matches the configuration provided in the SQS message.

  5. Fetch Workflow Actions: If a matching workflow is found, it fetches the set of actions to perform with the provided data.

  6. Send Actions to Second SQS Queue: The Lambda function sends the actions to another SQS queue named ActionsQueue along with the trigger data.

  7. Invoke Second Lambda Function: ActionsQueue triggers a second Lambda function upon receiving a message.

  8. Perform Actions: The second Lambda function takes the action data and calls the respective APIs to perform the actions, such as sending a message to the user.

This setup ensures that your workflows are reliable and can handle various tasks efficiently.

Image description

Understanding API Server.

You can find the source code for the project here.

Now, let’s dive into the Model and Controller to understand how the API server functions.

Workflow Model

In a workflow, we have one Trigger and one or more Actions. Below is the schema definition for the workflow and trigger

// models/workflow.ts

const triggerSchema = new Schema<Trigger>({
  type: { type: String, required: true },
  app: { type: String, required: true },
  metaData: { type: String , required: false },
  data: { type: Schema.Types.Mixed, required: false } // Mixed type for dynamic data
});

const workflowSchema = new Schema<Workflow>({
  workflowName: { type: String, required: true },
  trigger: { type: triggerSchema, required: true },
  actions: { type: [triggerSchema], required: true },
  created_at: { type: Date, default: Date.now },
  updated_at: { type: Date, default: Date.now },
});

// Create and export the model
const WorkflowModel = mongoose.model<Workflow>('Workflow', workflowSchema);

export default WorkflowModel;
Enter fullscreen mode Exit fullscreen mode

In this model:

  • Trigger: Contains properties such as type, app, optional metaData, and data.

  • Workflow: Extends the Mongoose Document interface, including properties like workflowName, trigger, actions, created_at, and updated_at.

For example, if our trigger is a form submission, the app will be 'forms' and the type will be 'new_submission'. The metaData helps to identify specific workflows, like so:

{
 "app": "forms",
 "type": "new_submission",
 "metadata":"{_id:someUniqueId}"
}
Enter fullscreen mode Exit fullscreen mode

Creating a Form Submission

Here’s how you can handle a form submission and trigger workflows:

export const createFormSubmission = async (req: Request<{}, {}, FormSubmissionBody>, res: Response): Promise<void> => {
  try {
    const { formId, customerName, customerEmail, customerPhone } = req.body;
    const newForm = new FormSubmission({ formId, customerName, customerEmail, customerPhone });
    await newForm.save();

    // Prepare the data to be sent to the workflow
    let workflowData = {
      app: 'forms',
      type: 'new_submission',
      metaData: JSON.stringify({ _id: formId }),
      data: newForm,
    }

    let params: params = {
      MessageBody: JSON.stringify(workflowData),
      QueueUrl: process.env.AWS_SQS_URL as string,
    };

    // Send the message
    sqs.sendMessage(params, (err, data) => {
      if (err) {
        console.log('Error', err);
      } else {
        console.log('Success', data.MessageId);
      }
    });
    res.status(201).json(newForm);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
};
Enter fullscreen mode Exit fullscreen mode

In this controller:

  • A new form submission is created and saved to the database.

  • Workflow data is prepared, including the form submission details.

  • The workflow data is sent to an AWS SQS queue.

  • Any workflows matching the trigger configuration (e.g., app: 'forms', type: 'new_submission', metaData: { _id: someUniqueId}) will be identified and the corresponding actions will be sent to another queue to execute.

Image description

Configure AWS CLI.

To use the Serverless Framework, we first need to configure the AWS CLI. Follow these steps to create an IAM user and configure the AWS CLI.

Create an IAM User

  1. Sign in to AWS Console: Log in to your AWS Management Console.

  2. Search for IAM: In the search bar, type “IAM” and select the IAM service from the results.

  3. Navigate to Users: Click on the “Users” option in the IAM dashboard.

  4. Create a New User:

  • Click on “Add User.”

  • Enter a username.

  • Click “Next.”

  1. Set Permissions:
  • Select “Attach policies directly.”

  • In the “Permission policies” section, select “AdministratorAccess.”

  • Click “Next” and then “Create User.”

  1. Create Access Key:
  • Click on the newly created user.

  • Go to the “Security credentials” tab.

  • Click on “Create access key.”

  • Select “Command Line Interface (CLI)” as the use case.

  • Check the acknowledgment checkbox.

  • Click “Next.”

  1. Add Description and Create Key:
  • Add a description if desired.

  • Click “Create Access Key.”

  1. Store Access Keys:
  • Copy the Access Key ID and Secret Access Key.

  • Store them securely, as the Secret Access Key will not be shown again.

Configure AWS CLI

  1. Install AWS CLI: If you haven’t already, download and install AWS CLI or brew install awscli .

  2. Open Terminal/Command Prompt: Open your terminal (Linux/Mac) or command prompt (Windows).

  3. Run AWS Configure Command:

aws configure
Enter fullscreen mode Exit fullscreen mode

4. Enter Credentials: You’ll be prompted to enter your credentials.

  • AWS Access Key ID: Enter your Access Key ID.

  • AWS Secret Access Key: Enter your Secret Access Key.

  • Default region name: Enter your preferred AWS region (e.g., us-east-1).

  • Default output format: Enter your preferred output format (e.g., json).

5. Verify Configuration:

  • Run the following command to verify the configuration:
aws sts get-caller-identity
Enter fullscreen mode Exit fullscreen mode

This should return details about your IAM user, confirming that the AWS CLI is configured correctly.

Image description

Deploy with Serverless Framework.

You can find the source code for the project here.

What is the Serverless Framework?

The Serverless Framework is an open-source tool that simplifies the process of building and deploying serverless applications. It abstracts away the complexities of cloud infrastructure, enabling developers to focus on writing application code. Supporting various cloud providers such as AWS, Azure, and Google Cloud, the Serverless Framework offers a consistent experience for deploying functions, managing resources, and scaling applications effortlessly.

In this guide, we will develop Lambda functions and deploy resources using the Serverless Framework. We will cover topics such as database calls from Lambda, using MongoDB, and pushing messages from one Lambda queue worker to another queue.

  1. Install Serverless Framework: First, install the Serverless Framework globally on your machine.
npm install -g serverless
Enter fullscreen mode Exit fullscreen mode

2. Create a serverless service: Navigate to your project directory and create a new Serverless service.

  • Enter serverless command in your terminal, and select AWS / Node.js / Simple Function option.

  • Now name your project, select the Create A New App option and give an app name, and this will create a serverless service.

3. Create Workers/handlers

  • TriggerQueue Handler: This handler will pull messages from the trigger queue, find workflows with the same trigger configuration provided in the message, and push the respective set of actions into the actions queue along with the trigger data.
'use strict';

const { MongoClient } = require('mongodb');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS();

const MONGODB_URI = process.env.MONGODB_URI;
const MONGODB_DB = process.env.MONGODB_DB;
const OUTPUT_QUEUE_URL = process.env.OUTPUT_QUEUE_URL;

let cachedDb = null;

async function connectToDatabase(uri) {
  if (cachedDb) {
    return cachedDb;
  }

  const client = await MongoClient.connect(uri, {
    useNewUrlParser: true,
    useUnifiedTopology: true,
  });

  const db = client.db(MONGODB_DB);
  console.log('Connected to MongoDB');

  cachedDb = db;
  return db;
}

const buildQuery = (type, app, metaData) => {
  const query = { 'trigger.type': type };
  if (app) {
    query['trigger.app'] = app;
  }
  if (metaData) {
    query['trigger.metaData'] = metaData;
  }
  return query;
};

module.exports.triggerHandler = async (event) => {
  // Validate environment variables
  if (!MONGODB_URI || !MONGODB_DB || !OUTPUT_QUEUE_URL) {
    console.error('Missing environment variables');
    return {
      statusCode: 500,
      body: JSON.stringify({
        message: 'Internal Server Error',
      }),
    };
  }

  let db;
  try {
    db = await connectToDatabase(MONGODB_URI);
  } catch (error) {
    console.error('Error connecting to MongoDB:', error);
    return {
      statusCode: 500,
      body: JSON.stringify({
        message: 'Error connecting to MongoDB',
      }),
    };
  }

  for (const record of event.Records) {
    const { body } = record;
    console.log(`Received message: ${body}`);

    let parsedBody;
    try {
      parsedBody = JSON.parse(body);
    } catch (error) {
      console.error('Error parsing message body:', error);
      continue; // Skip to the next message
    }

    const { type, app, metaData, data } = parsedBody;

    let result;
    try {
      const collection = db.collection('workflows');
      const query = buildQuery(type, app, metaData);
      result = await collection.find(query).toArray();
    } catch (error) {
      console.error('Error querying MongoDB:', error);
      continue; // Skip to the next message
    }

    for (const workflow of result) {
      const MessageBody = {
        actions: workflow.actions,
        data: data,
      };
      console.log('Message to send:', MessageBody);

      const params = {
        QueueUrl: OUTPUT_QUEUE_URL,
        MessageBody: JSON.stringify(MessageBody),
      };

      try {
        await sqs.sendMessage(params).promise();
        console.log('Message sent to output queue');
      } catch (error) {
        console.error('Error sending message to output queue:', error);
      }
    }
  }

  return {
    statusCode: 200,
    body: JSON.stringify({
      message: 'Messages processed and sent successfully',
    }),
  };
};
Enter fullscreen mode Exit fullscreen mode
  • ActionsQueue Handler: This handler pulls messages from the actions queue and calls the respective APIs to perform the workflow actions.
'use strict';
const axios = require('axios');


const API_SERVER_URL = process.env.API_SERVER_URL;


const handleMessage = async (type, data) => {
  try {
    switch (type) {
      case 'send_message':
        const messageBody = {
          from: 'workflow',
          message: JSON.stringify(data),
        };
        await axios.post(`${API_SERVER_URL}/api/v1/send_message`, messageBody);
        console.log('Message sent successfully');
        break;
      default:
        console.log('Unknown type:', type);
    }
  } catch (error) {
    console.error('Error handling message:', error);
  }
};

const performAction = async (action, data) => {
  const { type, app } = action;
  switch (app) {
    case 'messaging':
      await handleMessage(type, data);
      break;
    default:
      console.log('Unknown app:', app);
  }
};

module.exports.actionsHandler = async (event) => {
  for (const record of event.Records) {
    const { body } = record;

    let parsedBody;
    try {
      parsedBody = JSON.parse(body);
    } catch (error) {
      console.error('Error parsing message body:', error);
      continue; // Skip to the next record
    }

    const { actions, data } = parsedBody;
    for (const action of actions) {
      console.log('Performing action:', action, data);
      try {
        await performAction(action, data);
        console.log('Action performed successfully');
      } catch (error) {
        console.error('Error performing action:', error);
      }
    }
  }

  return {
    statusCode: 200,
    body: JSON.stringify({
      message: 'Messages processed and sent successfully',
    }),
  };
};
Enter fullscreen mode Exit fullscreen mode

4. Deploy the Serverless Service: The serverless.yml configuration file outlines the setup for deploying a serverless application using the Serverless Framework on AWS.

Here's a breakdown in simple terms:

  1. Basic Information:
  • Organization: bhaskarsawant

  • Application: trigger

  • Service: trigger

  1. Plugins:
  • serverless-offline: Allows running the service locally for development and testing.
  1. Provider Configuration:
  • Provider: AWS

  • Runtime: Node.js version 20.x

  • Region: ap-south-1 (AWS region)

  1. IAM Role Statements
  • Permissions: The service is allowed to:

  • Receive messages from SQS

  • Delete messages from SQS

  • Get SQS queue attributes

  • Send messages to SQS

  1. Resources:
  • TriggerQueue ARN (Amazon Resource Name)

  • ActionsQueue ARN

  1. Environment Variables:
  • OUTPUT_QUEUE_URL: Reference to the ActionsQueue

  • MONGODB_URI: MongoDB connection URL (from environment variables)

  • MONGODB_DB: MongoDB database name (from environment variables)

  • API_SERVER_URL: API server URL (from environment variables)

  1. Functions:
  • sqsHandler:

  • Handler: handler.triggerHandler

  • Trigger: SQS queue TriggerQueue with a batch size of 1

  • actionsHandler:

  • Handler: actionsHandler.actionsHandler

  • Trigger: SQS queue ActionsQueue with a batch size of 1

  1. Resources
  • TriggerQueue:

  • Type: AWS SQS Queue

  • Name: trigger_queue

  • ActionsQueue:

  • Type: AWS SQS Queue

  • Name: actions_queue

org: bhaskarsawant
app: trigger
service: trigger

plugins:
  - serverless-offline

provider:
  name: aws
  runtime: nodejs20.x
  region: ap-south-1
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - "sqs:ReceiveMessage"
        - "sqs:DeleteMessage"
        - "sqs:GetQueueAttributes"
        - "sqs:SendMessage"
      Resource:
        - !GetAtt TriggerQueue.Arn
        - !GetAtt ActionsQueue.Arn
  environment:
    OUTPUT_QUEUE_URL:
      Ref: ActionsQueue
    MONGODB_URI: ${env:DB_URL}
    MONGODB_DB: ${env:DB_NAME}
    API_SERVER_URL: ${env:API_URL}

functions:
  sqsHandler:
    handler: handler.triggerHandler
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - TriggerQueue
              - Arn
          batchSize: 1
  actionsHandler:
    handler: actionsHandler.actionsHandler
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - ActionsQueue
              - Arn
          batchSize: 1

resources:
  Resources:
    TriggerQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: "trigger_queue"
    ActionsQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: "actions_queue"
Enter fullscreen mode Exit fullscreen mode

This configuration sets up two AWS Lambda functions that are triggered by messages from two SQS queues (trigger_queue and actions_queue). It also defines necessary permissions and environment variables.

Now to deploy the service on AWS, enter the following command in your terminal.

serverless deploy --region ap-south-1
Enter fullscreen mode Exit fullscreen mode

Image description

Integrate with AWS SQS.

You can find the source code for the project here.

In this section, we will integrate our API with AWS SQS by creating a helper file for AWS configuration. This file will handle the setup for connecting to AWS SQS.

What is AWS SQS?

AWS Simple Queue Service (SQS) is a fully managed message queuing service that allows you to decouple and scale microservices, distributed systems, and serverless applications. It enables you to send, store, and receive messages between software components.

Steps to Integrate AWS SQS

Create AWS Helper File

We need to create a new file for AWS configuration. This helper file will contain the configuration for AWS SQS.

  1. Create the file: In your project directory, create a new file named awsHelper.ts.

  2. Add AWS SQS Configuration: Add the following code to awsHelper.ts to configure AWS SQS:

// Import AWS SDK
import AWS from 'aws-sdk';

// Configure AWS SQS
export const sqs = new AWS.SQS({
  accessKeyId: process.env.AWS_ACCESS_KEY_ID,
  secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
  region: process.env.AWS_REGION,
  apiVersion: process.env.AWS_API_VERSION,
});
Enter fullscreen mode Exit fullscreen mode

You can find these credentials in the Configure AWS CLI section.

Use the AWS Helper in Your Handlers

In your handlers where you need to interact with AWS SQS, import and use the sqs instance from the AWS helper file.

// formController.ts

import { sqs } from './awsHelper';

let params = {
  MessageBody: JSON.stringify(workflowData),
  QueueUrl: process.env.AWS_SQS_URL as string,
};

// Send the message
sqs.sendMessage(params, (err, data) => {
  if (err) {
    console.log('Error', err);
  } else {
    console.log('Success', data.MessageId);
  }
});
Enter fullscreen mode Exit fullscreen mode

After deploying and creating the AWS resources in the Deploy with Serverless Framework section,

  1. Log in to your AWS Management Console.

  2. Navigate to SQS: Search for “SQS” in the search bar and select the SQS service.

  3. Find the TriggerQueue:

  • Look for the TriggerQueue in the list of queues.

  • Click on the TriggerQueue to view its details.

  • Find and copy the URL of the TriggerQueue.

  1. Update Environment Configuration:
  • Update your environment configuration with the URL of the TriggerQueue you just copied as AWS_SQS_URL.

  • This URL will be used in your application to send messages to the queue.

  1. Test the Integration:
  • Test by trying to send a message to the queue by hitting your API endpoint using Postman.

Image description

Deploy API Server on AWS EC2.

You can find the source code for the project here.

After thoroughly testing the API server and lambda functions, the next step is to deploy your API server on an AWS EC2 instance. Follow these steps to ensure a smooth deployment process.

  1. Launch an EC2 Instance
  • Log in to the AWS Management Console.

  • Navigate to the EC2 dashboard and click on “Launch Instance.”

  • Give your instance a name and select the operating system (Ubuntu).

  • Choose an instance type, such as t2.micro (free tier) or t2.small.

  • Create a new key pair, which you’ll use to access the instance.

  • Under network settings, select to allow traffic from HTTPS and HTTP.

  • Review and launch the instance.

  1. Expose the port 8080
  • On the EC2 dashboard, select your instance.

  • Click on the security group and edit the inbound rules and add a rule to allow traffic on port 8080.

Image description

Image description

  1. Connect to Your EC2 Instance
  • Open your terminal and set the correct permissions for your SSH key:
chmod 700 Your_SSH_Key.pem
Enter fullscreen mode Exit fullscreen mode
  • SSH into the machine using the EC2 instance URL:
ssh -i Your_SSH_Key.pem ubuntu@your-ec2-instance-url
Enter fullscreen mode Exit fullscreen mode
  1. Setup the project
  • Clone the Git repository:
git clone https://github.com/bhaskarcsawant/hubspot_workflows
Enter fullscreen mode Exit fullscreen mode
  • Install Node.js:
// install nvm
curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.32.0/install.sh | bash

// activate nvm
. ~/.nvm/nvm.sh

// install nodejs
nvm install 20
Enter fullscreen mode Exit fullscreen mode
  • Install project dependencies:
cd your-project

npm install
Enter fullscreen mode Exit fullscreen mode
  • Add environment variables:
// create a .env file
vim .env

// add following variables to the .env file and save and exit
MONGODB_URI=your_mongodb_uri
AWS_SQS_URL=your_trigger_sqs_queue_url
AWS_ACCESS_KEY_ID=your_aws_access_key_id
AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key
AWS_REGION=your_aws_region
AWS_API_VERSION=latest
Enter fullscreen mode Exit fullscreen mode
  • Start the server:
npm start
Enter fullscreen mode Exit fullscreen mode
  • Visit your server in a browser at:
http://your_ec2_domain:8080 

// Eg. http://ec2-54-81-243-55.compute-1.amazonaws.com:8080/
Enter fullscreen mode Exit fullscreen mode
  • After successfully setting up the server update the API URL in the lambda function’s environment variable and deploy the updated serverless service using the following command in your terminal.
serverless deploy --region ap-south-1
Enter fullscreen mode Exit fullscreen mode
  1. Test Your Workflow
  • Create a form submission using Postman.

  • Verify that a message containing your form submission is created inside the messages MongoDB collection.

Let’s see this in action. 😄

This is one possible architecture that allows you to build an automated workflow system. Using this architecture, you can create different types of triggers and actions, so that with a good frontend, your users can set up automated workflows without any coding.

Image description

Scalability.

Given the architecture is scalable, several improvements and additions can enhance its efficiency and performance. Let’s explore these enhancements and how to implement them effectively.

Connection Pooling

  • Problem: In the triggerHandler Lambda function, each invocation creates a new database connection, which can overwhelm the database at scale.

  • Solution:

  • Using EC2 Instances as Queue Workers: Instead of using Lambda functions, you can set up an EC2 instance as a queue worker. This allows you to use the same database connection across multiple requests, which can be scaled horizontally.

  • Using BullMQ: Another approach is using a service like BullMQ, which provides queue workers out of the box and processes queue messages asynchronously.

Autoscaling groups

  • Problem: As the load increases, manual scaling can be inefficient and may lead to downtime.

  • Solution: Implement autoscaling groups to automatically adjust the number of EC2 instances based on demand. This ensures that your application can handle varying loads efficiently.

Additional Points for Scalability

  1. Load Balancer: Use an Elastic Load Balancer (ELB) to distribute incoming traffic across multiple EC2 instances. This ensures high availability and fault tolerance.

  2. Caching: Implement caching mechanisms (like Redis or Memcached) to store frequently accessed data. This reduces the load on your database and improves response times.

  3. Database Scaling: Use Amazon RDS with read replicas to distribute the read load across multiple instances.

  4. Monitoring and Logging: Implement robust monitoring and logging using AWS CloudWatch or NewRelic. This helps you track the performance of your application and quickly identify and resolve issues.

If you find other ways to enhance scalability or encounter any issues during implementation, please feel free to comment or reach out to me. Your feedback will help everyone and is greatly appreciated. I’m always open to suggestions and eager to help you! Additionally, feel free to contribute to the codebase regarding any enhancements. Your contributions are valuable and welcomed! 😊

Follow Me for More Insights

If you found this guide helpful and want to learn more about web development, Software Engineering, and other exciting tech topics, follow me here on Medium. I regularly share tutorials, insights, and tips to help you become a better developer. Let’s connect and grow together in this journey of continuous learning!

Connect with Me

Stay tuned for more articles, and feel free to reach out if you have any questions or suggestions for future topics.

Cheers, thanks for reading! 😊

Happy coding! 🚀

This article was originally published on Medium. It is also available on Hashnode and Dev.to .

Top comments (0)