TL;DR
Set spark.sql.adaptive.coalescePartitions.enabled
to false
when performing compute intensive operations inside User Defined Aggregate Functions (UDAFs) on small inputs.
In detail
Apache Spark has a nice feature called Adaptive Query Execution (AQE), which performs optimizations based on runtime statistics and is enabled by default since 3.2.0. One such optimization is Coalescing Post Shuffle Partitions for dynamic shuffle partition number tuning. The latter improves query performance most of the time, but may have an opposite effect on long running compute intensive tasks. Take a look at the example below.
Test data:
from pyspark.sql.types import StringType, DateType
import pyspark.sql.functions as f
from datetime import date, timedelta
cities = ['Amsterdam', 'Oslo', 'Warsaw', 'Copenhagen', 'Prague', 'Helsinki', 'Paris', 'Berlin', 'Dublin', 'Reykjavik']
cities_df = spark.createDataFrame(cities, StringType()).toDF('city')
dates = [date(2021, 1, 1) + timedelta(days=x) for x in range(0, 365)]
dates_df = spark.createDataFrame(dates, DateType()).toDF('day')
df = cities_df.crossJoin(dates_df).withColumn('orders', f.floor(100 * f.rand(seed=1234)))
df.show()
+---------+----------+------+
| city| day|orders|
+---------+----------+------+
|Amsterdam|2021-01-01| 71|
|Amsterdam|2021-01-02| 83|
|Amsterdam|2021-01-03| 20|
|Amsterdam|2021-01-04| 23|
|Amsterdam|2021-01-05| 89|
|Amsterdam|2021-01-06| 42|
|Amsterdam|2021-01-07| 50|
|Amsterdam|2021-01-08| 47|
|Amsterdam|2021-01-09| 57|
|Amsterdam|2021-01-10| 65|
|Amsterdam|2021-01-11| 59|
|Amsterdam|2021-01-12| 45|
|Amsterdam|2021-01-13| 52|
|Amsterdam|2021-01-14| 62|
|Amsterdam|2021-01-15| 14|
|Amsterdam|2021-01-16| 66|
|Amsterdam|2021-01-17| 18|
|Amsterdam|2021-01-18| 51|
|Amsterdam|2021-01-19| 59|
|Amsterdam|2021-01-20| 53|
+---------+----------+------+
only showing top 20 rows
df.count() # shows 3650
Now we fake model training for time series data by doing some stupid compute intensive operations inside UDAF.
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import random
schema = StructType([
StructField("city", StringType()),
StructField("day", DateType()),
StructField("orders", LongType()),
StructField("ml_orders", LongType()),
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_intensive(df):
fake_fitted_orders = []
for y in df['orders'].to_list():
yhat = y
for i in range(0, 10000):
yhat += random.randint(-1, 1)
fake_fitted_orders.append(yhat)
df = df.assign(ml_orders = pd.Series(fake_fitted_orders))
return df
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
result_df = df.groupBy("city").apply(compute_intensive)
result_df
will have spark.sql.shuffle.partitions
partitions:
assert(result_df.rdd.getNumPartitions() == int(spark.conf.get("spark.sql.shuffle.partitions")))
In my case there are 250 partitions by default, but actually there are 10 non empty ones since we have so little data in place:
num_non_empty_partitions_df = result_df \
.withColumn('partition_id', f.spark_partition_id()) \
.groupby('partition_id') \
.count()
num_non_empty_partitions_df.show()
+------------+-----+
|partition_id|count|
+------------+-----+
| 145| 365|
| 25| 365|
| 108| 365|
| 92| 365|
| 8| 365|
| 40| 365|
| 84| 365|
| 235| 365|
| 152| 365|
| 34| 365|
+------------+-----+
The stage, which relates to groupby
operation has 250 tasks, but only 10 tasks done actual work.
The busy tasks have been placed on the same executor 25, and 8 out of 10 tasks started at the same time. This number corresponds to the amount of CPU cores.
Now let's enable AQE coalescePartitions and see what happens.
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Do the same computation again
AQEShuffleRead
operator will be added to the execution plan, which dynamically coalesces shuffle partitions.
All operations are now performed inside just one task.
Let's measure the overall performance degradation.
import time
def with_execution_time(name, fn):
st = time.time()
fn()
et = time.time()
elapsed_time = et - st
print("Execution time for '{}' {} seconds".format(name, str(elapsed_time)))
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
with_execution_time('coalescePartitions disabled', lambda: result_df.count())
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
with_execution_time('coalescePartitions enabled', lambda: result_df.count())
Execution time for 'coalescePartitions disabled' 11.219334125518799 seconds
Execution time for 'coalescePartitions enabled' 30.78111720085144 seconds
Now the query runs almost 3 times slower on our synthetic data, but in real world we observed a 10x degradation for one distributed training pipeline soon after Spark 3.2.0 upgrade.
If disabling Coalescing Post Shuffle Partitions is undesirable, Spark allows to fine tune its behavior using spark.sql.adaptive.coalescePartitions.parallelismFirst
, park.sql.adaptive.coalescePartitions.minPartitionSize
and spark.sql.adaptive.advisoryPartitionSizeInBytes
options.
Hopefully, this article gave you not only a useful Spark tip, but also a framework for debugging similar issues in your apps.
Top comments (3)
Thank you Artem Plotnikov.
When I run the second portion, fake model training for time series data
It gives an Error, like this
NameError: name 'df' is not defined
Code is exactly as you have given above. ? Am I missing something.
when i was trying to run first query getting error and running for ever when its used df.show() .
After comment df.show() query completes but I cant see the data frame. Can you let me know what is missing?
here the query im running made few changes so it run and completes.
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
spark = SparkSession.builder \
.appName("Vandana PS ") \
.enableHiveSupport() \
.getOrCreate()
from pyspark.sql.types import StringType, DateType
import pyspark.sql.functions as f
from datetime import date, timedelta
cities = ['Amsterdam']
cities_df = spark.createDataFrame(cities, StringType()).toDF('city')
dates = [date(2023, 11, 4) + timedelta(days=x) for x in range(0, 2)]
dates_df = spark.createDataFrame(dates, DateType()).toDF('day')
df = cities_df.crossJoin(dates_df).withColumn('orders', f.floor(100 * f.rand(seed=1234)))
display(df)
df.show(2)
print(df.to_string())
You can ignore my below question as i resolved it from my end. Thank You!