DEV Community

Cover image for The 5-minute guide to using bucketing in Pyspark
luminousmen
luminousmen

Posted on • Updated on

The 5-minute guide to using bucketing in Pyspark

There are many different tools in the world, each of which solves a range of problems. Many of them are judged by how well and correct they solve this or that problem, but there are tools that you just like, you want to use them. They are properly designed and fit well in your hand, you do not need to dig into the documentation and understand how to do this or that simple action. About one of these tools for me I will be writing this series of posts.

I will describe the optimization methods and tips that help me solve certain technical problems and achieve high efficiency. This is my updated collection.

Many of the optimizations that I will describe will not affect the JVM languages ​​so much, but without these methods, many Python applications may simply not work.

Whole series:


Let's start with the problem.

We've got two tables and we do one simple inner join by one column:

t1 = spark.table("unbucketed1")
t2 = spark.table("unbucketed2")

t1.join(t2, "key").explain()
Enter fullscreen mode Exit fullscreen mode

In the physical plan, what you will get is something like the following:

== Physical Plan ==                                                             
*(5) Project [key#10L, value#11, value#15]
+- *(5) SortMergeJoin [key#10L], [key#14L], Inner
   :- *(2) Sort [key#10L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#10L, 200)
   :     +- *(1) Project [key#10L, value#11]
   :        +- *(1) Filter isnotnull(key#10L)
   :           +- *(1) FileScan parquet default.unbucketed1[key#10L,value#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
   +- *(4) Sort [key#14L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#14L, 200)
         +- *(3) Project [key#14L, value#15]
            +- *(3) Filter isnotnull(key#14L)
               +- *(3) FileScan parquet default.unbucketed2[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

Enter fullscreen mode Exit fullscreen mode

SortMergeJoin is the default spark join, but now we are concerned with the other two things on the execution plan. They are two Exchange operations. We are always concerned with exchanges because they shuffle our data β€” we want to avoid it, well, unless there is no choice. But...

There must be a better way

We know that we joined by the key column so we will use this information just to get rid of these two exchanges.

How?

Use bucketing

Bucketing

Bucketing is an optimization technique that decomposes data into more manageable parts(buckets) to determine data partitioning. The motivation is to optimize the performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and hence stages), because the shuffle may not be necessary -- both DataFrames can be already located in the same partitions.

Bucketing is enabled by default. Spark SQL uses spark.sql.sources.bucketing.enabled configuration property to control whether it should be enabled and used for query optimization or not.

Bucketing specifies physical data placement so we pre shuffle our data because we want to avoid this data shuffle at runtime.

Oookay, do I really need to do an additional step if the shuffle will be executed anyway?

If you're joining multiple times than yes. The more joins the better performance gains.

An example of how to create a bucketed table:

df.write\
    .bucketBy(16, "key") \
    .sortBy("value") \
    .saveAsTable("bucketed", format="parquet")
Enter fullscreen mode Exit fullscreen mode

So here, bucketBy distributes data across a fixed number of buckets(16 in our case) and can be used when a number of unique values are unbounded. If the number of unique values is limited it's better to use partitioning rather than bucketing.

t2 = spark.table("bucketed")
t3 = spark.table("bucketed")

# bucketed - bucketed join. 
# Both sides have the same bucketing, and no shuffles are needed.
t3.join(t2, "key").explain()
Enter fullscreen mode Exit fullscreen mode

And resulting physical plan:

== Physical Plan ==
*(3) Project [key#14L, value#15, value#30]
+- *(3) SortMergeJoin [key#14L], [key#29L], Inner
   :- *(1) Sort [key#14L ASC NULLS FIRST], false, 0
   :  +- *(1) Project [key#14L, value#15]
   :     +- *(1) Filter isnotnull(key#14L)
   :        +- *(1) FileScan parquet default.bucketed[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- *(2) Sort [key#29L ASC NULLS FIRST], false, 0
      +- *(2) Project [key#29L, value#30]
         +- *(2) Filter isnotnull(key#29L)
            +- *(2) FileScan parquet default.bucketed[key#29L,value#30] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
Enter fullscreen mode Exit fullscreen mode

Here we have not only fewer code-gen stages but also no exchanges.

As of Spark 2.4, Spark supports bucket pruning to optimize filtering on the bucketed column (by reducing the number of bucket files to scan).

Summary

Overall, bucketing is a relatively new technique that in some cases might be a great improvement both in stability and performance. However, I found that using it is not trivial and has many gotchas.

Bucketing works well when the number of unique values is unbounded. Columns that are used often in queries and provide high selectivity are good choices for bucketing. Spark tables that are bucketed store metadata about how they are bucketed and sorted which help optimize joins, aggregations, and queries on bucketed columns.

Full gist


Thank you for reading!

Any questions? Leave your comment below to start fantastic discussions!

Check out my blog or come to say hi πŸ‘‹ on Twitter or subscribe to my telegram channel.
Plan your best!

Oldest comments (4)

Collapse
 
mx profile image
Maxime Moreau • Edited

Hi, thank you for sharing.
Could you elaborate on the gotchas? And why do you find the use not trivial? Is it because we have to save the df using write method ect? So sometimes we will save the df "normally" and using buckets?

Thank you ;)

Collapse
 
luminousmen profile image
luminousmen

Thank you for your support, Maxime!
I say it's not trivial because you have to fulfill at least a few conditions.

  1. You have to read the data in the same way as it was bucketed - while writing to the bucket Spark uses the hash function on the bucketed key to select which bucket to write the data into.
  2. spark.sql.shuffle.partitions must be the same as the number of buckets, otherwise, we will get a standard shuffle
  3. Choose the bucket columns wisely, everything depends on the workload. Sometimes it is better to handle the optimization process to the catalyst than to do it yourself.
  4. Choose the number of buckets wisely, this is also a tradeoff. If you had as many performers as buckets, it would lead to a fast load. However, if the data volume is too small, it may not be very good in terms of performance.
Collapse
 
ndricca profile image
ndricca • Edited

Hi, may I ask you what differs between buckets and partitions? I've never used buckets but, using Spark reading mainly Hive tables, I noticed that partitions have too a large impact on shuffling. So what are the main differences? Thanks in advance.

Collapse
 
luminousmen profile image
luminousmen

At their core, they are one and the same thing - optimizing data retrieval based on splitting up data by content. The difference is the number of aka separations that will eventually occur. As you said in order to join data, Spark needs the data that is to be joined (i.e., the data based on each key) to live on the same partition. In essence, we want to move the data (shuffle) as little as possible between the nodes of the cluster so that join can happen as parallel as possible (in each partition).

For example, if you're doing any annual financial reports, it makes sense to partition the data by month - you'll have 12 partitions in total. And if there are 12>= executors in the cluster, then doing join by month we can make join absolutely parallel if the dataset with which we join is also partied in the same way.

The same thing happens with buckets, but with data that don't have some logical way to partition (e.g., logs, user transactions, etc.). We can essentially do the same thing as with the partitions but specifying exact number of buckets (aka partitions).

I hope that makes sense. I'm thinking of writing an article on this while it's in my backlog. I hope I get to the point of writing it eventually :)