SNS to Step Functions via EventBridge with CDK
I can't believe I'm saying this outloud but I've got some Legacy Serverless code deployed in many of my projects. What do I mean by that? I've got a lot of code that looks like this. SNS receives a message. SQS has a subscription to that queue and then a Lambda is connected that does something. I have a few filters in the SNS subscription but I have a great deal of code that transforms the data in the Lambda before doing something with it. Even if that means using a Step Function that is triggered by the Lambda, there's still a lot of "plumbing" code that just seems super wasteful. And let's be real, you can't just "start over" or "replace" when you are in production with customers. You have to find ways to work with what you have by evolving your architecture.
Enter new Serverless. And enter not having to waste Lambda cycles on tranforms and filtering but doing so in the AWS managed runtimes like EventBridge Pipes. I wrote a previous article on streaming DynamoDB to Pipes so after you are done with this read, head over there to see how to use Pipes to connect up your DDB stream.
The Architecture
Quick walkthrough of what is above
- Messge is still published to SNS
- SQS has the subscription to SNS (this stays the same)
- Connect EventBridge Pipes to the SQS
- Pipes allow a Filter to remove unecasary messages
- Pipes allow a Transform to shape the data as needed
- An EventBus is the target for the Pipe
- The EventBus can have targets that trigger downstream code/services
Breaking down the Architecture
Let's walk through each step of the flow via code and images. I'm going to be using CDK with TypeScript to demonstrate how to stand up this sample solution
Standing up the SNS and SQS
// creating the SNS Topic
this._topic = new Topic(scope, "SampleTopic", {
topicName: "sample-topic",
displayName: "Sample Topic",
});
// creating the SQS Queue
this._queue = new Queue(scope, "SampleQueue", {
queueName: `sample-queue`,
});
/// add the subscription
this._topic.addSubscription(
new SqsSubscription(this._queue, {
// SUPER IMPORTANT -- rawMessageDelivery
rawMessageDelivery: true,
})
);
The above does a few simple things.
- Creates the Topic
- Creates the Queue
- Adds the subscription VERY IMPORTANT make sure to include
rawMessageDelivery
. This tells SNS to send SQS just the message body and not all of the other SNS details. It keeps the JSON clean too so it's not escaped
Sample Topic
Sample Queue
Sample Subscription
Now let's move onto the EventBridge Pipe that will listen to the queue
Standing up the Pipe
First off, if you want to read more from AWS about the Pipes feature feel free to link out here.
Now to build the pipe. Let's again dive into CDK
// Create the role
const pipeRole = this.pipeRole(
scope,
this.sourcePolicy(props.queue),
this.targetPolicy(props.bus)
);
// Create the pipe
const pipe = new pipes.CfnPipe(this, "Pipe", {
name: "SampleEvent-Pipe",
roleArn: pipeRole.roleArn,
source: props.queue.queueArn,
target: props.bus.eventBusArn,
sourceParameters: this.sourceParameters(),
targetParameters: this.targetParameters(),
});
This is what the constructor
code of the Construct
looks like. I am building up the Role that handles
- Source Policy (the queue)
- Target Policy (the bus)
Then adding in the Source
and Target
parameters. To explore this further.
Source Policy
Sets up permission to read, delete and get queue attributes
sourcePolicy = (queue: IQueue): PolicyDocument => {
return new PolicyDocument({
statements: [
new PolicyStatement({
resources: [queue.queueArn],
actions: [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
],
effect: Effect.ALLOW,
}),
],
});
};
Target Policy
Sets up Bus permission to be able put events on the EventBus
targetPolicy = (bus: IEventBus): PolicyDocument => {
return new PolicyDocument({
statements: [
new PolicyStatement({
resources: [bus.eventBusArn],
actions: ["events:PutEvents"],
effect: Effect.ALLOW,
}),
],
});
};
Source Parameters
This creates rules around how the Pipe reads from SQS in addition to adding in a Filter so that we don't get noise in our Pipe
sourceParameters = () => {
return {
sqsQueueParameters: {
batchSize: 1,
},
filterCriteria: {
filters: [
{
pattern: `
{
"body": {
"eventType": ["SampleEvent"]
}
}`,
},
],
},
};
};
Notice how I'm using the field body
to start my filter. That's because EventBridge Pipes automatically pulls in the message structure frmo the Queue Message. To read more about how all this works, this doc was extremely helpful for me.
Target Parameters
This creates rules around how the Pipe posts data into its target. You also get the ability to Transform the data that makes sense. I'm actually dropping some fields from the message to simplify it a little bit in addition to removing the body
root field element so that the message looks like what I expect.
targetParameters = () => {
return {
eventBridgeEventBusParameters: {
detailType: "SampleEventTriggered",
source: "com.binaryheap.sample-source",
},
inputTemplate: `
{
"metaBody": {
"correlationId": <$.messageId>
},
"messageBody": {
"field1": <$.body.field1>,
"field2": <$.body.field2>,
"field3": <$.body.field3>
}
}`,
};
};
A thing to note about Source and Target parameters is that they are specific to the service that you are reading from and targeting. Which is why the target
in this example looks like an EventBridge target becast that's where this is going.
And finally, here is the sample event that we are working with
{
"eventType": "SampleEvent",
"field1": "Sample Field 1",
"field2": "Sample Field 2",
"field3": "Sample Field 3"
}
With all that put together, let's take a look at When deployed the resources look like in the AWS Console
The Pipe Source
The Pipe Filter
The Pipe Tranformer
Transfomer Tool
I want to take a quick pause to highlight something that is super helpful. First off, I think being a good builder for the cloud is taking advantage of all thats at your disposal. Just because I'm a CLI/terminal/VSCode type of guy, doesn't mean I never use the AWS Console. That would be silly. And I'd be missing out on the wonderful power of this transformer tool.
When you edit the Pipe and view the Transforms section of the target you get this
What this gives you is the ability to plug in a sample event, code up your transformer and then see live updates on the Output
section to see how well your transform works. You can then take that and plug it into your CDK code like I've done above in the TargetParameters
.
EventBridge Bus and the Rule
To keep moving through the architecture, the output of the Pipe is targeting an EventBridge Bus. For this sample, I've created a Custom Bus. You can surely use the default but I tend to prefer Custom Buses to isolate boundaries and then mesh them together with rules. There are pros and cons to either which are outside of the scope of this article.
this._bus = new events.EventBus(scope, "EventBus", {
eventBusName: `sample-event-bus`,
});
The Bus is super simple. Give it a name and off we go
And now for the Rule
const rule = new Rule(this, "SampleEvent-Rule", {
eventPattern: {
detailType: ["SampleEventTriggered"],
},
ruleName: "sample-event-triggered-rule",
eventBus: props.bus,
});
const dlq = new Queue(this, "SameEventTriggered-DLQ");
const role = new Role(this, "SameEventTriggered-Role", {
assumedBy: new ServicePrincipal("events.amazonaws.com"),
});
rule.addTarget(
new SfnStateMachine(props.stateMachine, {
input: RuleTargetInput,
deadLetterQueue: dlq,
role: role,
})
);
A few little pieces to breakdown with this code
First, creating the rule. You must give it an eventPattern
which shoud look familar from the above code. I'm simply filtering on the root level field detailType
and looking for the event we published on the Pipe
Second, create a Role
that can be used by the target for triggering the State Machine.
Third, the Dead Letter Queue. If something goes wrong calling the State Machine, the message drops in that queue
And laslty, the target. There are many targets you can create, but in this case, I'm using a StateMachine target that has the
- Rule Input
- Dead Letter Queue
- Role used
EventBridge Rule
When the infra gets deployed, the rule will look like this in the AWS Console
State Machine Build
And for the last piece of this journey, we need a State Machine to execute. I like connecting up to a State Machine because most of the time these days when I'm building async and Event Driven Architecture workflows, a State Machine makes a ton of sense.
For this example, the State Machine is basic. It just has a Succeed Task. Since this isn't a Step Functions article, there's no meat in there. But again, you can extend this as much as your needs require.
finalizeStateMachine = (scope: Construct) => {
const logGroup = new logs.LogGroup(this, "CloudwatchLogs", {
logGroupName: "/aws/vendedlogs/states/sample-state-machine",
});
const role = new Role(this, "StateMachineRole", {
assumedBy: new ServicePrincipal(`states.us-west-2.amazonaws.com`),
});
const flow = this.buildStateMachine(scope);
this._stateMachine = new stepfunctions.StateMachine(this, "StateMachine", {
role: role,
stateMachineName: "SampleStateMachine",
definition: flow,
stateMachineType: stepfunctions.StateMachineType.EXPRESS,
timeout: Duration.seconds(30),
logs: {
level: LogLevel.ALL,
destination: logGroup,
includeExecutionData: true,
},
});
};
Walking through the above code
- Build up the CloudWatch LogGroup
- Create the StateMachine Role
- Build the StateMachine
buildStateMachine = (scope: Construct): stepfunctions.IChainable => {
return new Succeed(scope, "DefaultSucceed");
};
Again, the build is basic. But you can add all the IChainable
s you want to make the Workflow that you require.
State Machine Definition
The Execution
If you are following along with the Github repos, now's the time you can start executing the worfklow.
To deploy
make deploy-local
Grab a very quick beverage as it shouldn't take more than a couple of minutes and then head on over to the SNS Console
Take the sample input file located at
test/sample-message.json
Now quickly ... and I mean quickly run on on over to the Step Functions Console (quick tip is to have it open in a tab)
You should see a successful execution and then you can drill in to see this output
Notice how the EventBridge style format is carried over and we've removed any notion of this having come from SNS or SQS. Pretty cool isn't it? And to think we didn't use a Lambda. The code we wrote was configuration and IaC code in TypeScript to build a highly scalable and reliable cloud workload.
Wrapping Up
A few points I want to highlight about this architectual pattern
- This is a Severless and Event Driven Style Architecture that will scale, be reliable and will deal with failure should bad things happen. It does not mean it's not complex, but with the parts being broken up as they are, problems are easy to find and easy to fix in isolation
- I meantioned
rawMessageDelivery
on the SNS subscription. Do not skip this. - Cost/Load - by using Filters in the Pipes, you will not pay for unused cycles and your down stream code will not have to deal with filtering or working with data it dosen't desire. This will naturally help with load so you aren't wasting cycles.
There is a lot of code above and it's mostly in fragrments so if you want to pull the whole repos down and use it as a template or just see how it looks in your environment, feel free to check it out
This was a lengthy and detailed article. So thank you for reading and I hope you found it helpful. I can personally say that I'm using this pattern for connecting up these legacy Serverless components in my apps and its paid some really great dividends and I know it will for you too!
Top comments (0)