DEV Community

Cover image for From AWS Kinesis to Apache Kafka: Building a Real-Time Streaming Bridge
Abdelrahman Ahmed
Abdelrahman Ahmed

Posted on

From AWS Kinesis to Apache Kafka: Building a Real-Time Streaming Bridge

Introduction:

In one of my recent projects, I needed to replicate data from an AWS Kinesis stream into an Apache Kafka topic. The goal was to allow downstream systems to consume the same events from Kafka that were originally being produced to Kinesis.

The core question here is: How can we replicate data from AWS Kinesis stream to a Kafka topic in real-time and with minimal latency, and with the highest reliability?

I explored different solutions, including Kafka Connect, MirrorMaker, and other integration tools. However, most of them came with complex configurations and limitations that didn’t fit our infrastructure setup.

So, I decided to build a custom Kinesis-to-Kafka bridge that will be flexible, lightweight, highly configurable, and perfectly plug into our infrastructure setup.

If you’re new to Amazon Kinesis, here’s a quick overview of it (feel free to skip this section if you’re already familiar):

Amazon Kinesis is a group of streaming services within Amazon Web Services (AWS) created to stream real-time data at scale. It contains different services like:

  • Amazon Kinesis Data Streams
  • Amazon Kinesis Data Firehose
  • Amazon Kinesis Video Streams
  • Amazon Managed Service for Apache Flink

In this article, we will focus on Kinesis Data Streams, which is a serverless (fully managed by AWS) streaming service designed to handle high-throughput, low-latency streaming use-cases like events, logs, and clickstreams.

If you’re also new to Apache Kafka, here’s a quick overview(feel free to skip this section if you’re already familiar):

Apache Kafka is a distributed event streaming platform that handles large volumes of real-time data. It provides durability, scalability, and strong ordering guarantees. It is usually used to build data pipelines and streaming applications at different scales, and it is very dominant in event-driven architectures.

The Replicator Schema

The replicator consists of three core stages, as shown in the following diagram:

┌──────────────────────────┐
│     Kinesis Consumer     │  ← Consumes records from AWS Kinesis stream
└────────────┬─────────────┘
             │
             ▼
┌──────────────────────────┐
│     Data Processing      │  ← Applies business logic to transform/validate/parse the records
└────────────┬─────────────┘
             │
             ▼
┌──────────────────────────┐
│    Producing to Kafka    │  ← Produces processed events to Kafka topic/topics
└──────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Kinesis Consumer: Consuming data from a Kinesis stream requires more than just reading records; it requires manual management for offsets since Kinesis doesn’t provide a built-in offset tracking mechanism for consumers. Instead, consumers are responsible for tracking their own progress, typically by storing checkpoint data in a DynamoDB table, often referred to as a lease table.

This approach, called leasing, allows multiple consumers to coordinate shard (the basic unit of Kinesis stream) access and avoid processing the same records. You can implement this logic manually (very complex) or use a ready-to-use library such as Amazon Kinesis Client Library (KCL), which handles the lease management, DynamoDB tables creation, and offset tracking out of the box.

In my case, I used KCL for Python (KCLPY) to consume from Kinesis so that I can avoid re-implementing low-level coordination and state management logic.

Data Processing: This stage is where your core business logic lives. After data is fetched from Kinesis but before it’s forwarded to Kafka, you may need to:

  • Adding any required metadata to records
  • Validate or filter the records
  • Transform the records to meet Kafka consumer expectations

If no transformation is needed, the data can be streamed directly to Kafka.

Producing to Kafka: This is the final stage, in which records are being published to the designated Kafka topic.

At the implementation level, this is typically achieved by using one of the Kafka SDKs. In my case, I used the confluent_kafka‎ client library for Python.

After this stage, the records are serialized and published to Kafka, which allows the downstream systems to consume the data in real-time.

Tech Stack and Project Structure

This project was developed using Python, kclpy, confluent_kafka, and Docker.

Below is an overview of the project structure:

kinesis-to-kafka-replicator/
├── kinesis_replicator/
│   ├── __init__.py
│   └── record_processor.py
│
├── config/
│   ├── kcl.properties.template
│   └── replicator_configs.template
│
├── amazon_kclpy/
│   └── jars/
│
├── run.sh
├── Dockerfile
├── requirements.txt
└── README.md
Enter fullscreen mode Exit fullscreen mode

Configuration Breakdown

The configuration is separated into two parts:

  • KCLPY Configuration: Located in kcl.properties.template and contains all the required settings for the Kinesis client library (KCL), such as streamName, initialPositionInStream, and the AWS authentication parameters. A more detailed list can be found here.

  • Replicator Configuration: Located in replicator_configs.template and includes the kafka_specific settings, such as bootstrap_servers, kafka_topic, and client_id, along with any global parameters required for tuning the replicator, such as sleep_seconds.

All Python dependencies, such as boto3, confluent_kafka, and amazon_kclpy, are listed in requirements.txt.

Core Logic: record_processor.py

This is the heart code base of the replicator.

The RecordProcessor class is responsible for:

  • Consuming/Receiving the records from the Kinesis stream
  • Applying any business logic or transformations
  • Publishing the resulting data to Kafka
  • Handling retries, checkpointing, and logging

A more detailed sample can be found here

Following a simplified breakdown of what’s happening:

➤ Processing Kinesis Records

# Example for KCL process_records method with leasing functionality (checkpointing)

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq) # Business logic is being imlemented in this methos
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            # Checkpoints every N seconds
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))
Enter fullscreen mode Exit fullscreen mode

➤ Producing to Kafka

