DEV Community

Aki for AWS Community Builders

Posted on

Lightweight ETL with AWS Lambda, chDB, and PyIceberg (Compared with DuckDB)

Original Japanese article: AWS Lambda×chDB×PyIcebergで実現する軽量ETLとDuckDB比較

Introduction

I'm Aki, an AWS Community Builder (@jitepengin).

In my previous articles, I’ve explored lightweight ETL implementations:

Lightweight ETL using AWS Lambda is widely used in practical scenarios.
When using Lambda, the choice of query engine significantly affects performance and stability, especially given the limited memory and execution time.

So far, I’ve used DuckDB for processing, but in this article, I’ll focus on chDB, the embedded version of ClickHouse, to implement a lightweight ETL.
We’ll also compare it with DuckDB under the same conditions to explore the characteristics of each engine.

Architecture

The architecture is very simple (the same setup as my DuckDB article).
A Lambda function is triggered by S3 file uploads and writes the data to S3 in Iceberg format.

The key libraries used in Lambda:

  • chDB: A database engine that enables fast in-memory SQL query execution.
  • PyArrow: Supports high-speed data conversion and transfer using the Arrow format.
  • PyIceberg: Allows access and manipulation of Iceberg tables via AWS Glue Catalog.

What is chDB?

chDB is an embedded engine version of ClickHouse that can be used as a library.
Its main features:

  • Contains the ClickHouse execution engine
  • High-performance C++ native implementation
  • Execute SQL directly from Python
  • Can run serverless in a single process

Like DuckDB, it’s positioned as an “embedded analytical engine.”


Packaging for Lambda

Since the dependencies are large, I used a container image.

Dockerfile

FROM public.ecr.aws/lambda/python:3.12

WORKDIR /var/task

COPY requirements.txt .
RUN pip install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

COPY lambda_function.py .

CMD ["lambda_function.lambda_handler"]
Enter fullscreen mode Exit fullscreen mode

requirements.txt

chdb
pyiceberg[glue]
pyarrow
Enter fullscreen mode Exit fullscreen mode

Sample Code

import chdb
import pyarrow as pa
from pyiceberg.catalog.glue import GlueCatalog


def _to_pyarrow_table(result):
    """
    Compatibility helper to extract a pyarrow.Table from a chDB query_result.
    """
    if hasattr(chdb, "to_arrowTable"):
        return chdb.to_arrowTable(result)

    if hasattr(result, "to_pyarrow"):
        return result.to_pyarrow()
    if hasattr(result, "to_arrow"):
        return result.to_arrow()

    raise RuntimeError(
        "Cannot convert chdb query_result to pyarrow.Table. "
        f"Available attributes: {sorted(dir(result))[:200]}"
    )


def normalize_arrow_for_iceberg(table: pa.Table) -> pa.Table:
    """
    Normalize Arrow types that Iceberg does not accept
    (mainly timezone-aware timestamps).
    """
    new_fields = []
    new_columns = []

    for field, column in zip(table.schema, table.columns):
        if pa.types.is_timestamp(field.type) and field.type.tz is not None:
            # Remove timezone information (values remain in UTC)
            new_type = pa.timestamp(field.type.unit)
            new_fields.append(pa.field(field.name, new_type, field.nullable))
            new_columns.append(column.cast(new_type))
        else:
            new_fields.append(field)
            new_columns.append(column)

    new_schema = pa.schema(new_fields)
    return pa.Table.from_arrays(new_columns, schema=new_schema)


def lambda_handler(event, context):
    try:
        # Extract S3 bucket and object key from the event
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']

        # Build S3 HTTPS URL
        s3_url = (
            f"https://{s3_bucket}."
            f"s3.ap-northeast-1.amazonaws.com/"
            f"{s3_object_key}"
        )

        print(f"s3_url: {s3_url}")

        # Query Parquet data on S3 using chDB
        query = f"""
            SELECT *
            FROM s3('{s3_url}', 'Parquet')
            WHERE VendorID = 1
        """

        # Execute chDB query with Arrow output
        result = chdb.query(query, "Arrow")

        # Convert chDB result to pyarrow.Table
        arrow_table = _to_pyarrow_table(result)
        print(f"Original schema: {arrow_table.schema}")

        # Normalize schema for Iceberg compatibility
        arrow_table = normalize_arrow_for_iceberg(arrow_table)
        print(f"Normalized schema: {arrow_table.schema}")
        print(f"Rows: {arrow_table.num_rows}")

        # Initialize Iceberg Glue Catalog
        catalog = GlueCatalog(
            name="my_catalog",
            database="icebergdb",
            region_name="ap-northeast-1",
        )

        # Load Iceberg table
        iceberg_table = catalog.load_table("icebergdb.yellow_tripdata")

        # Append data to Iceberg table
        iceberg_table.append(arrow_table)

        print("Data appended to Iceberg table.")

    except Exception as e:
        print("Exception:", e)
        raise
