DEV Community

Cover image for AWS Firehose Delivery Stream with Dynamic Partitioning
Bhaskar Singh
Bhaskar Singh

Posted on

AWS Firehose Delivery Stream with Dynamic Partitioning

What is Kinesis Firehose Delivery Stream?

Kinesis firehose delivery stream is an extract, transform, and load (ETL) service that reliably captures, transforms, and delivers streaming data to data lakes, data stores, and analytics services.

One of the most common use cases is to stream data to Amazon S3 and convert the data to a suitable format for analysis without building their own ETL pipeline.

Before we begin, let's go over the prerequisites:

  • CDK - The code examples are written for CDK. To read more here.
  • TypeScript - The example code is written in typescript, but the same code can be written in other languages using the CDK.

Design

Design Diagram

We will have a set of producers that will produce events that can come from users, processes, or other systems. The produced events would stream through the firehose delivery stream. Eventually, the data would be stored in an S3 bucket. From there, we can perform data lake services and use tools to do analytics.

Implementation

To implement the design we have to follow the following steps:

1. Create Log Group

We would first create the log group to store logs if the firehose delivery stream is not able to deliver the data to the destination, in this case the S3 Bucket. It is not mandatory to create the log group, but having this would help us triage any issues later in case of failure.

import { LogGroup, LogStream, RetentionDays } from 'aws-cdk-lib/aws-logs';

const firehoseLogGroupName = 'FirehoseLogGroup';
const firehoseLogStreamName = 'FirehoseLogStream';

const firehoseLogGroup = new LogGroup(this, firehoseLogGroupName, {
    logGroupName: firehoseLogGroupName,
    retention: RetentionDays.ONE_YEAR,
});

const firehoseLogStream = new LogStream(this, logStreamName, {
    logGroup: firehoseLogGroup,
    logStreamName: firehoseLogStreamName
});
Enter fullscreen mode Exit fullscreen mode

2. Create S3 Bucket to store the data from Firehose

As per the design, we are using the S3 bucket as the destination for the firehose delivery stream. Currently, dynamic partitioning of data is only supported for delivery to S3.

import { Bucket } from 'aws-cdk-lib/aws-s3';

const firehoseS3BucketName = 'Firehose-destination-bucket';
const firehoseBucket = new Bucket(this, firehoseS3BucketName, {
    bucketName: firehoseS3BucketName
});
Enter fullscreen mode Exit fullscreen mode

3. Create Firehose IAM Role

We would create IAM role which will be used by firehose to write error logs to the log group create earlier.

import { Role, PolicyStatement, Effect } from 'aws-cdk-lib/aws-iam';

const firehoseRoleName = 'FirehoseRole';
const firehoseRole = new Role(this, firehoseRoleName, {
    roleName: firehoseRoleName,
});

firehoseRole.addToPolicy({
    new PolicyStatement({
        effect: Effect.ALLOW,
        actions: [logs:PutLogEvents],
        resources: ['*']
    })
});

Enter fullscreen mode Exit fullscreen mode

4. Create Firehose Delivery Stream with Dynamic Partitioning

We can now create the firehose delivery stream. I am trying to use CDK version 2 because CDK version 1 is going to hit the end of life soon. For the latest update, read here. However, the CDK construct for the firehose for CDK 2 is still in preview and is not stable, So we will use the generic construct, which would be CfnDeliveryStream.

We have to note one important point here: We can only enable the dynamic partition once while creating the firehose delivery stream. Read here for more.

Let's assume the input to the firehose be JSON with the following fields:

{
  "name": string,
  "department": string,
  "status": string,
  "access": string,
  "lastAccessed" long,
}
Enter fullscreen mode Exit fullscreen mode

According to the json we will define the partition in S3 bucket.

import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose'

