DEV Community

Aki for AWS Community Builders

Posted on

Lightweight ETL with AWS Glue Python Shell, chDB, and PyIceberg (Compared with DuckDB)

Original Japanese article: AWS Glue Python Shell×chDB×PyIcebergで実現する軽量ETLとDuckDB比較

Introduction

I'm Aki, an AWS Community Builder (@jitepengin).

In my previous article, I implemented a lightweight ETL pipeline using AWS Lambda, chDB, and PyIceberg.

Lightweight ETL with AWS Lambda, chDB, and PyIceberg (Compared with DuckDB)

In this article, I’ll introduce a Glue Python Shell version of that architecture, implementing ETL that writes data into Apache Iceberg using chDB.

AWS Glue Python Shell has different characteristics compared to Lambda. By leveraging those strengths, we can design ETL workloads that go beyond Lambda’s limitations.
As in the previous article, I’ll also run the same benchmark comparison against DuckDB to highlight the characteristics of each engine.


What is AWS Glue Python Shell?

AWS Glue Python Shell is a Glue execution environment that runs Python scripts without Apache Spark. Compared with Lambda’s 15-minute limit, it supports much longer batch jobs—the default maximum is 48 hours—so it’s suitable for longer-running ETL tasks. It’s serverless, so you don’t manage infrastructure and you pay for what you use.

Compared to Glue Spark or EMR, Glue Python Shell is often cheaper for lightweight ETL and data transformations. However, one downside versus Lambda is fewer native trigger options: Glue Python Shell does not directly support S3 object-created triggers or EventBridge triggers in the same way Lambda does. Typical approaches are to invoke Glue Python Shell via Lambda or Glue Workflows. Despite constraints, when used well it can significantly expand your processing options.


Architecture Used in This Article

The architecture is intentionally simple (and identical to the DuckDB version).

Here’s the architecture covered in this post: when a file is uploaded to S3, a Lambda function is triggered. The Lambda only starts a Glue Python Shell job and passes the S3 path; the actual ETL runs inside the Glue Python Shell and writes Iceberg-format data back to S3. This lets you process workloads that exceed Lambda’s 15-minute limit.

Key Libraries Used

  • chDB: A database engine that enables fast in-memory SQL query execution.
  • PyArrow: Supports high-speed data conversion and transfer using the Arrow format.
  • PyIceberg: Allows access and manipulation of Iceberg tables via AWS Glue Catalog.

Note: Another way to invoke Glue Python Shell is via EventBridge → Glue Workflow → Glue Python Shell.
I’ll cover the differences and benefits of these approaches in a future article.


What is chDB?

https://clickhouse.com/chdb

chDB is an embedded engine version of ClickHouse that can be used as a library.
Its main features:

  • Contains the ClickHouse execution engine
  • High-performance C++ native implementation
  • Execute SQL directly from Python
  • Can run serverless in a single process

Like DuckDB, it’s positioned as an “embedded analytical engine.”


Additional Python modules path

Set the Additional Python modules path to include:

pyiceberg[glue]==0.9.1,pydantic==2.5.0,pydantic-core==2.14.1,annotated-types==0.6.0,chdb==0.13.0,pyarrow==14.0.1,typing-extensions==4.8.0
Enter fullscreen mode Exit fullscreen mode

Sample Code

Lambda trigger function

A simple Lambda that receives an S3 event and starts the Glue job:

import boto3

def lambda_handler(event, context):
    glue = boto3.client("glue")

    s3_bucket = event['Records'][0]['s3']['bucket']['name']
    s3_object_key = event['Records'][0]['s3']['object']['key']

    s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"

    response = glue.start_job_run(
        JobName="chdb shell",
        Arguments={
            "--s3_input": s3_input_path
        }
    )
    print(f"Glue Job started: {response['JobRunId']}")
    return response
Enter fullscreen mode Exit fullscreen mode

AWS Glue Python Shell ETL Logic

The Glue Python Shell job is started from Lambda with the S3 input path; the job uses DuckDB to read the file and PyIceberg to write into an Iceberg table. In this example the DuckDB query filters VendorID = 1 to demonstrate SQL-style cleansing/filters.

This example uses GlueCatalog access. You can also use the Glue Iceberg REST catalog approach (REST Catalog) depending on your requirements—see the link below for differences.

Reference about catalog access modes:
PyIceberg on AWS Lambda: Comparing GlueCatalog and REST Catalog Access Methods

import boto3
import sys
import os
from awsglue.utils import getResolvedOptions


def get_job_parameters():
    try:
        required_args = ['s3_input']
        args = getResolvedOptions(sys.argv, required_args)

        s3_file_path = args['s3_input']
        print(f"s3_input: {s3_file_path}")

        return s3_file_path

    except Exception as e:
        print(f"parameters error: {e}")
        raise


