DEV Community

Aki for AWS Community Builders

Posted on

Boosting Lightweight ETL on AWS Lambda & Glue Python Shell with DuckDB and Apache Arrow Dataset

Original Japanese article: AWS Lambda/Glue Python Shell×DuckDBの軽量ETLをApache Arrow Datasetで高速化してみた

Introduction

I'm Aki, an AWS Community Builder (@jitepengin).

In my previous articles, I introduced lightweight ETL using AWS Lambda and Glue Python Shell.
In the process, I found that DuckDB's performance was not as high as expected:

Does Increasing AWS Lambda Memory to 10GB Really Make It Faster? (AWS Lambda chDB/DuckDB PyIceberg Benchmark)
AWS Lambda and AWS Glue Python Shell in the Context of Lightweight ETL

In this article, I will cover what became the bottleneck for DuckDB and how using Apache Arrow Dataset can improve performance, along with the trade-offs observed.


Recap of Previous Articles

Does Increasing AWS Lambda Memory to 10GB Really Make It Faster? (AWS Lambda chDB/DuckDB PyIceberg Benchmark)
AWS Lambda and AWS Glue Python Shell in the Context of Lightweight ETL

Using NYC taxi data, we compared performance on the same file:
data.page]https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Test files:

  • January 2024 Yellow Taxi Trip Records (2,964,624 records, 48MB)
  • Full-year 2024 Yellow Taxi Trip Records (41,169,720 records, 807MB)

Lambda measurements were taken with memory configurations of 1024MB, 2048MB, and the maximum 3008MB (without quota increase).
Glue Python Shell tests were performed with DPU settings of 1/16 and 1.

Since memory usage cannot be directly compared, we focus only on execution time.

48MB File (1 Month)

Execution Platform Resource Setting chDB Time (s) DuckDB Time (s)
Glue Python Shell 1/16 DPU 46.000 40.000
Glue Python Shell 1 DPU 39.000 34.000
AWS Lambda 1024 MB 5.092 5.163
AWS Lambda 2048 MB 3.873 4.265
AWS Lambda 3008 MB 3.370 4.061

807MB File (1 Year)

Execution Platform Resource Setting chDB Time (s) DuckDB Time (s)
Glue Python Shell 1/16 DPU OutOfMemory OutOfMemory
Glue Python Shell 1 DPU 51.0 212.0
AWS Lambda 1024 MB OutOfMemory OutOfMemory
AWS Lambda 2048 MB OutOfMemory OutOfMemory
AWS Lambda 3008 MB 27.171 187.332

What Caused the DuckDB Bottleneck?

When loading Parquet directly into DuckDB, the flow is typically:

S3
↓
DuckDB read_parquet
↓
Filter / Query
Enter fullscreen mode Exit fullscreen mode

Key points:

  1. S3 Scan: Reading the entire dataset involves heavy network I/O — this can take most of the time.
  2. Parquet Decode: Decoding inside DuckDB adds CPU load.
  3. Query Processing: For simple filters like WHERE VendorID = 1, query time is minimal.

Even if the query itself is light, S3 scanning becomes the bottleneck, lowering DuckDB’s standalone performance.
Measurements showed that in Glue Python Shell, of the total 210 seconds, 176 seconds (~83%) were spent in S3 Scan + Parquet Decode.

Solution: Use Apache Arrow Dataset to separate reading from querying and improve performance.


What is Apache Arrow Dataset?

https://arrow.apache.org/docs/python/dataset.html

Apache Arrow Dataset is a library for efficiently reading Parquet or CSV files using the columnar Arrow in-memory format.

Features:

  • Fast Parquet reading
  • Efficient decode operations
  • Parallelized S3 reads
  • Filter/Projection pushdown to reduce I/O

By leveraging these features, the S3 Scan + Parquet Decode bottleneck can be greatly reduced.


Architecture Overview

AWS Lambda

Same architecture as in previous articles:

Glue Python Shell

Also same as before:


Sample Code (AWS Lambda)

import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog

