Creating automated workflows similar to HubSpot’s powerful system involves leveraging advanced cloud technologies. This guide will walk you through the process of developing such workflows using AWS Lambda, Amazon SQS, Node.js, and MongoDB.
Understanding Automated Workflows
Automated workflows are sequences of actions triggered by specific events. They streamline repetitive tasks, send notifications, update records, and more, all without manual intervention. By integrating AWS Lambda, SQS, Node.js, and MongoDB, you can build robust workflows. This tutorial provides a step-by-step approach to creating these workflows.
What You Will Learn
Automated Workflow Architecture: Gain insights into the inner workings of automated workflows.
Understanding API Server: Learn how workflows operate through code examples and explanations.
Configure AWS CLI: Install and configure the AWS CLI to manage AWS resources from the command line.
Deploy with Serverless Framework: Create and deploy AWS Lambda functions and other resources using the Serverless Framework.
Integrate API with AWS SQS: Send messages to AWS SQS from your API server for reliable workflows.
Deploy API Server on AWS EC2: Deploy your Node.js API server to AWS EC2 for high availability and performance.
Scalability: Understand how to scale your workflows and API server to handle increased traffic and demand.
Prerequisites
An AWS account.
Basic knowledge of Node.js and MongoDB.
Access to a MongoDB Atlas account or a local MongoDB setup.
Automated Workflow Architecture.
Let’s illustrate this with an example where a workflow sends a message to a user upon a form submission. Here’s the step-by-step process:
Trigger Event: When a form submission occurs, it acts as a trigger.
Send Message to SQS: Add a piece of code in the form submission function to push a message to an SQS queue named
TriggerQueue
on every new form submission.Invoke Lambda Function:
TriggerQueue
triggers a Lambda function upon receiving a message.Check for Workflow in DB: The Lambda function checks if any workflow exists in the database that matches the configuration provided in the SQS message.
Fetch Workflow Actions: If a matching workflow is found, it fetches the set of actions to perform with the provided data.
Send Actions to Second SQS Queue: The Lambda function sends the actions to another SQS queue named
ActionsQueue
along with the trigger data.Invoke Second Lambda Function:
ActionsQueue
triggers a second Lambda function upon receiving a message.Perform Actions: The second Lambda function takes the action data and calls the respective APIs to perform the actions, such as sending a message to the user.
This setup ensures that your workflows are reliable and can handle various tasks efficiently.
Understanding API Server.
You can find the source code for the project here.
Now, let’s dive into the Model and Controller to understand how the API server functions.
Workflow Model
In a workflow, we have one Trigger and one or more Actions. Below is the schema definition for the workflow and trigger
// models/workflow.ts
const triggerSchema = new Schema<Trigger>({
type: { type: String, required: true },
app: { type: String, required: true },
metaData: { type: String , required: false },
data: { type: Schema.Types.Mixed, required: false } // Mixed type for dynamic data
});
const workflowSchema = new Schema<Workflow>({
workflowName: { type: String, required: true },
trigger: { type: triggerSchema, required: true },
actions: { type: [triggerSchema], required: true },
created_at: { type: Date, default: Date.now },
updated_at: { type: Date, default: Date.now },
});
// Create and export the model
const WorkflowModel = mongoose.model<Workflow>('Workflow', workflowSchema);
export default WorkflowModel;
In this model:
Trigger: Contains properties such as
type
,app
, optionalmetaData
, anddata
.Workflow: Extends the Mongoose
Document
interface, including properties likeworkflowName
,trigger
,actions
,created_at
, andupdated_at
.
For example, if our trigger is a form submission, the app
will be 'forms'
and the type
will be 'new_submission'
. The metaData
helps to identify specific workflows, like so:
{
"app": "forms",
"type": "new_submission",
"metadata":"{_id:someUniqueId}"
}
Creating a Form Submission
Here’s how you can handle a form submission and trigger workflows:
export const createFormSubmission = async (req: Request<{}, {}, FormSubmissionBody>, res: Response): Promise<void> => {
try {
const { formId, customerName, customerEmail, customerPhone } = req.body;
const newForm = new FormSubmission({ formId, customerName, customerEmail, customerPhone });
await newForm.save();
// Prepare the data to be sent to the workflow
let workflowData = {
app: 'forms',
type: 'new_submission',
metaData: JSON.stringify({ _id: formId }),
data: newForm,
}
let params: params = {
MessageBody: JSON.stringify(workflowData),
QueueUrl: process.env.AWS_SQS_URL as string,
};
// Send the message
sqs.sendMessage(params, (err, data) => {
if (err) {
console.log('Error', err);
} else {
console.log('Success', data.MessageId);
}
});
res.status(201).json(newForm);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
};
In this controller:
A new form submission is created and saved to the database.
Workflow data is prepared, including the form submission details.
The workflow data is sent to an AWS SQS queue.
Any workflows matching the trigger configuration (e.g.,
app: 'forms', type: 'new_submission', metaData: { _id: someUniqueId}
) will be identified and the corresponding actions will be sent to another queue to execute.
Configure AWS CLI.
To use the Serverless Framework, we first need to configure the AWS CLI. Follow these steps to create an IAM user and configure the AWS CLI.
Create an IAM User
Sign in to AWS Console: Log in to your AWS Management Console.
Search for IAM: In the search bar, type “IAM” and select the IAM service from the results.
Navigate to Users: Click on the “Users” option in the IAM dashboard.
Create a New User:
Click on “Add User.”
Enter a username.
Click “Next.”
- Set Permissions:
Select “Attach policies directly.”
In the “Permission policies” section, select “AdministratorAccess.”
Click “Next” and then “Create User.”
- Create Access Key:
Click on the newly created user.
Go to the “Security credentials” tab.
Click on “Create access key.”
Select “Command Line Interface (CLI)” as the use case.
Check the acknowledgment checkbox.
Click “Next.”
- Add Description and Create Key:
Add a description if desired.
Click “Create Access Key.”
- Store Access Keys:
Copy the Access Key ID and Secret Access Key.
Store them securely, as the Secret Access Key will not be shown again.
Configure AWS CLI
Install AWS CLI: If you haven’t already, download and install AWS CLI or
brew install awscli
.Open Terminal/Command Prompt: Open your terminal (Linux/Mac) or command prompt (Windows).
Run AWS Configure Command:
aws configure
4. Enter Credentials: You’ll be prompted to enter your credentials.
AWS Access Key ID: Enter your Access Key ID.
AWS Secret Access Key: Enter your Secret Access Key.
Default region name: Enter your preferred AWS region (e.g.,
us-east-1
).Default output format: Enter your preferred output format (e.g.,
json
).
5. Verify Configuration:
- Run the following command to verify the configuration:
aws sts get-caller-identity
This should return details about your IAM user, confirming that the AWS CLI is configured correctly.
Deploy with Serverless Framework.
You can find the source code for the project here.
What is the Serverless Framework?
The Serverless Framework is an open-source tool that simplifies the process of building and deploying serverless applications. It abstracts away the complexities of cloud infrastructure, enabling developers to focus on writing application code. Supporting various cloud providers such as AWS, Azure, and Google Cloud, the Serverless Framework offers a consistent experience for deploying functions, managing resources, and scaling applications effortlessly.
In this guide, we will develop Lambda functions and deploy resources using the Serverless Framework. We will cover topics such as database calls from Lambda, using MongoDB, and pushing messages from one Lambda queue worker to another queue.
- Install Serverless Framework: First, install the Serverless Framework globally on your machine.
npm install -g serverless
2. Create a serverless service: Navigate to your project directory and create a new Serverless service.
Enter
serverless
command in your terminal, and selectAWS / Node.js / Simple Function
option.Now name your project, select the Create A New App option and give an app name, and this will create a serverless service.
3. Create Workers/handlers
- TriggerQueue Handler: This handler will pull messages from the trigger queue, find workflows with the same trigger configuration provided in the message, and push the respective set of actions into the actions queue along with the trigger data.
'use strict';
const { MongoClient } = require('mongodb');
const AWS = require('aws-sdk');
const sqs = new AWS.SQS();
const MONGODB_URI = process.env.MONGODB_URI;
const MONGODB_DB = process.env.MONGODB_DB;
const OUTPUT_QUEUE_URL = process.env.OUTPUT_QUEUE_URL;
let cachedDb = null;
async function connectToDatabase(uri) {
if (cachedDb) {
return cachedDb;
}
const client = await MongoClient.connect(uri, {
useNewUrlParser: true,
useUnifiedTopology: true,
});
const db = client.db(MONGODB_DB);
console.log('Connected to MongoDB');
cachedDb = db;
return db;
}
const buildQuery = (type, app, metaData) => {
const query = { 'trigger.type': type };
if (app) {
query['trigger.app'] = app;
}
if (metaData) {
query['trigger.metaData'] = metaData;
}
return query;
};
module.exports.triggerHandler = async (event) => {
// Validate environment variables
if (!MONGODB_URI || !MONGODB_DB || !OUTPUT_QUEUE_URL) {
console.error('Missing environment variables');
return {
statusCode: 500,
body: JSON.stringify({
message: 'Internal Server Error',
}),
};
}
let db;
try {
db = await connectToDatabase(MONGODB_URI);
} catch (error) {
console.error('Error connecting to MongoDB:', error);
return {
statusCode: 500,
body: JSON.stringify({
message: 'Error connecting to MongoDB',
}),
};
}
for (const record of event.Records) {
const { body } = record;
console.log(`Received message: ${body}`);
let parsedBody;
try {
parsedBody = JSON.parse(body);
} catch (error) {
console.error('Error parsing message body:', error);
continue; // Skip to the next message
}
const { type, app, metaData, data } = parsedBody;
let result;
try {
const collection = db.collection('workflows');
const query = buildQuery(type, app, metaData);
result = await collection.find(query).toArray();
} catch (error) {
console.error('Error querying MongoDB:', error);
continue; // Skip to the next message
}
for (const workflow of result) {
const MessageBody = {
actions: workflow.actions,
data: data,
};
console.log('Message to send:', MessageBody);
const params = {
QueueUrl: OUTPUT_QUEUE_URL,
MessageBody: JSON.stringify(MessageBody),
};
try {
await sqs.sendMessage(params).promise();
console.log('Message sent to output queue');
} catch (error) {
console.error('Error sending message to output queue:', error);
}
}
}
return {
statusCode: 200,
body: JSON.stringify({
message: 'Messages processed and sent successfully',
}),
};
};
- ActionsQueue Handler: This handler pulls messages from the actions queue and calls the respective APIs to perform the workflow actions.
'use strict';
const axios = require('axios');
const API_SERVER_URL = process.env.API_SERVER_URL;
const handleMessage = async (type, data) => {
try {
switch (type) {
case 'send_message':
const messageBody = {
from: 'workflow',
message: JSON.stringify(data),
};
await axios.post(`${API_SERVER_URL}/api/v1/send_message`, messageBody);
console.log('Message sent successfully');
break;
default:
console.log('Unknown type:', type);
}
} catch (error) {
console.error('Error handling message:', error);
}
};
const performAction = async (action, data) => {
const { type, app } = action;
switch (app) {
case 'messaging':
await handleMessage(type, data);
break;
default:
console.log('Unknown app:', app);
}
};
module.exports.actionsHandler = async (event) => {
for (const record of event.Records) {
const { body } = record;
let parsedBody;
try {
parsedBody = JSON.parse(body);
} catch (error) {
console.error('Error parsing message body:', error);
continue; // Skip to the next record
}
const { actions, data } = parsedBody;
for (const action of actions) {
console.log('Performing action:', action, data);
try {
await performAction(action, data);
console.log('Action performed successfully');
} catch (error) {
console.error('Error performing action:', error);
}
}
}
return {
statusCode: 200,
body: JSON.stringify({
message: 'Messages processed and sent successfully',
}),
};
};
4. Deploy the Serverless Service: The serverless.yml
configuration file outlines the setup for deploying a serverless application using the Serverless Framework on AWS.
Here's a breakdown in simple terms:
- Basic Information:
Organization:
bhaskarsawant
Application:
trigger
Service:
trigger
- Plugins:
- serverless-offline: Allows running the service locally for development and testing.
- Provider Configuration:
Provider: AWS
Runtime: Node.js version 20.x
Region: ap-south-1 (AWS region)
- IAM Role Statements
Permissions: The service is allowed to:
Receive messages from SQS
Delete messages from SQS
Get SQS queue attributes
Send messages to SQS
- Resources:
TriggerQueue
ARN (Amazon Resource Name)ActionsQueue
ARN
- Environment Variables:
OUTPUT_QUEUE_URL: Reference to the
ActionsQueue
MONGODB_URI: MongoDB connection URL (from environment variables)
MONGODB_DB: MongoDB database name (from environment variables)
API_SERVER_URL: API server URL (from environment variables)
- Functions:
sqsHandler:
Handler:
handler.triggerHandler
Trigger: SQS queue
TriggerQueue
with a batch size of 1actionsHandler:
Handler:
actionsHandler.actionsHandler
Trigger: SQS queue
ActionsQueue
with a batch size of 1
- Resources
TriggerQueue:
Type: AWS SQS Queue
Name:
trigger_queue
ActionsQueue:
Type: AWS SQS Queue
Name:
actions_queue
org: bhaskarsawant
app: trigger
service: trigger
plugins:
- serverless-offline
provider:
name: aws
runtime: nodejs20.x
region: ap-south-1
iamRoleStatements:
- Effect: "Allow"
Action:
- "sqs:ReceiveMessage"
- "sqs:DeleteMessage"
- "sqs:GetQueueAttributes"
- "sqs:SendMessage"
Resource:
- !GetAtt TriggerQueue.Arn
- !GetAtt ActionsQueue.Arn
environment:
OUTPUT_QUEUE_URL:
Ref: ActionsQueue
MONGODB_URI: ${env:DB_URL}
MONGODB_DB: ${env:DB_NAME}
API_SERVER_URL: ${env:API_URL}
functions:
sqsHandler:
handler: handler.triggerHandler
events:
- sqs:
arn:
Fn::GetAtt:
- TriggerQueue
- Arn
batchSize: 1
actionsHandler:
handler: actionsHandler.actionsHandler
events:
- sqs:
arn:
Fn::GetAtt:
- ActionsQueue
- Arn
batchSize: 1
resources:
Resources:
TriggerQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "trigger_queue"
ActionsQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "actions_queue"
This configuration sets up two AWS Lambda functions that are triggered by messages from two SQS queues (trigger_queue
and actions_queue
). It also defines necessary permissions and environment variables.
Now to deploy the service on AWS, enter the following command in your terminal.
serverless deploy --region ap-south-1
Integrate with AWS SQS.
You can find the source code for the project here.
In this section, we will integrate our API with AWS SQS by creating a helper file for AWS configuration. This file will handle the setup for connecting to AWS SQS.
What is AWS SQS?
AWS Simple Queue Service (SQS) is a fully managed message queuing service that allows you to decouple and scale microservices, distributed systems, and serverless applications. It enables you to send, store, and receive messages between software components.
Steps to Integrate AWS SQS
Create AWS Helper File
We need to create a new file for AWS configuration. This helper file will contain the configuration for AWS SQS.
Create the file: In your project directory, create a new file named
awsHelper.ts
.Add AWS SQS Configuration: Add the following code to
awsHelper.ts
to configure AWS SQS:
// Import AWS SDK
import AWS from 'aws-sdk';
// Configure AWS SQS
export const sqs = new AWS.SQS({
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
region: process.env.AWS_REGION,
apiVersion: process.env.AWS_API_VERSION,
});
You can find these credentials in the Configure AWS CLI section.
Use the AWS Helper in Your Handlers
In your handlers where you need to interact with AWS SQS, import and use the sqs
instance from the AWS helper file.
// formController.ts
import { sqs } from './awsHelper';
let params = {
MessageBody: JSON.stringify(workflowData),
QueueUrl: process.env.AWS_SQS_URL as string,
};
// Send the message
sqs.sendMessage(params, (err, data) => {
if (err) {
console.log('Error', err);
} else {
console.log('Success', data.MessageId);
}
});
After deploying and creating the AWS resources in the Deploy with Serverless Framework section,
Log in to your AWS Management Console.
Navigate to SQS: Search for “SQS” in the search bar and select the SQS service.
Find the TriggerQueue:
Look for the
TriggerQueue
in the list of queues.Click on the
TriggerQueue
to view its details.Find and copy the URL of the
TriggerQueue
.
- Update Environment Configuration:
Update your environment configuration with the URL of the
TriggerQueue
you just copied asAWS_SQS_URL
.This URL will be used in your application to send messages to the queue.
- Test the Integration:
- Test by trying to send a message to the queue by hitting your API endpoint using Postman.
Deploy API Server on AWS EC2.
You can find the source code for the project here.
After thoroughly testing the API server and lambda functions, the next step is to deploy your API server on an AWS EC2 instance. Follow these steps to ensure a smooth deployment process.
- Launch an EC2 Instance
Log in to the AWS Management Console.
Navigate to the EC2 dashboard and click on “Launch Instance.”
Give your instance a name and select the operating system (Ubuntu).
Choose an instance type, such as t2.micro (free tier) or t2.small.
Create a new key pair, which you’ll use to access the instance.
Under network settings, select to allow traffic from HTTPS and HTTP.
Review and launch the instance.
- Expose the port 8080
On the EC2 dashboard, select your instance.
Click on the security group and edit the inbound rules and add a rule to allow traffic on port 8080.
- Connect to Your EC2 Instance
- Open your terminal and set the correct permissions for your SSH key:
chmod 700 Your_SSH_Key.pem
- SSH into the machine using the EC2 instance URL:
ssh -i Your_SSH_Key.pem ubuntu@your-ec2-instance-url
- Setup the project
- Clone the Git repository:
git clone https://github.com/bhaskarcsawant/hubspot_workflows
- Install Node.js:
// install nvm
curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.32.0/install.sh | bash
// activate nvm
. ~/.nvm/nvm.sh
// install nodejs
nvm install 20
- Install project dependencies:
cd your-project
npm install
- Add environment variables:
// create a .env file
vim .env
// add following variables to the .env file and save and exit
MONGODB_URI=your_mongodb_uri
AWS_SQS_URL=your_trigger_sqs_queue_url
AWS_ACCESS_KEY_ID=your_aws_access_key_id
AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key
AWS_REGION=your_aws_region
AWS_API_VERSION=latest
- Start the server:
npm start
- Visit your server in a browser at:
http://your_ec2_domain:8080
// Eg. http://ec2-54-81-243-55.compute-1.amazonaws.com:8080/
- After successfully setting up the server update the API URL in the lambda function’s environment variable and deploy the updated serverless service using the following command in your terminal.
serverless deploy --region ap-south-1
- Test Your Workflow
Create a form submission using Postman.
Verify that a message containing your form submission is created inside the
messages
MongoDB collection.
Let’s see this in action. 😄
This is one possible architecture that allows you to build an automated workflow system. Using this architecture, you can create different types of triggers and actions, so that with a good frontend, your users can set up automated workflows without any coding.
Scalability.
Given the architecture is scalable, several improvements and additions can enhance its efficiency and performance. Let’s explore these enhancements and how to implement them effectively.
Connection Pooling
Problem: In the
triggerHandler
Lambda function, each invocation creates a new database connection, which can overwhelm the database at scale.Solution:
Using EC2 Instances as Queue Workers: Instead of using Lambda functions, you can set up an EC2 instance as a queue worker. This allows you to use the same database connection across multiple requests, which can be scaled horizontally.
Using BullMQ: Another approach is using a service like BullMQ, which provides queue workers out of the box and processes queue messages asynchronously.
Autoscaling groups
Problem: As the load increases, manual scaling can be inefficient and may lead to downtime.
Solution: Implement autoscaling groups to automatically adjust the number of EC2 instances based on demand. This ensures that your application can handle varying loads efficiently.
Additional Points for Scalability
Load Balancer: Use an Elastic Load Balancer (ELB) to distribute incoming traffic across multiple EC2 instances. This ensures high availability and fault tolerance.
Caching: Implement caching mechanisms (like Redis or Memcached) to store frequently accessed data. This reduces the load on your database and improves response times.
Database Scaling: Use Amazon RDS with read replicas to distribute the read load across multiple instances.
Monitoring and Logging: Implement robust monitoring and logging using AWS CloudWatch or NewRelic. This helps you track the performance of your application and quickly identify and resolve issues.
If you find other ways to enhance scalability or encounter any issues during implementation, please feel free to comment or reach out to me. Your feedback will help everyone and is greatly appreciated. I’m always open to suggestions and eager to help you! Additionally, feel free to contribute to the codebase regarding any enhancements. Your contributions are valuable and welcomed! 😊
Follow Me for More Insights
If you found this guide helpful and want to learn more about web development, Software Engineering, and other exciting tech topics, follow me here on Medium. I regularly share tutorials, insights, and tips to help you become a better developer. Let’s connect and grow together in this journey of continuous learning!
Connect with Me
Twitter: https://twitter.com/Bhaskar_Sawant_
Stay tuned for more articles, and feel free to reach out if you have any questions or suggestions for future topics.
Cheers, thanks for reading! 😊
Happy coding! 🚀
This article was originally published on Medium. It is also available on Hashnode and Dev.to .
Top comments (0)