DEV Community

Cover image for Triggering Lambda Functions From Amazon MSK
Danny Kay
Danny Kay

Posted on • Edited on

Triggering Lambda Functions From Amazon MSK

Introduction

I was on Twitter last month and came across the below tweet.

This announcement was pretty big in my opinion.

Having the ability to trigger AWS Lambda functions from Amazon MSK records allows us to utilize many benefits of Serverless computing.

I had a bit of spare time on my hands so thought I'd have a little dig into this new feature.

Want to follow along?

Alrighty, so before we do anything constructive the repository for all the bits and bobs I'm showing you below can be found here.

Be wary, AWS is fun but that fun comes at a cost. Following this will cost you some dough so keep an eye on the Billing & Cost Management Dashboard people!

This post assumes you know your way around AWS and understand how Kafka works.

Setting up the Infrastructure

Before writing our Lambda functions we need some infrastructure, I'm using CloudFormation in this example. AWS has kindly provided a template that provides a basic architecture of a 3 node Kafka Cluster, VPC, Subnet setup, and an EC2 bastion.

https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-cluster.html#aws-resource-msk-cluster--examples

I've tweaked this ever so slightly so that we can communicate with the broker using TLS_PLAINTEXT instead of TLS, I've modified the MSK Security group to allow MSK to communicate with itself and I'm running 2 Brokers instead of 3.

You will also need an EC2 Key Pair so go ahead into the EC2 Console and create one, making sure you download the Key and keep it somewhere safe and out of harm's way.

With the two pieces of pivotal information, we can go ahead and run the script.

➜ deploy-msk-infrastructure.sh IP_ADDRESS KEY_PAIR

➜ deploy-msk-infrastructure.sh 1.2.3.4/32 demo-keypair.pem
Enter fullscreen mode Exit fullscreen mode

It takes around 20–30 minutes for everything to be up, running, and functional. 

Once we are cooking with gas we should be able to SSH into our Bastion, create a Topic, and produce a record to it.

➜ chmod 400 demo-keypair.pem

➜ ssh -i "demo-keypair.pem" ec2-user@ec2-1-2-3-4.eu-west-2.compute.amazonaws.com