def lambda_handler(event, context):
    try:

        # DuckDB setup in Lambda
        duckdb_connection = duckdb.connect(database=':memory:')

        # Retrieve S3 path from event
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        s3_input_path = f"{s3_bucket}/{s3_object_key}"

        print(f"S3 input path: {s3_input_path}")

        # Read Parquet from S3 using Arrow Dataset
        # Use boto3 session to get temporary credentials
        session = boto3.Session()
        credentials = session.get_credentials().get_frozen_credentials()

        s3 = fs.S3FileSystem(
            region="ap-northeast-1",
            access_key=credentials.access_key,
            secret_key=credentials.secret_key,
            session_token=credentials.token
        )

        # Load dataset with Arrow Dataset
        dataset = ds.dataset(
            s3_input_path,
            filesystem=s3,
            format="parquet"
        )

        # Convert dataset to Arrow Table (in-memory)
        arrow_table = dataset.to_table()
        print(f"Number of rows retrieved: {arrow_table.num_rows}")
        print(f"Schema: {arrow_table.schema}")

        # DuckDB processing (SQL query)
        # Use DuckDB from_arrow to run SQL on Arrow 
        rel = duckdb_connection.from_arrow(arrow_table)
        result_arrow_table = duckdb_connection.execute(
            """
            SELECT * 
            FROM rel
            WHERE VendorID = 1
            """
        ).fetch_arrow_table()

        # Configure Glue Catalog (to access Iceberg table)
        catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog")  # Adjust to your environment.

        # Load the table
        namespace = "icebergdb"  # Adjust to your environment.
        table_name = "yellow_tripdata"  # Adjust to your environment.
        iceberg_table = catalog.load_table(f"{namespace}.{table_name}")

        # Append data to the Iceberg table in bulk
        iceberg_table.append(result_arrow_table)

        print("Data has been appended to S3 in Iceberg format.")

    except Exception as e:
        print(f"An error occurred: {e}")
Enter fullscreen mode Exit fullscreen mode

Note: This article focuses on the differences in operation, so version updates or conflict handling in Iceberg tables are omitted.

Key points:

  • Arrow Dataset separates S3 reading from DuckDB querying
  • Light filters can be applied in Arrow Dataset alone for speed

Sample Code (Glue Python Shell)

import boto3
import sys
import os
import pyarrow.dataset as ds
import pyarrow.fs as fs
from awsglue.utils import getResolvedOptions


def get_job_parameters():
    args = getResolvedOptions(sys.argv, ['s3_input'])
    s3_path = args['s3_input']
    print(f"object: {s3_path}")
    return s3_path


def setup_duckdb_environment():
    try:
        duckdb_dir = '/tmp/.duckdb'
        os.environ['HOME'] = '/tmp'
        os.makedirs(duckdb_dir, exist_ok=True)
        print(f"DuckDB environment setup completed: {duckdb_dir}")
        return True
    except Exception as e:
        print(f"DuckDB environment setup error: {e}")
        return False


def read_parquet_with_arrow_dataset(s3_input):

    print("Reading with Arrow Dataset...")

    session = boto3.Session()
    credentials = session.get_credentials().get_frozen_credentials()

    s3 = fs.S3FileSystem(
        region="ap-northeast-1",
        access_key=credentials.access_key,
        secret_key=credentials.secret_key,
        session_token=credentials.token
    )

    path = s3_input.replace("s3://", "")

    dataset = ds.dataset(
        path,
        filesystem=s3,
        format="parquet"
    )

    table = dataset.to_table()

    print(f"Arrow read rows: {table.num_rows}")

    return table


def process_with_duckdb(arrow_table):

    import duckdb

    con = duckdb.connect(":memory:")

    try:

        rel = duckdb.from_arrow(arrow_table)

        result = con.execute("""
            SELECT *
            FROM rel
            WHERE VendorID = 1
        """).arrow()

        print(f"DuckDB filtered rows: {result.num_rows}")

        return result

    finally:
        con.close()


def write_iceberg_table(arrow_table):

    try:

        print("Writing started...")

        from pyiceberg.catalog import load_catalog

        catalog_config = {
            "type": "glue",
            "warehouse": "s3://your-bucket/your-warehouse/",
            "region": "ap-northeast-1"
        }

        catalog = load_catalog("glue_catalog", **catalog_config)

        table_identifier = "icebergdb.yellow_tripdata"

        table = catalog.load_table(table_identifier)

        print(f"Target data to write: {arrow_table.num_rows:,} rows")

        table.append(arrow_table)

        return True

    except Exception as e:

        print(f"Writing error: {e}")

        import traceback
        traceback.print_exc()

        return False


