Gold tables answer business questions directly.
Examples:
- Trips per hour by region
- Revenue per ZIP
- Average distance by time window
Gold tables are:
- Aggregated
- Optimized
Dashboard-ready
Introducing Event Time & Watermarking
Again, for the gold layer to handle late data we would add watermark. Here, with windowing as well to properly close the aggregations on events grouped based on time.
Here, telling spark to wait for 30 minutes from the latest event received on the open window of 1 hour time gap. Later on, to close the window and add the aggregated results as finalized when the watermark threshold (max event tpep_pickup_datetime received - 30 minsutes) becomes greater than the window close time.
from pyspark.sql.functions import *
silver_df = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_df = (
silver_df
.withWatermark("tpep_pickup_datetime", "30 minutes")
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"region"
)
.agg(
count("*").alias("trip_count"),
sum("fare_amount").alias("total_fare"),
avg("trip_distance").alias("avg_distance")
)
)
Now, to stream it to gold delta tables.
(
gold_df.writeStream.option('mergeSchema', 'true')
.trigger(availableNow=True)
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_metrics") \
.outputMode("append") \
.toTable("nyc_taxi.gold.taxi_metrics")
)
As mentioned, the gold answers business directly and hence there can be mulitple views required. We would create one more view highlighting the taxi_trip_metrics.
from pyspark.sql.functions import *
silver_stream = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_stream = (
silver_stream
.withWatermark("tpep_pickup_datetime", "30 minutes")
.withColumn("trip_date", to_date("tpep_pickup_datetime"))
.withColumn("trip_hour", hour("tpep_pickup_datetime"))
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"trip_date",
"trip_hour",
"pickup_zip",
"region"
)
.agg(
count("*").alias("total_trips"),
sum("fare_amount").alias("total_revenue"),
avg("fare_amount").alias("avg_fare"),
avg("trip_distance").alias("avg_distance")
)
)
gold_stream.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_trip_metrics") \
.outputMode("append") \
.table("nyc_taxi.gold.taxi_trip_metrics")
The data is now aggregated and available in gold delta tables to be used for inferring business insights!
Happy learning!
Top comments (0)