In Part 1, we built a Lambda-based crawler that stores smartphone pricing data in Parquet format on S3. Now, we’ll take the next step: load and transform this data into Snowflake, so your Data Science team can use it to create a pricing index.
📚 Table of Contents
- Pipeline Goal
- Glue Architecture
- ETL Job Example (PySpark)
- Partitioning Strategy
- Snowflake Table Structure
- Next: Send Data to the Data Science Team
📌 Pipeline Goal
We want to:
- Load raw Parquet data from S3 (output from our Lambda crawler)
- Add useful metadata (like ingestion date)
- Normalize fields where needed
- Store this transformed data in Snowflake for modeling
🔧 Glue Architecture
-
Input: S3 folder with Parquet files (e.g.
s3://your-bucket/price-index/smartphones/
) - Transformation: PySpark via Glue Job
-
Output: Snowflake table
PRICING.ECOM_PRICE_INDEX
Glue jobs can be triggered manually, on a schedule, or by S3 events.
💻 ETL Job Example (PySpark)
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.functions import col, current_date
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
# Load data
df = spark.read.parquet("s3://your-bucket/price-index/smartphones/")
# Add partitioning info
df = df.withColumn("ingestion_date", current_date())
# Clean column names (optional)
df = df.select(
col("name").alias("product_name"),
col("price").alias("product_price"),
col("timestamp").alias("scraped_at"),
col("ingestion_date")
)
# Convert to DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "df")
# Write to Snowflake
glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="snowflake",
connection_options={
"dbtable": "ECOM_PRICE_INDEX",
"database": "ANALYTICS",
"warehouse": "COMPUTE_WH",
"schema": "PRICING",
"sfURL": "your_account.snowflakecomputing.com",
"sfUser": "your_user",
"sfPassword": "your_password",
"sfRole": "SYSADMIN"
}
)
🧱 Partitioning Strategy
To optimize downstream analysis, partition your table or storage by:
ingestion_date
- optionally
brand
orretailer
if available
This makes it easier to retrieve the latest data for modeling or BI.
🧊 Snowflake Table Structure
CREATE TABLE PRICING.ECOM_PRICE_INDEX (
product_name STRING,
product_price FLOAT,
scraped_at TIMESTAMP,
ingestion_date DATE
);
🎯 Next: Send Data to the Data Science Team
Once the data is in Snowflake:
- Your DS team can query the latest snapshot or do delta comparisons
- You can expose a secure view for them:
CREATE OR REPLACE VIEW PRICING.VW_LATEST_PRICES AS
SELECT *
FROM PRICING.ECOM_PRICE_INDEX
QUALIFY ROW_NUMBER() OVER (PARTITION BY product_name ORDER BY scraped_at DESC) = 1;
This gives them the latest known price per product, ready for time-series modeling, anomaly detection, and building the pricing index.
Top comments (0)