Simple lambda
The most common mechanism create lambda - install AWS Toolkit, which will add AWS project templates. After that, choose the lambda template with a DynamoDB trigger, and that is all you need. But what about infrastructure?
public void FunctionHandler(DynamoDBEvent dynamoEvent)
{
foreach (var record in dynamoEvent.Records)
{
// TODO: Add business logic processing the record.Dynamodb
}
}
CDK
Infrastructure should live together with code because it has the same lifetime. Also, infrastructure should be easily reproducible in other environments. CDK is IaC that will solve these problems and we can write infrastructure using our lovely C# language.
public class DynamoDbLambda : Stack
{
internal DynamoDbLambda(Construct scope, string id, IStackProps props)
: base(scope, id, props)
{
var dynamoDbTable = new Table(this, "testTable", new TableProps
{
BillingMode = BillingMode.PAY_PER_REQUEST,
PartitionKey = new Attribute { Name = "Pk", Type = AttributeType.STRING },
SortKey = new Attribute { Name = "Sk", Type = AttributeType.STRING },
});
var dynamoDbLambda = new Function(this, "dynamoDbLambda", new FunctionProps
{
Runtime = Runtime.DOTNET_6,
MemorySize = 256,
Handler = "Serverless.DynamoDbLambda::Serverless.DynamoDbLambda.LambdaHandler::Handle",
Code = Code.FromAsset("Serverless.DynamoDbLambda/", new AssetOptions
{
Bundling = buildOption
}),
});
dynamoDbLambda.AddEventSource(new DynamoEventSource(dynamoDbTable, new DynamoEventSourceProps
{
StartingPosition = StartingPosition.TRIM_HORIZON,
}));
}
}
Taking into account, that we are using DynamoDB single table design, we will receive stream for all tables, but we want to listen to only а special table...
Filters
AWS lambda source has the possibility filter Dynamo DB stream. CDK syntax is a little bit weird, but it is just JSON representations.
Filters = new[]
{
FilterCriteria.Filter(new Dictionary<string, object> {
["dynamodb"] = new Dictionary<string, object>
{
["Keys"] = new Dictionary<string, object>
{
["Pk"] = new Dictionary<string, object>
{
["S"] = new[]{"prefix","MyTableName" }
}
}
}
})
},
All good, only one thing... handle failed records.
Fails and retries
There's no easy way to handle failed records. Because we can have a lot of reasons for failures:
- Bugs in lambda code
- Unavailable services
- Db structure changed We may retry and in case of failure send failed record to SQS.
var deadLetterQueue = new Queue(this, "deadLetterQueue");
dynamoDbLambda.AddEventSource(
new DynamoEventSource(dynamoDbTable, new DynamoEventSourceProps
{
OnFailure = new SqsDlq(deadLetterQueue),
RetryAttempts = 5,
}));
So far so good, but how to work with partial batch failure?
Batch processing
Instead of returning void
from the lambda handler, we can report failed items and lambda will handle partial failure. Add this option to CDK ReportBatchItemFailures = true
and modify the lambda itself.
public async Task<StreamsEventResponse> Handle(DynamoDBEvent events)
{
var tasks = events.Records
.Select(new Executor().Execute)
.ToList();
var failedItems = new List<BatchItemFailure>();
try
{
await Task.WhenAll(tasks);
}
catch
{
failedItems = tasks
.Select((task, index) => new { task, index })
.Where(x => !x.task.IsCompletedSuccessfully)
.Select(x => new BatchItemFailure
{
ItemIdentifier = events.Records[x.index].EventID
})
.ToList();
}
return new StreamsEventResponse {BatchItemFailures = failedItems};
}
And finally, the source code in the GitHub repository add-dynamo-db-lambda
branch
Top comments (0)