DEV Community

vinicius fagundes
vinicius fagundes

Posted on

❄️ Part 2: ETL to Snowflake with AWS Glue — Enabling Price Index Modeling

In Part 1, we built a Lambda-based crawler that stores smartphone pricing data in Parquet format on S3. Now, we’ll take the next step: load and transform this data into Snowflake, so your Data Science team can use it to create a pricing index.


📚 Table of Contents

  1. Pipeline Goal
  2. Glue Architecture
  3. ETL Job Example (PySpark)
  4. Partitioning Strategy
  5. Snowflake Table Structure
  6. Next: Send Data to the Data Science Team

📌 Pipeline Goal

We want to:

  • Load raw Parquet data from S3 (output from our Lambda crawler)
  • Add useful metadata (like ingestion date)
  • Normalize fields where needed
  • Store this transformed data in Snowflake for modeling

🔧 Glue Architecture

  • Input: S3 folder with Parquet files (e.g. s3://your-bucket/price-index/smartphones/)
  • Transformation: PySpark via Glue Job
  • Output: Snowflake table PRICING.ECOM_PRICE_INDEX

Glue jobs can be triggered manually, on a schedule, or by S3 events.


💻 ETL Job Example (PySpark)

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.functions import col, current_date
from awsglue.dynamicframe import DynamicFrame

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

# Load data
df = spark.read.parquet("s3://your-bucket/price-index/smartphones/")

# Add partitioning info
df = df.withColumn("ingestion_date", current_date())

# Clean column names (optional)
df = df.select(
    col("name").alias("product_name"),
    col("price").alias("product_price"),
    col("timestamp").alias("scraped_at"),
    col("ingestion_date")
)

# Convert to DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "df")

# Write to Snowflake
glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type="snowflake",
    connection_options={
        "dbtable": "ECOM_PRICE_INDEX",
        "database": "ANALYTICS",
        "warehouse": "COMPUTE_WH",
        "schema": "PRICING",
        "sfURL": "your_account.snowflakecomputing.com",
        "sfUser": "your_user",
        "sfPassword": "your_password",
        "sfRole": "SYSADMIN"
    }
)
Enter fullscreen mode Exit fullscreen mode

🧱 Partitioning Strategy

To optimize downstream analysis, partition your table or storage by:

  • ingestion_date
  • optionally brand or retailer if available

This makes it easier to retrieve the latest data for modeling or BI.


🧊 Snowflake Table Structure

CREATE TABLE PRICING.ECOM_PRICE_INDEX (
    product_name STRING,
    product_price FLOAT,
    scraped_at TIMESTAMP,
    ingestion_date DATE
);
Enter fullscreen mode Exit fullscreen mode

🎯 Next: Send Data to the Data Science Team

Once the data is in Snowflake:

  • Your DS team can query the latest snapshot or do delta comparisons
  • You can expose a secure view for them:
CREATE OR REPLACE VIEW PRICING.VW_LATEST_PRICES AS
SELECT *
FROM PRICING.ECOM_PRICE_INDEX
QUALIFY ROW_NUMBER() OVER (PARTITION BY product_name ORDER BY scraped_at DESC) = 1;
Enter fullscreen mode Exit fullscreen mode

This gives them the latest known price per product, ready for time-series modeling, anomaly detection, and building the pricing index.

Top comments (0)