DEV Community

Cover image for About Amazon EventBridge Pipes
awedis for AWS Community Builders

Posted on

About Amazon EventBridge Pipes

Amazon EventBridge Pipes helps you to create point-to-point integrations between event producers and consumers with optional transform, filter and enrich steps.

EventBridge Pipes reduces the amount of integration code you need to write and maintain when building event-driven applications.

Benefits:

  • Write less integration code
  • Save costs with filtering and built-in integrations
  • Source events in real time
  • Reduce operational load

Features/capabilites:

  • Filter events
  • Batch events
  • Order events
  • High concurrency
  • Advanced enrichment

Pipes vs Event Buses

  • Pipes are single publisher to single consumer, whereas Event Buses are many publisher to many consumers

Example

Let's say you want to pass data from SQS to AWS Step Functions, and you want to modify the data before passing it, in this case we need a lambda function in between for instance to change data and then pass it to the step functions. But using EventBridge Pipes we don't need this, we can change the data from pipes.

Components:
Image description

  • Source: EventBridge Pipes receives event data from a variety of sources, applies optional filters and enrichment to that data, and sends it to a target. If a source enforces order to the events sent to pipes, that order is maintained throughout the entire process to the target.
  • Filtering: EventBridge Pipes can filter a given sourceโ€™s events and then process only a subset of those events.
  • Target: After the event data has been filtered and enriched, you can send it to a specific target, such as an Amazon Kinesis stream or an Amazon CloudWatch log group.

๐Ÿ“‹ Note: We are going to add the enrichment part in the second part of this example

  • Filtering:
{
  "body": {
    "type": [{
      "prefix": "NEW_ORDER"
    }]
  }
}
Enter fullscreen mode Exit fullscreen mode

Filtering will check for the value of the payload coming from our queue, in our case if the type was NEW_ORDER the data will be passed to the target

  • Target Input Transformer (You can transform the data after itโ€™s enhanced and before itโ€™s sent to the target) for example in our case we will take the values from the body, and convert the camel case to snake case.
{
  "order_type": "<$.body.orderType>",
  "order_id": "<$.body.orderId>",
  "customer_id": "<$.body.customerId>"
}
Enter fullscreen mode Exit fullscreen mode
  • Output (that we configure / want to have at the end):
{
  "order_type": "",
  "order_id": "",
  "customer_id": ""
}
Enter fullscreen mode Exit fullscreen mode

Image description

Now let's try to test our Pipe, so the following payload will be passed to our SQS

{
  "orderType": "NEW_ORDER",
  "orderId": "O-123456",
  "amount": 25,
  "customerId": "123"
}
Enter fullscreen mode Exit fullscreen mode

Inside our step functions we can see the data that is being passed to, which is the result of the output that we configured inside the pipes
Image description
Imagine how much this is powerful, first we are filtering our data, then we are manipulating the data to certain structure and finally passing it to a target.

Now let's add enrichment to our pipe ๐Ÿ™‚

  • Enrichment: With the enrichment step of EventBridge Pipes, you can enhance the data from the source before sending it to the target.

In order to keep our Lambda function simple, we will only log out the event, and then pass it back to the target:

export const handler = async(event) => {
    console.log('this is from lambda');
    console.log('event =>', event);
    return event;
};
Enter fullscreen mode Exit fullscreen mode

EventBridge Pipes passes the enrichment responses directly to the configured target. This includes array responses for targets that support batches. You can also use your enrichment as a filter and pass fewer events than were received from the source. If you donโ€™t want to invoke the target, return an empty response, such as "", {}, or [].

Components:
Image description
Image description

Before our test make sure your EventBridge Pipes role has the right permissions to trigger a Lambda function (in my case I added the following AWSLambdaExecute permission)

Now let's pass the same message from SQS, and you should see the CloudWatch logs, resulted from the lambda function that we added.

CloudWatch output:
Image description

That's it, now we can filter, convert the input and enrich our data in a much easier way ๐Ÿ˜€

Conclusion:

The EventBridge Pipes is very useful tool, it will simplify you architecture, convert your old way of filtering and enriching data into much maintainable and organized way.

Latest comments (4)

Collapse
 
ronifintech__301bde445351 profile image
RoniFinTech

How do you use the input at the target if the target is ECS with a docker image?
Iโ€™ve been trying to figure it out without any luck :(

Collapse
 
awedis profile image
awedis

If the target is ECS container, then here inside the EventBridge pipes the target can be SQS, and from SQS you can write a simple code that can trigger the Container or do an API call

Collapse
 
johnmccuk profile image
John McCracken

Excellent explanation and demo of the service, thanks!

Collapse
 
awedis profile image
awedis

It's my pleasure ๐Ÿ™‚