Original Japanese article: AWSでの効率的なデータ処理を考える~データコンパクション~
Introduction
I'm Aki, an AWS Community Builder (@jitepengin).
In my previous article,
Designing a Cost-Efficient Parallel Data Pipeline on AWS Using Lambda and SQS,
I introduced a pattern where a large file is split into chunks using AWS Lambda and processed in parallel through SQS.
In this article, we look at the opposite scenario: when a large number of small event files—such as data from IoT devices or application logs—are continuously generated and uploaded.
For these use cases, we explore the Compactor Pattern.
What Is the Small-File Problem in Data Lakes?
When building a data lake, we often need to ingest huge numbers of small files, especially from IoT devices or application logs.
Over time, this can negatively impact performance in several ways:
- A massive number of tiny files accumulate in S3, increasing load on table storage and metadata management
- Query performance in Athena / AWS Glue degrades when scanning many small files
- Frequent snapshot updates in Iceberg/Delta increase costs and contention
By the way, in my previous article I implemented a backoff mechanism, but the number of conflicts was painful to tune… (that implementation needs a revisit!)
What Is the Compactor Pattern?
The Compactor Pattern is an approach that periodically merges many small files in a data lake into fewer large files.
By consolidating files, we can reduce query overhead, metadata pressure, and performance bottlenecks.
Typical Flow
Scheduled or Trigger-Based Execution
Run compaction periodically (e.g., every hour/day) or when a threshold number of files is reached.Small File Detection
Scan S3 or Iceberg/Delta manifests to detect small files.Merge (Compaction)
Use AWS Glue (or similar) to merge files and rewrite them as larger Parquet files.Cleanup
Remove old small files or unused snapshots (garbage collection).
Pre-Compaction: Compact Before Writing to the Data Lake
In this pattern, incoming small files are buffered (temporary storage, queue, etc.), compacted, and only then written into the data lake.
Think of it as cleaning up at the entrance.
Pros
- Optimized file structure from the start
- Reduces load and snapshot contention in the data lake
- Simpler Iceberg/Delta table management
Cons
- Higher latency (buffering required)
- Reduced real-time characteristics
- If compaction fails before writing, data-loss risk exists → retry design is important
Post-Compaction: Compact After Writing to the Data Lake
In this pattern, small files are written directly into Iceberg/Delta, and compaction is performed later by a separate job.
Think of it as cleaning up at the exit.
Pros
- Lowest write latency
- Friendly for real-time ingestion
- Lower write-failure risk (files written in small chunks)
Cons
- Small files temporarily accumulate, degrading performance
- Snapshot/transaction conflicts may increase in Iceberg/Delta
Implementing Pre-Compaction
Architecture
This pattern consists of two major components:
- Ingest (File Registration)
- Small files uploaded from IoT devices or services → stored in S3
- S3 Event triggers Lambda
- Lambda registers metadata (URI, size, status=PENDING) into DynamoDB
- Compaction
- Triggered when file count or total size exceeds threshold
- Lambda merges files (DuckDB in this sample)
- Writes merged Parquet to S3 / Iceberg
Sample Code: Ingest
# ingest.py
import os
import json
import uuid
import boto3
from urllib.parse import unquote_plus
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['CHUNK_TABLE'])
def lambda_handler(event, context):
for rec in event['Records']:
bucket = rec['s3']['bucket']['name']
key = unquote_plus(rec['s3']['object']['key'])
size = rec['s3']['object']['size']
uri = f's3://{bucket}/{key}'
table.put_item(
Item={
'ChunkId': str(uuid.uuid4()),
'Uri': uri,
'SizeBytes': size,
'Status': 'PENDING',
'Timestamp': int(context.aws_request_id[:8], 16)
}
)
return {'statusCode': 200, 'body': json.dumps({'message': 'Registered'})}
Sample Code: Compaction
# compaction.py
import os
import boto3
import duckdb
import time
from datetime import datetime
from pyiceberg.catalog.glue import GlueCatalog
# Environment variables
TABLE_NAME = os.environ['CHUNK_TABLE']
TARGET_TOTAL_SIZE = int(os.environ.get('TARGET_TOTAL_SIZE', 100 * 1024 * 1024)) # Default 100MB
ICEBERG_CATALOG_NAME = os.environ.get('ICEBERG_CATALOG_NAME', 'my_catalog')
ICEBERG_NAMESPACE = os.environ.get('ICEBERG_NAMESPACE', 'icebergdb')
ICEBERG_TABLE_NAME = os.environ.get('ICEBERG_TABLE_NAME', 'yellow_tripdata')
# AWS clients
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
def lambda_handler(event, context):
items = get_pending_items()
selected_items = []
accumulated_size = 0
for item in items:
item_size = item['SizeBytes']
if accumulated_size + item_size > TARGET_TOTAL_SIZE:
break
selected_items.append(item)
accumulated_size += item_size
if not selected_items:
return {'message': 'Below threshold, skipping processing', 'count': 0, 'size': 0}
uris = [item['Uri'] for item in selected_items]
print(f"Executing merge process {uris}")
arrow_table = merge_parquet_in_memory(uris)
print(f"arrow_table {arrow_table}")
append_to_iceberg(arrow_table)
mark_done([item['ChunkId'] for item in selected_items])
return {'message': 'Compaction completed', 'merged_rows': arrow_table.num_rows}
def get_pending_items():
table = dynamodb.Table(TABLE_NAME)
resp = table.scan(
FilterExpression="#st = :pending",
ExpressionAttributeNames={'#st': 'Status'},
ExpressionAttributeValues={':pending': 'PENDING'}
)
return resp.get('Items', [])
def merge_parquet_in_memory(uris):
con = duckdb.connect(database=':memory:')
con.execute("SET home_directory='/tmp'")
con.execute("INSTALL httpfs;")
con.execute("LOAD httpfs;")
# Read and merge Parquet files
df = con.read_parquet(uris, union_by_name=True).arrow()
return df
def append_to_iceberg(arrow_table, retries=5):
catalog = GlueCatalog(region_name="ap-northeast-1", name=ICEBERG_CATALOG_NAME)
delay = 10
for attempt in range(retries):
try:
table = catalog.load_table(f"{ICEBERG_NAMESPACE}.{ICEBERG_TABLE_NAME}")
table.refresh()
current_snapshot = table.current_snapshot()
snapshot_id = current_snapshot.snapshot_id if current_snapshot else "None"
print(f"Attempt {attempt + 1}: Using snapshot ID {snapshot_id}")
table.append(arrow_table)
print("Data has been appended to the Iceberg table.")
return
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if "Cannot commit" in str(e) or "branch main has changed" in str(e):
if attempt < retries - 1:
delay *= 2
print(f"Retrying after {delay} seconds.")
time.sleep(delay)
else:
print("Maximum retry attempts reached. Aborting process.")
raise
else:
raise
def mark_done(ids):
table = dynamodb.Table(TABLE_NAME)
for cid in ids:
table.update_item(
Key={'ChunkId': cid},
UpdateExpression="SET #st = :c",
ExpressionAttributeNames={'#st': 'Status'},
ExpressionAttributeValues={':c': 'COMPACTED'}
)
Results
Uploaded Files
Registered Data in DynamoDB
Files on Iceberg
Points to Consider
Backoff tuning
Iceberg snapshot conflicts happen frequently, so retry/backoff strategy must be tuned based on your environment.File size control
Optimal Iceberg file size is typically 128 MB–1 GB.EventBridge trigger frequency
Too slow → loss of freshness
Too fast → wasted invocations, duplicate compaction risks
Implementing Post-Compaction
This is a much simpler setup.
Architecture
AWS recommends using Athena to run OPTIMIZE and VACUUM operations:
https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html
Sample Code: Post-Compaction Lambda (Optimize and Vacuum)
This Lambda executes OPTIMIZE and VACUUM commands on the Iceberg table via Athena.
import boto3
athena = boto3.client('athena', region_name='ap-northeast-1')
TEMP_OUTPUT = 's3://20250421testresult/'
def lambda_handler(event, context):
queries = [
"OPTIMIZE icebergdb.yellow_tripdata REWRITE DATA USING BIN_PACK",
"VACUUM icebergdb.yellow_tripdata"
]
for query in queries:
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': 'icebergdb'},
ResultConfiguration={'OutputLocation': TEMP_OUTPUT}
)
print(f"Started Athena query: {response['QueryExecutionId']}")
Results
Before Execution
After Execution
As you can see, the table has been successfully optimized.
Automatic Compaction with AWS Glue (Post-Compaction)
Iceberg tables registered in Glue Data Catalog can use Table Optimizer, which supports:
- Automatic compaction
- Snapshot retention
- Orphan file cleanup
Docs:
https://docs.aws.amazon.com/glue/latest/dg/table-optimizers.html
https://docs.aws.amazon.com/glue/latest/dg/compaction-management.html
Configuration
Make sure to check all options for compaction, snapshot retention, and orphan file cleanup:

