DEV Community

Abhishek Gupta for AWS

Posted on • Updated on • Originally published at abhishek1987.Medium

Auto-scaling DynamoDB Streams applications on Kubernetes

This blog post demonstrates how to auto-scale your DynamoDB Streams consumer applications on Kubernetes. You will work with a Java application that uses the DynamoDB Streams Kinesis adapter library to consume change data events from a DynamoDB table. It will be deployed to an Amazon EKS cluster and will be scaled automatically using KEDA.

The application includes an implementation of the com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor that processes data from the DynamoDB stream and replicates it to another (target) DynamoDB table - this is just used as an example. We will use the AWS CLI to produce data to DynamoDB stream and observe the scaling of the application.

The code is available in this GitHub repository

Image description

What's covered?

Introduction

Amazon DynamoDB is a fully managed database service that provides fast and predictable performance with seamless scalability. With DynamoDB Streams, you can leverage Change Data Capture (CDC) to get notified about changes to DynamoDB table data in real-time. This makes it possible to easily build applications that react to changes in the underlying database without the need for complex polling or querying.

DynamoDB offers two streaming models for change data capture:

  • Kinesis Data Streams for DynamoDB
  • DynamoDB Streams

With Kinesis Data Streams, you can capture item-level modifications in any DynamoDB table and replicate them to a Kinesis data stream. With DynamoDB Streams captures a time-ordered sequence of item-level modifications in any DynamoDB table and stores this information in a log for up to 24 hours.

We will make use of the native DynamoDB Streams capability. Even with DynamoDB Streams, there are multiple options to choose from when it comes to consuming the change data events:

Our application will leverage DynamoDB Streams along with the Kinesis Client Library (KCL) adapter library 1.x to consume change data events from a DynamoDB table.

Horizontal scalability with Kinesis Client Library

The Kinesis Client Library ensures that for every shard there is a record processor running and processing that shard. KCL helps take care of many of the complex tasks associated with distributed computing and scalability. It connects to the data stream, enumerates the shards within the data stream and uses leases to coordinates shard associations with its consumer applications.

A record processor is instantiated for every shard it manages. KCL pulls data records from the data stream, pushes the records to the corresponding record processor and checkpoints processed records. More importantly, it balances shard-worker associations (leases) when the worker instance count changes or when the data stream is re-sharded (shards are split or merged). This means that you are able to scale your DynamoDB Streams application by simply adding more instances since KCL will automatically balance the shards across the instances.

But, you still need a way to scale your applications when the load increases. Of course, you could do it manually or build a custom solution to get this done.

This is where KEDA comes in.

What is KEDA?

KEDA is a Kubernetes-based event-driven autoscaling component that can monitor event sources like DynamoDB Streams and scale the underlying Deployments (and Pods) based on the number of events needing to be processed. It's built on top of native Kubernetes primitives such as the Horizontal Pod Autoscaler that can be added to any Kubernetes cluster. Here is a high level overview of it's key components (you can refer to the KEDA documentation for a deep-dive):


From KEDA documentation - https://keda.sh/docs/2.8/concepts/

  1. The keda-operator-metrics-apiserver component in KEDA acts as a Kubernetes metrics server that exposes metrics for the Horizontal Pod Autoscaler
  2. A KEDA Scaler integrates with an external system (such as Redis) to fetch these metrics (e.g. length of a List) to drives auto scaling of any container in Kubernetes based on the number of events needing to be processed.
  3. The role of the keda-operator component is to activate and deactivate Deployment i.e. scale to and from zero.

You will see the DynamoDB Streams scaler in action that scales based on the shard count of a DynamoDB Stream.

Now lets move on the practical part of this tutorial.

Prerequisites

In addition to an AWS account, you will need to have the AWS CLI, kubectl and Docker installed.

Setup an EKS cluster and create a DynamoDB tables

There are a variety of ways in which you can create an Amazon EKS cluster. I prefer using eksctl CLI because of the convenience it offers. Creating an an EKS cluster using eksctl, can be as easy as this:

eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
Enter fullscreen mode Exit fullscreen mode

For details, refer to the Getting started with Amazon EKS – eksctl.

Create a DynamoDB table with streams enabled to persist application data and access the change data feed. You can use the AWS CLI to create a table with the following command:

aws dynamodb create-table \
    --table-name users \
    --attribute-definitions AttributeName=email,AttributeType=S \
    --key-schema AttributeName=email,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
Enter fullscreen mode Exit fullscreen mode

We will need to create another table that will serve as a replica for the first table.

aws dynamodb create-table \
    --table-name users_replica \
    --attribute-definitions AttributeName=email,AttributeType=S \
    --key-schema AttributeName=email,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST
Enter fullscreen mode Exit fullscreen mode

Clone this GitHub repository and change to the right directory:

git clone https://github.com/abhirockzz/dynamodb-streams-keda-autoscale
cd dynamodb-streams-keda-autoscale
Enter fullscreen mode Exit fullscreen mode

Ok let's get started!

Setup and configure KEDA on EKS

For the purposes of this tutorial, you will use YAML files to deploy KEDA. But you could also use Helm charts.

