DEV Community

Cover image for Filter Lambda Events from DynamoDB Stream (with CDK)
Davide de Paolis
Davide de Paolis

Posted on • Updated on

Filter Lambda Events from DynamoDB Stream (with CDK)

In one of our project we have a setup so that whenever something is added to our DynamoDB, a Lambda gets triggered by the DynamoStream.

That Lambda will then check what kind of changed happened, parse the data in the inserted/edited/removed row and react accordingly.

Filter events within Lambda

This is relatively simple and quite handy to decouple our system from changes occurring in the database. Unfortunately, this requires sometimes quite some logic ( in the form of if/else or switches) to determine if we are interested in what happened in the database, and to forward execution to different modules ( or even different lambdas).

Imagine in your table you are adding, removing and editing User information like UserId, UserName, Address, Status.
And imagine you want to "detect" when a specific status is entered in any row of your database.

You don't care if a specific UserId is added or removed to the table, nor you need to do anything if name or address is edited.

You just need some additional functionality whenever the Status of the user has changed to, for example Banned or Deactivated. On top of that, there might be dozens of statuses to which you don't have to do anything.

With the current approach you would have to specify this logic in the lambda handler, and most of the times the lambda would be invoked and would return early because the filtering logic does not apply.

Enter Event Source Mappings Filters / Lambda Event Filtering

On the 26th of November 2021 AWS announced Filtering Event Sources for Lambda functions which greatly simplifies and improves this behaviour (and lets you also reduce costs, thanks to less invocations)

Why?
Because the logic of the filter you currently have inside your handler is directly defined in your Stack definition!

At every change in the DynamoDB Stream (but same applies for Kinesis and SQS), the logic will be executed, but Lambda will be invoked ONLY when there are matches.

How awesome is that?
awesome!
A lot! and we were super excited about it, until we realised that this functionality is not available in AWS CDK, which we are using to describe our stack!

[Update 19.9.22]
Functionality was finally added to the CDK https://github.com/aws/aws-cdk/pull/21917 - but I will still keep the cloudformation tips since they might be useful for similar missing features in the future

