DEV Community

Artem Plotnikov
Artem Plotnikov

Posted on

Spark tip: Disable Coalescing Post Shuffle Partitions for compute intensive tasks

TL;DR

Set spark.sql.adaptive.coalescePartitions.enabled to false when performing compute intensive operations inside User Defined Aggregate Functions (UDAFs) on small inputs.

In detail

Apache Spark has a nice feature called Adaptive Query Execution (AQE), which performs optimizations based on runtime statistics and is enabled by default since 3.2.0. One such optimization is Coalescing Post Shuffle Partitions for dynamic shuffle partition number tuning. The latter improves query performance most of the time, but may have an opposite effect on long running compute intensive tasks. Take a look at the example below.

Test data:



from pyspark.sql.types import StringType, DateType
import pyspark.sql.functions as f

from datetime import date, timedelta

cities = ['Amsterdam', 'Oslo', 'Warsaw', 'Copenhagen', 'Prague', 'Helsinki', 'Paris', 'Berlin', 'Dublin', 'Reykjavik']
cities_df = spark.createDataFrame(cities, StringType()).toDF('city')

dates = [date(2021, 1, 1) + timedelta(days=x) for x in range(0, 365)]
dates_df = spark.createDataFrame(dates, DateType()).toDF('day')

df = cities_df.crossJoin(dates_df).withColumn('orders', f.floor(100 * f.rand(seed=1234)))
df.show()


Enter fullscreen mode Exit fullscreen mode


+---------+----------+------+
|     city|       day|orders|
+---------+----------+------+
|Amsterdam|2021-01-01|    71|
|Amsterdam|2021-01-02|    83|
|Amsterdam|2021-01-03|    20|
|Amsterdam|2021-01-04|    23|
|Amsterdam|2021-01-05|    89|
|Amsterdam|2021-01-06|    42|
|Amsterdam|2021-01-07|    50|
|Amsterdam|2021-01-08|    47|
|Amsterdam|2021-01-09|    57|
|Amsterdam|2021-01-10|    65|
|Amsterdam|2021-01-11|    59|
|Amsterdam|2021-01-12|    45|
|Amsterdam|2021-01-13|    52|
|Amsterdam|2021-01-14|    62|
|Amsterdam|2021-01-15|    14|
|Amsterdam|2021-01-16|    66|
|Amsterdam|2021-01-17|    18|
|Amsterdam|2021-01-18|    51|
|Amsterdam|2021-01-19|    59|
|Amsterdam|2021-01-20|    53|
+---------+----------+------+
only showing top 20 rows


Enter fullscreen mode Exit fullscreen mode


df.count() # shows 3650


Enter fullscreen mode Exit fullscreen mode

Now we fake model training for time series data by doing some stupid compute intensive operations inside UDAF.



from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
from pyspark.sql.functions import pandas_udf, PandasUDFType

import pandas as pd
import random

schema = StructType([
    StructField("city", StringType()),
    StructField("day", DateType()),
    StructField("orders", LongType()),
    StructField("ml_orders", LongType()),
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_intensive(df):
    fake_fitted_orders = []
    for y in df['orders'].to_list():
        yhat = y
        for i in range(0, 10000):
            yhat += random.randint(-1, 1)

        fake_fitted_orders.append(yhat)

    df = df.assign(ml_orders = pd.Series(fake_fitted_orders))
    return df

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")

result_df = df.groupBy("city").apply(compute_intensive)


Enter fullscreen mode Exit fullscreen mode

result_df will have spark.sql.shuffle.partitions partitions:



assert(result_df.rdd.getNumPartitions() == int(spark.conf.get("spark.sql.shuffle.partitions")))


Enter fullscreen mode Exit fullscreen mode

In my case there are 250 partitions by default, but actually there are 10 non empty ones since we have so little data in place:



num_non_empty_partitions_df = result_df \
    .withColumn('partition_id', f.spark_partition_id()) \
    .groupby('partition_id') \
    .count()

num_non_empty_partitions_df.show()


Enter fullscreen mode Exit fullscreen mode


+------------+-----+
|partition_id|count|
+------------+-----+
|         145|  365|
|          25|  365|
|         108|  365|
|          92|  365|
|           8|  365|
|          40|  365|
|          84|  365|
|         235|  365|
|         152|  365|
|          34|  365|
+------------+-----+


Enter fullscreen mode Exit fullscreen mode

The stage, which relates to groupby operation has 250 tasks, but only 10 tasks done actual work.
Tasks performing groupby operation (SQE disabled)

The busy tasks have been placed on the same executor 25, and 8 out of 10 tasks started at the same time. This number corresponds to the amount of CPU cores.

Now let's enable AQE coalescePartitions and see what happens.



spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Do the same computation again


Enter fullscreen mode Exit fullscreen mode

AQEShuffleRead operator will be added to the execution plan, which dynamically coalesces shuffle partitions.

AQEShuffleRead operator

All operations are now performed inside just one task.

Tasks performing groupby operation (SQE enabled)

Let's measure the overall performance degradation.



import time

def with_execution_time(name, fn):
    st = time.time()
    fn()
    et = time.time()
    elapsed_time = et - st
    print("Execution time for '{}' {} seconds".format(name, str(elapsed_time)))

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
with_execution_time('coalescePartitions disabled', lambda: result_df.count())

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
with_execution_time('coalescePartitions enabled', lambda: result_df.count())


Enter fullscreen mode Exit fullscreen mode


Execution time for 'coalescePartitions disabled' 11.219334125518799 seconds
Execution time for 'coalescePartitions enabled' 30.78111720085144 seconds


Enter fullscreen mode Exit fullscreen mode

Now the query runs almost 3 times slower on our synthetic data, but in real world we observed a 10x degradation for one distributed training pipeline soon after Spark 3.2.0 upgrade.

If disabling Coalescing Post Shuffle Partitions is undesirable, Spark allows to fine tune its behavior using spark.sql.adaptive.coalescePartitions.parallelismFirst, park.sql.adaptive.coalescePartitions.minPartitionSize and spark.sql.adaptive.advisoryPartitionSizeInBytes options.

Hopefully, this article gave you not only a useful Spark tip, but also a framework for debugging similar issues in your apps.

Top comments (3)

Collapse
 
vandu15 profile image
Vandana Kumar

Thank you Artem Plotnikov.

When I run the second portion, fake model training for time series data
It gives an Error, like this

result_df = df.groupBy("city").apply(compute_intensive)
Enter fullscreen mode Exit fullscreen mode

NameError: name 'df' is not defined

Code is exactly as you have given above. ? Am I missing something.

Collapse
 
vandu15 profile image
Vandana Kumar

when i was trying to run first query getting error and running for ever when its used df.show() .

After comment df.show() query completes but I cant see the data frame. Can you let me know what is missing?
here the query im running made few changes so it run and completes.
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')

spark = SparkSession(sc)

spark = SparkSession.builder \
.appName("Vandana PS ") \
.enableHiveSupport() \
.getOrCreate()
from pyspark.sql.types import StringType, DateType
import pyspark.sql.functions as f
from datetime import date, timedelta
cities = ['Amsterdam']
cities_df = spark.createDataFrame(cities, StringType()).toDF('city')
dates = [date(2023, 11, 4) + timedelta(days=x) for x in range(0, 2)]
dates_df = spark.createDataFrame(dates, DateType()).toDF('day')
df = cities_df.crossJoin(dates_df).withColumn('orders', f.floor(100 * f.rand(seed=1234)))

display(df)

df.show(2)

print(df.to_string())

Collapse
 
vandu15 profile image
Vandana Kumar

You can ignore my below question as i resolved it from my end. Thank You!