def main():

    if not setup_duckdb_environment():
        print("Failed to set up DuckDB environment")
        return

    try:

        s3_input = get_job_parameters()

        # Arrow Dataset read
        arrow_tbl = read_parquet_with_arrow_dataset(s3_input)

        # DuckDB SQL filter
        result_tbl = process_with_duckdb(arrow_tbl)

        # Iceberg write
        if write_iceberg_table(result_tbl):
            print("Writing fully successful!")
        else:
            print("Writing failed")

    except Exception as e:

        print(f"Main error: {e}")

        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Note: This article focuses on the differences in operation, so version updates or conflict handling in Iceberg tables are omitted.

Key points:

  • Glue Python Shell can execute ETL in the same Lambda-style configuration
  • Responsibilities separated: Arrow Dataset for reading, DuckDB for SQL query
  • Lightweight filters can be processed efficiently

Benchmarking

Using the same dataset and queries as in previous articles:

AWS Lambda

48MB File (1 Month) memory=3008MB

Method Time (ms) Memory (MB)
chDB 3,369.78 1115
DuckDB 4,061.33 524
DuckDB × Arrow Dataset 3,591.84 928

807MB File (1 Year) memory=10240MB

Method Time (ms) Memory (MB)
chDB 22,839.18 3490
DuckDB 189,678.02 2788
DuckDB × Arrow Dataset 15,220.6 8086

Glue Python Shell (1 DPU)

48MB File (1 Month)

Method Time (s)
chDB 39
DuckDB 34
DuckDB × Arrow Dataset 32

807MB File (1 Year)

Method Time (s)
chDB 51
DuckDB 212
DuckDB × Arrow Dataset 44

As a result, both AWS Lambda and Glue Python Shell were able to achieve significant performance improvements compared to chDB.
In other words, addressing the S3 scan and Parquet decoding bottlenecks seems to be the key to improving DuckDB processing.

However, in the case of Lambda, large file sizes can lead to high memory usage, potentially exceeding the memory limits.
This means that careful consideration of where and how to use this approach is necessary.


Memory Consideration with Arrow Dataset

dataset = ds.dataset(
    s3_input_path,
    filesystem=s3,
    format="parquet"
)

arrow_table = dataset.to_table()
Enter fullscreen mode Exit fullscreen mode

In this process, dataset.to_table() materializes the entire dataset in memory as an Arrow Table.
Arrow Tables use a columnar in-memory format, which is very fast, but in this case, loading the entire file at once can result in high memory usage.

For example, reading an 807MB Parquet file in Lambda can cause the memory footprint of the Arrow Table to be much larger than the compressed Parquet file size.
While to_table() is convenient, it is important to be aware that it can significantly increase memory consumption depending on the processing.

Lambda can also use /tmp for disk-backed processing, but processing in memory is overwhelmingly faster.
However, due to memory limits, expanding a large file into an Arrow Table can quickly consume a large amount of memory.

For simple queries, one approach is to iterate over row groups instead of materializing the entire table in memory, processing small chunks at a time.
This method can potentially keep memory usage within a few hundred MBs.


Trade-Offs

Using Arrow Dataset significantly improves the speed of S3 reads and Parquet decoding.
However, expanding the dataset all at once with to_table() increases memory usage, which may hit Lambda’s memory limits.

  • Pros: Decoding and I/O are faster, resulting in improved performance.
  • Cons: Materializing the entire file consumes a lot of memory, and for large files, Lambda may run into OutOfMemory errors.

Therefore, it is important to design your ETL with a balance between performance and memory usage in mind.
For small files or when Lambda has sufficient memory, loading the full dataset at once is fine.
For larger files, consider chunked processing by row group or pushdown filters to keep memory usage under control.


Pushdown with Arrow Dataset

Using Arrow Dataset’s Filter/Projection Pushdown, you can load only the row groups you need from S3.

Here’s how you can apply it:

arrow_table = dataset.to_table(
    columns=["VendorID", "tpep_pickup_datetime"],
    filter=ds.field("VendorID") == 1
)
Enter fullscreen mode Exit fullscreen mode
  • Only the necessary row groups are read (this is crucial for large datasets).
  • Reduces network I/O from S3.
  • Can further shorten processing time.
  • Arrow Dataset is optimal for lightweight reads and simple filters; complex queries should still be handled in DuckDB.