Notes
- Charged per DPU → cost increases with fragmentation
- Only available in supported regions
Use Cases for Each Compaction Approach
Lambda-Based Compaction (Pre-Compaction)
Use Cases
- When Glue auto-compaction cannot be used, such as with Delta Lake.
- When you want to implement compaction logic tailored to complex business requirements.
- When you want to leverage existing serverless infrastructure like Lambda/Step Functions for flexible configurations.
Pros
- Flexible logic implementation: Can freely customize file selection criteria and merge procedures.
- Multi-format support: Works with Iceberg, Delta Lake, and other formats.
- Cost control: Lambda runs only when needed, avoiding DPU billing.
Cons
- High implementation and operational cost: Requires building and managing Lambda, DynamoDB, EventBridge, etc.
- Increased monitoring effort: Custom metrics and failure detection logic must be implemented and maintained.
- Scalability considerations: Be mindful of performance bottlenecks with large datasets.
Lambda-Based Compaction (Post-Compaction)
Use Cases
- When Glue auto-compaction cannot be used, such as with Delta Lake.
- When you want to automate periodic file consolidation while keeping operational overhead low.
Pros
- Low implementation effort: Only need to run queries via Athena from Lambda.
Cons
- Scalability considerations: Performance bottlenecks may appear with large datasets.
AWS Glue Auto-Compaction (Post-Compaction)
Use Cases
- When managing Apache Iceberg tables centrally via the Glue Catalog.
- When you want to automate periodic file consolidation while minimizing operational overhead.
- When you prefer to rely on standard features without custom compaction logic, suitable for medium to large-scale data lakes.
Pros
- Minimal implementation effort: Enable via Glue console or CLI.
- Simplified management: Monitor via CloudWatch metrics and the Glue console.
- Native support: Supports compaction, snapshot retention, and orphan file deletion.
Cons
- Glue DPU billing: Charged per minute, costs may increase depending on frequency.
- Limitations: Automatic processing is limited to Iceberg.
- Trigger flexibility: For fine-grained or dynamic triggers, additional design is required.
Conclusion
In this article, we discussed data compaction—an important consideration when operating a data lake.
In a data lake, files of various sizes are ingested at different times. When a large number of small files accumulate, processing efficiency can degrade.
By performing compaction as introduced in this article, you can maintain an environment that allows for efficient data processing. There are several approaches available, so you should choose the configuration that best fits your requirements and the current state of your system.
I hope this article serves as a useful reference for designing an optimal architecture for your data lake.












Top comments (0)