DEV Community

Aki for AWS Community Builders

Posted on

Lightweight ETL on AWS Lambda Using DuckDB and Snowflake Connector

Original Japanese article: AWS Lambda × DuckDB × Snowflake ConnectorによるETLの実装

Introduction

I'm Aki, an AWS Community Builder (@jitepengin).

In my previous article, I introduced how to connect to Snowflake from AWS Lambda using Key Pair authentication.

Securely Implementing Snowflake AWS Lambda Integration with Key Pair Authentication + Secrets Manager

This time, I would like to try the event-driven data ingestion approach that I introduced in the previous article.

In this article, I will implement an event-driven ETL pipeline that uses DuckDB on AWS Lambda to perform lightweight transformations on Parquet files stored in Amazon S3 and then load the processed data into Snowflake.

In addition, during the implementation process, I encountered an interesting limitation where write_pandas fails when writing to a Catalog-Linked Database. I will also summarize the root cause and the workaround.


Why Snowpipe Is Not Enough

Snowpipe is a very convenient feature for automatic data ingestion.

However, it has limitations when it comes to data transformation and complex filtering.

In other words, when you need preprocessing, filtering, or the integration of multiple events, you need to choose another approach.

In such cases, AWS Lambda becomes a strong option due to its high flexibility.


Architecture

  1. A Parquet file is uploaded to S3
  2. Lambda is triggered by the S3 event
  3. DuckDB reads the data and performs the required transformations
  4. snowflake.connector writes the data into Snowflake

The two key libraries used in this implementation are shown below.


DuckDB

DuckDB is an embedded database engine designed for OLAP (Online Analytical Processing).

Because DuckDB is extremely lightweight and supports in-memory processing, it can run efficiently even in a simple execution environment such as AWS Lambda.

It is said to provide particularly strong performance for batch workloads such as data analytics and ETL processing.

In addition, it enables SQL-based filtering and lightweight data transformations, allowing for intuitive implementations.

https://duckdb.org/


Snowflake Connector

Snowflake Connector for Python is a library that provides an interface for connecting to Snowflake and executing all standard operations.

By using this library, it becomes possible to operate Snowflake from runtime environments such as Lambda.

https://docs.snowflake.com/en/developer-guide/python-connector/python-connector


Sample Code

In the sample code below, WHERE VendorID = 1 is added as an ETL filter.

By performing filtering and data transformation inside Lambda, highly flexible preprocessing becomes possible.

import duckdb
import boto3
import json
import snowflake.connector
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from snowflake.connector.pandas_tools import write_pandas
import time
import random

SECRET_ID = "snowflake-keypair"


def get_secret():
    client = boto3.client("secretsmanager")
    response = client.get_secret_value(SecretId=SECRET_ID)
    return json.loads(response["SecretString"])

def lambda_handler(event, context):
    conn = None
    duckdb_connection = None

    try:
        duckdb_connection = duckdb.connect(database=":memory:")
        duckdb_connection.execute("SET home_directory='/tmp'")
        duckdb_connection.execute("INSTALL httpfs")
        duckdb_connection.execute("LOAD httpfs")

        s3_bucket = event["Records"][0]["s3"]["bucket"]["name"]
        s3_object_key = event["Records"][0]["s3"]["object"]["key"]

        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
        print(f"S3 input path: {s3_input_path}")


        query = f"""
            SELECT *
            FROM read_parquet('{s3_input_path}')
            WHERE VendorID = 1
        """

        result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()

        print(f"DuckDB filtered rows: {result_arrow_table.num_rows}")

        secret = get_secret()

        private_key_obj = serialization.load_pem_private_key(
            secret["privateKey"].encode("utf-8"),
            password=None,
            backend=default_backend()
        )

        conn = snowflake.connector.connect(
            user=secret["user"],
            account=secret["account"],
            private_key=private_key_obj,
            role=secret.get("role"),
            warehouse=secret.get("warehouse"),
            database=secret.get("database"),
            schema=secret.get("schema")
        )

        cur = conn.cursor()
        cur.execute(f"USE DATABASE {secret['database']}")
        cur.execute(f"USE SCHEMA {secret['schema']}")
        cur.close()

        import pandas as pd

        df = result_arrow_table.to_pandas()

        df["tpep_pickup_datetime"] = pd.to_datetime(
            df["tpep_pickup_datetime"],
            unit="us"
        ).dt.strftime("%Y-%m-%d %H:%M:%S")

        df["tpep_dropoff_datetime"] = pd.to_datetime(
            df["tpep_dropoff_datetime"],
            unit="us"
        ).dt.strftime("%Y-%m-%d %H:%M:%S")

        success, nchunks, nrows, _ = write_pandas(
            conn,
            df,
            table_name="YELLOW_TRIPDATA"
        )

        print(
            f"Snowflake write success={success}, "
            f"rows={nrows}, chunks={nchunks}"
        )

        return {
            "statusCode": 200,
            "body": (
                f"Processed {result_arrow_table.num_rows} rows "
                f"and wrote {nrows} rows to Snowflake."
            )
        }

    except Exception as e:
        print(f"An error occurred: {e}")
        import traceback
        traceback.print_exc()

        return {
            "statusCode": 500,
            "body": str(e)
        }

    finally:
        if conn:
            conn.close()

        if duckdb_connection:
            duckdb_connection.close()
