DEV Community

Yefet Ben Tili
Yefet Ben Tili

Posted on • Edited on

Exploring Apache Spark New Pandas API

spark & pandas logo

Apache Spark™ 3.2 release came with the announcement of pandas API,
Now Data Scientists/analysts who are familiar with the Pandas API will be able to leverage the same syntax and the power of distributed computing that Spark offers.

Why Pandas on Spark?

Optimized single-machine performance: according to Databricks benchmark Pandas on spark outperform pandas even on a single machine thanks to the optimizations in the Spark engine
Single code base: Easier for Data Science and data engineers teams to collaborate with a unified code base for Data Analysis/Data Transformation steps

spark logo

Source : https://databricks.com/fr/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

In this blog post, we will take a look at the spark pandas API and compare it to the standard PySpark DataFrame syntax

Creating pyspark.pandas Dataframe

from pyspark.pandas import DataFrame

pdf = DataFrame({
                   'foo': ['one', 'one', 'one', 'two', 'two','two'],
                   'bar': ['A', 'B', 'C', 'A', 'B','C'],
                   'baz': [1, 2, 3, 4, 5,6],
                   'zoo': ['x', 'y', 'z', 'q', 'w','t'],
                   "too": [[11,22],[22,78],[8,6],[78,2],[12],[2]]})

print(pdf)
Enter fullscreen mode Exit fullscreen mode
   foo bar  baz zoo       too
0  one   A    1   x  [11, 22]
1  one   B    2   y  [22, 78]
2  one   C    3   z    [8, 6]
3  two   A    4   q   [78, 2]
4  two   B    5   w      [12]
5  two   C    6   t       [2]
Enter fullscreen mode Exit fullscreen mode

It is possible to convert a pyspark.pandas.DataFrame to the standard pyspark.sql.dataframe.DataFrame

sdf = pdf.to_spark()
sdf.show()
Enter fullscreen mode Exit fullscreen mode
+---+---+---+---+--------+
|foo|bar|baz|zoo|     too|
+---+---+---+---+--------+
|one|  A|  1|  x|[11, 22]|
|one|  B|  2|  y|[22, 78]|
|one|  C|  3|  z|  [8, 6]|
|two|  A|  4|  q| [78, 2]|
|two|  B|  5|  w|    [12]|
|two|  C|  6|  t|     [2]|
+---+---+---+---+--------+

print(type(pdf))
print(type(sdf))

<class 'pyspark.pandas.frame.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
Enter fullscreen mode Exit fullscreen mode

Data Manipulation

Column's creation

Column creation with simple condition is pretty strightforward in both APIs:

# Standard PySpark Syntax
sdf.withColumn("new",sdf.baz + 1).show()
Enter fullscreen mode Exit fullscreen mode
+---+---+---+---+--------+---+
|foo|bar|baz|zoo|     too|new|
+---+---+---+---+--------+---+
|one|  A|  1|  x|[11, 22]|  2|
|one|  B|  2|  y|[22, 78]|  3|
|one|  C|  3|  z|  [8, 6]|  4|
|two|  A|  4|  q| [78, 2]|  5|
|two|  B|  5|  w|    [12]|  6|
|two|  C|  6|  t|     [2]|  7|
+---+---+---+---+--------+---+
Enter fullscreen mode Exit fullscreen mode
# Pandas Spark API
print(pdf.assign(new = pdf.baz + 1))
Enter fullscreen mode Exit fullscreen mode
   foo bar  baz zoo       too  new
0  one   A    1   x  [11, 22]    2
1  one   B    2   y  [22, 78]    3
2  one   C    3   z    [8, 6]    4
3  two   A    4   q   [78, 2]    5
4  two   B    5   w      [12]    6
5  two   C    6   t       [2]    7
Enter fullscreen mode Exit fullscreen mode

Transformation operations

Let's take the example of Exploding.
Exploding over a column is a simple operation where each element of a list-like column becomes a row and also the index values get replicated if there are any.

#pyspark syntax
from pyspark.sql.functions import explode
sdf.withColumn("too", explode("too")).show()
Enter fullscreen mode Exit fullscreen mode
+---+---+---+---+---+
|foo|bar|baz|zoo|too|
+---+---+---+---+---+
|one|  A|  1|  x| 11|
|one|  A|  1|  x| 22|
|one|  B|  2|  y| 22|
|one|  B|  2|  y| 78|
|one|  C|  3|  z|  8|
|one|  C|  3|  z|  6|
|two|  A|  4|  q| 78|
|two|  A|  4|  q|  2|
|two|  B|  5|  w| 12|
|two|  C|  6|  t|  2|
+---+---+---+---+---+