Thanks to the comments in the issue linked above ( and to this blogpost i found out about Escape Hatches and how you can use CloudFormation to extend and override some functionality not directly available in CDK constructs.

Of course I used in the past CloudFormation CustomResources ( especially when using Serverless framework, but I never really liked them, nor was confident in using them.

Although a bit hacky, in the end it is not difficult at all.

  • Create your mapping
  • grab its istance as CloudFormationMapping
  • override its property FilterCriteria
      const cfnSourceMapping = sourceMapping.node.defaultChild as CfnEventSourceMapping

        cfnSourceMapping.addPropertyOverride('FilterCriteria', {
            Filters: [
                {
                    Pattern: { // your filterpattern here}
            ],
        })
Enter fullscreen mode Exit fullscreen mode

Every mapping can have multiple filters, with a specific pattern, based on one or more properties and different conditions.
You can find more info about it here but this is a simple example:

   "Filters": [
        {
            "Pattern": "{ \"dynamodb\": { \"OldImage\": { \"status\": { \"S\": [ "banned" ] } } } }"
        }
    ]
Enter fullscreen mode Exit fullscreen mode

Something worth notice with is not so clear from the docs is that the filter has to be a JSON object, but you don't have to write it with escapes and quotes, which is very errorprone.
you can just wrap your object definition in a JSON.stringify.

    Filters: [
                {
                    Pattern:
                        JSON.stringify(
                            dynamodb: {
                                OldImage: {
                                    status: {S: ['banned']},
                                },
                            },
                            eventName: ['REMOVE'],
                        }),
                },
            ]
Enter fullscreen mode Exit fullscreen mode

Much cleaner!

In this example for example we expect our Lambda to be executed only when a row in our Dynamo Table has been deleted, for which the user status was banned.

nice

Of course, even though some limitations apply, you can mix and match with different patterns and operators.

Something you might want to pay attention to though, is the structure of the data you are filtering on.

Based on the settings you are defining in your stream NEW_AND_OLD_IMAGES, or only Keys, or only new, you might not get data at all to use within your filters and you might write the filter criteria accessing the right object ( but this is true also for the old approach were you filtering inside the handler).

In case of a Removed event, we have to filter based on the OldImage data, not the NewImage property.

{
    "Records": [
        {
            "eventID": "d5fb1185ad5fb1182505d1ce20",
            "eventName": "REMOVE",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "REGION",
            "dynamodb": {
                "ApproximateCreationDateTime": 1642582888,
                "Keys": {
                    "userId": {
                        "S": "abcdef1234"
                    }
                },
                "OldImage": {
                    "requestTime": {
                        "N": "1642581514233"
                    },
                    "name": {
                        "S": "john doe"
                    },
                    "locale": {
                        "S": "en"
                    },
                    "userId": {
                        "S": "abcdef1234"
                    },
                    "status": {
                        "S": "banned"
                    }
                },
                "SequenceNumber": "4381400000000060039193684",
                "SizeBytes": 206,
                "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "userIdentity": {
                "principalId": "dynamodb.amazonaws.com",
                "type": "Service"
            },
            "eventSourceARN": "arn:aws:dynamodb:REGION:ACCOUNT:table/TABLENAME/stream/TIMESTAMP"
        }
    ]
}

Enter fullscreen mode Exit fullscreen mode

Show me the Code!!

So here is the old implementation with the filtering logic within the lambda handler:

/// stack.ts

import {DynamoEventSource} from 'aws-cdk-lib/aws-lambda-event-sources'

const sourceMapping = new DynamoEventSource(audienceTable, {
    startingPosition: StartingPosition.TRIM_HORIZON,
    batchSize: 5,
    bisectBatchOnError: true,
    retryAttempts: 10,
})
processStreamLambda.addEventSource(sourceMapping)

myTable.grantStreamRead(processStreamLambda)

//// in processStreamLambda handler.ts

const {Records} = event
Records.forEach(record => {
    console.log(record)
    const {eventName, dynamodb} = record
    if (eventName === 'REMOVE' && dynamodb?.OldImage?.status === 'banned') {
        console.log(`[${eventName}] - the user was removed after being banned [dynamodb?.OldImage}]`)
    } else {
        console.log(`[${eventName}] - something else I am not interested in`)
    }
})

Enter fullscreen mode Exit fullscreen mode

While the following shows the changes in the stack with the CloudFormation Override of the source mapping and the filter patterns:

const sourceMapping = new EventSourceMapping(this, 'dynamoTableEventSourceMapping', {
    startingPosition: StartingPosition.TRIM_HORIZON,
    batchSize: 5,
    target: processStreamLambda,
    eventSourceArn: myTable.tableStreamArn,
    bisectBatchOnError: true,
    retryAttempts: 10,
})
const cfnSourceMapping = sourceMapping.node.defaultChild as CfnEventSourceMapping

cfnSourceMapping.addPropertyOverride('FilterCriteria', {
    Filters: [
        {
            Pattern:
                JSON.stringify({
                    // Only capture DELETE events whose status IS deletion_requested"
                    dynamodb: {
                        OldImage: {
                            status: {S: ['banned']},
                        },
                    },
                    eventName: ['REMOVE'],
                }),
        },
    ],
})

//// in processStreamLambda handler.ts

const {Records} = event
Records.forEach(record => {
    console.log(record)
    const {eventName, dynamodb} = record

// look Ma! no logic!!

    console.log(`[${eventName}] - the user was removed after being banned [dynamodb?.OldImage}]`)
})
Enter fullscreen mode Exit fullscreen mode

You might think it's not worth it, and the if else in the lambda is absolutely fine, and I somehow agree.

But the logic in the handler could become complex, and it might happen that the changes to the DB are in the order of hundreds of thousands and in the end the real logic of the lambda is executed only few times.

It is a waste of resources.

With filters patterns you will save executions, and costs and keep code nice, clean and decoupled.

Imagine that you have another requirement where you need to do something when an Item with some specific properties is added to the table.
Your original lambda handler would not have a single responsibility anymore, because it would need an addition condition in the if/else and should then forward the action for the Remove and the action for the Insert.

With Filters in the Stack you can have a separate lambda react to the DynamoStream only when its specific filter matches.
No changes whatsoever in the old Lambda.

//  // Only capture INSERT events where errorMsg was populated"

Filters: [
    {
        Pattern:
            JSON.stringify({
                dynamodb: {
                    NewImage: {
                        errorMsg: {
                            S: [
                                {"anything-but": ""}
                            ]
                        }
                    },
                },
                eventName: ['INSERT'],
            }),
    },
]

Enter fullscreen mode Exit fullscreen mode

You can have a look at all the possible operators you can use, and start play around with them.

Filter Operators

Keep into account that for Dynamo you always have to consider the Type of the property you are filtering - here for example we want to filter only the rows that contain a Column 'errorMsg' which is not empty. (Notice the nested object and property S - for String).

What i don't quite like

Less testable and debuggable

The fact that you can decouple and describe the matches at the stack level it's awesome, but saying that you don't have to deal with the logic anymore is not true.
You simply remove it from the Lambda and move it to the stack.
The conditions and logic are still there. More organized and more decoupled, but a bit less visible, and mostly way less testable and debuggable as the logic in your lambda.

At this point I still could not find a way to test or debug the filters - while trying this feature out I made a mistake in a property name / object structure and my lambda was not triggered no matter how many changes I was applying to my dynamo stream.

Failing deployments

Often when you need to adjust the logic of the filters, your deployment fails because there is already a source mapping defined.

Resource handler returned message: "The event source arn (" arn:aws:dynamodb:REGION:ACCOUNT:table/TABLENAME/stream/TIMESTAMP") and function ("lambda-process-stream ") provided mapping already exists. Please update or delete the existing mapping with UUID4 bee4f11-68f0-4538-bd1b-fb87c8887a7c
Enter fullscreen mode Exit fullscreen mode

Either you edit/remove directly the mapping from CLI or you need to redeploy without a mapping and then with new one.

aws lambda delete-event-source-mapping --uuid bee4f11-68f0-4538-bd1b-fb87c8887a7c
Enter fullscreen mode Exit fullscreen mode

Awkard and error prone.

not cool

Some events might get lost.

Events coming from SQS or DynamoStream go through this automagic filters before they reach your code.

If they don't match them, they are discarded.
I know that same happens when you ignore them in your lambda logic, but at least there you would have some visibility of lots of event flowing in and no execution really happening. You could set up alarms and monitoring, or start debugging with Cloudwatch Insight understanding the data in the event and the logic in your lambda.
With Filters in the EventSourceMapping I haven't yet found a way to test and debug what is going on behind the scene, you might be loosing important events because a filter was added improperly. And take a while to notice and react.

Recap

There are some drawbacks and a couple of pitfalls, but in general I find this functionality very handy.

It is also relatively recent, so I expect AWS or the community to come up with some more improvement and tips to make the developer experience better. (if you know some already, leave a comment).
I guess we will introduce them gradually to some of our lambdas and see how it goes. Will keep you posted!

see ya


Photo by Sam 🐷 on Unsplash

Top comments (3)

Collapse
 
wesleycheek profile image
Wesley Cheek • Edited

Great! Hope sometime soon there will be an easier way to do this.

By the way, for anyone hoping to filter for TTL deletions in DynamoDB, you can use this filter pattern:

{
    "Filters": [
        {
            "Pattern": json.dumps(
                {
                    "userIdentity": {
                        "type": ["Service"],
                        "principalId": ["dynamodb.amazonaws.com"],
                    }
                }
            )
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode
Collapse
 
dvddpl profile image
Davide de Paolis

thisthis is awesome. thanx

Collapse
 
dvddpl profile image
Davide de Paolis

Functionality has been finally added to the CDK! no need for hacky cloud formation tips!
github.com/aws/aws-cdk/pull/21917