DEV Community

Emiliano Roberti
Emiliano Roberti

Posted on

Understanding Amazon Kinesis Data Streams: Indexing, Replay, and Consumer Flexibility

Amazon Kinesis Data Streams (KDS) is a powerful platform for real-time data streaming and processing. It allows applications to ingest, process, and analyse high-throughput data in real time. One of its key strengths lies in indexing, event replay, and the ability to support multiple consumers at varying speeds. Let’s explore these features and their implications.


How Indexes Work in Kinesis Data Streams

In Kinesis, every record ingested into the stream is assigned a sequence number. This sequence number acts as the index for the record and serves the following purposes:

  • Record Order: Within a shard, records are strictly ordered based on their sequence numbers.
  • Replay and Retrieval: The sequence number allows applications to replay specific records or ranges of records.
  • Shard Iterator: Consumers use the sequence number to request records starting from a specific point.

Indexing Characteristics

  1. Shard-Level Granularity: Each shard in a stream maintains its own set of sequence numbers. Sequence numbers are unique within a shard but not across the stream.
  2. Durability: Records are retained in the stream for a configurable retention period (default: 24 hours, maximum: 7 days). During this period, indexes enable replay.
  3. Starting Position: Consumers can begin reading from:
    • TRIM_HORIZON: Start from the oldest record available.
    • LATEST: Start with the most recent record.
    • Specific Sequence Number: Start from an exact point in the stream.

Replay Events with Kinesis

One of Kinesis's standout features is the ability to replay events. This enables use cases like:

  • Debugging or reprocessing historical data.
  • Feeding data into new consumers without data loss.

How Replay Works

  1. Consumers use a Shard Iterator to specify the starting point of record retrieval. Example: Replay records from a sequence number 49659891250263205584801854121677320731210653829945098258.
  2. Kinesis ensures that records are delivered in order for each shard, maintaining consistency during replay.

Supporting Multiple Consumers at Different Speeds

Kinesis streams are designed to support multiple consumers reading data from the same stream. Here’s how it works:

Key Features

  1. Independent Progress:
    • Each consumer can maintain its own shard iterator, allowing independent progress.
    • Fast consumers can process records quickly, while slower consumers operate at their own pace.

Events from Kinesis

  1. Enhanced Fan-Out (EFO):

    • With EFO, consumers receive their own throughput pipe, avoiding contention between multiple consumers.
    • Default throughput: 2 MB/s per shard for all consumers.
    • With EFO: Each consumer gets an additional 2 MB/s.
  2. Checkpointing:

    • Checkpointing ensures that each consumer keeps track of the last record processed.
    • This allows consumers to resume processing from where they left off, even after interruptions.

Kinesis Architecture

Architecture Components

Left Side: Data Sources

  1. Mobile: Data generated by mobile devices, such as user interactions, telemetry, or logs.
  2. Desktop: Similar data generated from desktop applications.
  3. Game Servers: Game server activities, player interactions, or system metrics.

These data sources produce real-time events and stream them into Amazon Kinesis.


Centre: Amazon Kinesis

  • Amazon Kinesis: A managed service that collects, processes, and analyses real-time streaming data. It consists of shards, which are the basic units for parallel data processing. Each shard processes a portion of the incoming data.
    • Shard A, B, C, D: Multiple shards provide scalability and allow parallel data ingestion and processing. The number of shards typically depends on the data volume and processing requirements.

Right Side: AWS Components

  1. AWS Lambda:

    • Event-driven compute service that automatically processes incoming data from the Kinesis stream.
    • It applies transformations, enrichments, or routing logic to the data.
    • Lambda also triggers notifications via SNS (Simple Notification Service) for downstream systems or users.
  2. Amazon DynamoDB:

    • A NoSQL database used to store processed or aggregated data from Kinesis.
    • Suitable for scenarios like maintaining state, querying game statistics, or storing metadata.
  3. SNS (Simple Notification Service):

    • Publishes notifications triggered by AWS Lambda. These notifications can be sent to users, admins, or other applications for alerting or further actions.
  4. Amazon Kinesis Data Analytics:

    • Analyses streaming data in real-time using SQL-like queries.
    • Provides insights such as trends, anomalies, or metrics without requiring the data to be stored.

Use Cases

This architecture is particularly suited for:

  • Event driven platforms that require real-time monitoring.
  • Gaming platforms that require real-time scores and gaming events.
  • Applications providing live dashboards or insights.
  • Monitoring systems that need to alert on specific events or thresholds.

Amazon Kinesis Data Streams cab provide a flexible solution for real-time data processing. By using indexing, replay capabilities, and consumer independence, you can design systems that are scalable, resilient, and tailored to various workloads.

I have also included my repository on creating records to be sent to aws kinesis stream and a target lambda to extract the records ready to be purposed for business logic

Full code here

https://github.com/EmiRoberti77/cdk_kinesis_stream_sample

How the code works with CDK, Kinesis and Lambda

This document provides an overview of setting up a Kinesis Data Stream with AWS CDK and integrating it with a Lambda function for processing records. The examples include:

  • Sending records to the Kinesis Data Stream.
  • Creating an AWS CDK stack for the stream and Lambda integration.
  • Writing a Lambda function to process the records.

Project Setup

To create and manage the infrastructure using AWS CDK, follow these steps:

Install AWS CDK

Ensure you have the AWS CDK installed globally:

