Introduction
I was on Twitter last month and came across the below tweet.
This announcement was pretty big in my opinion.
Having the ability to trigger AWS Lambda functions from Amazon MSK records allows us to utilize many benefits of Serverless computing.
I had a bit of spare time on my hands so thought I'd have a little dig into this new feature.
Want to follow along?
Alrighty, so before we do anything constructive the repository for all the bits and bobs I'm showing you below can be found here.
Be wary, AWS is fun but that fun comes at a cost. Following this will cost you some dough so keep an eye on the Billing & Cost Management Dashboard people!
This post assumes you know your way around AWS and understand how Kafka works.
Setting up the Infrastructure
Before writing our Lambda functions we need some infrastructure, I'm using CloudFormation in this example. AWS has kindly provided a template that provides a basic architecture of a 3 node Kafka Cluster, VPC, Subnet setup, and an EC2 bastion.
I've tweaked this ever so slightly so that we can communicate with the broker using TLS_PLAINTEXT
instead of TLS
, I've modified the MSK Security group to allow MSK to communicate with itself and I'm running 2 Brokers instead of 3.
You will also need an EC2 Key Pair so go ahead into the EC2 Console and create one, making sure you download the Key and keep it somewhere safe and out of harm's way.
With the two pieces of pivotal information, we can go ahead and run the script.
➜ deploy-msk-infrastructure.sh IP_ADDRESS KEY_PAIR
➜ deploy-msk-infrastructure.sh 1.2.3.4/32 demo-keypair.pem
It takes around 20–30 minutes for everything to be up, running, and functional.
Once we are cooking with gas we should be able to SSH into our Bastion, create a Topic, and produce a record to it.
➜ chmod 400 demo-keypair.pem
➜ ssh -i "demo-keypair.pem" ec2-user@ec2-1-2-3-4.eu-west-2.compute.amazonaws.com
__| __|_ )
_| ( / Amazon Linux 2 AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-2/
[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-topics.sh --create --topic example \
--bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --partitions 1 --replication-factor 1
[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-console-producer.sh --topic example --broker-list b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092
>Hello!
And then consume it.
[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-console-consumer.sh --topic example --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --from-beginning
Hello!
Alrighty, we have verified that our Cluster is alive and kicking. Now we can go ahead and create a Lambda function to verify the connectivity.
Simple Lambda Function
Before we get into the code lets just go over how a Lambda function is actually invoked by Amazon MSK.
A Lambda function is responsible for handling our records. But there is another process that happens before this which is an Event Source Mapping.
An Event Source Mapping is a Lambda process that reads data from a source system and invokes a function with the data it has received from the source system.
The Event Source Mapping that we create in the two examples below is responsible for reading the records from the Topics and invoking the functions.
The Event Source Mapping can be created from the AWS Lambda Console but it can also be created from a CloudFormation Template which we'll be doing in this example.
Right let's create this basic Lambda function, and when I say basic, I mean basic!
Think of this Lambda as verifying that it can communicate with Kafka and all the permissions are set up correctly.
package com.danieljameskay.main
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.RequestHandler
import com.danieljameskay.models.MSKEvent
import com.google.gson.Gson
import com.google.gson.GsonBuilder
class App(var gson: Gson = GsonBuilder().setPrettyPrinting().create()) : RequestHandler<MSKEvent, String> {
override fun handleRequest(event: MSKEvent, context: Context): String {
val logger = context.logger
logger.log(gson.toJson(event))
return "OK"
}
}
Quickly running through the code. The JSON Event is Mapped into an MSKEvent
object and the contents are logged out.
Following the link here, we have to create a Role with some extra permissions and configure the Event Source for the Lambda function as shown below in the below CloudFormation Template.
AWSTemplateFormatVersion: '2010-09-09'
Description: "Basic Lambda Function to verify MSK connectivity."
Parameters:
EventSourceArn:
Description: ARN of the Event Source, Kafka in this case
Type: String
TopicName:
Description: Topic for the Lambda to consume from
Type: String
S3Bucket:
Description: S3 Bucket for the JAR
Type: String
S3Key:
Description: JAR Name
Type: String
Handler:
Description: Application Handler
Type: String
Resources:
BasicKafkaConsumerLambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName:
Fn::Sub: BasicKafkaConsumerLambdaRole
AssumeRolePolicyDocument:
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Version: 2012-10-17
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
Path: /
BasicKafkaConsumerLambdaFunction:
Type: AWS::Lambda::Function
Properties:
Name: BasicKafkaConsumerLambdaFunction
Description: BasicKafkaConsumerLambdaFunction
Runtime: java11
Code:
S3Bucket: !Ref S3Bucket
S3Key: !Ref S3Key
Handler: !Ref Handler
MemorySize: 128
Timeout: 10
Role:
Fn::GetAtt:
- BasicKafkaConsumerLambdaRole
- Arn
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !Ref EventSourceArn
FunctionName: !Ref BasicKafkaConsumerLambdaFunction
StartingPosition : LATEST
Topics:
- !Ref TopicName
With the Lambda, Role, and Event Source Mapping configured in CloudFormation, we can go ahead and run the script.
➜ ./deploy-lambda.sh UBER_JAR_LOCATION \
S3_BUCKET_TO_UPLOAD_TOO \
STACK_NAME \
EVENT_SOURCE_ARN \
TOPIC_NAME \
S3_BUCKET_WHICH_CONTAINS_JAR \
JAR_NAME \
LAMBDA_HANDLER
➜ ./deploy-lambda.sh ./basic-function-all.jar \
s3://my-bucket/basic-function-all.jar \
Basic-Lambda-Stack \
arn:aws:kafka:eu-west-2:111111111111:cluster/MSKCluster/f0705c96-e239-4f74-a0f7-f82031a2fc65-4 \
example \
my-bucket \
basic-function-all.jar \
com.danieljameskay.main.App::handleRequest
This script is extremely basic, it uses Gradle to build the app, creates a JAR, uploads it to S3, and finally creates the CloudFormation stack. It should take a couple of minutes for everything to be deployed.
As we did before, connect back to the Bastion and produce some basic test records.
[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-console-producer.sh --topic example --broker-list b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --property "parse.key=true" --property "key.separator=:"
> 1:Hello!
> 2:Hey!
Then we can head to the Terminal and query the CloudWatch Logs.
➜ aws logs get-log-events --log-group-name /aws/lambda/BasicKafkaConsumerLambdaFunction --log-stream-name '2020/09/20/[$LATEST]8dd5e9881b7f456897d62d39b654e36d'
We can see in the logs the records printed out. Below is a prettier version that has been escaped, values restored and is easier to read.
{
"records": {
"example-0": [
{
"topic": "example",
"partition": 0,
"offset": 0,
"timestamp": 1600598451721,
"timestampType": "CREATE_TIME",
"key": "MQ==",
"value": "SGVsbG8h"
}
]
}
}
and
{
"records": {
"example-1": [
{
"topic": "example",
"partition": 1,
"offset": 0,
"timestamp": 1600598468528,
"timestampType": "CREATE_TIME",
"key": "Mg==",
"value": "SGV5IQ=="
}
]
}
}
So we can see the records have been grouped into an array based on the partition they came from. We also get some metadata such as the timestamp and the value of the record has been encoded in base64.
The custom class I created which is used by the function handler to map the JSON into an MSKEvent object in this example drops the EventSource
and EventSourceARN
fields and if a Key isn't provided a zero will be visible in the payload.
Below is an example of what the payload would look like if our function consumed multiple records from different partitions.
{
"records": {
"example-1": [
{
"topic": "example",
"partition": 1,
"offset": 0,
"timestamp": 1599337456523E12,
"timestampType": "CREATE_TIME",
"key": "0",
"value": "cWVxd2Vxd2U="
}
],
"example-2": [
{
"topic": "example",
"partition": 2,
"offset": 0,
"timestamp": 1599337455636E12,
"timestampType": "CREATE_TIME",
"key": "0",
"value": "cXdlcXdl"
}
]
}
}
Consumer Groups
A quick note on Consumer Groups. Multiple Consumers can work as part of a Consumer Group to increase parallelism. This allows the Partitions in the Topic to be divided between the available Consumers for consumption.
When the Consumer Group is registered with MSK, the group has the same name as the event source mapping UUID. If you aren't sure what the UUID is of your Event Source Mapping follow the instructions in the first link at the bottom of this post.
➜ aws lambda get-event-source-mapping --uuid 02d6c44a-46a7-41fe-8dd0-0f8ab903fee2
{
"UUID": "02d6c44a-46a7-41fe-8dd0-0f8ab903fee2",
"BatchSize": 100,
"EventSourceArn": "arn:aws:kafka:eu-west-2:111111111111:cluster/MSKCluster/f0705c96-e239-4f74-a0f7-f82031a2fc65-4",
"FunctionArn": "arn:aws:lambda:eu-west-2:111111111111:function:BasicKafkaConsumerLambdaFunction",
"LastModified": "2020-09-20T11:18:49.525000+01:00",
"LastProcessingResult": "OK",
"State": "Enabled",
"StateTransitionReason": "USER_INITIATED",
"Topics": [
"example"
]
}
Then cross-reference this using the Kafka Consumer Group CLI.
[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-consumer-groups.sh --list --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092
02d6c44a-46a7-41fe-8dd0-0f8ab903fee2
So we can see that the UUID of our Event Source Mapping does match the name of the Consumer Group. The last command will display the current offset and lag for each Partition which is useful to understand how our Consumer Group is performing.
[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-consumer-groups.sh --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --describe --group 02d6c44a-46a7-41fe-8dd0-0f8ab903fee2
Integrating SQS
Now we're going to step it up a notch.
We're going to insert some real-world data into Kafka then trigger a Lambda function to send that data into an SQS queue.
Why SQS you might ask?
SQS was the very first AWS service that launched back in November 2004 and to this very day, it is still an extremely popular service used to build queue-based solutions.
One of my favorite Microservice patterns is the Anti-Corruption Layer Pattern. It allows us to provide a layer between two systems that need to be integrated where the semantics are different.
We're going to build a tiny ACL in this example. This means mapping the event generated by MSK into a smaller data structure, serializing the data to JSON, and sending it into an SQS queue.
I have a small application running which is receiving updates every 20 seconds from the Poloniex 24 Hour Exchange Volume API. Its sending the updates it receives onto MSK. The data received has the below structure.
{
"USDC": "2811608.201",
"PAX": "100.623",
"DAI": "202.840",
"USDJ": "14553.678",
"USDT": "36356839.873",
"TRX": "73155426.619",
"BUSD": "7581.134",
"BTC": "559.254",
"BNB": "1717.156",
"ETH": "70.466",
"XMR": "0.000"
}
This application is running on the Bastion EC2 instance just for the purpose of this demo. Before we do anything, we have to create a Topic for records to be sent to.
[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-topics.sh --create --topic example.poloniex.24hr.exchange.volume --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --partitions 3 --replication-factor 2
As we did earlier in this article, we'll need to deploy our new Lambda function and some Cloudformation Templates for our SQS resource.
First, we'll deploy our SQS Queue.
➜ ./SQS-Integration/deploy-infrastructure STACK_NAME NAME_OF_QUEUE
The CloudFormation Template for SQS is really simple, it just requires the name of the Queue we want to create.
AWSTemplateFormatVersion: "2010-09-09"
Description: "Creates an SQS queue for the Poloniex Exchange Volume Lambda to send records to."
Parameters:
QueueName:
Description: Name of the Queue
Type: String
Resources:
SQSQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Ref QueueName
The Application is pretty straight forward. A record is produced to Kafka, Lambda is triggered, the JSON event is mapped into an MSKEvent
object, the records inside the event are looped over and mapped into a basic ExchangeVolumeUpdate
object and sent to SQS.
The CloudFormation template is pretty similar to the earlier one we used for the basic Lambda function.
AWSTemplateFormatVersion: '2010-09-09'
Description: "Poloniex Exchange Volume Lambda Function"
Parameters:
EventSourceArn:
Description: ARN of the Event Source, Kafka in this case
Type: String
TopicName:
Description: Topic for the Lambda to consume from
Type: String
S3Bucket:
Description: S3 Bucket for the JAR
Type: String
S3Key:
Description: JAR Name
Type: String
Handler:
Description: Application Handler
Type: String
Resources:
PoloniexExchangeVolumeLambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName:
Fn::Sub: PoloniexExchangeVolumeLambdaRole
AssumeRolePolicyDocument:
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Version: 2012-10-17
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
Path: /
PoloniexExchangeVolumeLambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: PoloniexExchangeVolumeLambdaFunction
Description: PoloniexExchangeVolumeLambdaFunction
Runtime: java11
Code:
S3Bucket: !Ref S3Bucket
S3Key: !Ref S3Key
Handler: !Ref Handler
MemorySize: 256
Timeout: 10
Role:
Fn::GetAtt:
- PoloniexExchangeVolumeLambdaRole
- Arn
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !Ref EventSourceArn
FunctionName: !Ref PoloniexExchangeVolumeLambdaFunction
StartingPosition : LATEST
Topics:
- !Ref TopicName
Then we'll deploy our Lambda.
➜ ./deploy-lambda.sh UBER_JAR_LOCATION \
S3_BUCKET_TO_UPLOAD_TOO \
STACK_NAME \
EVENT_SOURCE_ARN \
TOPIC_NAME \
S3_BUCKET_WHICH_CONTAINS_JAR \
JAR_NAME \
LAMBDA_HANDLER
➜ ./deploy-lambda.sh ./sqs-integration-all.jar \
s3://my-bucket/sqs-integration-all.jar \
MSK-SQS-Lambda-Stack \
arn:aws:kafka:eu-west-2:111111111111:cluster/MSKCluster/f0705c96-e239-4f74-a0f7-f82031a2fc65-4 \
example.poloniex.24hr.exchange.volume \
my-bucket \
sqs-integration-all.jar \
com.danieljameskay.sqs.integration.main.App::handleRequest
This script behaves in the same way as the one used in the basic example above, it uses Gradle to build the app, creates a JAR, uploads it to S3, and finally creates the CloudFormation stack. It should take a couple of minutes for everything to be deployed.
The application is running and sending records to MSK so as soon as our Lambda is deployed we should be able to use the SQS web interface to verify records are being sent to SQS with no issues.
We can also view the CloudWatch graphs to gain insight into how our function is behaving.
Happy days! Everything is working as it should!
Wrap Up
So there we are, we've run through how to invoke some basic Lambda Functions from MSK and utilized CloudFormation to create the infrastructure.
I hope everyone who has read this post has enjoyed it and if anyone has any questions drop me a comment or a tweet!
Cover photo by Pieter van de Sande on Unsplash
Useful links:
https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html
https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
Stay safe!
Cheers
DK
Top comments (3)
What happens if the lambda triggered by MSK fails for whatever reason? Is there an automatic retry mechanism (which is something you CAN define /parametrize for other event source triggers, such as DynamoDB streams)? Or is it ignored?
Nice article. you mentioned about consumer group/consumer. in my testing i am seeing that AWS is invoking only one instance of the lambda and does not parallelize .looks like a limitation. any thoughts on that?
I thought the same, but it did seem to add concurrency at a rate of 1 every 15 minutes. I dumped 5 million records into a Kafka topic with 10 partitions and it is taking forever to process them (still going). This is the issue I find with lambda and serverless, the lack of control. You can provision the lambdas, but then you might as well run a container or ec2 instance.