Enter fullscreen mode Exit fullscreen mode

Note: This article focuses on the differences in operation, so version updates or conflict handling in Iceberg tables are omitted.

Execution Result

We successfully appended data to an Iceberg table!
Although no filtering or validation was implemented here, additional processing can be added inside Lambda.

Comparison with DuckDB

DuckDB is another popular engine for lightweight ETL.
For details, see my DuckDB article:

DuckDB Dockerfile

FROM public.ecr.aws/lambda/python:3.12

WORKDIR /var/task

COPY requirements.txt .
RUN pip install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

COPY lambda_function.py .

CMD ["lambda_function.lambda_handler"]
Enter fullscreen mode Exit fullscreen mode

DuckDB requirements.txt

chdb
pyiceberg[glue]
pyarrow
Enter fullscreen mode Exit fullscreen mode

DuckDB Sample Code

import duckdb
import pyarrow as pa
from pyiceberg.catalog.glue import GlueCatalog  

def lambda_handler(event, context):
    try:
        # Connect to DuckDB and set the home directory
        duckdb_connection = duckdb.connect(database=':memory:')
        duckdb_connection.execute("SET home_directory='/tmp'") 

        # Install and load the httpfs extension
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")

        # Load data from S3 using DuckDB
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']

        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"

        print(f"s3_input_path: {s3_input_path}")

        query = f"""
            SELECT * FROM read_parquet('{s3_input_path}') WHERE VendorID = 1
        """
        # Execute SQL and retrieve results as a PyArrow Table
        result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()

        print(f"Number of rows retrieved: {result_arrow_table.num_rows}")
        print(f"Data schema: {result_arrow_table.schema}")

        # Configure Glue Catalog (to access Iceberg table)
        catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog")  # Adjust to your environment.

        # Load the table
        namespace = "icebergdb"  # Adjust to your environment.
        table_name = "yellow_tripdata"  # Adjust to your environment.
        iceberg_table = catalog.load_table(f"{namespace}.{table_name}")

        # Append data to the Iceberg table in bulk
        iceberg_table.append(result_arrow_table) 

        print("Data has been appended to S3 in Iceberg format.")

    except Exception as e:
        print(f"An error occurred: {e}")
Enter fullscreen mode Exit fullscreen mode

Note: Version updates and conflict handling are omitted here as well.

Benchmark

We used NYC taxi data for testing, comparing the same file with different memory allocations.

Source: NYC Taxi Trip Record Data

Test files:

  • January 2024 Yellow Taxi Trip Records (2,964,624 records, 48 MB)
  • Full 2024 dataset (41,169,720 records, 807 MB)

Memory settings: 1024 MB, 2048 MB, and 3008 MB (maximum without quota increase).

January File (48 MB)

Memory (MB) chDB Time (ms) chDB Memory (MB) DuckDB Time (ms) DuckDB Memory (MB)
1024 5,092.17 1018 5,163.48 512
2048 3,872.62 1132 4,264.52 538
3008 3,369.78 1115 4,061.33 524

Full Year File (807 MB)

Memory (MB) chDB Time (ms) chDB Memory (MB) DuckDB Time (ms) DuckDB Memory (MB)
1024 OutOfMemory - OutOfMemory -
2048 OutOfMemory - OutOfMemory -
3008 27,170.76 3001 187,331.51 2732

Lambda ran out of memory due to insufficient buffer for Parquet → Arrow → Iceberg conversion.

Observations

  • chDB: Fast but high peak memory consumption; sufficient Lambda memory is required.
  • DuckDB: Memory-efficient, but processing time increases with large datasets.
  • Lightweight ETL: “One file per Lambda execution” is the optimal design.
  • Key point: On Lambda, consider data size × memory allocation × engine characteristics for efficient ETL.

Conclusion

We implemented ETL for Iceberg tables using AWS Lambda and chDB.
Like DuckDB, chDB is a strong candidate for lightweight ETL.

  • Memory usage is high, but execution is fast, making it suitable when speed is critical.
  • For complex queries, performance may vary, so testing is recommended.

Both engines have different strengths, but using Lambda enables real-time ETL triggered by S3 events, allowing a lightweight and simple data processing flow.
Integrating with Iceberg also enables scalable Lakehouse architectures.

Although both chDB and DuckDB are still evolving, understanding their characteristics and using them according to the use case allows for building simple yet extensible data pipelines.

I hope this article helps those exploring lightweight ETL or real-time processing for Iceberg tables.

Top comments (0)