DEV Community

Cover image for Optimize your Serverless architectures with event filtering
Mohammed for AWS Community Builders

Posted on • Updated on • Originally published at mohdizzy.Medium

Optimize your Serverless architectures with event filtering

More often than not when working in a Serverless ecosystem with internal AWS services such as SQS, SNS, Kinesis etc. , we unintentionally tend not to leverage the filtering capabilities of those services which can reduce boilerplate code within your lambda functions, and also attribute to performance improvements.

Why do this?

Reduced logic: The most obvious benefit is reduced code in your lambda functions.

Cost: The second most beneficial aspect is cost. If your source is sending a huge catalog of event types, and you are interested only in processing a fraction of them, then implementing event filtering is more of a necessity than an added feature.

In this article, we will take a look at each of those AWS services which allow message filtering in brief and see how to quickly implement them for your application’s use case. Of course, it’s essential to understand the incoming payload structure in advance for the filtering to work effectively else you would only be discarding messages without ever invoking the function even for the ones you are actually interested in.

All lambda filtering patterns for supported AWS services allow the following type of checks

Sourced from AWS documentation

{
   "meta":{
      "triggerEventLog":{
         "id":"55b8826e35adc2ba-471f5164b8bc6221"
      }
   },
   "temperature":{
      "type":"celsius"
   },
   "version":"1.11.0"
}
Enter fullscreen mode Exit fullscreen mode

Note that the syntax (to be added under your specific function) provided is for the Serverless framework but things should work in a similar way for most IaC frameworks.

SQS:

SQS accepts plain text and JSON based filtering patterns. Notice that filterPatterns is an array, so you can add at the most 5 patterns per event source (as per AWS docs).

Side note: If you are looking to understand on a deeper level how to process messages from an SQS queue at a massive scale, do give this piece a read!

    events:
      - sqs:
          arn: arn:aws:sqs:us-east-1:xxxx:filterTest
          filterPatterns:
            - body: {"temperature":{"type": ["celsius"]}}
Enter fullscreen mode Exit fullscreen mode

Kafka:

If your application uses Kafka (Amazon MSK) as a message broker, event filtering is possible with Kafka topics too. The syntax and the working are quite similar to SQS, the variation here is the “value” key is used instead of “body” as the kafka message in the incoming payload is present within the value key.

    events:
      - stream: 
          arn: arn:aws:kinesis:us-east-1:xxx:cluster/MyCluster/xxx
          topic: myTopic
          filterPatterns:
           - value:
              temperature: 
                type: ["celsius"] 
Enter fullscreen mode Exit fullscreen mode

Kinesis streams:

Filtering Kinesis data streams operates in the same fashion as SQS/MSK topics. The variation being with the “data” key.

The amazing thing about filtering Kinesis/MSK events is that the incoming payload is always base64 encoded, however, AWS internally decodes and performs the filtering for you.

    events:
      - stream: 
          arn: arn:aws:kinesis:us-east-1:xxx:stream/filterTest
          filterPatterns:
           - data:
              temperature: 
                type: ["celsius"]
Enter fullscreen mode Exit fullscreen mode

DynamoDB Streams:

Any changes to a record within the dynamodb table will trigger an event to the stream (when enabled). If your application is interested in processing only when there are insert operations being performed, then having an event filter can definitely reduce the number of invocations for your function.

Since the lambda event that comes from a dynamodb stream invocation has a predefined structure, logically, the filtering needs to conform to that structure.

Sample event structure for reference

{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
Enter fullscreen mode Exit fullscreen mode

The filter you see below has been set up to look for those insert operations when the temperature column has value starting with celsius. eg celsius#100

    events:          
      - stream:
          type: dynamodb
          arn: stream_arn
          filterPatterns:
            - eventName: 'INSERT' # [INSERT|MODIFY|REMOVE] 
              dynamodb:
                Keys: # NewImage|OldImage|Keys
                  temperature: # name of field
                    S: [{"prefix":"celsius"}] # S for String, N for number, etc.
Enter fullscreen mode Exit fullscreen mode

SNS:

Unlike SQS and other services, event filtering with SNS works differently.

Every subscriber to an SNS topic can have its own filtering policy based on

  • MessageBody
  • MessageAttributes

In contrast to event filtering with other AWS services, SNS doesn’t allow nested event filtering, i.e. if you wish to filter a JSON payload that is nested within the object(s), SNS will not be able to filter such messages. However, the base filter concepts such as numeric (equals/range) and string (null/equals/begins with/exists/not exist) checks will still work just like other event sources.

    events:
      - sns:
          arn: arn:aws:sns:us-east-1:xxx:filterTest
          filterPolicyScope: MessageBody
          filterPolicy:
            version: 
              - "1.11.0"
Enter fullscreen mode Exit fullscreen mode

API Gateway:

Request validation is a great feature that AWS provides which can be used to reject those payloads that do not conform to an API’s specification. With this setup, a lot of the elementary validation can be off loaded to the API gateway level. It not only allows the filtering of non-conformant payloads but also acts as a security barrier for any malicious request.

The specifications of your API schema can be set up this way,

{
    "definitions": {},
    "$schema": "http://json-schema.org/draft-04/schema#",
    "type": "object",
    "title": "Filtering temperature sensor events",
    "required": ["temperature","version"],
    "properties": {
      "version": {
        "type": "string",
        "pattern": "^\d{1}\.\d{2}\.\d{1}$"
      },
      "temperature":{
        "type": "object",
        "properties": {
          "type": {
            "type":"string",
            "minLength": 1,
            "maxLength": 60
          }
        }
      },
      # Other examples of possible checks
      "price": { "type": "number", "minimum": 25, "maximum": 500 },
      "type":  { "type": "string", "enum": ["sensor1", "sensor2", "sensor3"] }
    }
}
Enter fullscreen mode Exit fullscreen mode

Syntax for serverless.yml

    events:
      - http:
          path: /temperature
          method: post   
          request:
            schema:
              application/json: ${file(valid_request.json)}
Enter fullscreen mode Exit fullscreen mode

Going a step ahead in filtering:

Eventbridge Pipes

Pipes is a feature that was recently introduced by AWS as part of Eventbridge that allows filtering, enriching, and delivery of that payload to a destination of your choice.

To provide an overview of the four sections (filtering & enrichment are optional),

  • Sources can be: SQS, DynamoDB, MSK, Self managed Kafka, Amazon MQ.
  • The same filtering rules as above apply here too.
  • If your filtered payload requires enrichment by calling external sources, that can be achieved by using: a lambda, API gateway, any external API, Step function workflow.
  • The target list is quite extensive. It includes all the serverless components you can think of and more!

This a great way tool to streamline data processing for Serverless workloads in an efficient manner. For instance, if the data source is the same but the event types are varied, you could essentially create dedicated “pipes” for each event that may require a different filter and enrichment process before it reaches the destination.

Conclusion:

With the above items, we’ve covered all the event sources in AWS that support event/request filtering.

Something to keep in mind, a queue usually has 1 consumer only, therefore you may not have the flexibility of having multiple filtering patterns. However, event sources like Kinesis, MSK, DynamoDB streams can have multiple consumers listening to the same stream, and hence each of those consumers can have independent filter patterns depending on the specific event types they are interested in processing.

It’s definitely worth setting up event filtering where possible if you are looking to fine tune performance and costs with your Serverless architecture.

Top comments (0)