Experimenting in Lambda

You can integrate pushdown into your existing Lambda code. For example:

import duckdb
import pyarrow.dataset as ds
import pyarrow.fs as fs
import pyarrow as pa
import boto3
from pyiceberg.catalog.glue import GlueCatalog

def lambda_handler(event, context):
    try:

        # DuckDB setup in Lambda
        duckdb_connection = duckdb.connect(database=':memory:')

        # Retrieve S3 path from event
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        s3_input_path = f"{s3_bucket}/{s3_object_key}"

        print(f"S3 input path: {s3_input_path}")

        # Read Parquet from S3 using Arrow Dataset
        # Use boto3 session to get temporary credentials
        session = boto3.Session()
        credentials = session.get_credentials().get_frozen_credentials()

        s3 = fs.S3FileSystem(
            region="ap-northeast-1",
            access_key=credentials.access_key,
            secret_key=credentials.secret_key,
            session_token=credentials.token
        )

        # Load dataset with Arrow Dataset
        dataset = ds.dataset(
            s3_input_path,
            filesystem=s3,
            format="parquet"
        )

        # Convert dataset to Arrow Table (in-memory)
        arrow_table = dataset.to_table(filter=ds.field("VendorID") == 1)
        print(f"Number of rows retrieved: {arrow_table.num_rows}")
        print(f"Schema: {arrow_table.schema}")

        # DuckDB processing (SQL query)
        # Use DuckDB from_arrow to run SQL on Arrow 
        rel = duckdb_connection.from_arrow(arrow_table)
        result_arrow_table = duckdb_connection.execute(
            """
            SELECT * 
            FROM rel
            """
        ).fetch_arrow_table()

        # Configure Glue Catalog (to access Iceberg table)
        catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog")  # Adjust to your environment.

        # Load the table
        namespace = "icebergdb"  # Adjust to your environment.
        table_name = "yellow_tripdata"  # Adjust to your environment.
        iceberg_table = catalog.load_table(f"{namespace}.{table_name}")

        # Append data to the Iceberg table in bulk
        iceberg_table.append(result_arrow_table)

        print("Data has been appended to S3 in Iceberg format.")

    except Exception as e:
        print(f"An error occurred: {e}")
Enter fullscreen mode Exit fullscreen mode

Using pushdown, memory usage was reduced significantly:

Method Time (ms) Memory (MB)
DuckDB × Arrow Dataset 15,220.6 8086
DuckDB × Arrow Dataset (Pushdown) 15,238.14 3458

The memory usage was significantly reduced with pushdown.
This two-step approach filtering unnecessary data with Arrow Dataset before passing it to DuckDB proves to be effective for large datasets.


Insights

  • DuckDB alone is slow due to S3 Scan + Parquet Decode
  • DuckDB shines with complex queries (JOIN, GROUP BY, WINDOW)
  • Pushdown is key in Arrow Dataset
  • Separating responsibilities (Arrow Dataset + DuckDB) enables efficient ETL in Lambda/Glue
  • chDB offers balanced memory and speed out-of-the-box

Responsibility-Separated Architecture

S3 Parquet (raw data)
      │
      ▼
Arrow Dataset → row group scan + simple filter
      │
      ▼
DuckDB → SQL query (JOIN, GROUP BY, Window functions)
      │
      ▼
PyIceberg → Iceberg table write
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this article, we explored performance improvements for a lightweight ETL built with AWS Lambda / Glue Python Shell × DuckDB × PyIceberg.

For lightweight ETL, especially on AWS Lambda, processing time is a critical factor. By using Apache Arrow Dataset, we were able to significantly improve performance by offloading S3 reading and Parquet decoding before running queries in DuckDB.

However, there are trade-offs. Expanding an entire dataset into memory with to_table() can lead to high memory usage, which may exceed Lambda’s limits for large files. Therefore, careful responsibility separation and chunked processing (e.g., row group iteration or pushdown filters) are important considerations.

With the architecture presented here, even large files can be processed quickly in a lightweight ETL on AWS. While complex queries may still face performance limitations, this approach provides a practical and efficient option for real-time or near-real-time ETL in a Lakehouse environment using Apache Iceberg.

We hope this article serves as a reference for those exploring lightweight data processing and ETL patterns on Iceberg tables.

Top comments (0)