Amazon EventBridge Pipes helps you to create point-to-point integrations between event producers and consumers with optional transform, filter and enrich steps.
EventBridge Pipes reduces the amount of integration code you need to write and maintain when building event-driven applications.
Benefits:
- Write less integration code
- Save costs with filtering and built-in integrations
- Source events in real time
- Reduce operational load
Features/capabilites:
- Filter events
- Batch events
- Order events
- High concurrency
- Advanced enrichment
Pipes vs Event Buses
- Pipes are single publisher to single consumer, whereas Event Buses are many publisher to many consumers
Example
Let's say you want to pass data from SQS to AWS Step Functions, and you want to modify the data before passing it, in this case we need a lambda function in between for instance to change data and then pass it to the step functions. But using EventBridge Pipes we don't need this, we can change the data from pipes.
- Source: EventBridge Pipes receives event data from a variety of sources, applies optional filters and enrichment to that data, and sends it to a target. If a source enforces order to the events sent to pipes, that order is maintained throughout the entire process to the target.
- Filtering: EventBridge Pipes can filter a given sourceβs events and then process only a subset of those events.
- Target: After the event data has been filtered and enriched, you can send it to a specific target, such as an Amazon Kinesis stream or an Amazon CloudWatch log group.
π Note: We are going to add the enrichment part in the second part of this example
- Filtering: ```json
{
"body": {
"type": [{
"prefix": "NEW_ORDER"
}]
}
}
Filtering will check for the value of the payload coming from our queue, in our case if the type was `NEW_ORDER` the data will be passed to the target
- Target Input Transformer (You can transform the data after itβs enhanced and before itβs sent to the target) for example in our case we will take the values from the body, and convert the camel case to snake case.
```json
{
"order_type": "<$.body.orderType>",
"order_id": "<$.body.orderId>",
"customer_id": "<$.body.customerId>"
}
- Output (that we configure / want to have at the end): ```json
{
"order_type": "",
"order_id": "",
"customer_id": ""
}
![Image description](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/hnpv54odcvej86owudbo.png)
Now let's try to test our Pipe, so the following payload will be passed to our SQS
```json
{
"orderType": "NEW_ORDER",
"orderId": "O-123456",
"amount": 25,
"customerId": "123"
}
Inside our step functions we can see the data that is being passed to, which is the result of the output
that we configured inside the pipes
Imagine how much this is powerful, first we are filtering our data, then we are manipulating the data to certain structure and finally passing it to a target.
Now let's add enrichment to our pipe π
- Enrichment: With the enrichment step of EventBridge Pipes, you can enhance the data from the source before sending it to the target.
In order to keep our Lambda function simple, we will only log out the event, and then pass it back to the target:
export const handler = async(event) => {
console.log('this is from lambda');
console.log('event =>', event);
return event;
};
EventBridge Pipes passes the enrichment responses directly to the configured target. This includes array responses for targets that support batches. You can also use your enrichment as a filter and pass fewer events than were received from the source. If you donβt want to invoke the target, return an empty response, such as ""
, {}
, or []
.
Before our test make sure your EventBridge Pipes role has the right permissions to trigger a Lambda function (in my case I added the following AWSLambdaExecute
permission)
Now let's pass the same message from SQS, and you should see the CloudWatch logs, resulted from the lambda function that we added.
That's it, now we can filter, convert the input and enrich our data in a much easier way π
Conclusion:
The EventBridge Pipes is very useful tool, it will simplify you architecture, convert your old way of filtering and enriching data into much maintainable and organized way.
Top comments (4)
Excellent explanation and demo of the service, thanks!
It's my pleasure π
How do you use the input at the target if the target is ECS with a docker image?
Iβve been trying to figure it out without any luck :(
If the target is ECS container, then here inside the EventBridge pipes the target can be SQS, and from SQS you can write a simple code that can trigger the Container or do an API call