Enter fullscreen mode Exit fullscreen mode

Execution Result

As shown above, the data was successfully written.


Switching the Destination to a Catalog-Linked Database

As introduced in a previous article, what happens if we try writing to a table configured with a Catalog-Linked Database (Iceberg)?

Let’s test it.

AWS Snowflake Lakehouse: 2 Practical Apache Iceberg Integration Patterns


A Write Error Occurs

When attempting to write to the Catalog-Linked Database, the following error occurred:

{
  "statusCode": 500,
  "body": "093678 (0A000): SQL Compilation Error: This operation is not supported in a catalog-linked database."
}
Enter fullscreen mode Exit fullscreen mode

Why Writing to a Catalog-Linked Database Fails

The reason this happens is due to the interaction between write_pandas in the Snowflake Connector and the constraints of a Catalog-Linked Database.

Internally, write_pandas creates a temporary stage.

https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#id13

Writes a pandas DataFrame to a table in a Snowflake database.

To write the data to the table, the function saves the data to Parquet files, uses the PUT command to upload these files to a temporary stage, and uses the COPY INTO

command to copy the data from the files to the table. You can use some of the function parameters to control how the PUT and COPY INTO
statements are executed.

However, stage creation is not supported in a Catalog-Linked Database.

https://docs.snowflake.com/en/user-guide/tables-iceberg-catalog-linked-database#considerations-for-using-a-catalog-linked-database-for-iceberg-tables

"You can create schemas, externally managed Iceberg tables, or database roles in a catalog-linked database. Creating other Snowflake objects isn't currently supported."

This conflict causes write_pandas to fail with the error:

This operation is not supported in a catalog-linked database.

More specifically, the temporary stage created internally falls under the category of “other Snowflake objects,” so the error occurs at the point where CREATE TEMPORARY STAGE is executed.

That said, there is a workaround.


How to Write to a Catalog-Linked Database

A relatively simple approach is to use an INSERT statement directly.

Here is an example implementation:

        for i in range(0, len(df), 1000): 
            chunk = df.iloc[i:i+1000]
            columns = ", ".join(chunk.columns)
            placeholders = ", ".join(["%s"] * len(chunk.columns))
            sql = f"INSERT INTO {secret['schema']}.YELLOW_TRIPDATA ({columns}) VALUES ({placeholders})"
            cur.executemany(sql, chunk.values.tolist())

Alternatively, another good approach is to create a stage in a different database and execute the INSERT through that route.


Conclusion

In this article, I implemented a lightweight event-driven ETL pipeline triggered by S3 events using AWS Lambda, DuckDB, and the Snowflake Connector.

By using DuckDB inside Lambda, I was able to perform SQL-based filtering and lightweight transformations directly on Parquet files stored in S3, and successfully load the processed results into Snowflake.

In addition, I confirmed an important limitation: when using write_pandas against a Catalog-Linked Database (Iceberg), the write fails because the connector internally creates a temporary stage.

Although there are some constraints, combining DuckDB and the Snowflake Connector enables the construction of a low-cost and flexible data processing pipeline for Snowflake.

The key point is to clearly understand how Snowflake manages Iceberg tables.

It is important to determine whether the table is a Snowflake-managed Iceberg table or connected through mechanisms such as a Catalog-Linked Database, and to properly understand that structure.

In any case, the combination of Snowflake and Iceberg is an extremely powerful option for building a Lakehouse architecture.

I hope this article will be helpful for those considering lightweight data processing and real-time ETL pipelines with AWS and Snowflake when working with Iceberg tables.





Top comments (0)