Install KEDA:

# update version 2.8.2 if required

kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
Enter fullscreen mode Exit fullscreen mode

Verify the installation:

# check Custom Resource Definitions
kubectl get crd

# check KEDA Deployments
kubectl get deployment -n keda

# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda
Enter fullscreen mode Exit fullscreen mode

Configure IAM Roles

The KEDA operator as well as the DynamoDB streams consumer application need to invoke AWS APIs. Since both will run as Deployments in EKS, we will use IAM Roles for Service Accounts (IRSA) to provide the necessary permissions.

In our particular scenario:

  • KEDA operator needs to be able to get information about the DynamoDB table and Stream.
  • The application (KCL 1.x library to be specific) needs to interact with Kinesis and DynamoDB - it needs a bunch of IAM permissions to do so.

Configure IRSA for KEDA operator

Set your AWS Account ID and OIDC Identity provider as environment variables:

ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)

#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1

OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
Enter fullscreen mode Exit fullscreen mode

Create a JSON file with Trusted Entities for the role:

read -r -d '' TRUST_RELATIONSHIP <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
          "${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
        }
      }
    }
  ]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust_keda.json
Enter fullscreen mode Exit fullscreen mode

Now, create the IAM role and attach the policy (take a look at policy_dynamodb_streams_keda.json file for details):

export ROLE_NAME=keda-operator-dynamodb-streams-role

aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for DynamoDB streams KEDA scaler on EKS"

aws iam create-policy --policy-name keda-dynamodb-streams-policy --policy-document file://policy_dynamodb_streams_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-dynamodb-streams-policy
Enter fullscreen mode Exit fullscreen mode

Associate the IAM role and Service Account:

kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}

# verify the annotation 
kubectl describe serviceaccount/keda-operator -n keda
Enter fullscreen mode Exit fullscreen mode

You will need to restart KEDA operator Deployment for this to take effect:

kubectl rollout restart deployment.apps/keda-operator -n keda

# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"

# expected output

AWS_STS_REGIONAL_ENDPOINTS:   regional
AWS_DEFAULT_REGION:           us-east-1
AWS_REGION:                   us-east-1
AWS_ROLE_ARN:                 arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-dynamodb-streams-role
AWS_WEB_IDENTITY_TOKEN_FILE:  /var/run/secrets/eks.amazonaws.com/serviceaccount/token
Enter fullscreen mode Exit fullscreen mode

Configure IRSA for the DynamoDB Streams consumer application

Start by creating a Kubernetes Service Account:

kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
  name: dynamodb-streams-consumer-app-sa
EOF
Enter fullscreen mode Exit fullscreen mode

Create a JSON file with Trusted Entities for the role:

read -r -d '' TRUST_RELATIONSHIP <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
          "${OIDC_PROVIDER}:sub": "system:serviceaccount:default:dynamodb-streams-consumer-app-sa"
        }
      }
    }
  ]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust.json
Enter fullscreen mode Exit fullscreen mode

Now, create the IAM role and attach the policy. Update the policy.json file and enter the region and AWS account details.

export ROLE_NAME=dynamodb-streams-consumer-app-role

aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for DynamoDB Streams consumer app on EKS"

aws iam create-policy --policy-name dynamodb-streams-consumer-app-policy --policy-document file://policy.json

aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/dynamodb-streams-consumer-app-policy
Enter fullscreen mode Exit fullscreen mode

Associate the IAM role and Service Account:

kubectl annotate serviceaccount -n default dynamodb-streams-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}

# verify the annotation
kubectl describe serviceaccount/dynamodb-streams-consumer-app-sa
Enter fullscreen mode Exit fullscreen mode

The core infrastructure is now ready. Let's prepare and deploy the consumer application.

Deploy DynamoDB Streams consumer application to EKS

We would first need to build the Docker image and push it to ECR (you can refer to the Dockerfile for details)

Build and push the Docker image to ECR

# create runnable JAR file
mvn clean compile assembly\:single

# build docker image
docker build -t dynamodb-streams-consumer-app .

AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)

# create a private ECR repo
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com

aws ecr create-repository --repository-name dynamodb-streams-consumer-app --region us-east-1

# tag and push the image
docker tag dynamodb-streams-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest
Enter fullscreen mode Exit fullscreen mode

Deploy the consumer application

Update the consumer.yaml to include the Docker image you just pushed to ECR and the ARN for the DynamoDB streams for the source table. The rest of the manifest remains the same.

To retrieve the ARN for the stream, run the following command:

aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn'
Enter fullscreen mode Exit fullscreen mode

The consumer.yaml Deployment manifest looks like this:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: dynamodb-streams-kcl-consumer-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: dynamodb-streams-kcl-consumer-app
  template:
    metadata:
      labels:
        app: dynamodb-streams-kcl-consumer-app
    spec:
      serviceAccountName: dynamodb-streams-kcl-consumer-app-sa
      containers:
        - name: dynamodb-streams-kcl-consumer-app
          image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-kcl-consumer-app:latest
          imagePullPolicy: Always
          env:
            - name: TARGET_TABLE_NAME
              value: users_replica
            - name: APPLICATION_NAME
              value: dynamodb-streams-kcl-app-demo
            - name: SOURCE_TABLE_STREAM_ARN
              value: <enter ARN>
            - name: AWS_REGION
              value: us-east-1
            - name: INSTANCE_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