Enter fullscreen mode Exit fullscreen mode
print(pdf.explode("too"))
Enter fullscreen mode Exit fullscreen mode
   foo bar  baz zoo  too
0  one   A    1   x   11
0  one   A    1   x   22
1  one   B    2   y   22
1  one   B    2   y   78
2  one   C    3   z    8
2  one   C    3   z    6
3  two   A    4   q   78
3  two   A    4   q    2
4  two   B    5   w   12
5  two   C    6   t    2
Enter fullscreen mode Exit fullscreen mode

Simple in both APIs, but things will be different in the next example.

Custum transformation operations

PySpark UDF (a.k.a User Defined Function) :

When we want to apply specific transfomration based on conditions and such, we might need to create a Spark udf.

A User Defined Function is Python Function syntax wrapped inside a PySpark SQL udf() object. First you register then you call it on the target DataFrame.

Although it's the most expensive operation in PySpark, in order to perform a custom transformation operation we might need to create a UDF.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Create function
def encode(baz,too,bar):
  baz = int(baz)
  return ("Yes" if (baz % 2 == 0) 
  and ( len(too) == 2) 
  and (bar in ["A","B"]) else "No") 

# Register the function as UDF
encode_udf = udf(encode, StringType())

# Call it on the target Dataframe
sdf.withColumn("encode", encode_udf("baz","too","bar")).show()

+---+---+---+---+--------+------+
|foo|bar|baz|zoo|     too|encode|
+---+---+---+---+--------+------+
|one|  A|  1|  x|[11, 22]|    No|
|one|  B|  2|  y|[22, 78]|   Yes|
|one|  C|  3|  z|  [8, 6]|    No|
|two|  A|  4|  q| [78, 2]|   Yes|
|two|  B|  5|  w|    [12]|    No|
|two|  C|  6|  t|     [2]|    No|
+---+---+---+---+--------+------+

Enter fullscreen mode Exit fullscreen mode

The same transformation requires less coding in Pandas Syntax

pdf["encode"] = (pdf.apply(lambda x: "Yes" if (x["baz"] % 2 == 0 ) and ( len(x["too"]) == 2 ) 
          and (x["bar"]) in ["A","B"]
           else "No",axis=1)).to_list()

print(pdf)

   foo bar  baz zoo       too encode
0  one   A    1   x  [11, 22]     No
1  one   B    2   y  [22, 78]    Yes
2  one   C    3   z    [8, 6]     No
3  two   A    4   q   [78, 2]    Yes
4  two   B    5   w      [12]     No
5  two   C    6   t       [2]     No
Enter fullscreen mode Exit fullscreen mode

Window function

Spark Window functions are used to calculate results such as the rank, row number, etc. over a range of input rows:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("foo").orderBy("baz")
sdf.withColumn("rank_", row_number().over(windowSpec)).show()
Enter fullscreen mode Exit fullscreen mode
+---+---+---+---+--------+-----+
|foo|bar|baz|zoo|     too|rank_|
+---+---+---+---+--------+-----+
|one|  A|  1|  x|[11, 22]|    1|
|one|  B|  2|  y|[22, 78]|    2|
|one|  C|  3|  z|  [8, 6]|    3|
|two|  A|  4|  q| [78, 2]|    1|
|two|  B|  5|  w|    [12]|    2|
|two|  C|  6|  t|     [2]|    3|
+---+---+---+---+--------+-----+
Enter fullscreen mode Exit fullscreen mode

The Same transformation requires a simple groupbyin Pandas syntax

rank_df = pdf.copy()
rank_df["rank_"] = pdf.groupby(by=['foo'])['baz'].rank().to_list()
print(rank_df)
Enter fullscreen mode Exit fullscreen mode
   foo bar  baz zoo       too  rank_
0  one   A    1   x  [11, 22]    1.0
1  one   B    2   y  [22, 78]    2.0
2  one   C    3   z    [8, 6]    3.0
3  two   A    4   q   [78, 2]    1.0
4  two   B    5   w      [12]    2.0
5  two   C    6   t       [2]    3.0
Enter fullscreen mode Exit fullscreen mode

Data visualization

The default backend for plots in Pandas is matplotlib but Pandas on Spark comes with plotly as it offers the capability to plot interactive plots and not just static images like its counterpart.

pdf["plot_col"] = [2, 1, 2, 2, 0, 0]
pdf.plot_col.hist()
Enter fullscreen mode Exit fullscreen mode

plotly image

Top comments (0)