DEV Community

loading...
Cover image for PySpark and Apache Spark Broadcast Mechanism

PySpark and Apache Spark Broadcast Mechanism

adipolak profile image Adi Polak ・2 min read

🐦 Follow me on Twitter, happy to take your suggestions on topics.


Apache Spark is based on distributed computation and distributed data concepts. Each machine/task gets a piece of the data to process.

Many times, we will need something like a lookup table or parameters to base our calculations. Those parameters will be static and won't change during the calculation, they will be read-only params.

Broadcast variables are used when static(read-only) variables need to be shared across executers.

Why Should We Use It?

Without broadcast variables, these variables would be shipped to each executor for every transformation and action; this can cause network overhead. However, with broadcast variables, they are shipped once to all executors and are cached for future reference. See the example next.

Python code sample with PySpark :

Here, we create a broadcast from a list of strings.
Loading a Parquet file to Spark DataFrame and filter the DataFrame based on the broadcast value. The broadcast is being shipped to the executers only once(network call for each executor). If we used a list without the broadcasting mechanism, for every row/entry in the DataFrame, we would send the whole list, which will result in many networking requests.

The number of requests will be equal or greater than the number of rows in the DataFrame. Since we talk about Big Data computation, the number of executors is necessarily smaller than the number of rows. And will clutter our cluster.
In the end, we release the executor dedicated memory by calling broadcastVar.unpersist().

from pyspark import SparkContext, broadcast
from pyspark.sql import SparkSession 
import pyspark.sql.functions as func

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("App Name") \
    .getOrCreate() # starts spark session locally

sc = spark.sparkContext

words_new = sc.broadcast(["list of values to broadcast", "spark", "python"]) 
data = words_new.value # accessing the value stored in the broadcast in master

df = spark.read.parquet ('some_file_with_header.parquet')  # loading parquet file into Spark DataFrame
filtered = df.filter(func.col('name').isin(words_new.value)) # filtering dataframe based on broadcast list with isin functionality

words_new.unpersist() # sending requests to each executer to release static variables from dedicated memory

That was Apache Spark Broadcast with PySpark in UNDER 3 min! Which is part of Apache Spark Bitesize series.

Discussion (2)

pic
Editor guide
Collapse
shirmati profile image
Shir Matishevski

Good one, thanks Adi! 👸
What's the benefit (if any) in using the broadcasting over standard spark caching?

Collapse
adipolak profile image
Adi Polak Author

Thank you, Shir! 💖
Spark caching built for caching Spark DataFrames or RDD in memory.
Spark is a distributed computing framework that works on distributed data.
Each executor gets a chunk of the data to process, load data into memory, process it, and remove it from memory ( unless there are optimization or more known actions on the data ).
We can ask Spark to explicitly cache that chunk of data in the executors' memory.
Caching action means that it stays in dedicated memory until we call unpersist on it.

We can cache many RDDs or DataFrames, but we won't have legal access from one to the other in the executors. If we exceed the memory space, the executor will write it to disk.

With Broadcast, we broadcast variables that we need, usually small size, shortlist, dictionary, and such that we are used together with the DataFrames or RDDs in computation.

For example:
Countries data map and financial transactions, countries data, and location do not change - that means static data. Transactions are new and are coming in streaming or batching. We can broadcast the countries with the static data map ( assuming it fits into memory) and in DataFrame load the transaction either in batch or streaming.
In each transaction computation, we can enrich and make the decisions based on the static data we have - the countries map.

With cache - we can cache the transaction data and re-process it again but not the static data that is no an RDD or Dataframe. However, we might get blocked, and all other transaction will take longer or if we out of memory will be paused.
We can turn the static data into an RDD or Dataframe if we would like to take action on it specifically. Still, since it is relatively small data, it's better to do the calculations on the driver node and not 'pay' the overhead of distributed computing.

Is that make sense?