DEV Community

Sankalp
Sankalp

Posted on

Spark Optimization

Scenario -1: joining tables, when one table is big and another is small

Normal (Shuffle) join: All data move across the network and performance will be degrade.

broadcast join

To avoid shuffle (slow network) developer can use broadcast join when on table table is small enough to fit in memory. Instead of shuffle the big table, broadcast join copy the small table to all executors/ clusters and big table reside as it it.
Finally join happen locally.

  • spark can auto broadcast if table size is below.


spark.sql.autoBroadcastJoinThreshold = 10 MB(Default)

  • Forcefully broadcast


from pyspark.sql.functions import broadcast
df = big_df.join(broadcast(small_df), "id)

Scenario -2: joining tables, when both tables are big.

When both tables are big, dev cannot use broadcast join, so Spark will do a shuffle join.
Spark must shuffle Table A by join key and shuffle Table B by join key and bring same keys to same executor.
This is slow + heavy network + also can cause of data skew.

1. repartition

Partition Both Tables on Join Key

If both tables are already partitioned by the same join column, then Spark shuffles once and keeps matching keys together

Helps balanced distribution.
Best when data is reused many times: write tables partitioned by join key.


df1 = df1.repartition("customer_id")
df2 = df2.repartition("customer_id")

2. Salting

Fix Data Skew

Some keys are very frequent (like country = "India")
One partition becomes huge → one task slow → job slow.
Add random value to skewed key:


df1 = df1.withColumn("salt", rand()*10)
df2 = df2.withColumn("salt", rand()*10)

Join using:


(key, salt)

Big key split into many small pieces
Work distributed across executors

3. Filter Early (Reduce Data Before Join)

4. Use Bucketing (For Repeated Joins)

5. Increase Parallelism (Avoid Big Partitions)


spark.conf.set("spark.sql.shuffle.partitions", 400)

6. Use AQE (Adaptive Query Execution)

Spark can change plan during runtime:
It can:
change join strategy
split skewed partitions
optimize shuffle

spark.conf.set("spark.sql.adaptive.enabled", "true")

Top comments (0)