Enter fullscreen mode Exit fullscreen mode

Create the Deployment:

kubectl apply -f consumer.yaml

# verify Pod transition to Running state
kubectl get pods -w
Enter fullscreen mode Exit fullscreen mode

DynamoDB Streams consumer app autoscaling in action with KEDA

Now that you've deployed the consumer application, KCL adapter library should jump into action. The first thing it will do is create a "control table" in DynamoDB - it should be the same as name of the application (which in this case is dynamodb-streams-kcl-app-demo).

It might take a few minutes for the initial co-ordination to happen and the table to get created. You can check the logs of the consumer application to see the progress.

kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})
Enter fullscreen mode Exit fullscreen mode

Once the lease allocation is complete, check the table and note the leaseOwner attribute:

aws dynamodb describe-table --table-name dynamodb-streams-kcl-app-demo
Enter fullscreen mode Exit fullscreen mode

Image description

Add data to the DynamoDB table

Now that you've deployed the consumer application, let's add data to the source DynamoDB table (users).
You can use the producer.sh script for this.

export export TABLE_NAME=users
./producer.sh
Enter fullscreen mode Exit fullscreen mode

Check consumer logs to see the messages being processed:

kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})
Enter fullscreen mode Exit fullscreen mode

Check the target table (users_replica) to confirm that the DynamoDB streams consumer application has indeed replicated the data.

aws dynamodb scan --table-name users_replica
Enter fullscreen mode Exit fullscreen mode

Notice that the value for the processed_by attribute? It's the same as consumer application Pod. This will make it easier for us to verify the end to end autoscaling process.

Image description

Create the KEDA scaler

Use the scaler definition:

kubectl apply -f keda-dynamodb-streams-scaler.yaml
Enter fullscreen mode Exit fullscreen mode

Here is the ScaledObject definition. Notice that it's targeting the dynamodb-streams-kcl-consumer-app Deployment (the one we just created) and the shardCount is set to 2:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name:  aws-dynamodb-streams-scaledobject
spec:
  scaleTargetRef:
    name: dynamodb-streams-kcl-consumer-app
  triggers:
  - type: aws-dynamodb-streams
    metadata:
      awsRegion: us-east-1
      tableName: users
      shardCount: "2"
      identityOwner: "operator"
Enter fullscreen mode Exit fullscreen mode

Note on shardCount attribute:

We are using the shardCount value of 2. This is very important to note since we are using DynamoDB Streams Kinesis adapter library using KCL 1.x that supports "Up to 2 simultaneous consumers per shard.". This means that you cannot have more than two consumer application instances processing the same DynamoDB stream shard.

However, this KEDA scaler configuration will ensure that there will be one Pod for every two shards. So, for e.g. if there are four shards, the application will be scaled out to two Pods. If there are siz shards, there will be three Pods, and so on. Of course, you can choose to have one Pod for every shard by setting the shardCount to 1.

To keep a track of the number of shards in the DynamoDB stream, you can run the following command:

aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards | length'
Enter fullscreen mode Exit fullscreen mode

I have used a utility called jq

If you want the shard details:

aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards'
Enter fullscreen mode Exit fullscreen mode

Verify DynamoDB streams consumer application auto-scaling

We started off with one Pod of our application. But, thanks to KEDA, we should now see additional Pods coming up automatically to match the processing requirements of the consumer application.

To confirm, check the number of Pods:

kubectl get pods -l=app=dynamodb-streams-kcl-consumer-app-consumer
Enter fullscreen mode Exit fullscreen mode

Most likely, you will see fours shards in the DynamoDB stream and two Pods. This can change (increase/decrease) depending on the rate at which data is produced to the DynamoDB table.

Just like before, validate the data in the DynamoDB target table (users_replica) and note the processed_by attribute. Since we have scaled out to additional Pods, the value should be different for each record since each Pod will process a subset of the messages from the DynamoDB change stream.

Also, make sure to check dynamodb-streams-kcl-app-demo control table in DynamoDB - You should see an update for the leaseOwner reflecting the fact that now there are two Pods consuming from the DynamoDB stream.

Image description

Once you have verified the end to end solution, you can clean up the resources to avoid incurring any additional charges.

Delete resources

Delete the EKS cluster and DynamoDB tables.

eksctl delete cluster --name <enter cluster name>
aws dynamodb delete-table --table-name users
aws dynamodb delete-table --table-name users_replica
Enter fullscreen mode Exit fullscreen mode

Conclusion

Use cases you should experiment with

  • Scale further up - How can you make DynamoDB streams increase it's number of shards? What happens to the number of consumer instance Pods?
  • Scale down - What happens when the shard capacity of the DynamoDB streams decreases?

In this post, we demonstrated how to use KEDA and DynamoDB Streams and combine two powerful techniques (Change data capture and Auto-scaling) to build scalable, event-driven systems that can adapt based on the data processing needs of your application.

Top comments (0)