DEV Community

Cover image for Part 7: Gold Layer – Metrics, Watermarks, and Aggregations
Nithyalakshmi Kamalakkannan
Nithyalakshmi Kamalakkannan

Posted on

Part 7: Gold Layer – Metrics, Watermarks, and Aggregations

Gold tables answer business questions directly.

Examples:

  • Trips per hour by region
  • Revenue per ZIP
  • Average distance by time window

Gold tables are:

  • Aggregated
  • Optimized
  • Dashboard-ready

  • Introducing Event Time & Watermarking

Again, for the gold layer to handle late data we would add watermark. Here, with windowing as well to properly close the aggregations on events grouped based on time.
Here, telling spark to wait for 30 minutes from the latest event received on the open window of 1 hour time gap. Later on, to close the window and add the aggregated results as finalized when the watermark threshold (max event tpep_pickup_datetime received - 30 minsutes) becomes greater than the window close time.

from pyspark.sql.functions import *
silver_df = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_df = (
silver_df
.withWatermark("tpep_pickup_datetime", "30 minutes")
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"region"
)
.agg(
count("*").alias("trip_count"),
sum("fare_amount").alias("total_fare"),
avg("trip_distance").alias("avg_distance")
)
)

Now, to stream it to gold delta tables.

(
gold_df.writeStream.option('mergeSchema', 'true')
.trigger(availableNow=True)
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_metrics") \
.outputMode("append") \
.toTable("nyc_taxi.gold.taxi_metrics")
)

As mentioned, the gold answers business directly and hence there can be mulitple views required. We would create one more view highlighting the taxi_trip_metrics.

from pyspark.sql.functions import *
silver_stream = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_stream = (
silver_stream
.withWatermark("tpep_pickup_datetime", "30 minutes")
.withColumn("trip_date", to_date("tpep_pickup_datetime"))
.withColumn("trip_hour", hour("tpep_pickup_datetime"))
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"trip_date",
"trip_hour",
"pickup_zip",
"region"
)
.agg(
count("*").alias("total_trips"),
sum("fare_amount").alias("total_revenue"),
avg("fare_amount").alias("avg_fare"),
avg("trip_distance").alias("avg_distance")
)
)
gold_stream.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_trip_metrics") \
.outputMode("append") \
.table("nyc_taxi.gold.taxi_trip_metrics")

The data is now aggregated and available in gold delta tables to be used for inferring business insights!

Happy learning!

Top comments (0)