Original Japanese article: AWSでの効率的なデータ処理を考える~AWS LambdaとSQSを活用した並列データパイプライン~
Introduction
I'm Aki, an AWS Community Builder (@jitepengin).
In my previous articles, I introduced how to build lightweight ETL workflows using AWS Lambda.
This time, I’ll continue with another Lambda-based data processing pattern—handling large datasets efficiently by splitting them into chunks and processing them in parallel.
- Lightweight ETL with AWS Glue Python Shell, DuckDB, and PyIceberg
- Lightweight ETL with AWS Lambda, DuckDB, and PyIceberg
- Lightweight ETL with AWS Lambda, DuckDB, and delta-rs
As you probably know, dealing with large datasets efficiently is one of the most important challenges in data engineering.
When working with AWS Lambda, handling large files often becomes difficult due to memory limits and maximum execution time.
Even loading a large dataset itself can be expensive—and performing transformations on top of that easily exceeds Lambda’s limits.
In this article, I introduce an architecture that combines AWS Lambda and SQS to split large files into chunks and process them in parallel, enabling efficient and scalable data pipelines.
Architecture Overview
The architecture looks like this:

The processing flow is:
- A file uploaded to S3 triggers an SQS notification.
- A Lambda function is invoked, which splits the file into appropriate chunk sizes and stores the chunks back into S3.
- Metadata for each chunk is sent to another SQS queue.
- A separate Lambda function consumes messages from that SQS queue and performs ETL (extract, transform, load) on each chunk.
- The processed data is finally loaded into a data lake stored in Iceberg format.
The Lambda functions use the following libraries:
- DuckDB – for fast, in-memory SQL processing
- PyArrow – for efficient columnar data handling and conversion
- PyIceberg – for accessing and updating Iceberg tables using AWS Glue Catalog
Benefits of This Architecture
This approach offers several advantages:
1. Improved scalability through parallel processing
By using SQS, Lambda can process a large file in parallel by splitting it into chunks.
The heavier the processing inside Lambda becomes, the more effective parallelism becomes.
2. Avoiding Lambda’s memory and timeout limits
Chunking enables you to process large datasets without exceeding Lambda’s resource limits.
With the help of DuckDB, the processing becomes even more efficient.
3. Built-in retry mechanisms using SQS
One major advantage of SQS is its native retry capability.
When processing chunked data, errors may cause partial failures or missing results.
Retries and DLQ (dead-letter queue) handling become critical for reliability.
Points to Consider
1. Ordering guarantees
While SQS FIFO queues exist, the combination of parallel chunk processing and asynchronous Lambda execution means you cannot guarantee data order when writing to the final table.
If strict sequencing is required, consider using Step Functions or another orchestrator.
2. Error handling and retry strategies
Although not detailed in this article, setting up a DLQ is highly recommended.
A proper visibility timeout helps prevent duplicate processing.
Test Dataset
For this demo, I used the NYC Yellow Taxi Trip dataset from Kaggle (CSV format).
Since it is CSV, I also added a step to convert it into Parquet for better compression and processing efficiency.
Dataset link:
https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data
Packaging for Lambda
I reused the previous Lambda setup and packaged dependencies using Lambda Layers:
mkdir python
pip install -t python --platform manylinux2014_x86_64 --only-binary=:all: pyiceberg[glue,duckdb]
Installing pyiceberg[glue,duckdb] automatically includes the necessary Glue extensions, DuckDB, and PyArrow.
However, the package size exceeds the 250MB Lambda Layer limit, so I removed boto3 manually.
Using a container image may be a cleaner solution overall.
Sample Code
File Chunk Producer (Lambda)
import duckdb
import pyarrow.parquet as pq
import boto3
import json
import os
import pyarrow as pa
s3_client = boto3.client("s3")
sqs_client = boto3.client("sqs")
S3_BUCKET_NAME = "your target S3 bucket"
SQS_QUEUE_URL = "your SQS queue for sending chunk metadata"
def lambda_handler(event, context):
try:
print(f"event: {event}")
# Retrieve S3 event information from the SQS message
event_body = json.loads(event['Records'][0]['body'])
s3_bucket = event_body['Records'][0]['s3']['bucket']['name']
s3_object_key = event_body['Records'][0]['s3']['object']['key']
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
print(f"Processing file: {s3_input_path}")
# Create a base filename without the extension
base_filename = s3_object_key.rsplit(".", 1)[0]
# Connect to DuckDB in-memory database
duckdb_connection = duckdb.connect(database=":memory:")
duckdb_connection.execute("SET home_directory='/tmp'") # Set Lambda's temporary directory
# Install and load the httpfs extension
duckdb_connection.execute("INSTALL httpfs;")
duckdb_connection.execute("LOAD httpfs;")
# Load the CSV file using read_csv_auto
query = f"SELECT * FROM read_csv_auto('{s3_input_path}')"
arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
# Convert all column names to lowercase
new_column_names = [name.lower() for name in arrow_table.column_names]
arrow_table = arrow_table.rename_columns(new_column_names)
print(f"Schema after conversion: {arrow_table.schema}")
# Get chunk size (default: 100MB)
chunk_size_mb = int(os.environ.get("CHUNK_SIZE_MB", 100))
chunk_size_bytes = chunk_size_mb * 1024 * 1024
# Retrieve total data size and row count
total_bytes = arrow_table.nbytes
total_rows = arrow_table.num_rows
# Calculate number of rows per chunk (minimum 1 row)
chunk_rows = max(1, total_rows * chunk_size_bytes // total_bytes)
# Create chunks based on row count, save to S3, and send SQS notifications
for i in range(0, total_rows, chunk_rows):
chunk = arrow_table.slice(i, chunk_rows)
output_file = f"/tmp/{base_filename}_part{i // chunk_rows}.parquet"
pq.write_table(chunk, output_file)
print(f"Saved chunk: {output_file}")
s3_chunk_key = f"chunks/{base_filename}_part{i // chunk_rows}.parquet"
s3_client.upload_file(output_file, S3_BUCKET_NAME, s3_chunk_key)
message_body = json.dumps({
"s3_bucket": S3_BUCKET_NAME,
"s3_object_key": s3_chunk_key
})
sqs_client.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message_body)
print(f"Chunk saved to S3 and message sent to SQS: {s3_chunk_key}")
except Exception as e:
print(f"Error processing file: {e}")
ETL Consumer (Lambda)
import duckdb
import pyarrow as pa
import boto3
import json
import time
from pyiceberg.catalog.glue import GlueCatalog # Access Iceberg tables using GlueCatalog
def lambda_handler(event, context):
try:
# 1. Receive SQS message
message_body = json.loads(event['Records'][0]['body']) # Process a single message
s3_bucket = message_body['s3_bucket']
s3_object_key = message_body['s3_object_key']
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
print(f"Processing file from SQS message: {s3_input_path}")
# 2. Connect to DuckDB and configure home directory
duckdb_connection = duckdb.connect(database=':memory:')
duckdb_connection.execute("SET home_directory='/tmp'") # Use Lambda's temporary directory
# Install and load httpfs extension
duckdb_connection.execute("INSTALL httpfs;")
duckdb_connection.execute("LOAD httpfs;")
# 3. Load data from S3 with DuckDB
# Apply any required filtering or cleansing here.
query = f"""
SELECT * FROM read_parquet('{s3_input_path}')
"""
# Execute the SQL and fetch results as a PyArrow table
result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
print(f"Number of rows retrieved: {result_arrow_table.num_rows}")
print(f"Data schema: {result_arrow_table.schema}")
# 4. Configure Glue Catalog (access Iceberg tables)
catalog = GlueCatalog(region_name="ap-northeast-1", database="your_schema", name="my_catalog")
# 5. Append data to Iceberg table (with retry logic)
retry_append(catalog, "your_schema", "your_table", result_arrow_table)
except Exception as e:
print(f"An error occurred: {e}")
# Retry logic when commit conflicts occur
def retry_append(catalog, namespace, table_name, result_arrow_table, retries=5):
delay = 5 # Initial delay in seconds
for attempt in range(retries):
try:
# Load the latest Iceberg table
iceberg_table = catalog.load_table(f"{namespace}.{table_name}")
iceberg_table.refresh() # Explicitly refresh table metadata
current_snapshot = iceberg_table.current_snapshot()
snapshot_id = current_snapshot.snapshot_id if current_snapshot else "None"
print(f"Attempt {attempt + 1}: Using snapshot ID {snapshot_id}")
# Append data
iceberg_table.append(result_arrow_table)
print("Data successfully appended to the Iceberg table.")
return
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if "Cannot commit" in str(e) or "branch main has changed" in str(e):
if attempt < retries - 1:
delay *= 2 # Exponential backoff
print(f"Retrying in {delay} seconds with the latest snapshot...")
time.sleep(delay)
else:
print("Max retries reached. Failing the operation.")
raise
Execution Results
Before processing:
After processing:
In this example, the file was split into 19 chunks:
And all 19 chunks were written into Iceberg:
Areas for Improvement
Retry interval tuning
The exponential backoff for Iceberg commit retries is set a bit high.
This should ideally be configurable via environment variables.
Since retry logic depends heavily on Iceberg’s behavior and table conflict frequency, design this part carefully.
Chunking strategy
This implementation splits CSV before converting to Parquet, but splitting after conversion could be more optimal depending on the dataset.
Conclusion
In this article, we built a parallel data pipeline using AWS Lambda, SQS, DuckDB, and PyIceberg.
Although ETL in AWS is commonly done with Glue or EMR, their cost may not be suitable for all workloads.
Lambda is a cost-efficient option, but it comes with strict memory and execution time limits.
This architecture helps overcome those limitations and enables efficient, scalable, and cost-effective ETL processing on AWS.
However, if you require strict workflow orchestration or more robust error handling, Step Functions or Glue Workflows may be a better fit.
Remember: choose the right tool based on your workload requirements—not just cost.
I hope this article helps you when designing Lambda-based data pipelines!




Top comments (0)