def _to_pyarrow_table(result):
    """
    Compatibility helper to extract a pyarrow.Table from a chDB query_result.
    """
    import chdb

    if hasattr(chdb, "to_arrowTable"):
        return chdb.to_arrowTable(result)

    if hasattr(result, "to_pyarrow"):
        return result.to_pyarrow()
    if hasattr(result, "to_arrow"):
        return result.to_arrow()

    raise RuntimeError(
        "Cannot convert chdb query_result to pyarrow.Table. "
        f"Available attributes: {sorted(dir(result))[:200]}"
    )


def normalize_arrow_for_iceberg(table):
    """
    Normalize Arrow schema for Iceberg compatibility.
    - timestamptz -> timestamp
    - binary -> string
    """
    import pyarrow as pa

    new_fields = []
    new_columns = []

    for field, column in zip(table.schema, table.columns):
        # timestamp with timezone -> timestamp
        if pa.types.is_timestamp(field.type) and field.type.tz is not None:
            new_type = pa.timestamp(field.type.unit)
            new_fields.append(pa.field(field.name, new_type, field.nullable))
            new_columns.append(column.cast(new_type))
        else:
            new_fields.append(field)
            new_columns.append(column)

    new_schema = pa.schema(new_fields)
    return pa.Table.from_arrays(new_columns, schema=new_schema)


def read_parquet_with_chdb(s3_input):
    """
    Read Parquet file from S3 using chDB.
    """
    import chdb

    if s3_input.startswith("s3://"):
        bucket, key = s3_input.replace("s3://", "").split("/", 1)
        s3_url = f"https://{bucket}.s3.ap-northeast-1.amazonaws.com/{key}"
    else:
        s3_url = s3_input

    print(f"Reading data from S3: {s3_url}")

    query = f"""
        SELECT *
        FROM s3('{s3_url}', 'Parquet')
        WHERE VendorID = 1
    """

    result = chdb.query(query, "Arrow")
    arrow_table = _to_pyarrow_table(result)

    print("Original schema:")
    print(arrow_table.schema)

    # Normalize schema for Iceberg compatibility
    arrow_table = normalize_arrow_for_iceberg(arrow_table)

    print("Normalized schema:")
    print(arrow_table.schema)
    print(f"Rows: {arrow_table.num_rows:,}")

    return arrow_table


def write_iceberg_table(arrow_table):
    """
    Write Arrow table to Iceberg table using PyIceberg.
    """
    try:
        print("Writing started...")

        from pyiceberg.catalog import load_catalog

        catalog_config = {
            "type": "glue",
            "warehouse": "s3://your-bucket/your-warehouse/",  # Adjust to your environment.
            "region": "ap-northeast-1",
        }

        catalog = load_catalog("glue_catalog", **catalog_config)
        table_identifier = "icebergdb.yellow_tripdata"

        table = catalog.load_table(table_identifier)

        print(f"Target data to write: {arrow_table.num_rows:,} rows")
        table.append(arrow_table)

        return True

    except Exception as e:
        print(f"Writing error: {e}")
        import traceback
        traceback.print_exc()
        return False


