DEV Community

diogoaurelio
diogoaurelio

Posted on

4

Getting Started with Spark (part 4) - Unit Testing

Alright quite a while ago (already counting years), I published a tutorial series focused on helping people getting started with Spark. Here is an outline of the previous posts:

In the meanwhile Spark has not decreased popularity, so I thought I continued updating the same series. In this post we cover an essential part of any ETL project, namely Unit testing.

For that I created a sample repository, which is meant to serve as boiler plate code for any new Python Spark project.

Let us browse through the main job script. Please note that the repository might contain updated version, which might defer in details with the next gist.

# create the general function
def _amount_spent(quantity: int, price: float) -> float:
"""
Calculates the product between two variables
:param quantity: (float/int)
:param price: (float/int)
:return:
(float/int)
"""
return quantity * price
def amount_spent_udf(data: DataFrame) -> DataFrame:
# create the general UDF
amount_spent_udf = F.udf(_amount_spent, DoubleType())
# Note: DoubleType in Java/Scala is equal to Python float; thus you can alternatively specify FloatType()
# Apply our UDF to the dataframe
return data.withColumn('amount_spent', amount_spent_udf(F.col('quantity'), F.col('price')))
def main(conf: ConfigParser, spark: SparkSession) -> None:
# mock data
customers = spark.createDataFrame([
Row(customer_name="Geoffrey", date="2016-04-22", category="A", product_name="apples", quantity=1, price=50.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="B", product_name="Lamp", quantity=2, price=38.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="D", product_name="Solar Pannel", quantity=1, price=29.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="A", product_name="apples", quantity=3, price=50.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="C", product_name="Rice", quantity=5, price=15.00),
Row(customer_name="Geoffrey", date="2016-06-05", category="A", product_name="apples", quantity=5, price=50.00),
Row(customer_name="Geoffrey", date="2016-06-05", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Geoffrey", date="2016-06-15", category="Y", product_name="Motor skate", quantity=7, price=68.00),
Row(customer_name="Geoffrey", date="2016-06-15", category="E", product_name="Book: The noose", quantity=1, price=125.00),
Row(customer_name="Yann", date="2016-04-22", category="B", product_name="Lamp", quantity=1, price=38.00),
Row(customer_name="Yann", date="2016-05-03", category="Y", product_name="Motor skate", quantity=1, price=68.00),
Row(customer_name="Yann", date="2016-05-03", category="D", product_name="Recycle bin", quantity=5, price=27.00),
Row(customer_name="Yann", date="2016-05-03", category="C", product_name="Rice", quantity=15, price=15.00),
Row(customer_name="Yann", date="2016-04-02", category="A", product_name="bananas", quantity=3, price=55.00),
Row(customer_name="Yann", date="2016-04-02", category="B", product_name="Lamp", quantity=2, price=38.00),
Row(customer_name="Yann", date="2016-04-03", category="E", product_name="Book: Crime and Punishment", quantity=5, price=100.00),
Row(customer_name="Yann", date="2016-04-13", category="E", product_name="Book: The noose", quantity=5, price=125.00),
Row(customer_name="Yann", date="2016-04-27", category="D", product_name="Solar Pannel", quantity=5, price=29.00),
Row(customer_name="Yann", date="2016-05-27", category="D", product_name="Recycle bin", quantity=5, price=27.00),
Row(customer_name="Yann", date="2016-05-27", category="A", product_name="bananas", quantity=3, price=55.00),
Row(customer_name="Yann", date="2016-05-01", category="Y", product_name="Motor skate", quantity=1, price=68.00),
Row(customer_name="Yann", date="2016-06-07", category="Z", product_name="space ship", quantity=1, price=227.00),
Row(customer_name="Yoshua", date="2016-02-07", category="Z", product_name="space ship", quantity=2, price=227.00),
Row(customer_name="Yoshua", date="2016-02-14", category="A", product_name="bananas", quantity=9, price=55.00),
Row(customer_name="Yoshua", date="2016-02-14", category="B", product_name="Lamp", quantity=2, price=38.00),
Row(customer_name="Yoshua", date="2016-02-14", category="A", product_name="apples", quantity=10, price=55.00),
Row(customer_name="Yoshua", date="2016-03-07", category="Z", product_name="space ship", quantity=5, price=227.00),
Row(customer_name="Yoshua", date="2016-04-07", category="Y", product_name="Motor skate", quantity=4, price=68.00),
Row(customer_name="Yoshua", date="2016-04-07", category="D", product_name="Recycle bin", quantity=5, price=27.00),
Row(customer_name="Yoshua", date="2016-04-07", category="C", product_name="Rice", quantity=5, price=15.00),
Row(customer_name="Yoshua", date="2016-04-07",category= "A", product_name="bananas", quantity=9, price=55.00),
Row(customer_name="Jurgen", date="2016-05-01", category="Z", product_name="space ship", quantity=1, price=227.00),
Row(customer_name="Jurgen", date="2016-05-01", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Jurgen", date="2016-05-08", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Jurgen", date="2016-05-08", category="Y", product_name="Motor skate", quantity=1, price=68.00),
Row(customer_name="Jurgen", date="2016-06-05", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Jurgen", date="2016-06-05", category="C", product_name="Rice", quantity=5, price=15.00),
Row(customer_name="Jurgen", date="2016-06-05", category="Y", product_name="Motor skate", quantity=2, price=68.00),
Row(customer_name="Jurgen", date="2016-06-05", category="D", product_name="Recycle bin", quantity=5, price=27.00),
])
result = amount_spent_udf(data=customers)
result.show(10)

The previous gist recovers the same example used in the previous post on UDFs and Window Functions.

Here is an example how we could test our "amount_spent_udf" function:

from tests.test_utils.test_spark import spark_session
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import col
from src.job import amount_spent_udf
def test_amount_spent_udf(spark_session: SparkSession) -> None:
input_df = spark_session.createDataFrame([
Row(customer_name="Geoffrey", date="2016-04-22", category="Foo", product_name="Bar", quantity=1, price=2.00),
])
result = amount_spent_udf(data=input_df)
assert isinstance(result, DataFrame)
assert result.count() == input_df.count()
assert sorted(result.columns) == ['amount_spent', 'category', 'customer_name', 'date', 'price',
'product_name', 'quantity']
assert result.collect()[0].amount_spent == 2.00

Now note the first line on the unit tests script, which is the secret sauce to load a spark context for your unit tests. Bellow is the code that creates the "spark_session" object passed as an argument to the "test_amount_spent_udf" function.

"""
Utilities common to all tests using spark
"""
import pytest
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import logging
def quiet_py4j():
""" turn down spark logging for the test context """
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_context(request):
"""
fixture for creating a spark session
Args:
request: pytest.FixtureRequest object
"""
conf = SparkConf() \
.setMaster("local[2]") \
.setAppName("pytest-pyspark-local-testing")
sc = SparkContext(conf=conf)
request.addfinalizer(lambda: sc.stop())
quiet_py4j()
return sc
@pytest.fixture(scope="session")
def spark_session(request):
"""
fixture for creating a spark session
Args:
request: pytest.FixtureRequest object
"""
spark_conf = SparkConf() \
.setMaster("local[2]") \
.setAppName("pytest-pyspark2.+-local-testing")
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
request.addfinalizer(lambda: spark.stop())
quiet_py4j()
return spark

And that is it. We strongly encourage you to have a look on the correspondent git repository, where we specify detailed instructions how to run it locally.

And that is it for today, hope it helped!

Speedy emails, satisfied customers

Postmark Image

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more