This Post is about creating an AWS Step Functions Workflow on AWS to process an ETL Job on daily COVID-19 count and deploying the infrastructure with Terraform
Contents
- Project Overview
- Architecture
- Step Functions Workflow
- What is AWS Step Functions?
- Creating Step Functions Workflow
- Deploying Infrastructure with Terraform
- Conclusion
1. Project Overview
This project is Second part in the series #CloudGuruChallenge – Event-Driven Python on AWS. Here we deploy an AWS Step Functions Workflow along with various other components required for error handling. The AWS Step Functions Workflow will process an ETL job on daily COVID-19 count which will be demonstrated in the next/final part of this series. Note:- Appropriate permissions must be configured in IAM for the below code to work.
To automate the process Terraform is used for IaC (Infrastructure as Code) and AWS CodePipeline is used for CI/CD. The details about setting up Terraform and CodePipeline has been discussed in detail in Part-1 of the series.
2. Architecture
The above architecture works as follows:
- An event bridge rule triggers the AWS Step Functions Workflow once daily at 1pm.
- The Step Functions Workflow upon success sends an Email to the owner through Amazon SNS.
- Upon Failure the another event is triggered by the event bridge which takes the failed event and sends it to SNS, SQS and cloudwatch logs. A notification of this is sent to the owner by Email through SNS.
- An event bridge rule triggers a lambda function once daily at 7am. This function retrieves the failed events from SQS. It then triggers the step functions workflow with an input containing all the failed dates.
3. Step Functions Workflow
What is AWS Step Functions?
It is a serverless orchestration service where different serverless services can be made to interact based on events while exchanging data in JSON format.
Creating Step Functions Workflow
The above workflow works as follows:
- Each rectangular block represents a lambda function task except for CheckETLStatus and Notify which are Choice state and SNS task respectively.
- The workflow starts with retrieving the ETL status which can be either initial load or update.
- If it is initial load, all the data is retrieved as a batch from the source.
- If it is update, only a row of data is retrieved from the source (which is the case count for the current day).
- The retrieved data is transformed and loaded through the transform and load tasks respectively.
- Finally an Email is sent to the owner notifying him about the success of the ETL job.
To Create the above workflow the code is given below.
{
"StartAt": "GetETLStatus",
"States": {
"GetETLStatus": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "CheckETLStatus",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"CheckETLStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.result.status",
"StringEquals": "InitialLoad",
"Next": "InitialLoad"
},
{
"Variable": "$.result.status",
"StringEquals": "Update",
"Next": "Update"
}
],
"Default": "InitialLoad"
},
"InitialLoad": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "Transform",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"Update": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "Transform",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"Transform": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "Load",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"Load": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"TimeoutSeconds": 3,
"ResultPath": "$.result",
"Next": "Notify"
},
"Notify": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "YOUR-TOPIC-ARN",
"Message.$": "$"
},
"End": true
}
}
}
4. Deploying Infrastructure with Terraform
The code to deploy the above infrastructure with terraform is shown below. For more details about how to set up terraform and code pipeline visit Part-1 of the series. I have omitted the code for the other two scheduled events as I would be showing them in next part of this series.
state_machine.tf
resource "aws_sfn_state_machine" "sfn_state_machine" {
name = "YOUR-STATE-MACHINE-NAME"
role_arn = "YOUR-ROLE-ARN"
definition = <<EOF
{
"StartAt": "GetETLStatus",
"States": {
"GetETLStatus": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "CheckETLStatus",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"CheckETLStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.result.status",
"StringEquals": "InitialLoad",
"Next": "InitialLoad"
},
{
"Variable": "$.result.status",
"StringEquals": "Update",
"Next": "Update"
}
],
"Default": "InitialLoad"
},
"InitialLoad": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "Transform",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"Update": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "Transform",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"Transform": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"Next": "Load",
"TimeoutSeconds": 3,
"ResultPath": "$.result"
},
"Load": {
"Type": "Task",
"Resource": "YOUR-LAMBDA-ARN",
"TimeoutSeconds": 3,
"ResultPath": "$.result",
"Next": "Notify"
},
"Notify": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "YOUR-SNS-TOPIC-ARN",
"Message.$": "$"
},
"End": true
}
}
}
EOF
depends_on = [
aws_sns_topic.ETLJobStatus,
aws_lambda_function.state_machine_lambdas
]
}
sqs.tf
resource "aws_sqs_queue" "Events_DLQ" {
name = "Events_DLQ_T"
}
sns.tf
resource "aws_sns_topic" "ETLJobStatus" {
name = "ETLJobStatus_T"
}
resource "aws_sns_topic" "ETLErrorMessages" {
name = "ETLErrorMessages_T"
}
resource "aws_sns_topic_subscription" "ETLJobStatus_target" {
topic_arn = aws_sns_topic.ETLJobStatus.arn
protocol = "email"
endpoint = "YOUR-EMAIL-ID"
depends_on = [
aws_sns_topic.ETLJobStatus
]
}
resource "aws_sns_topic_subscription" "ETLErrorMessages_target" {
topic_arn = aws_sns_topic.ETLErrorMessages.arn
protocol = "email"
endpoint = "YOUR-EMAIL-ID"
depends_on = [
aws_sns_topic.ETLErrorMessages
]
}
cloudwatch.tf
#EventBridge Events
resource "aws_cloudwatch_event_rule" "state_machine_events_failed" {
name = "state_machine_events_failed_t"
description = "This event is triggered when the state machine fails."
event_pattern = <<EOF
{
"source": ["aws.states"],
"detail-type": ["Step Functions Execution Status Change"],
"detail": {
"status": ["FAILED"],
"stateMachineArn": ["${aws_sfn_state_machine.sfn_state_machine.arn}"]
}
}
EOF
depends_on = [
aws_sfn_state_machine.sfn_state_machine
]
}
#EventBridge Event Targets
resource "aws_cloudwatch_event_target" "sns" {
rule = aws_cloudwatch_event_rule.state_machine_events_failed.name
target_id = "SendToSNS"
arn = aws_sns_topic.ETLErrorMessages.arn
depends_on = [
aws_cloudwatch_event_rule.state_machine_events_failed,
aws_sns_topic.ETLErrorMessages
]
}
resource "aws_cloudwatch_event_target" "sqs" {
rule = aws_cloudwatch_event_rule.state_machine_events_failed.name
target_id = "SendToSQS"
arn = aws_sqs_queue.Events_DLQ.arn
depends_on = [
aws_cloudwatch_event_rule.state_machine_events_failed,
aws_sqs_queue.Events_DLQ
]
}
resource "aws_cloudwatch_event_target" "cloudwatch_logs" {
rule = aws_cloudwatch_event_rule.state_machine_events_failed.name
target_id = "SendToCloudwatchLogs"
arn = aws_cloudwatch_log_group.log_group.arn
depends_on = [
aws_cloudwatch_event_rule.state_machine_events_failed,
aws_cloudwatch_log_group.log_group
]
}
#Cloudwatch Log Group
resource "aws_cloudwatch_log_group" "log_group" {
name = "state_machine_events_failed_t"
}
5. Conclusion
In this post we have seen how to build step functions workflow for an ETL job and deploy it with terraform. I know that AWS Glue could be a better approach but still I wanted to explore step functions and this challenge was modified to accommodate it. In the next/final part of this series I'll be combining everything to complete the #CloudGuruChallenge – Event-Driven Python on AWS. I'll be performing ETL job on daily COVID-19 cases and display it in AWS Quicksight.
Top comments (0)