def main():
    try:
        import chdb
        import pyiceberg

        # Read input parameter
        s3_input = get_job_parameters()

        # Read data with chDB
        arrow_tbl = read_parquet_with_chdb(s3_input)
        print(f"Data read success: {arrow_tbl.num_rows:,} rows")

        # Write to Iceberg table
        if write_iceberg_table(arrow_tbl):
            print("\nWriting fully successful!")
        else:
            print("Writing failed")

    except Exception as e:
        print(f"Main error: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode

Note: This article focuses on the differences in operation, so version updates or conflict handling in Iceberg tables are omitted.


Execution Result

We successfully registered data into an Iceberg table (OTF) using this approach.


Comparison with DuckDB

Let’s compare this implementation with the DuckDB-based approach introduced earlier.

Lightweight ETL with AWS Glue Python Shell, DuckDB, and PyIceberg

DuckDB Sample Code

import boto3
import sys
import os
from awsglue.utils import getResolvedOptions


def get_job_parameters():
    # get job parameters
    try:
        required_args = ['s3_input']

        args = getResolvedOptions(sys.argv, required_args)

        s3_file_path = args['s3_input']
        print(f"object: {s3_file_path}")

        return s3_file_path

    except Exception as e:
        print(f"parameters error: {e}")
        raise

def setup_duckdb_environment():
    # Properly set up the DuckDB environment
    try:
        home_dir = '/tmp'
        duckdb_dir = '/tmp/.duckdb'
        extensions_dir = '/tmp/.duckdb/extensions'

        os.environ['HOME'] = home_dir
        os.environ['DUCKDB_HOME'] = duckdb_dir
        os.environ['DUCKDB_CONFIG_PATH'] = duckdb_dir
        os.environ['DUCKDB_EXTENSION_DIRECTORY'] = extensions_dir

        os.makedirs(duckdb_dir, exist_ok=True)
        os.makedirs(extensions_dir, exist_ok=True)
        os.chmod(duckdb_dir, 0o755)
        os.chmod(extensions_dir, 0o755)

        print(f"DuckDB environment setup completed: {duckdb_dir}")
        return True

    except Exception as e:
        print(f"DuckDB environment setup error: {e}")
        return False

def read_parquet_with_duckdb(s3_input):
    # Read Parquet file from S3 using DuckDB
    import duckdb

    con = duckdb.connect(':memory:')

    try:
        con.execute("SET extension_directory='/tmp/.duckdb/extensions';")
        con.execute("INSTALL httpfs;")
        con.execute("LOAD httpfs;")

        session = boto3.Session()
        credentials = session.get_credentials()

        con.execute(f"SET s3_region='ap-northeast-1';")
        con.execute(f"SET s3_access_key_id='{credentials.access_key}';")
        con.execute(f"SET s3_secret_access_key='{credentials.secret_key}';")

        if credentials.token:
            con.execute(f"SET s3_session_token='{credentials.token}';")

        print(f"Reading data from S3: {s3_input}")
        sql = f"SELECT * FROM read_parquet('{s3_input}') WHERE VendorID = 1"
        res = con.execute(sql)

        return res.arrow()

    except Exception as e:
        print(f"DuckDB error: {e}")
        raise
    finally:
        con.close()

def write_iceberg_table(arrow_table):
    # Write to an Iceberg table
    try:
        print("Writing started...")

        from pyiceberg.catalog import load_catalog
        from pyiceberg.schema import Schema
        from pyiceberg.types import NestedField, StringType, IntegerType, DoubleType, TimestampType

        catalog_config = {
            "type": "glue",
            "warehouse": "s3://your-bucket/your-warehouse/", # Adjust to your environment.
            "region": "ap-northeast-1"
        }

        catalog = load_catalog("glue_catalog", **catalog_config)

        table_identifier = "icebergdb.yellow_tripdata"

        table = catalog.load_table(table_identifier)
        print(f"Target data to write: {arrow_table.num_rows:,} rows")

        # Write Arrow table to Iceberg table
        table.append(arrow_table)


        return True

    except Exception as e:
        print(f"Writing error: {e}")
        import traceback
        traceback.print_exc()
        return False

def main():  
    # Set up DuckDB environment
    if not setup_duckdb_environment():
        print("Failed to set up DuckDB environment")
        return

    try:
        import pyiceberg

        # Read data
        s3_input = get_job_parameters()

        arrow_tbl = read_parquet_with_duckdb(s3_input)
        print(f"Data read success: {arrow_tbl.shape}")

        # Write to Iceberg table
        if write_iceberg_table(arrow_tbl):
            print("\nWriting fully successful!")

        else:
            print("Writing failed")

    except Exception as e:
        print(f"Main error: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Note: This article focuses on the differences in operation, so version updates or conflict handling in Iceberg tables are omitted.


Benchmark

We used NYC taxi data for testing, comparing the same file with different memory allocations.

Source: NYC Taxi Trip Record Data

Test files:

  • January 2024 Yellow Taxi Trip Records (2,964,624 records, 48 MB)
  • Full 2024 dataset (41,169,720 records, 807 MB)

We test with DPU = 1/16 and 1.


Benchmark Results

January File (48MB)

DPU chDB Time (s) chDB DPU hours DuckDB Time (s) DuckDB DPU hours
1/16 46 <0.01 40 <0.01
1 39 0.02 34 0.01

Full Year File (807MB)

DPU chDB Time (s) chDB DPU hours DuckDB Time (s) DuckDB DPU hours
1/16 OutOfMemory OutOfMemory
1 51 0.02 212 0.06

Glue Python Shell ran out of memory due to insufficient buffer for Parquet → Arrow → Iceberg conversion.


Observations

  • For small datasets (~48MB), both chDB and DuckDB are fully practical. The performance difference is negligible.
  • With 1 DPU and sufficient memory, chDB outperforms DuckDB in both execution time and cost efficiency. This makes chDB particularly suitable for processing large files in a single run.
  • In lightweight ETL using Glue Python Shell, DPU sizing based on file size is the most critical design factor.

Conclusion

In this article, we implemented an ETL pipeline that writes data into Apache Iceberg using AWS Glue Python Shell and chDB.

When using 1 DPU and processing large files, chDB demonstrated superior performance and cost efficiency compared to DuckDB.
That said, behavior may vary with more complex queries, so benchmarking under real workloads is always recommended.

By leveraging Glue Python Shell’s long execution time, we can efficiently process workloads that would otherwise time out in Lambda.

Combining chDB or DuckDB with PyIceberg enables highly efficient, serverless ETL pipelines with Iceberg integration.
Since Iceberg has become a de facto standard for Open Table Formats (OTF), this approach significantly expands the practical use cases of lakehouse architectures.

I hope this article helps anyone considering lightweight ETL or near–real-time processing on Iceberg tables.

Top comments (0)