DEV Community

Cover image for Part 6: Silver Layer – Cleansing, Enrichment, and Dimensions
Nithyalakshmi Kamalakkannan
Nithyalakshmi Kamalakkannan

Posted on

Part 6: Silver Layer – Cleansing, Enrichment, and Dimensions

The Silver layer converts raw events into analytics-ready records by:

  • Cleaning bad data
  • Enforcing schema
  • Adding business context
  • Applying dimensional modeling

This is where value is created.

Data Cleansing and Type Enforcement

  • Bronze must remain untouched
  • Silver enforces correctness
  • Errors are isolated from ingestion

silver_stream = (
spark.readStream
.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.withColumn(
"tpep_pickup_datetime",
to_timestamp("tpep_pickup_datetime")
)
.withColumn(
"fare_amount",
col("fare_amount").cast("double")
)
.filter(col("fare_amount") > 0)
)

Using Broadcast joins

For ensuring we capture the required dimentional modelling, we need to make joins. But with distributed computing across executors, shuffling the data among them is costlier. In our case, the use case is to join with zip_dim, a relatively smaller table. Hence as a performance improvement, we are using the Broadcast join here. This can be seen from the screenshots attached below.

Without Broadcast join

With Broadcast join

Adding watermarks

We are looking for real time data to be processed now and then, and hence we would need to say when it's ready to be processed, apply joins and add to sink for the next steps. Of course, Either as whole result or only changeset!
Thus, we have added watermark asking spark to wait and accommodate for 30 minutes late data.

The final code for the silver layer is below.

from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
bronze_stream = spark.readStream.table("nyc_taxi.bronze.taxi_trips")
zip_dim = spark.read.table("nyc_taxi.raw.zip_dim")
silver_df = (
bronze_stream
.withColumn(
"pickup_zip",
regexp_replace("pickup_zip", "\.0$", "").cast("int")
)
.withColumn(
"tpep_pickup_datetime",
to_timestamp("tpep_pickup_datetime")
)
.withColumn(
"tpep_dropoff_datetime",
to_timestamp("tpep_dropoff_datetime")
)
.withWatermark("tpep_pickup_datetime", "30 minutes")

.join(
broadcast(zip_dim),
bronze_stream.pickup_zip == zip_dim.zip_code,
"left"
)
.select(
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
"trip_distance",
"fare_amount",
"pickup_zip",
"region",
"state"
)
)

Now, we will stream it to silver delta table with output mode as append - To get the finalized or closed window's results added to the Silver delta lake sink.

(
silver_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/silver")
.trigger(availableNow=True)
.toTable("nyc_taxi.silver.taxi_trips_enriched")
)

The required cleansing and normalization has happened, and the data is now ready to get further matured for showcasing the business insights.

Happy learning!

Top comments (0)