# A code snippet from the process_record method mentioned above that shows how to produce to Kafka

            try:
                self.kafka_producer.produce(
                    topic=self.kafka_topic,
                    value=data,
                    key=partition_key,
                    callback=callback
                )
            except BufferError:
                logger.warning(f"Kafka producer queue full, draining and retrying for record {sequence_number}")
                self.kafka_producer.poll(1.0)
                self.kafka_producer.produce(
                    topic=self.kafka_topic,
                    value=data,
                    key=partition_key,
                    callback=callback
                )
Enter fullscreen mode Exit fullscreen mode

This ensures the record is safely published to Kafka, with retry logic in case of temporary backpressure.

amazon_kclpy/jars/: This folder contains the Java dependencies used by KCLPY. These are required to enable the bridge between Python and the underlying KCL lease coordination.
Jar files can be generated by following the instructions written here.

Deployment and Startup

To ensure portability and seamless deployment, the replicator is contranized using Docker. Here is a breakdown of how to build and run it.

Dockerfile: Defines how to build a container image for the replicator that includes the required Python environment, libraries, and the Kinesis Client Library (KCL) dependencies.

➤ Sample Dockerfile

FROM python:3.9-slim

# Install system dependencies
RUN apt-get update && apt-get install -y openjdk-11-jre curl unzip && \
    rm -rf /var/lib/apt/lists/*

# Set workdir and copy code
WORKDIR /app
COPY . /app

# Install Python requirements
RUN pip install --no-cache-dir -r requirements.txt

# Set entrypoint
ENTRYPOINT ["./run.sh"]
Enter fullscreen mode Exit fullscreen mode

run.sh: The run.sh script acts as the entrypoint to launch the replicator. First, the script renders the configuration template files into typical configuration files, then it bootstraps the KCL MultiDaemon with that configuration.

The run.sh script includes:

  • Template rendering for the config files using the envsubst command
  • Launching the KCL Python daemon with the provided configuration

➤ Sample run.sh

#!/bin/bash
set -e

# Rendering the configuration templates
envsubst < config/replicator_configs.template > config/replicator_configs.json
envsubst < config/kcl.properties.template > config/kcl.properties

# Start KCL daemon
java -cp "amazon_kclpy/jars/*" software.amazon.kinesis.multilang.MultiLangDaemon config/kcl.properties
Enter fullscreen mode Exit fullscreen mode

Build and Run

Pre-requisites:

  • Make sure that your Kinesis stream has data
  • Make sure that your Kafka topic exists and is ready to receive data
  • The number of shards in the stream must equal the number of partitions in the Kafka topic to help in maintaining global ordering
  • The KCL requires some DynamoDB tables to handle Kinesis offsets; you can pre-create them or give the KCL permissions to do that based on the AWS IAM role used.

1) Build the Docker image

# Go to the Dockerfile path and run the following command

docker build -t kinesis-to-kafka-replicator .
Enter fullscreen mode Exit fullscreen mode

2) Prepare the required environment variables

# Example for KCL configuration
KCL_STREAM_NAME=kafka-kinesis-replicator-test
KCL_APPLICATION_NAME=replicator-app-name
KCL_REGION=eu-west-1
KCL_INITIAL_POSITION=LATEST
KCL_LOG_LEVEL=DEBUG
KCL_AWS_CREDENTIALS_PROVIDER=DefaultCredentialsProvider

# Example for Kafka and global configuration
SLEEP_SECONDS=5
CHECKPOINT_RETRIES=5
CHECKPOINT_FREQ_SECONDS=30
KAFKA_TOPIC=kinesis_kafka_replication_test
KAFKA_BOOTSTRAP_SERVERS=bootstrap-server:9094
KAFKA_SASL_USERNAME=kafka-user
KAFKA_SASL_PASSWORD=kafka-password
KAFKA_SASL_MECHANISM=SCRAM-SHA-512
KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
KAFKA_CLIENT_ID=replicator-client
NUMBER_OF_SHARDS=4
AWS_ACCESS_KEY_ID=example-key-id
AWS_SECRET_ACCESS_KEY=example-secret-key
AWS_SESSION_TOKEN=example-token
Enter fullscreen mode Exit fullscreen mode

Note: The above environment variables are saved into a kinesis_kafka_replicator.env file and will be used in the following Docker run command to be passed to the container.

3) Run the container

docker run --env-file kinesis_kafka_replicator.env -it kinesis-to-kafka-replicator
Enter fullscreen mode Exit fullscreen mode

Verify the container is in a running state.

Verification and Observability

Once the replicator container is up and running, we can observe some logs to make sure that the app inside the container is working properly.

1) The application started successfully: The highlighted log indicates that the replicator started successfully, and it is processing the correct stream.

replicator started successfully

2) Monitor the DDB tables creation: The following images confirm that the replicator can recognize the pre-created tables and will not create them again.

ddb table1

ddb table2

3) KCL leader election: The replicator elected a worker leader and assigned it to lead the consumption from a shard.

leader election

4) KCL Kinesis consumer: The replicator started to consume the records from the Kinesis stream and push them to Kafka.

kinesis consumer

5) Observing the Kafka topic: By observing the Kafka topic on Kafka Ui, it can be easily noticed that the messages started to show up there.

kafka topic ui

All of this confirms that our replicator is working properly and can mirror the data between the two platforms successfully.

Conclusion

This approach helped in real-time streaming between Kinesis and Kafka without over-engineering. In this guide, we built a lightweight replicator using Python, KCL, and Kafka libraries to bridge data between the two systems in real-time, ensuring reliability and configurability.

Top comments (0)