DEV Community

Cover image for Firehose and Iceberg Tables
Evan
Evan

Posted on

Firehose and Iceberg Tables

One of the most underrated combinations in AWS is Kinesis Firehose, Iceberg tables, and Athena. This trio makes querying and reporting on extremely large datasets both easy and cost-effective.

Implementing this solution is super simple. Below is the CloudFormation template to create the S3 bucket, Iceberg table, and Kinesis Firehose delivery.

AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for S3 Iceberg table with Kinesis Firehose direct put - v3'

Parameters:
  S3BucketName:
    Type: String
    Description: Name of the S3 bucket for Iceberg data and metadata (must be unique globally)
    Default: 'bakers-orders-tc-dl-dev'
  GlueDatabaseName: 
    Type: String
    Description: Name of the database inside of glue
    Default: 'orders'
  GlueTableName: 
    Type: String
    Description: Name of the table inside of glue
    Default: 'bakers_orders'
  FirehoseStreamName:
    Type: String
    Description: Name of the Kinesis Firehose for direct Iceberg table ingestion
    Default: 'kinises-orders-firehose' 

Resources:
  IcebergDataBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref S3BucketName
      # Additional bucket properties like CORS, Lifecycle rules, etc. can be added here.


  IcebergDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref 'AWS::AccountId'
      DatabaseInput:
        Name: !Ref GlueDatabaseName
        # Optional: Specify a default S3 location for the database
        LocationUri: !Sub 's3://${IcebergDataBucket}/databases/${GlueDatabaseName}/'

  IcebergFirehoseDeliveryRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: 'sts:AssumeRole'
      Policies:
        - PolicyName: FirehoseIcebergPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - 's3:*'
                Resource:
                  - !GetAtt IcebergDataBucket.Arn
                  - !Sub '${IcebergDataBucket.Arn}/*'
              - Effect: Allow
                Action:
                  - 'glue:GetTable'
                  - 'glue:GetTableVersion'
                  - 'glue:GetTableVersions'
                  - 'glue:UpdateTable'
                  - 'glue:GetDatabase'
                Resource:
                  - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog'
                  - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabaseName}'
                  - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDatabaseName}/*'

  KinesisFirehoseIceberg:
    Type: AWS::KinesisFirehose::DeliveryStream
    DependsOn: OrdersIcebergTable
    Properties:
      DeliveryStreamName: !Ref FirehoseStreamName
      DeliveryStreamType: DirectPut
      IcebergDestinationConfiguration:
        RoleARN: !GetAtt IcebergFirehoseDeliveryRole.Arn
        CatalogConfiguration:
          CatalogArn: !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog'
        S3Configuration:
          RoleARN: !GetAtt IcebergFirehoseDeliveryRole.Arn
          BucketARN: !GetAtt IcebergDataBucket.Arn
          Prefix: !Sub '${GlueDatabaseName}.db/${GlueTableName}/'
          ErrorOutputPrefix: 'errors/'
          BufferingHints:
            SizeInMBs: 128
            IntervalInSeconds: 60
          CompressionFormat: GZIP
        DestinationTableConfigurationList:
          - DestinationDatabaseName: !Ref GlueDatabaseName
            DestinationTableName: !Ref GlueTableName
            S3ErrorOutputPrefix: 'errors/'
            UniqueKeys:
              - 'order_id'
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub '/aws/kinesisfirehose/${FirehoseStreamName}'
          LogStreamName: 'IcebergDelivery'

  OrdersIcebergTable:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref 'AWS::AccountId'
      DatabaseName: !Ref IcebergDatabase
      OpenTableFormatInput:
        IcebergInput:
          MetadataOperation: CREATE
          Version: '2'
      TableInput:
        Name: !Ref GlueTableName
        TableType: 'EXTERNAL_TABLE'
        StorageDescriptor:
          Columns:
            - Name: 'order_id'
              Type: 'bigint'
            - Name: 'order_price'
              Type: 'double'
            - Name: 'item_name'
              Type: 'string'
            - Name: 'quantity'
              Type: 'int'
            - Name: 'vendor_name'
              Type: 'string'
            - Name: 'vendor_id'
              Type: 'int'
          Location: !Sub 's3://${IcebergDataBucket}/${GlueDatabaseName}.db/${GlueTableName}'
Outputs:
  IcebergS3Bucket:
    Description: "S3 Bucket for Iceberg Table Data"
    Value: !Ref IcebergDataBucket
  GlueDatabaseName:
    Description: "AWS Glue Database Name"
    Value: !Ref IcebergDatabase
  GlueTableName:
    Description: "AWS Glue Table Name"
    Value: !Ref OrdersIcebergTable
  FirehoseStreamName:
    Description: "Kinesis Firehose Stream Name for direct Iceberg ingestion"
    Value: !Ref KinesisFirehoseIceberg
  FirehoseStreamArn:
    Description: "Kinesis Firehose Stream ARN"
    Value: !GetAtt KinesisFirehoseIceberg.Arn


Enter fullscreen mode Exit fullscreen mode

Here's the code to generate 1,000 orders. This will help us simulate real data flowing from an application to a data lake. In a real-world scenario, this could be code in your application that sends actual orders to Firehose for delivery to the Iceberg table for reporting and analytics.

import boto3
import json
import random
import uuid
from datetime import datetime

# Initialize Firehose client
firehose = boto3.client('firehose', region_name='us-east-1')

DELIVERY_STREAM_NAME = 'baking_orders'

# Sample data for generating realistic records
ITEM_NAMES = [
    'Chocolate Cake', 'Vanilla Cupcake', 'Croissant', 'Baguette',
    'Sourdough Bread', 'Cinnamon Roll', 'Blueberry Muffin', 'Apple Pie',
    'Cheesecake', 'Brownie', 'Danish Pastry', 'Eclair', 'Macaron',
    'Pretzel', 'Donut', 'Scone', 'Banana Bread', 'Pumpkin Pie'
]

VENDOR_NAMES = [
    'Sweet Delights Bakery', 'Golden Crust', 'The Flour Shop',
    'Artisan Breads Co', 'Sugar & Spice', 'Daily Fresh Bakery',
    'The Rolling Pin', 'Heavenly Bakes', 'Sunrise Pastries',
    'The Bread Basket'
]

def generate_record(order_id):
    """Generate a single order record."""
    vendor_id = random.randint(1000, 9999)
    return {
        'order_id': order_id,
        'order_price': round(random.uniform(5.99, 150.99), 2),
        'item_name': random.choice(ITEM_NAMES),
        'quantity': random.randint(1, 20),
        'vendor_name': random.choice(VENDOR_NAMES),
        'vendor_id': vendor_id
    }

def put_records_to_firehose(records):
    """Send records to Firehose in batches of 500 (Firehose limit)."""
    batch_size = 500
    total_sent = 0

    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]

        # Format records for Firehose (each record needs newline delimiter)
        firehose_records = [
            {'Data': json.dumps(record) + '\n'}
            for record in batch
        ]

        response = firehose.put_record_batch(
            DeliveryStreamName=DELIVERY_STREAM_NAME,
            Records=firehose_records
        )

        failed_count = response.get('FailedPutCount', 0)
        total_sent += len(batch) - failed_count

        print(f'Batch {i // batch_size + 1}: Sent {len(batch)} records, Failed: {failed_count}')

    return total_sent

def main():
    print(f'Generating 1,000 order records...')

    # Generate unique order_ids using a starting point + sequential
    base_order_id = int(datetime.now().timestamp() * 1000)
    records = [generate_record(base_order_id + i) for i in range(1000)]

    print(f'Sending records to Firehose stream: {DELIVERY_STREAM_NAME}')
    total_sent = put_records_to_firehose(records)

    print(f'\nComplete! Successfully sent {total_sent} records to Firehose.')
    print(f'Sample record: {json.dumps(records[0], indent=2)}')

if __name__ == '__main__':
    main()

Enter fullscreen mode Exit fullscreen mode

Once the orders have been processed and sent to the Iceberg table we can use the power of Amazon Athena to query the data and Amazon Quicksight to create reports and dashboards for business intelligence capabilities.

Here you can see us query the Iceberg table using Amazon Athena

We can query pretty much like we would with normal SQL syntax. Here you can see we query by orders over $100.00

Apache Iceberg scales to very large datasets even millions of rows. Keep in mind though, that Athena charges you for data scanned, so avoid running SELECT * queries on tables with millions of rows or your Athena bill could become quite expensive.

If your goal is to set up a robust BI or ML platform I highly recommend this approach. It's pretty cheap compared to most other solutions and fairly easy to setup.

Top comments (0)