Apache Spark™ 3.2 release came with the announcement of pandas API,
Now Data Scientists/analysts who are familiar with the Pandas API will be able to leverage the same syntax and the power of distributed computing that Spark offers.
Why Pandas on Spark?
• Optimized single-machine performance: according to Databricks benchmark Pandas on spark outperform pandas even on a single machine thanks to the optimizations in the Spark engine
• Single code base: Easier for Data Science and data engineers teams to collaborate with a unified code base for Data Analysis/Data Transformation steps
Source : https://databricks.com/fr/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
In this blog post, we will take a look at the spark pandas API and compare it to the standard PySpark DataFrame syntax
Creating pyspark.pandas Dataframe
from pyspark.pandas import DataFrame
pdf = DataFrame({
'foo': ['one', 'one', 'one', 'two', 'two','two'],
'bar': ['A', 'B', 'C', 'A', 'B','C'],
'baz': [1, 2, 3, 4, 5,6],
'zoo': ['x', 'y', 'z', 'q', 'w','t'],
"too": [[11,22],[22,78],[8,6],[78,2],[12],[2]]})
print(pdf)
foo bar baz zoo too
0 one A 1 x [11, 22]
1 one B 2 y [22, 78]
2 one C 3 z [8, 6]
3 two A 4 q [78, 2]
4 two B 5 w [12]
5 two C 6 t [2]
It is possible to convert a pyspark.pandas.DataFrame
to the standard pyspark.sql.dataframe.DataFrame
sdf = pdf.to_spark()
sdf.show()
+---+---+---+---+--------+
|foo|bar|baz|zoo| too|
+---+---+---+---+--------+
|one| A| 1| x|[11, 22]|
|one| B| 2| y|[22, 78]|
|one| C| 3| z| [8, 6]|
|two| A| 4| q| [78, 2]|
|two| B| 5| w| [12]|
|two| C| 6| t| [2]|
+---+---+---+---+--------+
print(type(pdf))
print(type(sdf))
<class 'pyspark.pandas.frame.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
Data Manipulation
Column's creation
Column creation with simple condition is pretty strightforward in both APIs:
# Standard PySpark Syntax
sdf.withColumn("new",sdf.baz + 1).show()
+---+---+---+---+--------+---+
|foo|bar|baz|zoo| too|new|
+---+---+---+---+--------+---+
|one| A| 1| x|[11, 22]| 2|
|one| B| 2| y|[22, 78]| 3|
|one| C| 3| z| [8, 6]| 4|
|two| A| 4| q| [78, 2]| 5|
|two| B| 5| w| [12]| 6|
|two| C| 6| t| [2]| 7|
+---+---+---+---+--------+---+
# Pandas Spark API
print(pdf.assign(new = pdf.baz + 1))
foo bar baz zoo too new
0 one A 1 x [11, 22] 2
1 one B 2 y [22, 78] 3
2 one C 3 z [8, 6] 4
3 two A 4 q [78, 2] 5
4 two B 5 w [12] 6
5 two C 6 t [2] 7
Transformation operations
Let's take the example of Exploding.
Exploding over a column is a simple operation where each element of a list-like column becomes a row and also the index values get replicated if there are any.
#pyspark syntax
from pyspark.sql.functions import explode
sdf.withColumn("too", explode("too")).show()
+---+---+---+---+---+
|foo|bar|baz|zoo|too|
+---+---+---+---+---+
|one| A| 1| x| 11|
|one| A| 1| x| 22|
|one| B| 2| y| 22|
|one| B| 2| y| 78|
|one| C| 3| z| 8|
|one| C| 3| z| 6|
|two| A| 4| q| 78|
|two| A| 4| q| 2|
|two| B| 5| w| 12|
|two| C| 6| t| 2|
+---+---+---+---+---+
print(pdf.explode("too"))
foo bar baz zoo too
0 one A 1 x 11
0 one A 1 x 22
1 one B 2 y 22
1 one B 2 y 78
2 one C 3 z 8
2 one C 3 z 6
3 two A 4 q 78
3 two A 4 q 2
4 two B 5 w 12
5 two C 6 t 2
Simple in both APIs, but things will be different in the next example.
Custum transformation operations
PySpark UDF (a.k.a User Defined Function) :
When we want to apply specific transfomration based on conditions and such, we might need to create a Spark udf.
A User Defined Function is Python Function syntax wrapped inside a PySpark SQL udf() object. First you register then you call it on the target DataFrame.
Although it's the most expensive operation in PySpark, in order to perform a custom transformation operation we might need to create a UDF.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Create function
def encode(baz,too,bar):
baz = int(baz)
return ("Yes" if (baz % 2 == 0)
and ( len(too) == 2)
and (bar in ["A","B"]) else "No")
# Register the function as UDF
encode_udf = udf(encode, StringType())
# Call it on the target Dataframe
sdf.withColumn("encode", encode_udf("baz","too","bar")).show()
+---+---+---+---+--------+------+
|foo|bar|baz|zoo| too|encode|
+---+---+---+---+--------+------+
|one| A| 1| x|[11, 22]| No|
|one| B| 2| y|[22, 78]| Yes|
|one| C| 3| z| [8, 6]| No|
|two| A| 4| q| [78, 2]| Yes|
|two| B| 5| w| [12]| No|
|two| C| 6| t| [2]| No|
+---+---+---+---+--------+------+
The same transformation requires less coding in Pandas Syntax
pdf["encode"] = (pdf.apply(lambda x: "Yes" if (x["baz"] % 2 == 0 ) and ( len(x["too"]) == 2 )
and (x["bar"]) in ["A","B"]
else "No",axis=1)).to_list()
print(pdf)
foo bar baz zoo too encode
0 one A 1 x [11, 22] No
1 one B 2 y [22, 78] Yes
2 one C 3 z [8, 6] No
3 two A 4 q [78, 2] Yes
4 two B 5 w [12] No
5 two C 6 t [2] No
Window function
Spark Window functions are used to calculate results such as the rank, row number, etc. over a range of input rows:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("foo").orderBy("baz")
sdf.withColumn("rank_", row_number().over(windowSpec)).show()
+---+---+---+---+--------+-----+
|foo|bar|baz|zoo| too|rank_|
+---+---+---+---+--------+-----+
|one| A| 1| x|[11, 22]| 1|
|one| B| 2| y|[22, 78]| 2|
|one| C| 3| z| [8, 6]| 3|
|two| A| 4| q| [78, 2]| 1|
|two| B| 5| w| [12]| 2|
|two| C| 6| t| [2]| 3|
+---+---+---+---+--------+-----+
The Same transformation requires a simple groupby
in Pandas syntax
rank_df = pdf.copy()
rank_df["rank_"] = pdf.groupby(by=['foo'])['baz'].rank().to_list()
print(rank_df)
foo bar baz zoo too rank_
0 one A 1 x [11, 22] 1.0
1 one B 2 y [22, 78] 2.0
2 one C 3 z [8, 6] 3.0
3 two A 4 q [78, 2] 1.0
4 two B 5 w [12] 2.0
5 two C 6 t [2] 3.0
Data visualization
The default backend for plots in Pandas is matplotlib but Pandas on Spark comes with plotly as it offers the capability to plot interactive plots and not just static images like its counterpart.
pdf["plot_col"] = [2, 1, 2, 2, 0, 0]
pdf.plot_col.hist()
Top comments (0)