DEV Community

Aki for AWS Community Builders

Posted on

Designing a Cost-Efficient Parallel Data Pipeline on AWS Using Lambda and SQS

Original Japanese article: AWSでの効率的なデータ処理を考える~AWS LambdaとSQSを活用した並列データパイプライン~

Introduction

I'm Aki, an AWS Community Builder (@jitepengin).

In my previous articles, I introduced how to build lightweight ETL workflows using AWS Lambda.
This time, I’ll continue with another Lambda-based data processing pattern—handling large datasets efficiently by splitting them into chunks and processing them in parallel.

As you probably know, dealing with large datasets efficiently is one of the most important challenges in data engineering.
When working with AWS Lambda, handling large files often becomes difficult due to memory limits and maximum execution time.

Even loading a large dataset itself can be expensive—and performing transformations on top of that easily exceeds Lambda’s limits.

In this article, I introduce an architecture that combines AWS Lambda and SQS to split large files into chunks and process them in parallel, enabling efficient and scalable data pipelines.

Architecture Overview

The architecture looks like this:

The processing flow is:

  1. A file uploaded to S3 triggers an SQS notification.
  2. A Lambda function is invoked, which splits the file into appropriate chunk sizes and stores the chunks back into S3.
  3. Metadata for each chunk is sent to another SQS queue.
  4. A separate Lambda function consumes messages from that SQS queue and performs ETL (extract, transform, load) on each chunk.
  5. The processed data is finally loaded into a data lake stored in Iceberg format.

The Lambda functions use the following libraries:

  • DuckDB – for fast, in-memory SQL processing
  • PyArrow – for efficient columnar data handling and conversion
  • PyIceberg – for accessing and updating Iceberg tables using AWS Glue Catalog

Benefits of This Architecture

This approach offers several advantages:

1. Improved scalability through parallel processing

By using SQS, Lambda can process a large file in parallel by splitting it into chunks.
The heavier the processing inside Lambda becomes, the more effective parallelism becomes.

2. Avoiding Lambda’s memory and timeout limits

Chunking enables you to process large datasets without exceeding Lambda’s resource limits.
With the help of DuckDB, the processing becomes even more efficient.

3. Built-in retry mechanisms using SQS

One major advantage of SQS is its native retry capability.
When processing chunked data, errors may cause partial failures or missing results.
Retries and DLQ (dead-letter queue) handling become critical for reliability.

Points to Consider

1. Ordering guarantees

While SQS FIFO queues exist, the combination of parallel chunk processing and asynchronous Lambda execution means you cannot guarantee data order when writing to the final table.
If strict sequencing is required, consider using Step Functions or another orchestrator.

2. Error handling and retry strategies

Although not detailed in this article, setting up a DLQ is highly recommended.
A proper visibility timeout helps prevent duplicate processing.

Test Dataset

For this demo, I used the NYC Yellow Taxi Trip dataset from Kaggle (CSV format).
Since it is CSV, I also added a step to convert it into Parquet for better compression and processing efficiency.

Dataset link:
https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data

Packaging for Lambda

I reused the previous Lambda setup and packaged dependencies using Lambda Layers:

mkdir python 
pip install -t python --platform manylinux2014_x86_64 --only-binary=:all: pyiceberg[glue,duckdb]
Enter fullscreen mode Exit fullscreen mode

Installing pyiceberg[glue,duckdb] automatically includes the necessary Glue extensions, DuckDB, and PyArrow.
However, the package size exceeds the 250MB Lambda Layer limit, so I removed boto3 manually.

Using a container image may be a cleaner solution overall.

Sample Code

File Chunk Producer (Lambda)

import duckdb
import pyarrow.parquet as pq
import boto3
import json
import os
import pyarrow as pa

s3_client = boto3.client("s3")
sqs_client = boto3.client("sqs")

S3_BUCKET_NAME = "your target S3 bucket"
SQS_QUEUE_URL = "your SQS queue for sending chunk metadata"

