Original Japanese article: AWS Lambda × DuckDB × Snowflake ConnectorによるETLの実装
Introduction
I'm Aki, an AWS Community Builder (@jitepengin).
In my previous article, I introduced how to connect to Snowflake from AWS Lambda using Key Pair authentication.
This time, I would like to try the event-driven data ingestion approach that I introduced in the previous article.
In this article, I will implement an event-driven ETL pipeline that uses DuckDB on AWS Lambda to perform lightweight transformations on Parquet files stored in Amazon S3 and then load the processed data into Snowflake.
In addition, during the implementation process, I encountered an interesting limitation where write_pandas fails when writing to a Catalog-Linked Database. I will also summarize the root cause and the workaround.
Why Snowpipe Is Not Enough
Snowpipe is a very convenient feature for automatic data ingestion.
However, it has limitations when it comes to data transformation and complex filtering.
In other words, when you need preprocessing, filtering, or the integration of multiple events, you need to choose another approach.
In such cases, AWS Lambda becomes a strong option due to its high flexibility.
Architecture
- A Parquet file is uploaded to S3
- Lambda is triggered by the S3 event
- DuckDB reads the data and performs the required transformations
-
snowflake.connectorwrites the data into Snowflake
The two key libraries used in this implementation are shown below.
DuckDB
DuckDB is an embedded database engine designed for OLAP (Online Analytical Processing).
Because DuckDB is extremely lightweight and supports in-memory processing, it can run efficiently even in a simple execution environment such as AWS Lambda.
It is said to provide particularly strong performance for batch workloads such as data analytics and ETL processing.
In addition, it enables SQL-based filtering and lightweight data transformations, allowing for intuitive implementations.
Snowflake Connector
Snowflake Connector for Python is a library that provides an interface for connecting to Snowflake and executing all standard operations.
By using this library, it becomes possible to operate Snowflake from runtime environments such as Lambda.
https://docs.snowflake.com/en/developer-guide/python-connector/python-connector
Sample Code
In the sample code below, WHERE VendorID = 1 is added as an ETL filter.
By performing filtering and data transformation inside Lambda, highly flexible preprocessing becomes possible.
import duckdb
import boto3
import json
import snowflake.connector
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from snowflake.connector.pandas_tools import write_pandas
import time
import random
SECRET_ID = "snowflake-keypair"
def get_secret():
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId=SECRET_ID)
return json.loads(response["SecretString"])
def lambda_handler(event, context):
conn = None
duckdb_connection = None
try:
duckdb_connection = duckdb.connect(database=":memory:")
duckdb_connection.execute("SET home_directory='/tmp'")
duckdb_connection.execute("INSTALL httpfs")
duckdb_connection.execute("LOAD httpfs")
s3_bucket = event["Records"][0]["s3"]["bucket"]["name"]
s3_object_key = event["Records"][0]["s3"]["object"]["key"]
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
print(f"S3 input path: {s3_input_path}")
query = f"""
SELECT *
FROM read_parquet('{s3_input_path}')
WHERE VendorID = 1
"""
result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
print(f"DuckDB filtered rows: {result_arrow_table.num_rows}")
secret = get_secret()
private_key_obj = serialization.load_pem_private_key(
secret["privateKey"].encode("utf-8"),
password=None,
backend=default_backend()
)
conn = snowflake.connector.connect(
user=secret["user"],
account=secret["account"],
private_key=private_key_obj,
role=secret.get("role"),
warehouse=secret.get("warehouse"),
database=secret.get("database"),
schema=secret.get("schema")
)
cur = conn.cursor()
cur.execute(f"USE DATABASE {secret['database']}")
cur.execute(f"USE SCHEMA {secret['schema']}")
cur.close()
import pandas as pd
df = result_arrow_table.to_pandas()
df["tpep_pickup_datetime"] = pd.to_datetime(
df["tpep_pickup_datetime"],
unit="us"
).dt.strftime("%Y-%m-%d %H:%M:%S")
df["tpep_dropoff_datetime"] = pd.to_datetime(
df["tpep_dropoff_datetime"],
unit="us"
).dt.strftime("%Y-%m-%d %H:%M:%S")
success, nchunks, nrows, _ = write_pandas(
conn,
df,
table_name="YELLOW_TRIPDATA"
)
print(
f"Snowflake write success={success}, "
f"rows={nrows}, chunks={nchunks}"
)
return {
"statusCode": 200,
"body": (
f"Processed {result_arrow_table.num_rows} rows "
f"and wrote {nrows} rows to Snowflake."
)
}
except Exception as e:
print(f"An error occurred: {e}")
import traceback
traceback.print_exc()
return {
"statusCode": 500,
"body": str(e)
}
finally:
if conn:
conn.close()
if duckdb_connection:
duckdb_connection.close()
Execution Result
As shown above, the data was successfully written.
Switching the Destination to a Catalog-Linked Database
As introduced in a previous article, what happens if we try writing to a table configured with a Catalog-Linked Database (Iceberg)?
Let’s test it.
AWS Snowflake Lakehouse: 2 Practical Apache Iceberg Integration Patterns
A Write Error Occurs
When attempting to write to the Catalog-Linked Database, the following error occurred:
{
"statusCode": 500,
"body": "093678 (0A000): SQL Compilation Error: This operation is not supported in a catalog-linked database."
}
Why Writing to a Catalog-Linked Database Fails
The reason this happens is due to the interaction between write_pandas in the Snowflake Connector and the constraints of a Catalog-Linked Database.
Internally, write_pandas creates a temporary stage.
https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#id13
Writes a pandas DataFrame to a table in a Snowflake database.
To write the data to the table, the function saves the data to Parquet files, uses the PUT command to upload these files to a temporary stage, and uses the COPY INTO




Top comments (0)