npm install -g aws-cdk
Enter fullscreen mode Exit fullscreen mode

Initialise the CDK Project

Run the following command to create a new CDK project:

cdk init app --language=typescript
Enter fullscreen mode Exit fullscreen mode

This will create a basic file structure for your CDK project.

File Structure

Below is the file structure for this example, including where the Lambda functions are stored:

project-root/
├── bin/
│   └── cdk-kinesis.ts           # Entry point for the CDK app
├── lib/
│   └── cdk-kinesis-stack.ts     # Stack definition for Kinesis and Lambda
├── src/
│   └── lambdas/
│       └── lambda_kinesis_target/
│           └── handler.ts       # Lambda handler for processing Kinesis records
├── node_modules/                # Node.js dependencies
├── package.json                 # Project dependencies
├── cdk.json                     # CDK project configuration
├── tsconfig.json                # TypeScript configuration
└── README.md                    # Documentation for the project
Enter fullscreen mode Exit fullscreen mode

Sending Records to Kinesis Data Stream

The following TypeScript code demonstrates how to send records to a Kinesis Data Stream. The record includes a random name, timestamp, and message, all of which are JSON-encoded.

import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
const kinesisClient = new KinesisClient({
  region: "us-east-1",
});

async function addRecordToKinesis(
  streamName: string,
  data: string,
  partitionKey: string
): Promise<any> {
  try {
    const encodedData = Buffer.from(data);
    const putRecord = new PutRecordCommand({
      StreamName: streamName,
      Data: encodedData,
      PartitionKey: partitionKey,
    });

    const response = await kinesisClient.send(putRecord);
    console.log(response);
    return response;
  } catch (err: any) {
    console.log(err.message);
    return undefined;
  }
}

const random = Math.floor(Math.random() * 1000);
const now = new Date().toISOString();
const streamName = "emi-kinesis-stream";
const data = JSON.stringify({
  name: "emi" + random,
  timeStamp: now,
  message: "kinesis message",
});
const partitionKey = "PartitionKey1";

addRecordToKinesis(streamName, data, partitionKey).catch((err) =>
  console.log(err)
);
Enter fullscreen mode Exit fullscreen mode

AWS CDK Stack for Kinesis and Lambda Integration

The following AWS CDK code sets up a Kinesis Data Stream and a Lambda function. It integrates the Lambda function with the Kinesis stream using an event source mapping.

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import { Stream, StreamMode } from "aws-cdk-lib/aws-kinesis";
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
import { Runtime, StartingPosition } from "aws-cdk-lib/aws-lambda";
import { Effect, PolicyStatement } from "aws-cdk-lib/aws-iam";
import { KinesisEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import * as path from "path";

export class CdkKinesisStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const stream = new Stream(this, "emi_cdk_stream", {
      streamName: "emi-kinesis-stream",
      streamMode: StreamMode.ON_DEMAND,
    });

    const targetKinesisLambda = new NodejsFunction(
      this,
      "emi_cdk_kinesis_target_lambda",
      {
        functionName: "emi_cdk_kinesis_target_lambda",
        runtime: Runtime.NODEJS_18_X,
        handler: "handler",
        entry: path.join(
          __dirname,
          "..",
          "src",
          "lambdas",
          "lambda_kinesis_target",
          "handler.ts"
        ),
      }
    );

    stream.grantReadWrite(targetKinesisLambda);

    targetKinesisLambda.addToRolePolicy(
      new PolicyStatement({
        actions: ["*"],
        resources: ["*"],
        effect: Effect.ALLOW,
      })
    );

    targetKinesisLambda.addEventSource(
      new KinesisEventSource(stream, {
        startingPosition: StartingPosition.LATEST,
        batchSize: 100,
        enabled: true,
      })
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Lambda Function for Processing Kinesis Records

The Lambda function below processes records from the Kinesis Data Stream. It decodes the Base64-encoded data, parses it as JSON, and logs the payload.

import { KinesisStreamEvent, KinesisStreamRecord } from "aws-lambda";

export const handler = async (event: KinesisStreamEvent): Promise<void> => {
  for (const record of event.Records) {
    const { partitionKey, sequenceNumber } = record.kinesis;
    console.log(
      `[partitionKey=${partitionKey}]:[sequenceNumber=${sequenceNumber}]`
    );
    const payload = Buffer.from(record.kinesis.data, "base64").toString(
      "utf-8"
    );
    try {
      const decodedPayload = JSON.parse(payload);
      console.log(decodedPayload);
    } catch (err) {
      console.error(err);
    }
  }
};
Enter fullscreen mode Exit fullscreen mode

AWS Console

lambda output

Summary

This setup demonstrates how to:

  1. Send data to a Kinesis Data Stream.
  2. Use AWS CDK to create and configure the stream and a Lambda function.
  3. Process the Kinesis stream records in a Lambda function.

With this architecture, you can handle real-time streaming data and integrate it into your applications effectively.

Reinvent your career. Join DEV.

It takes one minute and is worth it for your career.

Get started

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Immerse yourself in a wealth of knowledge with this piece, supported by the inclusive DEV Community—every developer, no matter where they are in their journey, is invited to contribute to our collective wisdom.

A simple “thank you” goes a long way—express your gratitude below in the comments!

Gathering insights enriches our journey on DEV and fortifies our community ties. Did you find this article valuable? Taking a moment to thank the author can have a significant impact.

Okay