DEV Community

Manasseh
Manasseh

Posted on

Event-Driven Architecture with DynamoDB, Kinesis Data Streams, Amazon Data Firehose, Lambda, and S3

Event-driven architecture have become increasingly popular in modern application design making it's possible to create scalable and efficient event-driven systems that react to data changes in real time.
In this article, I will explain how to build an event-driven pipeline using Amazon DynamoDB, Kinesis Data Streams, Kinesis Data Firehose, AWS Lambda, and Amazon S3.

Architecture

Overview of the Pipeline

The goal of this pipeline is to react to changes in a DynamoDB table, process the data, and store it in either S3 or trigger a Lambda function for further processing. Here's a high-level look at the flow:

DynamoDB – Your source of data.

  1. Kinesis Data Streams – Streams the change events from DynamoDB.
  2. Kinesis Data Firehose – Transforms and routes the streaming data.
  3. AWS Lambda (optional) – Processes data for real-time events or complex transformations.
  4. S3 – Stores processed data in a scalable, durable storage.

Part 1: Setting up the environment

Create a dynamo table
dynamo table

Amazon DynamoDB is a serverless, NoSQL, fully managed database with single-digit millisecond performance at any scale. It is well-suited for high-performance applications that require consistent and fast data access. e.g in Financial service applications, Gaming applications and Streaming applications

Setup Amazon Kinesis Data Streams
Data Streams

Amazon Kinesis Data Streams is a fully managed, serverless streaming data service that makes it easy to elastically ingest and store logs, events, clickstreams, and other forms of streaming data in real time.
Kinesis Data Streams is particularly helpful when you need to handle massive volumes of streaming data. It allows multiple consumers to read from the stream simultaneously, making it possible to apply multiple transformations or route the data to different destinations.

Setup Amazon Data Firehose
Source - Amazon Kinesis Data Streams
Destination - Amazon S3

Data Firehose

Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service, Amazon OpenSearch Serverless, Splunk, Apache Iceberg Tables, and any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers, including Datadog, Dynatrace, LogicMonitor, MongoDB, New Relic, Coralogix, and Elastic.

In the Data Firehose choose Amazon Kinesis Data Streams as the source and
Amazon S3 as the Destination.
data firehose1

data firehose2

Create an s3 Bucket

s3 bucket

Amazon S3 is a highly scalable and durable object storage service. S3 allows you to store structured, semi-structured, and unstructured data at virtually unlimited scale.
S3 is ideal for long-term data storage or batch processing use cases. Data in S3 can then be used for analytics, reporting, or further batch processing via services like AWS Glue or Amazon Athena.

Part 2: Insert and Manage the data

Insert data into the Dynamo Db table.

dynamo db1

dynamo db2

Four Items have been inserted into the table
dynamo db3

Create a lambda function and use the blueprint option.
Use the Process records sent to an Amazon Data Firehose Stream. Runtime python3.10

Lambda function

Lambda is a serverless compute service that triggers based on events. In this case, it processes each data batch from Firehose and can perform more complex transformations or trigger alerts/notifications.

After you have created the function, go to the configuration settings and edit the Timeout to 3 mins instead of seconds.
This is useful if since the function involves longer processing tasks, like dealing with large data sets.

lambda2

The function handles the incoming event, which contains records from Firehose. Each record has a recordId and data. The function processes each record and returns the result back to Firehose for further downstream operations.

Lambda code

import base64

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')

        # Do custom processing on the payload here
        print(f"the actual data is : {payload}")

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

Enter fullscreen mode Exit fullscreen mode

In the Amazon Data Firehose, configuration tab there is Transform and convert records option.
Select the lambda function we had created. This function is part of a real-time processing pipeline where you can make on-the-fly changes to the data before it is permanently stored or consumed by other services.

Intergrate firehose with lambda

Intergrate firehose with lambda2

Insert more records in the dynamodb table

s3

After Lambda processes the data, it is delivered to the S3 bucket via Firehose. S3 acts as the final storage layer in this pipeline.

s3 stored objects

All items initially created in the DynamoDB table are now stored as objects (files) in the S3 bucket.
These stored files can be used for data analysis, archiving, or further processing, such as feeding into machine learning models, querying with Amazon Athena, or visualizing with Amazon QuickSight.

Summary:

This architecture demonstrates how AWS services can work together in an event-driven, serverless pipeline. The flow begins with changes in DynamoDB, which are streamed in real-time through Kinesis Data Streams and Firehose, processed by Lambda if needed, and finally stored in S3.

Top comments (0)