const firehoseDataStreamName = 'ExampleFirehoseStream';
const firehoseDataStream = new CfnDeliveryStream(this, firehoseDataStreamName, {
    deliveryStreamName: firehoseDataStreamName,
    deliveryStreamType: 'DirectPut',
    extendedS3DestinationConfiguration: {
        bucketArn: firehoseBucket.bucketArn,
        bufferingHints: {sizeInMBs: 64: intervalInSeconds: 60}
        compressionFormat: 'GZIP',
        roleArn: firehoseRole.roleArn,
        prefix: 'FirehoseEvent/name=!{partitionKeyFromQuery:name}/department=!{partitionKeyFromQuery:department}/',
        cloudWatchLogginOptions: {
            enabled: true,
            logGroupName: firehoseLogGroup.logGroupName,
            logStreamName: fireshoseLogStream.logStreamName,
        },
        errorOutputPrefix: 'Error/',
        dynamicPartitioningConfiguration: {
            enabled: true,
            retryOption: {
                durationInSeconds: 300,
            }
        },
    },
    processingConfiguration: {
        enabled: true,
        processors: [
            {
                type: 'MetadataExtraction',
                parameters: [
                    {
                        parameterName: 'MetadataExtractionQuery',
                        parameterValue: '{ name: .name, department: .department }',
                    },
                    {
                        parameterName: 'JsonParsingEngine',
                        parameterValue: 'JQ-1-6',
                    },
                ],
            },
            {
                type: 'AppendDelimiterToRecord',
                parameter: [
                    {
                        parameterName: 'Delimiter,
                        parameterValue: '\\n',
                    },
                ],
            },
        ],
    },
});
Enter fullscreen mode Exit fullscreen mode

Now that we have seen how the firehose delivery stream is created using the CDK, We will understand the key fields from the above code snippet:

  • deliveryStreamType - We can select the type of stream. This can be one of the following values:
    • DirectPut: Other systems or processes will act as a source that can directly put the event.
    • KinesisStreamAsSource: The delivery stream uses a Kinesis data stream as a source.
  • extendedS3DestinationConfiguration - When using S3 as a destination, we can use S3DestinationConfiguration, which is limited in configurations. As we are using dynamic partitioning, we have to use extendedS3DestinationConfiguration, which has many additional configurations needed to support advanced firehose delivery stream configuration.
  • bufferingHints - To configure when to publish data to the destination, we use this field. We can set sizeInMBs to select the number of MBs after which the firehose will deliver the data and intervalInSeconds to select the time period after which the firehose will deliver the data. If both fields are mentioned, then whichever field hits the assigned limit will trigger the firehose to deliver the data to the destination.
  • compressionFormat - To select the compression format, we use this field. If it's not set, then the default is UNCOMPRESSED. The other acceptable values are GZIP, HADOOP_SNAPPY, Snappy, UNCOMPRESSED, ZIP, which can be chosen according to your use case.
  • dynamicPartitioningConfiguration - Used to enable dynamic partitioning. We can specify the retry behaviour in case Firehose is not able to deliver data to S3, in which case we can specify the time period after which Firehose will perform a retry.
  • errorOutputPrefix - In case of an unexpected error, the firehose delivery stream can deliver the data to S3 with a separate prefix. We can specify the prefix to which the firehose will put the failed events to S3. In this example, if there is a failure, then the events would have the prefix Error/.
  • processingConfiguration - Configures data processing for a Kinesis Data Firehose delivery stream.
    • processors - List of processors. There are 4 types of processors: AppendDelimiterToRecord, Lambda, MetadataExtraction, RecordDeAggregation.
      • MetadataExtraction: To extract the metadata for the received event, we use this processor. If we want to partition the events according to event data, then this is very useful. It has in-built support for JsonParsingEngine which can parse incoming JSON events. I have mentioned an example of this in the above code snippet. In this example, I have created the partition key as name and department from the incoming events.
      • AppendDelimiterToRecord: If we want to add a delimiter to the events when delivering the data to the destination, then we use a processor. Here I am adding \n as a delimiter, but we can use other characters as well, such as commas, etc.
  • prefix - To set the prefix of the delivered data to the S3 bucket, we use this configuration. I have used:
    'FirehoseEvent/name=!{partitionKeyFromQuery:name}/department=!{partitionKeyFromQuery:department}/'

If the incoming events are as follows:

{
  "name": "tom",
  "department": "IT",
  "status": "working",
  "access": "present",
  "lastAccessed": 1687801710,
}
Enter fullscreen mode Exit fullscreen mode

Then the event would be put to S3 with the prefix: FirehoseEvent/name=tom/department=IT/

In some cases, we might require data or time in the prefix as well, and then we can use the in-built partition for time. Example: FirehoseEvent/name=!{partitionKeyFromQuery:name}/department=!{partitionKeyFromQuery:department}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/date=!{timestamp:dd}/

If the event is received on 29-06-2023. Then the prefix would be: FirehoseEvent/name=tom/department=IT/year=2023/month=06/date=29.

Conclusion

In this post, I have shared how to create a firehose delivery stream with dynamic partitioning to deliver data to a S3 bucket.

If you like the post, then please share it, and for any questions, leave them in the comment section.

References:

Top comments (0)