def lambda_handler(event, context):
    try:
        print(f"event: {event}")
        # Retrieve S3 event information from the SQS message
        event_body = json.loads(event['Records'][0]['body'])
        s3_bucket = event_body['Records'][0]['s3']['bucket']['name']
        s3_object_key = event_body['Records'][0]['s3']['object']['key']
        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
        print(f"Processing file: {s3_input_path}")

        # Create a base filename without the extension
        base_filename = s3_object_key.rsplit(".", 1)[0]

        # Connect to DuckDB in-memory database
        duckdb_connection = duckdb.connect(database=":memory:")
        duckdb_connection.execute("SET home_directory='/tmp'")  # Set Lambda's temporary directory

        # Install and load the httpfs extension
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")

        # Load the CSV file using read_csv_auto
        query = f"SELECT * FROM read_csv_auto('{s3_input_path}')"
        arrow_table = duckdb_connection.execute(query).fetch_arrow_table()

        # Convert all column names to lowercase
        new_column_names = [name.lower() for name in arrow_table.column_names]
        arrow_table = arrow_table.rename_columns(new_column_names)
        print(f"Schema after conversion: {arrow_table.schema}")

        # Get chunk size (default: 100MB)
        chunk_size_mb = int(os.environ.get("CHUNK_SIZE_MB", 100))
        chunk_size_bytes = chunk_size_mb * 1024 * 1024

        # Retrieve total data size and row count
        total_bytes = arrow_table.nbytes
        total_rows = arrow_table.num_rows

        # Calculate number of rows per chunk (minimum 1 row)
        chunk_rows = max(1, total_rows * chunk_size_bytes // total_bytes)

        # Create chunks based on row count, save to S3, and send SQS notifications
        for i in range(0, total_rows, chunk_rows):
            chunk = arrow_table.slice(i, chunk_rows)
            output_file = f"/tmp/{base_filename}_part{i // chunk_rows}.parquet"

            pq.write_table(chunk, output_file)
            print(f"Saved chunk: {output_file}")

            s3_chunk_key = f"chunks/{base_filename}_part{i // chunk_rows}.parquet"
            s3_client.upload_file(output_file, S3_BUCKET_NAME, s3_chunk_key)

            message_body = json.dumps({
                "s3_bucket": S3_BUCKET_NAME,
                "s3_object_key": s3_chunk_key
            })
            sqs_client.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message_body)
            print(f"Chunk saved to S3 and message sent to SQS: {s3_chunk_key}")

    except Exception as e:
        print(f"Error processing file: {e}")

Enter fullscreen mode Exit fullscreen mode

ETL Consumer (Lambda)

import duckdb
import pyarrow as pa
import boto3
import json
import time
from pyiceberg.catalog.glue import GlueCatalog  # Access Iceberg tables using GlueCatalog

def lambda_handler(event, context):
    try:
        # 1. Receive SQS message
        message_body = json.loads(event['Records'][0]['body'])  # Process a single message
        s3_bucket = message_body['s3_bucket']
        s3_object_key = message_body['s3_object_key']

        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"

        print(f"Processing file from SQS message: {s3_input_path}")

        # 2. Connect to DuckDB and configure home directory
        duckdb_connection = duckdb.connect(database=':memory:')
        duckdb_connection.execute("SET home_directory='/tmp'")  # Use Lambda's temporary directory

        # Install and load httpfs extension
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")

        # 3. Load data from S3 with DuckDB  
        # Apply any required filtering or cleansing here.
        query = f"""
            SELECT * FROM read_parquet('{s3_input_path}')
        """
        # Execute the SQL and fetch results as a PyArrow table
        result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()

        print(f"Number of rows retrieved: {result_arrow_table.num_rows}")
        print(f"Data schema: {result_arrow_table.schema}")

        # 4. Configure Glue Catalog (access Iceberg tables)
        catalog = GlueCatalog(region_name="ap-northeast-1", database="your_schema", name="my_catalog")

        # 5. Append data to Iceberg table (with retry logic)
        retry_append(catalog, "your_schema", "your_table", result_arrow_table)

    except Exception as e:
        print(f"An error occurred: {e}")

# Retry logic when commit conflicts occur
def retry_append(catalog, namespace, table_name, result_arrow_table, retries=5):
    delay = 5  # Initial delay in seconds

    for attempt in range(retries):
        try:
            # Load the latest Iceberg table
            iceberg_table = catalog.load_table(f"{namespace}.{table_name}")
            iceberg_table.refresh()  # Explicitly refresh table metadata

            current_snapshot = iceberg_table.current_snapshot()
            snapshot_id = current_snapshot.snapshot_id if current_snapshot else "None"
            print(f"Attempt {attempt + 1}: Using snapshot ID {snapshot_id}")

            # Append data
            iceberg_table.append(result_arrow_table)
            print("Data successfully appended to the Iceberg table.")
            return
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if "Cannot commit" in str(e) or "branch main has changed" in str(e):
                if attempt < retries - 1:
                    delay *= 2  # Exponential backoff
                    print(f"Retrying in {delay} seconds with the latest snapshot...")
                    time.sleep(delay)
                else:
                    print("Max retries reached. Failing the operation.")
                    raise

Enter fullscreen mode Exit fullscreen mode

Execution Results

Before processing:

After processing:

In this example, the file was split into 19 chunks:

And all 19 chunks were written into Iceberg:

Areas for Improvement

Retry interval tuning

The exponential backoff for Iceberg commit retries is set a bit high.
This should ideally be configurable via environment variables.
Since retry logic depends heavily on Iceberg’s behavior and table conflict frequency, design this part carefully.

Chunking strategy

This implementation splits CSV before converting to Parquet, but splitting after conversion could be more optimal depending on the dataset.

Conclusion

In this article, we built a parallel data pipeline using AWS Lambda, SQS, DuckDB, and PyIceberg.
Although ETL in AWS is commonly done with Glue or EMR, their cost may not be suitable for all workloads.

Lambda is a cost-efficient option, but it comes with strict memory and execution time limits.

This architecture helps overcome those limitations and enables efficient, scalable, and cost-effective ETL processing on AWS.

However, if you require strict workflow orchestration or more robust error handling, Step Functions or Glue Workflows may be a better fit.
Remember: choose the right tool based on your workload requirements—not just cost.

I hope this article helps you when designing Lambda-based data pipelines!


Top comments (0)