๐ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.
If in case missed my previous article, do find it with the below links.
๐ Original previous post at ๐ Dev Post
๐ Reposted previous post at ๐ dev to @aravindvcyber
In this article, let us refactor our previous event rule which targets messages to a queue that triggers lambda directly into a new rule which will invoke a state machine, which will, in turn, invoke our lambda as a step function.
Benefits achieved in this approach ๐ฃโโ๏ธ
There are multiple benefits associated with this approach as follows.
AWS Step Functions lets you coordinate multiple AWS services into serverless workflows so you can build and update apps quickly
We can detach the direct invocation of our lambda by SQS into an indirect invocation via a state machine.
Using state machine, we could do a lot of transformation and conditional checks while we also enjoy the ability to creatively do a lot of orchestration by adding several step functions which could be identified as distinct chunks of steps in our workflow.
Thus eventually refracturing away most of the business flow logic from within the lambda into the statemachine definition and thereby making our lambda/processors more generalized and could be shared among various other tasks as well.
Also statemachine provides a lot of metrics, logs, and visual reference to the actual point of failure, which we may find a bit hard to trace and find inside the traditional monolithic lambda.
New construct for state machine ๐ง
Let us start by creating a new file constructs/sfn-simple.ts
We will start by importing the common modules along with stepfunction
and stepfunctions_tasks
as follows.
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Construct } from "constructs";
We do need other minor imports for this construct, but those are already discussed in our other articles and you should be able to understand them implicitly.
Let us add a new props interface sfnProps
to the required input information from the stack where this construct has been implemented.
export interface sfnProps {
triggerFunction: lambda.Function;
timeout: Duration;
}
In the above block of code, triggerFunction
will be our backend lambda which we would write later here when we are done with the statemachine definition. Timeout
will be to limit the maximum total time taken for statemachine invocation.
Construct skeleton ๐
Let us create a model construct template as shown below.
export class simpleSfnConstruct extends Construct {
public readonly sfnMachine: sfn.StateMachine
constructor(scope: Construct, id: string, props: sfnProps) {
super(scope, id);
}
}
You could also find the read-only object sfnMachine
, which we will use to refer from the stack to the statemachine created from inside the constructor function definition.
const { triggerFunction,timeout } = props;
Usual destructuring of our props inside the constructor.
Lambda payload with taskToken ๐
const sfnTaskPayload = sfn.TaskInput.fromObject({
"MyTaskToken": sfn.JsonPath.taskToken,
"Record": {
"messageId.$": "$.id",
"createdAt.$": "$.time",
"event.$": "States.StringToJson($.detail.message)"
}
});
Here sfnTaskPayload
will define our payload which we would use to pass as a parameter inside our stepfunction which will be used to invoke the lambda and wait for its completion.
Important things to note here will be the below properties.
MyTaskToken
which will get thesfn.JsonPath.taskToken
from the context data, and then the stepfunction will pause and wait. Later we will get the result from the lambda viaSendTaskSuccess
andSendTaskFailure
which help us to generate the output without polling the lambda, again and again, to check for the status and thereby saving us some state transitions using thisMyTaskToken
as reference.In the Record section, you could see that we have offloaded a certain part of the message level data extraction from the event message data from the processor code base to the stepfunction itself compared to our previous article. This involves using the
JSON path syntax
. Such kind of transformation and data extraction before actually invoking the compute help us with fine grain granularity, and control in our workflow logic and will be much useful for the statemachine designer and maintainer.
const recordMsg = new tasks.LambdaInvoke(this, "Record Message", {
lambdaFunction: triggerFunction,
timeout: Duration.minutes(1),
comment: "Record message in dynamo",
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
inputPath: "$",
payload: sfnTaskPayload,
resultSelector: {
"Payload.$": "$",
"StatusCode.$": "$.statusCode"
},
resultPath: "$.recordResult",
});
Final status steps ๐ฒ
This is only a formal success and failure point of reference, which we create in our statemachine to better visualize what has happened during the workflow execution.
const jobFailed = new sfn.Fail(this, "Job Failed", {
comment: "Job Failed"
});
const jobSucceed = new sfn.Succeed(this, "Job Succeed", {
comment: "Job Succeed"
});
Choice step for branching the workflow โ๏ธ
The choice step is used as a visual and functional reference to help make a decision based on the output from the lambda invocation job status from the previous step. Here we do the decision-making by choosing the next step using the data from the previous step's output.
const checkStatus = new sfn.Choice(this, "Check Status?",{
inputPath: "$.recordResult"
})
.when(sfn.Condition.numberEquals("$.StatusCode", 500), jobFailed)
.when(sfn.Condition.numberEquals("$.StatusCode", 200), jobSucceed)
.otherwise(jobFailed);
Stepfunction chaining into statemachine definition ๐
You can see from the below code that statefunction is nothing but a chain of stepfunctions, which we defined earlier. And every stepfunction is connected to the next one as a simple chain, though it could even contain some branching steps like the choice function.
const sfnDef = recordMsg.next(checkStatus);
Statemachine log group ๐ผ
A new log group is created to contain the logs received from the statemachine execution as shown below. This will be used in the statemachine implementation part.
const sfnLog = new LogGroup(this, "sfnLog", {
logGroupName: "sfnLogGroup",
removalPolicy: RemovalPolicy.DESTROY,
retention: RetentionDays.ONE_WEEK
})
Statemachine specification ๐ฉ
With that now, we can define the statemachine properties as shown below. Here it includes the sfnDef
and sfnLog
which is created earlier.
const stateMachine = new sfn.StateMachine(this, "msgStateMachine", {
definition: sfnDef,
timeout: timeout,
logs: {
destination: sfnLog,
includeExecutionData: true,
level: LogLevel.ALL
}
});
Granting invoke lambda to statemachine ๐น
Since lambda is a resource, we have to explicitly grant privilege to the statemachine execution role utilizing adding a new IAM policy statement sfnLambdaInvokePolicy
shown below. This will help in the Record Message
step in the workflow shown above.
const sfnLambdaInvokePolicy = new Policy(this, 'sfnLambdaInvokePolicy');
sfnLambdaInvokePolicy.addStatements(
new PolicyStatement({
actions:[
"lambda:InvokeFunction"
],
effect: Effect.ALLOW,
resources: [`${triggerFunction.functionArn}:$LATEST`],
sid: "sfnLambdaInvokePolicy"
})
)
stateMachine.role.attachInlinePolicy(sfnLambdaInvokePolicy)
Granting lambda execution role for sending status update ๐
Since we are not going to poll the lambda to find the status, again and again, we expect the lambda to callback
the statemachine on the job completion results. We have already sent the token
part of the payload to the lambda, which will then post a message back to the statemachine giving the status as success
or failure
based on the scenario. Till then the statemachine will be paused in its current step.
Please find the privileges in the form of IAM policy statement which is assigned to the processor lambda execution role to help achieve this.
const lambdaSfnStatusUpdatePolicy = new Policy(this, 'lambdaSfnStatusUpdatePolicy');
lambdaSfnStatusUpdatePolicy.addStatements(
new PolicyStatement({
actions:[
"states:SendTaskSuccess",
"states:SendTaskFailure",
],
effect: Effect.ALLOW,
resources: ['*'],
sid: "lambdaSfnStatusUpdatePolicy"
})
)
triggerFunction.role?.attachInlinePolicy(lambdaSfnStatusUpdatePolicy)
Setting the sfnMachine readonly property ๐ฒ
This is required to access the statement object from the stack where it is implemented and use it for further integration.
stateMachine.applyRemovalPolicy(RemovalPolicy.DESTROY);
this.sfnMachine = stateMachine
New lambda to record the message into the dynamodb ๐ท
Let us create a new file under lambda/message-recorder.ts
. In this lambda, we are only going to implement the save to dynamodb on the specified table. Besides logic to send status callback for success or failure scenarios, with the right output message.
import { PutItemInput } from "aws-sdk/clients/dynamodb";
import { DynamoDB,StepFunctions } from "aws-sdk";
const sfn = new StepFunctions({ apiVersion: "2016-11-23" });
exports.processor = async function (event: any) {
const dynamo = new DynamoDB();
let result: any | undefined = undefined;
const msg = event.Record;
const crt_time: number = new Date(msg.createdAt).getTime();
const putData: PutItemInput = {
TableName: process.env.MESSAGES_TABLE_NAME || "",
Item: {
messageId: { S: msg.messageId },
createdAt: { N: `${crt_time}` },
event: { S: JSON.stringify(msg.event) },
},
ReturnConsumedCapacity: "TOTAL",
};
try {
result = await dynamo.putItem(putData).promise();
} catch (err) {
const sendFailure: StepFunctions.SendTaskFailureInput = {
error: JSON.stringify(err),
cause: JSON.stringify({
statusCode: 500,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: msg.messageId,
ProcessorResult: err,
},
}),
taskToken: event.MyTaskToken,
};
await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
if (err) console.log(err, err.stack);
else console.log(data);
});
return sendFailure;
}
const sendSuccess: StepFunctions.SendTaskSuccessInput = {
output: JSON.stringify({
statusCode: 200,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: msg.messageId,
ProcessorResult: result,
},
}),
taskToken: event.MyTaskToken,
};
await sfn
.sendTaskSuccess(sendSuccess, function (err: any, data: any) {
if (err) console.log(err, err.stack);
else console.log(data);
})
.promise();
return sendSuccess;
};
Defining the lambda inside the stack ๐
Here we will be using the above code asset to define the lambda resource inside our CDK stack.
const messageRecorder = new lambda.Function(this, "MessageRecorderHandler", {
runtime: lambda.Runtime.NODEJS_14_X,
code: lambda.Code.fromAsset("lambda"),
handler: "message-recorder.processor",
logRetention: logs.RetentionDays.ONE_MONTH,
environment: {
MESSAGES_TABLE_NAME: envParams.messages.tableName || "",
},
});
messageRecorder.applyRemovalPolicy(RemovalPolicy.DESTROY);
Implementing the new sfn construct inside our stack ๐ด
Importing the construct library created earlier.
import {simpleSfnConstruct} from "../constructs/sfn-simple"
Passing the required params and getting an instance object reference by initialing the construct.
const sfnMachine = new simpleSfnConstruct(this, 'sfnMachine', {
timeout: Duration.seconds(30),
triggerFunction: messageRecorder
})
Event Target to statemachine ๐ข
const sfnRole = new Role(this, 'Role', {
assumedBy: new ServicePrincipal('events.amazonaws.com'),
});
const sfnCommonEventTarget = new eventTargets.SfnStateMachine(sfnMachine.sfnMachine,{
deadLetterQueue: commonEventProcessorQueueDLQ.queue,
retryAttempts: 3,
input: RuleTargetInput.fromEventPath("$"),
role: sfnRole
})
New event rule for the event target ๐ฉ
In this event rule, we use the new bus commonbus
and we use the same eventPattern
to forward the events to the statemachine defined above.
const sfnEventRule = new Rule(this, `sfnCommonEventProcessorRule`, {
eventBus: commonBus,
eventPattern: { source: [`com.devpost.commonevent`] },
targets: [sfnCommonEventTarget],
ruleName: "sfnCommonEventProcessorRule",
enabled: true
});
sfnEventRule.applyRemovalPolicy(RemovalPolicy.DESTROY);
Testing with postman ๐ฟ
Here I will be performing a test, by sending a message to the API endpoint as shown below once I have deployed the solution to my AWS environment.
Querying with the messageId
inside the dynamodb, the table is as follows
SELECT * FROM "MessagesTable" where messageId = 'cdd51245-987a-b3c7-eecf-6d6d63046073'
We can find the message in the dynamodb now.
Inspecting from AWS console ๐ณ
You could now check the workflow progress and execution logs from the AWS console.
Find with the event messageId ๐
Thus we have defined a new statemachine and reconfigured our existing event bus role to the new rule which delivers messages to the statemachine that we have built-in this article.
We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.
โญ We have our next article in serverless, do check out
๐ Thanks for supporting! ๐
Would be great if you like to โ Buy Me a Coffee, to help boost my efforts.
๐ Original post at ๐ Dev Post
๐ Reposted at ๐ dev to @aravindvcyber
๐ AWS CDK 101 - ๐ญ StateMachine and StepFunctions replacing our SQS based lambda trigger
โ Aravind V (@Aravind_V7) April 16, 2022
And much more in my pagehttps://t.co/CuYxnKr0Ig#awscdk #dynamodb #serverless #typescript #awslambda https://t.co/086UGXD0OV
Top comments (2)
Some formatting issues here
taken care now, thanks for your note @mmuller88