__|  __|_  )
       _|  (     /   Amazon Linux 2 AMI
      ___|\___|___|
https://aws.amazon.com/amazon-linux-2/


[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-topics.sh --create --topic example \
--bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --partitions 1 --replication-factor 1

[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-console-producer.sh --topic example --broker-list b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092

>Hello!

Enter fullscreen mode Exit fullscreen mode

And then consume it.

[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-console-consumer.sh --topic example --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --from-beginning

Hello!

Enter fullscreen mode Exit fullscreen mode

Alrighty, we have verified that our Cluster is alive and kicking. Now we can go ahead and create a Lambda function to verify the connectivity.

Simple Lambda Function

Before we get into the code lets just go over how a Lambda function is actually invoked by Amazon MSK.

A Lambda function is responsible for handling our records. But there is another process that happens before this which is an Event Source Mapping.

An Event Source Mapping is a Lambda process that reads data from a source system and invokes a function with the data it has received from the source system.

The Event Source Mapping that we create in the two examples below is responsible for reading the records from the Topics and invoking the functions.

The Event Source Mapping can be created from the AWS Lambda Console but it can also be created from a CloudFormation Template which we'll be doing in this example.

Right let's create this basic Lambda function, and when I say basic, I mean basic!

Think of this Lambda as verifying that it can communicate with Kafka and all the permissions are set up correctly.

package com.danieljameskay.main

import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.RequestHandler
import com.danieljameskay.models.MSKEvent
import com.google.gson.Gson
import com.google.gson.GsonBuilder

class App(var gson: Gson = GsonBuilder().setPrettyPrinting().create()) : RequestHandler<MSKEvent, String> {
    override fun handleRequest(event: MSKEvent, context: Context): String {
        val logger = context.logger
        logger.log(gson.toJson(event))
        return "OK"
    }
}
Enter fullscreen mode Exit fullscreen mode

Quickly running through the code. The JSON Event is Mapped into an MSKEvent object and the contents are logged out.

Following the link here, we have to create a Role with some extra permissions and configure the Event Source for the Lambda function as shown below in the below CloudFormation Template.

AWSTemplateFormatVersion: '2010-09-09'
Description: "Basic Lambda Function to verify MSK connectivity."
Parameters:
  EventSourceArn:
    Description: ARN of the Event Source, Kafka in this case
    Type: String
  TopicName:
    Description: Topic for the Lambda to consume from
    Type: String
  S3Bucket:
    Description: S3 Bucket for the JAR
    Type: String
  S3Key:
    Description: JAR Name
    Type: String
  Handler:
    Description: Application Handler
    Type: String
Resources:
  BasicKafkaConsumerLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName:
        Fn::Sub: BasicKafkaConsumerLambdaRole
      AssumeRolePolicyDocument:
        Statement:
          - Action:
              - sts:AssumeRole
            Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
        Version: 2012-10-17
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AWSLambdaExecute
        - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
      Path: /

  BasicKafkaConsumerLambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Name: BasicKafkaConsumerLambdaFunction
      Description: BasicKafkaConsumerLambdaFunction
      Runtime: java11
      Code:
        S3Bucket: !Ref S3Bucket
        S3Key: !Ref S3Key
      Handler: !Ref Handler
      MemorySize: 128
      Timeout: 10
      Role:
        Fn::GetAtt:
          - BasicKafkaConsumerLambdaRole
          - Arn

  EventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      EventSourceArn: !Ref EventSourceArn
      FunctionName: !Ref BasicKafkaConsumerLambdaFunction
      StartingPosition : LATEST
      Topics:
        - !Ref TopicName

Enter fullscreen mode Exit fullscreen mode

With the Lambda, Role, and Event Source Mapping configured in CloudFormation, we can go ahead and run the script.

➜ ./deploy-lambda.sh UBER_JAR_LOCATION \
S3_BUCKET_TO_UPLOAD_TOO \
STACK_NAME \
EVENT_SOURCE_ARN \
TOPIC_NAME \
S3_BUCKET_WHICH_CONTAINS_JAR \
JAR_NAME \
LAMBDA_HANDLER 

➜ ./deploy-lambda.sh ./basic-function-all.jar \
s3://my-bucket/basic-function-all.jar \
Basic-Lambda-Stack \     
arn:aws:kafka:eu-west-2:111111111111:cluster/MSKCluster/f0705c96-e239-4f74-a0f7-f82031a2fc65-4 \
example \
my-bucket \
basic-function-all.jar \
com.danieljameskay.main.App::handleRequest
Enter fullscreen mode Exit fullscreen mode

This script is extremely basic, it uses Gradle to build the app, creates a JAR, uploads it to S3, and finally creates the CloudFormation stack. It should take a couple of minutes for everything to be deployed.

As we did before, connect back to the Bastion and produce some basic test records.

[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-console-producer.sh --topic example --broker-list b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --property "parse.key=true" --property "key.separator=:"
> 1:Hello!
> 2:Hey!
Enter fullscreen mode Exit fullscreen mode

Then we can head to the Terminal and query the CloudWatch Logs.

➜ aws logs get-log-events --log-group-name /aws/lambda/BasicKafkaConsumerLambdaFunction --log-stream-name '2020/09/20/[$LATEST]8dd5e9881b7f456897d62d39b654e36d'
Enter fullscreen mode Exit fullscreen mode

We can see in the logs the records printed out. Below is a prettier version that has been escaped, values restored and is easier to read.

{
  "records": {
    "example-0": [
      {
        "topic": "example",
        "partition": 0,
        "offset": 0,
        "timestamp": 1600598451721,
        "timestampType": "CREATE_TIME",
        "key": "MQ==",
        "value": "SGVsbG8h"
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

and

{
  "records": {
    "example-1": [
      {
        "topic": "example",
        "partition": 1,
        "offset": 0,
        "timestamp": 1600598468528,
        "timestampType": "CREATE_TIME",
        "key": "Mg==",
        "value": "SGV5IQ=="
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

So we can see the records have been grouped into an array based on the partition they came from. We also get some metadata such as the timestamp and the value of the record has been encoded in base64.

The custom class I created which is used by the function handler to map the JSON into an MSKEvent object in this example drops the EventSource and EventSourceARN fields and if a Key isn't provided a zero will be visible in the payload.

Below is an example of what the payload would look like if our function consumed multiple records from different partitions.

{
  "records": {
    "example-1": [
      {
        "topic": "example",
        "partition": 1,
        "offset": 0,
        "timestamp": 1599337456523E12,
        "timestampType": "CREATE_TIME",
        "key": "0",
        "value": "cWVxd2Vxd2U="
      }
    ],
    "example-2": [
      {
        "topic": "example",
        "partition": 2,
        "offset": 0,
        "timestamp": 1599337455636E12,
        "timestampType": "CREATE_TIME",
        "key": "0",
        "value": "cXdlcXdl"
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Consumer Groups

A quick note on Consumer Groups. Multiple Consumers can work as part of a Consumer Group to increase parallelism. This allows the Partitions in the Topic to be divided between the available Consumers for consumption.

When the Consumer Group is registered with MSK, the group has the same name as the event source mapping UUID. If you aren't sure what the UUID is of your Event Source Mapping follow the instructions in the first link at the bottom of this post.

➜ aws lambda get-event-source-mapping --uuid 02d6c44a-46a7-41fe-8dd0-0f8ab903fee2
{
    "UUID": "02d6c44a-46a7-41fe-8dd0-0f8ab903fee2",
    "BatchSize": 100,
    "EventSourceArn": "arn:aws:kafka:eu-west-2:111111111111:cluster/MSKCluster/f0705c96-e239-4f74-a0f7-f82031a2fc65-4",
    "FunctionArn": "arn:aws:lambda:eu-west-2:111111111111:function:BasicKafkaConsumerLambdaFunction",
    "LastModified": "2020-09-20T11:18:49.525000+01:00",
    "LastProcessingResult": "OK",
    "State": "Enabled",
    "StateTransitionReason": "USER_INITIATED",
    "Topics": [
        "example"
    ]
}
Enter fullscreen mode Exit fullscreen mode

Then cross-reference this using the Kafka Consumer Group CLI.

[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-consumer-groups.sh  --list  --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092

02d6c44a-46a7-41fe-8dd0-0f8ab903fee2
Enter fullscreen mode Exit fullscreen mode

So we can see that the UUID of our Event Source Mapping does match the name of the Consumer Group. The last command will display the current offset and lag for each Partition which is useful to understand how our Consumer Group is performing.

[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-consumer-groups.sh --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --describe --group 02d6c44a-46a7-41fe-8dd0-0f8ab903fee2
Enter fullscreen mode Exit fullscreen mode

Integrating SQS

Now we're going to step it up a notch.

We're going to insert some real-world data into Kafka then trigger a Lambda function to send that data into an SQS queue.

Why SQS you might ask?

SQS was the very first AWS service that launched back in November 2004 and to this very day, it is still an extremely popular service used to build queue-based solutions.

One of my favorite Microservice patterns is the Anti-Corruption Layer Pattern. It allows us to provide a layer between two systems that need to be integrated where the semantics are different.

We're going to build a tiny ACL in this example. This means mapping the event generated by MSK into a smaller data structure, serializing the data to JSON, and sending it into an SQS queue.

I have a small application running which is receiving updates every 20 seconds from the Poloniex 24 Hour Exchange Volume API. Its sending the updates it receives onto MSK. The data received has the below structure.

{
  "USDC": "2811608.201",
  "PAX": "100.623",
  "DAI": "202.840",
  "USDJ": "14553.678",
  "USDT": "36356839.873",
  "TRX": "73155426.619",
  "BUSD": "7581.134",
  "BTC": "559.254",
  "BNB": "1717.156",
  "ETH": "70.466",
  "XMR": "0.000"
}
Enter fullscreen mode Exit fullscreen mode

This application is running on the Bastion EC2 instance just for the purpose of this demo. Before we do anything, we have to create a Topic for records to be sent to.

[ec2-user@ip-1-2-3-4 ~]➜ ./kafka/kafka_2.12-2.2.1/bin/kafka-topics.sh --create --topic example.poloniex.24hr.exchange.volume --bootstrap-server b-1.mskcluster.vxbd2z.c4.kafka.eu-west-2.amazonaws.com:9092 --partitions 3 --replication-factor 2
Enter fullscreen mode Exit fullscreen mode

As we did earlier in this article, we'll need to deploy our new Lambda function and some Cloudformation Templates for our SQS resource. 

First, we'll deploy our SQS Queue.

➜ ./SQS-Integration/deploy-infrastructure STACK_NAME NAME_OF_QUEUE
Enter fullscreen mode Exit fullscreen mode

The CloudFormation Template for SQS is really simple, it just requires the name of the Queue we want to create.

AWSTemplateFormatVersion: "2010-09-09"
Description: "Creates an SQS queue for the Poloniex Exchange Volume Lambda to send records to."
Parameters:
  QueueName:
    Description: Name of the Queue
    Type: String
Resources: 
  SQSQueue: 
    Type: AWS::SQS::Queue
    Properties: 
      QueueName: !Ref QueueName
Enter fullscreen mode Exit fullscreen mode

The Application is pretty straight forward. A record is produced to Kafka, Lambda is triggered, the JSON event is mapped into an MSKEvent object, the records inside the event are looped over and mapped into a basic ExchangeVolumeUpdate object and sent to SQS.

The CloudFormation template is pretty similar to the earlier one we used for the basic Lambda function.

AWSTemplateFormatVersion: '2010-09-09'
Description: "Poloniex Exchange Volume Lambda Function"
Parameters:
  EventSourceArn:
    Description: ARN of the Event Source, Kafka in this case
    Type: String
  TopicName:
    Description: Topic for the Lambda to consume from
    Type: String
  S3Bucket:
    Description: S3 Bucket for the JAR
    Type: String
  S3Key:
    Description: JAR Name
    Type: String
  Handler:
    Description: Application Handler
    Type: String

Resources:
  PoloniexExchangeVolumeLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName:
        Fn::Sub: PoloniexExchangeVolumeLambdaRole
      AssumeRolePolicyDocument:
        Statement:
          - Action:
              - sts:AssumeRole
            Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
        Version: 2012-10-17
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AWSLambdaExecute
        - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
        - arn:aws:iam::aws:policy/AmazonSQSFullAccess
      Path: /

  PoloniexExchangeVolumeLambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: PoloniexExchangeVolumeLambdaFunction
      Description: PoloniexExchangeVolumeLambdaFunction
      Runtime: java11
      Code:
        S3Bucket: !Ref S3Bucket
        S3Key: !Ref S3Key
      Handler: !Ref Handler
      MemorySize: 256
      Timeout: 10
      Role:
        Fn::GetAtt:
          - PoloniexExchangeVolumeLambdaRole
          - Arn

  EventSourceMapping:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      EventSourceArn: !Ref EventSourceArn
      FunctionName: !Ref PoloniexExchangeVolumeLambdaFunction
      StartingPosition : LATEST
      Topics:
        - !Ref TopicName
Enter fullscreen mode Exit fullscreen mode

Then we'll deploy our Lambda.

➜ ./deploy-lambda.sh UBER_JAR_LOCATION \
S3_BUCKET_TO_UPLOAD_TOO \
STACK_NAME \
EVENT_SOURCE_ARN \
TOPIC_NAME \
S3_BUCKET_WHICH_CONTAINS_JAR \
JAR_NAME \
LAMBDA_HANDLER

➜ ./deploy-lambda.sh ./sqs-integration-all.jar \
s3://my-bucket/sqs-integration-all.jar \
MSK-SQS-Lambda-Stack \     
arn:aws:kafka:eu-west-2:111111111111:cluster/MSKCluster/f0705c96-e239-4f74-a0f7-f82031a2fc65-4 \
example.poloniex.24hr.exchange.volume \
my-bucket \
sqs-integration-all.jar \
com.danieljameskay.sqs.integration.main.App::handleRequest
Enter fullscreen mode Exit fullscreen mode

This script behaves in the same way as the one used in the basic example above, it uses Gradle to build the app, creates a JAR, uploads it to S3, and finally creates the CloudFormation stack. It should take a couple of minutes for everything to be deployed.

The application is running and sending records to MSK so as soon as our Lambda is deployed we should be able to use the SQS web interface to verify records are being sent to SQS with no issues.

Alt Text

We can also view the CloudWatch graphs to gain insight into how our function is behaving.

Alt Text

Happy days! Everything is working as it should!

Wrap Up

So there we are, we've run through how to invoke some basic Lambda Functions from MSK and utilized CloudFormation to create the infrastructure.

I hope everyone who has read this post has enjoyed it and if anyone has any questions drop me a comment or a tweet!

Cover photo by Pieter van de Sande on Unsplash

Useful links:

https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html
https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

Stay safe!

Cheers
DK

Top comments (3)

Collapse
 
cristigeo profile image
Cristigeo

What happens if the lambda triggered by MSK fails for whatever reason? Is there an automatic retry mechanism (which is something you CAN define /parametrize for other event source triggers, such as DynamoDB streams)? Or is it ignored?

Collapse
 
krishwin profile image
krishwin

Nice article. you mentioned about consumer group/consumer. in my testing i am seeing that AWS is invoking only one instance of the lambda and does not parallelize .looks like a limitation. any thoughts on that?

Collapse
 
chriscamplejohn profile image
Chris Camplejohn • Edited

I thought the same, but it did seem to add concurrency at a rate of 1 every 15 minutes. I dumped 5 million records into a Kafka topic with 10 partitions and it is taking forever to process them (still going). This is the issue I find with lambda and serverless, the lack of control. You can provision the lambdas, but then you might as well run a container or ec2 instance. Concurrency