Alright quite a while ago (already counting years), I published a tutorial series focused on helping people getting started with Spark. Here is an outline of the previous posts:
- Part 1 Getting Started - covers basics on distributed Spark architecture, along with Data structures (including the old good RDD collections (!), whose use has been kind of deprecated by Dataframes)
- Part 2 intro to Dataframes
- Part 3 intro to UDFs and Window Functions
In the meanwhile Spark has not decreased popularity, so I thought I continued updating the same series. In this post we cover an essential part of any ETL project, namely Unit testing.
For that I created a sample repository, which is meant to serve as boiler plate code for any new Python Spark project.
Let us browse through the main job script. Please note that the repository might contain updated version, which might defer in details with the next gist.
# create the general function | |
def _amount_spent(quantity: int, price: float) -> float: | |
""" | |
Calculates the product between two variables | |
:param quantity: (float/int) | |
:param price: (float/int) | |
:return: | |
(float/int) | |
""" | |
return quantity * price | |
def amount_spent_udf(data: DataFrame) -> DataFrame: | |
# create the general UDF | |
amount_spent_udf = F.udf(_amount_spent, DoubleType()) | |
# Note: DoubleType in Java/Scala is equal to Python float; thus you can alternatively specify FloatType() | |
# Apply our UDF to the dataframe | |
return data.withColumn('amount_spent', amount_spent_udf(F.col('quantity'), F.col('price'))) | |
def main(conf: ConfigParser, spark: SparkSession) -> None: | |
# mock data | |
customers = spark.createDataFrame([ | |
Row(customer_name="Geoffrey", date="2016-04-22", category="A", product_name="apples", quantity=1, price=50.00), | |
Row(customer_name="Geoffrey", date="2016-05-03", category="B", product_name="Lamp", quantity=2, price=38.00), | |
Row(customer_name="Geoffrey", date="2016-05-03", category="D", product_name="Solar Pannel", quantity=1, price=29.00), | |
Row(customer_name="Geoffrey", date="2016-05-03", category="A", product_name="apples", quantity=3, price=50.00), | |
Row(customer_name="Geoffrey", date="2016-05-03", category="C", product_name="Rice", quantity=5, price=15.00), | |
Row(customer_name="Geoffrey", date="2016-06-05", category="A", product_name="apples", quantity=5, price=50.00), | |
Row(customer_name="Geoffrey", date="2016-06-05", category="A", product_name="bananas", quantity=5, price=55.00), | |
Row(customer_name="Geoffrey", date="2016-06-15", category="Y", product_name="Motor skate", quantity=7, price=68.00), | |
Row(customer_name="Geoffrey", date="2016-06-15", category="E", product_name="Book: The noose", quantity=1, price=125.00), | |
Row(customer_name="Yann", date="2016-04-22", category="B", product_name="Lamp", quantity=1, price=38.00), | |
Row(customer_name="Yann", date="2016-05-03", category="Y", product_name="Motor skate", quantity=1, price=68.00), | |
Row(customer_name="Yann", date="2016-05-03", category="D", product_name="Recycle bin", quantity=5, price=27.00), | |
Row(customer_name="Yann", date="2016-05-03", category="C", product_name="Rice", quantity=15, price=15.00), | |
Row(customer_name="Yann", date="2016-04-02", category="A", product_name="bananas", quantity=3, price=55.00), | |
Row(customer_name="Yann", date="2016-04-02", category="B", product_name="Lamp", quantity=2, price=38.00), | |
Row(customer_name="Yann", date="2016-04-03", category="E", product_name="Book: Crime and Punishment", quantity=5, price=100.00), | |
Row(customer_name="Yann", date="2016-04-13", category="E", product_name="Book: The noose", quantity=5, price=125.00), | |
Row(customer_name="Yann", date="2016-04-27", category="D", product_name="Solar Pannel", quantity=5, price=29.00), | |
Row(customer_name="Yann", date="2016-05-27", category="D", product_name="Recycle bin", quantity=5, price=27.00), | |
Row(customer_name="Yann", date="2016-05-27", category="A", product_name="bananas", quantity=3, price=55.00), | |
Row(customer_name="Yann", date="2016-05-01", category="Y", product_name="Motor skate", quantity=1, price=68.00), | |
Row(customer_name="Yann", date="2016-06-07", category="Z", product_name="space ship", quantity=1, price=227.00), | |
Row(customer_name="Yoshua", date="2016-02-07", category="Z", product_name="space ship", quantity=2, price=227.00), | |
Row(customer_name="Yoshua", date="2016-02-14", category="A", product_name="bananas", quantity=9, price=55.00), | |
Row(customer_name="Yoshua", date="2016-02-14", category="B", product_name="Lamp", quantity=2, price=38.00), | |
Row(customer_name="Yoshua", date="2016-02-14", category="A", product_name="apples", quantity=10, price=55.00), | |
Row(customer_name="Yoshua", date="2016-03-07", category="Z", product_name="space ship", quantity=5, price=227.00), | |
Row(customer_name="Yoshua", date="2016-04-07", category="Y", product_name="Motor skate", quantity=4, price=68.00), | |
Row(customer_name="Yoshua", date="2016-04-07", category="D", product_name="Recycle bin", quantity=5, price=27.00), | |
Row(customer_name="Yoshua", date="2016-04-07", category="C", product_name="Rice", quantity=5, price=15.00), | |
Row(customer_name="Yoshua", date="2016-04-07",category= "A", product_name="bananas", quantity=9, price=55.00), | |
Row(customer_name="Jurgen", date="2016-05-01", category="Z", product_name="space ship", quantity=1, price=227.00), | |
Row(customer_name="Jurgen", date="2016-05-01", category="A", product_name="bananas", quantity=5, price=55.00), | |
Row(customer_name="Jurgen", date="2016-05-08", category="A", product_name="bananas", quantity=5, price=55.00), | |
Row(customer_name="Jurgen", date="2016-05-08", category="Y", product_name="Motor skate", quantity=1, price=68.00), | |
Row(customer_name="Jurgen", date="2016-06-05", category="A", product_name="bananas", quantity=5, price=55.00), | |
Row(customer_name="Jurgen", date="2016-06-05", category="C", product_name="Rice", quantity=5, price=15.00), | |
Row(customer_name="Jurgen", date="2016-06-05", category="Y", product_name="Motor skate", quantity=2, price=68.00), | |
Row(customer_name="Jurgen", date="2016-06-05", category="D", product_name="Recycle bin", quantity=5, price=27.00), | |
]) | |
result = amount_spent_udf(data=customers) | |
result.show(10) |
The previous gist recovers the same example used in the previous post on UDFs and Window Functions.
Here is an example how we could test our "amount_spent_udf" function:
from tests.test_utils.test_spark import spark_session | |
from pyspark.sql import DataFrame, Row, SparkSession | |
from pyspark.sql.functions import col | |
from src.job import amount_spent_udf | |
def test_amount_spent_udf(spark_session: SparkSession) -> None: | |
input_df = spark_session.createDataFrame([ | |
Row(customer_name="Geoffrey", date="2016-04-22", category="Foo", product_name="Bar", quantity=1, price=2.00), | |
]) | |
result = amount_spent_udf(data=input_df) | |
assert isinstance(result, DataFrame) | |
assert result.count() == input_df.count() | |
assert sorted(result.columns) == ['amount_spent', 'category', 'customer_name', 'date', 'price', | |
'product_name', 'quantity'] | |
assert result.collect()[0].amount_spent == 2.00 |
Now note the first line on the unit tests script, which is the secret sauce to load a spark context for your unit tests. Bellow is the code that creates the "spark_session" object passed as an argument to the "test_amount_spent_udf" function.
""" | |
Utilities common to all tests using spark | |
""" | |
import pytest | |
from pyspark.sql import SparkSession | |
from pyspark import SparkContext, SparkConf | |
import logging | |
def quiet_py4j(): | |
""" turn down spark logging for the test context """ | |
logger = logging.getLogger('py4j') | |
logger.setLevel(logging.WARN) | |
@pytest.fixture(scope="session") | |
def spark_context(request): | |
""" | |
fixture for creating a spark session | |
Args: | |
request: pytest.FixtureRequest object | |
""" | |
conf = SparkConf() \ | |
.setMaster("local[2]") \ | |
.setAppName("pytest-pyspark-local-testing") | |
sc = SparkContext(conf=conf) | |
request.addfinalizer(lambda: sc.stop()) | |
quiet_py4j() | |
return sc | |
@pytest.fixture(scope="session") | |
def spark_session(request): | |
""" | |
fixture for creating a spark session | |
Args: | |
request: pytest.FixtureRequest object | |
""" | |
spark_conf = SparkConf() \ | |
.setMaster("local[2]") \ | |
.setAppName("pytest-pyspark2.+-local-testing") | |
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate() | |
request.addfinalizer(lambda: spark.stop()) | |
quiet_py4j() | |
return spark |
And that is it. We strongly encourage you to have a look on the correspondent git repository, where we specify detailed instructions how to run it locally.
And that is it for today, hope it helped!
Top comments (0)