In Part 1, we built a Lambda-based crawler that stores smartphone pricing data in Parquet format on S3. Now, we’ll take the next step: load and transform this data into Snowflake, so your Data Science team can use it to create a pricing index.
📚 Table of Contents
- Pipeline Goal
- Glue Architecture
- ETL Job Example (PySpark)
- Partitioning Strategy
- Snowflake Table Structure
- Next: Send Data to the Data Science Team
📌 Pipeline Goal
We want to:
- Load raw Parquet data from S3 (output from our Lambda crawler)
- Add useful metadata (like ingestion date)
- Normalize fields where needed
- Store this transformed data in Snowflake for modeling
🔧 Glue Architecture
- 
Input: S3 folder with Parquet files (e.g. s3://your-bucket/price-index/smartphones/)
- Transformation: PySpark via Glue Job
- 
Output: Snowflake table PRICING.ECOM_PRICE_INDEX
Glue jobs can be triggered manually, on a schedule, or by S3 events.
💻 ETL Job Example (PySpark)
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.functions import col, current_date
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
# Load data
df = spark.read.parquet("s3://your-bucket/price-index/smartphones/")
# Add partitioning info
df = df.withColumn("ingestion_date", current_date())
# Clean column names (optional)
df = df.select(
    col("name").alias("product_name"),
    col("price").alias("product_price"),
    col("timestamp").alias("scraped_at"),
    col("ingestion_date")
)
# Convert to DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "df")
# Write to Snowflake
glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type="snowflake",
    connection_options={
        "dbtable": "ECOM_PRICE_INDEX",
        "database": "ANALYTICS",
        "warehouse": "COMPUTE_WH",
        "schema": "PRICING",
        "sfURL": "your_account.snowflakecomputing.com",
        "sfUser": "your_user",
        "sfPassword": "your_password",
        "sfRole": "SYSADMIN"
    }
)
🧱 Partitioning Strategy
To optimize downstream analysis, partition your table or storage by:
- ingestion_date
- optionally brandorretailerif available
This makes it easier to retrieve the latest data for modeling or BI.
🧊 Snowflake Table Structure
CREATE TABLE PRICING.ECOM_PRICE_INDEX (
    product_name STRING,
    product_price FLOAT,
    scraped_at TIMESTAMP,
    ingestion_date DATE
);
🎯 Next: Send Data to the Data Science Team
Once the data is in Snowflake:
- Your DS team can query the latest snapshot or do delta comparisons
- You can expose a secure view for them:
CREATE OR REPLACE VIEW PRICING.VW_LATEST_PRICES AS
SELECT *
FROM PRICING.ECOM_PRICE_INDEX
QUALIFY ROW_NUMBER() OVER (PARTITION BY product_name ORDER BY scraped_at DESC) = 1;
This gives them the latest known price per product, ready for time-series modeling, anomaly detection, and building the pricing index.
 

 
    
Top comments (0)