Welcome to Day 8 of the Spark Mastery Series.
Today we deep-dive into the topic that makes or breaks ETL pipelines:
Joins and performance optimization.
'If you master todayβs concepts, you can improve Spark jobs from 2 hours β 10 minutes.'
Letβs begin.
π 1. Why Are Joins Slow in Spark?
When Spark performs a join, it often must shuffle data across executors so rows with the same join key end up in the same partition.
Shuffle includes:
- Network transfer
- Disk writes
- Disk reads
- Sorting
- Stage creation
Shuffles takes 80% of Sparkβs execution time in badly optimized pipelines.
π 2. Broadcast Joins β The Fastest Join Strategy
If one dataset is small (< 50MB), broadcast join is the fastest possible way.
df_large.join(broadcast(df_small), "id")
Why itβs fast?
- Spark copies the small table to all executors
- Each executor performs join locally
- No shuffle required
This can turn a shuffle join into a map-side join β extremely fast.
π 3. Repartitioning Before Join
If two DataFrames have different partitioning strategies, Spark shuffles both.
Solution:
df1 = df1.repartition("id")
df2 = df2.repartition("id")
joined = df1.join(df2, "id")
Why this helps:
- Ensures partition-level alignment
- Reduces shuffle volume
π 4. Handling Skew β The Most Important Real-World Skill
Data skew happens when a handful of join keys contain most of the data.
Example:
"India" β 5 million records
"USA" β 200,000
"UK" β 10,000
This causes:
- long-running tasks
- straggler tasks
- memory overflow
- executor timeout
β Solution 1: Salting Keys
df1 = df1.withColumn("salt", floor(rand() * 10))
df2 = df2.withColumn("salt", lit(0))
Now join on ["id", "salt"].
β Solution 2: Broadcast the smaller table
β Solution 3: Skew hint
df1.hint("skew").join(df2, "id")
π 5. Join Strategy Selection (What Spark Uses Internally)
SortMergeJoin
- Default strategy
- Good for large datasets
- Requires shuffle β expensive
HashJoin
- Faster
- Requires memory
- Used automatically if possible
π 6. Real-World Example: Retail Sales ETL
You have:
sales table β 200M records
product table β 50k records
The correct join:
df = sales.join(broadcast(products), "product_id")
This alone reduces runtime by 10 to 20x.
π Summary
- Today you learned how to:
- Identify shuffle-heavy joins
- Remove unnecessary shuffles
- Use broadcast joins
- Fix skew
- Repartition strategically
- Apply join hints
Follow for more such content. Let me know if I missed anything in comments. Thank you!!
Top comments (0)