When working with large volumes of financial data, querying efficiently and loading the results into a data warehouse like Snowflake is crucial. This article walks through how an analyst can handle millions of records stored as Parquet files in AWS S3 and export processed data to Snowflake.
The Problem
The task is to generate daily metrics (like total transaction volume, active customers, and average balances) from 3 TB of Parquet data. The data is partitioned by transaction_date in S3, but older partitions have inconsistent column names. The results must then be loaded into Snowflake for further analysis.
The Approach
Efficiently Query the Data
Instead of scanning the entire dataset, you only read the last 30 days of data by using partition pruning. This saves both time and cost.
Handle Schema Evolution
As the schema has changed over time (e.g., different column names for balance), you use SQL functions like COALESCE to handle missing or differently named columns, ensuring consistency.
Aggregate Metrics
You aggregate data by region to calculate total transaction volume, count active customers, and find the average account balance.
Load Data into Snowflake
After processing the data, you use Snowflake’s COPY INTO method for efficient, large-scale ingestion, moving the results from a CSV file into your warehouse.
Why This Works
Partition pruning ensures that only the relevant data is queried, making it fast and cost-efficient.
Schema handling with functions like COALESCE allows for seamless integration across different data partitions.
Snowflake’s optimized loading mechanisms allow for fast and reliable data transfer.
This approach makes working with large, partitioned datasets in cloud storage manageable, while ensuring efficient data processing and loading into Snowflake.
The Solution in PySQL
Read the last 30 days of Parquet using partition pruning:
import duckdb
import datetime
end = datetime.date.today()
start = end - datetime.timedelta(days=30)
con = duckdb.connect()
df = con.execute(f"""
SELECT *
FROM read_parquet('s3://bank-lake/transactions/transaction_date>= {start} AND transaction_date <= {end}/*.parquet')
""").fetchdf()
Aggregate metrics, handling schema differences:
result = con.execute("""
SELECT
region,
SUM(transaction_amount) AS total_tx,
COUNT(DISTINCT customer_id) AS active_customers,
AVG(COALESCE(account_balance, acct_balance)) AS avg_balance
FROM df
GROUP BY region
""").fetchdf()
Load the results into Snowflake:
import snowflake.connector
result.to_csv("daily.csv", index=False)
conn = snowflake.connector.connect(
user='YOUR_USER',
password='YOUR_PASSWORD',
account='YOUR_ACCOUNT'
)
conn.cursor().execute("""
PUT file://daily.csv @%DAILY_REGION_METRICS;
COPY INTO DAILY_REGION_METRICS
FROM @%DAILY_REGION_METRICS
FILE_FORMAT=(TYPE=CSV FIELD_OPTIONALLY_ENCLOSED_BY='"');
""")
Top comments (0)