One of the most underrated combinations in AWS is Kinesis Firehose, Iceberg tables, and Athena. This trio makes querying and reporting on extremely large datasets both easy and cost-effective.
Implementing this solution is super simple. Below is the CloudFormation template to create the S3 bucket, Iceberg table, and Kinesis Firehose delivery.
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for S3 Iceberg table with Kinesis Firehose direct put - v3'
Parameters:
S3BucketName:
Type: String
Description: Name of the S3 bucket for Iceberg data and metadata (must be unique globally)
Default: 'bakers-orders-tc-dl-dev'
GlueDatabaseName:
Type: String
Description: Name of the database inside of glue
Default: 'orders'
GlueTableName:
Type: String
Description: Name of the table inside of glue
Default: 'bakers_orders'
FirehoseStreamName:
Type: String
Description: Name of the Kinesis Firehose for direct Iceberg table ingestion
Default: 'kinises-orders-firehose'
Resources:
IcebergDataBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref S3BucketName
# Additional bucket properties like CORS, Lifecycle rules, etc. can be added here.
IcebergDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref 'AWS::AccountId'
DatabaseInput:
Name: !Ref GlueDatabaseName
# Optional: Specify a default S3 location for the database
LocationUri: !Sub 's3://${IcebergDataBucket}/databases/${GlueDatabaseName}/'
IcebergFirehoseDeliveryRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: 'sts:AssumeRole'
Policies:
- PolicyName: FirehoseIcebergPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:*'
Resource:
- !GetAtt IcebergDataBucket.Arn
- !Sub '${IcebergDataBucket.Arn}/*'
- Effect: Allow
Action:
- 'glue:GetTable'
- 'glue:GetTableVersion'
- 'glue:GetTableVersions'
- 'glue:UpdateTable'
- 'glue:GetDatabase'
Resource:
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog'
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabaseName}'
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDatabaseName}/*'
KinesisFirehoseIceberg:
Type: AWS::KinesisFirehose::DeliveryStream
DependsOn: OrdersIcebergTable
Properties:
DeliveryStreamName: !Ref FirehoseStreamName
DeliveryStreamType: DirectPut
IcebergDestinationConfiguration:
RoleARN: !GetAtt IcebergFirehoseDeliveryRole.Arn
CatalogConfiguration:
CatalogArn: !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog'
S3Configuration:
RoleARN: !GetAtt IcebergFirehoseDeliveryRole.Arn
BucketARN: !GetAtt IcebergDataBucket.Arn
Prefix: !Sub '${GlueDatabaseName}.db/${GlueTableName}/'
ErrorOutputPrefix: 'errors/'
BufferingHints:
SizeInMBs: 128
IntervalInSeconds: 60
CompressionFormat: GZIP
DestinationTableConfigurationList:
- DestinationDatabaseName: !Ref GlueDatabaseName
DestinationTableName: !Ref GlueTableName
S3ErrorOutputPrefix: 'errors/'
UniqueKeys:
- 'order_id'
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Sub '/aws/kinesisfirehose/${FirehoseStreamName}'
LogStreamName: 'IcebergDelivery'
OrdersIcebergTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref 'AWS::AccountId'
DatabaseName: !Ref IcebergDatabase
OpenTableFormatInput:
IcebergInput:
MetadataOperation: CREATE
Version: '2'
TableInput:
Name: !Ref GlueTableName
TableType: 'EXTERNAL_TABLE'
StorageDescriptor:
Columns:
- Name: 'order_id'
Type: 'bigint'
- Name: 'order_price'
Type: 'double'
- Name: 'item_name'
Type: 'string'
- Name: 'quantity'
Type: 'int'
- Name: 'vendor_name'
Type: 'string'
- Name: 'vendor_id'
Type: 'int'
Location: !Sub 's3://${IcebergDataBucket}/${GlueDatabaseName}.db/${GlueTableName}'
Outputs:
IcebergS3Bucket:
Description: "S3 Bucket for Iceberg Table Data"
Value: !Ref IcebergDataBucket
GlueDatabaseName:
Description: "AWS Glue Database Name"
Value: !Ref IcebergDatabase
GlueTableName:
Description: "AWS Glue Table Name"
Value: !Ref OrdersIcebergTable
FirehoseStreamName:
Description: "Kinesis Firehose Stream Name for direct Iceberg ingestion"
Value: !Ref KinesisFirehoseIceberg
FirehoseStreamArn:
Description: "Kinesis Firehose Stream ARN"
Value: !GetAtt KinesisFirehoseIceberg.Arn
Here's the code to generate 1,000 orders. This will help us simulate real data flowing from an application to a data lake. In a real-world scenario, this could be code in your application that sends actual orders to Firehose for delivery to the Iceberg table for reporting and analytics.
import boto3
import json
import random
import uuid
from datetime import datetime
# Initialize Firehose client
firehose = boto3.client('firehose', region_name='us-east-1')
DELIVERY_STREAM_NAME = 'baking_orders'
# Sample data for generating realistic records
ITEM_NAMES = [
'Chocolate Cake', 'Vanilla Cupcake', 'Croissant', 'Baguette',
'Sourdough Bread', 'Cinnamon Roll', 'Blueberry Muffin', 'Apple Pie',
'Cheesecake', 'Brownie', 'Danish Pastry', 'Eclair', 'Macaron',
'Pretzel', 'Donut', 'Scone', 'Banana Bread', 'Pumpkin Pie'
]
VENDOR_NAMES = [
'Sweet Delights Bakery', 'Golden Crust', 'The Flour Shop',
'Artisan Breads Co', 'Sugar & Spice', 'Daily Fresh Bakery',
'The Rolling Pin', 'Heavenly Bakes', 'Sunrise Pastries',
'The Bread Basket'
]
def generate_record(order_id):
"""Generate a single order record."""
vendor_id = random.randint(1000, 9999)
return {
'order_id': order_id,
'order_price': round(random.uniform(5.99, 150.99), 2),
'item_name': random.choice(ITEM_NAMES),
'quantity': random.randint(1, 20),
'vendor_name': random.choice(VENDOR_NAMES),
'vendor_id': vendor_id
}
def put_records_to_firehose(records):
"""Send records to Firehose in batches of 500 (Firehose limit)."""
batch_size = 500
total_sent = 0
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
# Format records for Firehose (each record needs newline delimiter)
firehose_records = [
{'Data': json.dumps(record) + '\n'}
for record in batch
]
response = firehose.put_record_batch(
DeliveryStreamName=DELIVERY_STREAM_NAME,
Records=firehose_records
)
failed_count = response.get('FailedPutCount', 0)
total_sent += len(batch) - failed_count
print(f'Batch {i // batch_size + 1}: Sent {len(batch)} records, Failed: {failed_count}')
return total_sent
def main():
print(f'Generating 1,000 order records...')
# Generate unique order_ids using a starting point + sequential
base_order_id = int(datetime.now().timestamp() * 1000)
records = [generate_record(base_order_id + i) for i in range(1000)]
print(f'Sending records to Firehose stream: {DELIVERY_STREAM_NAME}')
total_sent = put_records_to_firehose(records)
print(f'\nComplete! Successfully sent {total_sent} records to Firehose.')
print(f'Sample record: {json.dumps(records[0], indent=2)}')
if __name__ == '__main__':
main()
Once the orders have been processed and sent to the Iceberg table we can use the power of Amazon Athena to query the data and Amazon Quicksight to create reports and dashboards for business intelligence capabilities.
Here you can see us query the Iceberg table using Amazon Athena
We can query pretty much like we would with normal SQL syntax. Here you can see we query by orders over $100.00
Apache Iceberg scales to very large datasets even millions of rows. Keep in mind though, that Athena charges you for data scanned, so avoid running SELECT * queries on tables with millions of rows or your Athena bill could become quite expensive.
If your goal is to set up a robust BI or ML platform I highly recommend this approach. It's pretty cheap compared to most other solutions and fairly easy to setup.


Top comments (0)