Article written as a member - and with the support - of the Philips' Software Excellence team. Curious about working in tech at Philips? Find out more here.
Introduction
Testing data wrangling functions is notoriously challenging because it often involves messy and unpredictable real-world data. This data can come in a wide variety of formats, with inconsistencies, missing values, and unexpected structures.
Adding another layer of complexity are the wrangling operations themselves. These can range from simple filtering to complex transformations, and they often involve chaining multiple functions. This creates a cascading effect, where an error in one step can ripple through the entire process, making it difficult to pinpoint the source of the issue.
Despite the challenges, testing data wrangling functions remains crucial.
Complex data pipelines are inherently error-prone, and, without proper testing, errors can easily creep in and remain undetected for long periods, leading to downstream problems in analysis or reporting.
Further, debugging data wrangling issues can be frustrating and time-consuming. Tests act as a safety net, allowing for quick identification and correction of problems before they cause downstream issues.
Finally, well-written tests make it much easier to refactor code and add new functionality to the codebase. By ensuring that existing functionality remains intact, it allows for confident modification of the code without fear of unintended consequences.
Property-based testing
Traditional testing involves checking a function’s behavior with a set of predefined inputs. This works well for simple cases, but data wrangling functions often deal with a wide variety of messy, unpredictable data.
Property-based testing flips the script. Instead of providing specific examples, it defines general rules—or properties—the function should always follow, regardless of the input. It then takes over, generating a large number of random inputs and checking if the function upholds these properties across all these different scenarios.
This approach proves particularly effective at catching bugs that traditional testing might overlook. By generating a vast array of random inputs, property-based testing significantly increases the chance of uncovering hidden bugs.
Popular property-based testing libraries to consider are:
- Python: Hypothesis
- R: Hedgehog
- Scala: ScalaCheck
- Julia: Supposition.jl
For a more detailed view of property-based testing, consider reading Fred Hebert’s book PropEr Testing.
A quick note about Fuzzing
Fuzzing, in simple terms, is a test that, considering a large variety of inputs, checks if the code crashes. Think of it as a brute-force approach to test code resilience.
The hypothesis library, used in this article, blurs the line that separates property-based testing from fuzzing, and some tests may be closer to one concept or the other. The author and main maintainer of hypothesis has an article that explains his views on both property-based testing and fuzzing. I recommend reading it for further clarification.
Case study
Problem
I want to assess and predict trends in stroke mortality in U.S. states based on environmental quality indexes. This analysis relies on data wrangling functions that clean and join datasets from several sources.
Since these datasets are periodically updated, the data wrangling functions must adapt to handle new and potentially unpredictable data characteristics. This makes it difficult to test these functions. Focusing solely on the current data may not be sufficient. Ideally, tests should anticipate a wide range of issues that new datasets might introduce, such as unexpected values, missing entries, or structural changes. Property-based testing excels in this scenario. By generating a broad variety of test cases, it can help identify potential problems in the data wrangling logic regardless of the specific format of future datasets. This ensures the functions remain robust and adaptable even as the underlying data evolves.
For this study, data is expected to adhere to the following format:
us_state | metric_1 | metric_2 | ... | metric_n | target_variable |
---|---|---|---|---|---|
state_1 | value_1_1 | value_2_1 | ... | value_n_1 | target_1 |
... | ... | ... | ... | ... | ... |
state_n | value_1_n | value_2_n | ... | value_n_n | target_n |
Where all the values, other than “us_state”, are numerical and scaled (between 0 and 1).
Datasets
Metrics can be found on the dataset from the U.S. Environmental Protection Agency:
- Dataset: https://catalog.data.gov/dataset/usepa-environmental-quality-index-eqi-air-water-land-built-and-sociodemographic-domains-non-tra4
- Data dictionary: https://catalog.data.gov/dataset/usepa-environmental-quality-index-eqi-air-water-land-built-and-sociodemographic-domains-non-tra4
The raw data has the following structure:
stfips | State | County_Name | cat_RUCC | sociod_EQI_2Jan2018_VC | RUCC1_sociod_EQI_2Jan2018_VC | ... | RUCC4_EQI_2Jan2018_VC_5 |
---|---|---|---|---|---|---|---|
1001 | AL | Autaga County | 1 | -0.525910 | 0.225516 | ... | NaN |
1003 | AL | Baldwin County | 1 | -0.702319 | 0.185723 | ... | NaN |
... | ... | ... | ... | ... | ... | ... | ... |
56045 | WY | Weston County | 3 | 0.275503 | NaN | ... | NaN |
The target variable (stroke mortality) can be found on the dataset from the U.S. Department of Health & Human Services.
This dataset has the following structure:
Year | LocationAbbr | LocationDesc | GeographicLevel | DataSource | Class | Topic | Data_Value | ... | Georeference |
---|---|---|---|---|---|---|---|---|---|
2020 | AK | Nome | County | NVSS | Cardiovascular Diseases | Stroke Mortality | 110.7 | ... | POINT (-163.9462296 64.903977039) |
2020 | CT | Tolland County | County | NVSS | Cardiovascular Diseases | Stroke Mortality | 63.4 | ... | POINT (-72.337294 41.852989582) |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
2020 | VT | Caledonia County | County | NVSS | Cardiovascular Diseases | Stroke Mortality | 60.4 | ... | POINT (-72.10338989 44.460163015) |
Both datasets are available for free under open licenses.
- Environmental Quality Index (EQI) dataset license
- Stroke Mortality Data Among U.S. Adults dataset license
Data wrangling functions
The following data wrangling functions were written to clean and prepare the data for training machine learning models.
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
def get_target(df, location_column, target_column):
return df[[location_column, target_column]]\
.groupby(location_column)\
.sum()\
.reset_index()
def get_metrics(df, location_column, metrics_columns):
return df[[location_column] + metrics_columns]\
.groupby(location_column)\
.mean()\
.reset_index()
def _scale_columns(df):
aux = df.copy()
scaler = MinMaxScaler()
columns = aux.select_dtypes(include=np.number)\
.columns\
.tolist()
aux[columns] = scaler.fit_transform(aux[columns])
return aux
def get_df(target_df,
metrics_df,
target_location_column,
metrics_location_column):
aux = pd.merge(metrics_df,
target_df,
left_on=metrics_location_column,
right_on=target_location_column,
how="inner")
if target_location_column != metrics_location_column:
aux.drop(target_location_column, axis=1, inplace=True)
return _scale_columns(aux)
Code explanation:
- get_target(): takes the dataset with the target variable (Stroke Mortality), groups and sums the target variable by state and returns the results. Once through the function, the data looks like this:
LocationAbbr | Data_Value |
---|---|
AK | 12707.8 |
AL | 71147.8 |
... | ... |
WY | 9418.8 |
- get_metrics(): collects the metrics from the Environmental Quality Index dataset, calculates the average of each metric by state, and returns the results.
State | sociod_EQI_2Jan2018_VC | RUCC1_sociod_EQI_2Jan2018_VC | ... | RUCC4_EQI_2Jan2018_VC_5 |
---|---|---|---|---|
AK | 0.277039 | -0.271424 | ... | 1.235294 |
AL | 0.441253 | 0.606241 | ... | 4.727273 |
... | ... | ... | ... | ... |
WY | -0.396936 | -0.158441 | ... | 2.000000 |
- _scale_columns(): is a helper function that scales all numerical variables to a number between 0 and 1.
- get_df(): creates the final data frame by combining both resulting datasets from the previous functions by state and scaling the results with the _scale_columns() helper function.
Test code
To test any of the functions, we need to pass them a data frame. We can create simple data frames containing combinations of edge cases for each variable. A better alternative, however, is to use property-based testing techniques to generate random combinations of cases for each variable, run through the functions and test if the results fit the expected output.
In Python this can be done with the hypothesis library.
The Python testing framework pytest handles the tests. To get the output of hypothesis with pytest we can use the command pytest --hypothesis-run-statistics
.
The following were the imports used for testing:
from math import isnan
import numpy as np
import pytest
from datatest import validate
from hypothesis import given, settings
from hypothesis.extra.pandas import column, data_frames
from hypothesis.strategies import text, shared
from data_cleaning import get_target, get_metrics, get_df
Quick explanation of each import from the hypothesis library:
- data_frames() handles the generation of pandas data frames.
- column() works within data_frames() to create the columns of the generated data frame. It takes the column name, may take a dtype argument specifying the column data type and may take an elements parameter that specifies the strategy to use to generate each of the elements in the column (using other generators from the library). If elements are missing hypothesis tries to infer the strategy using the dtype argument.
- text() defines the data generation strategy for texts.
- shared() allows two or more generators to use the same data, provided by another generator. In simple terms, each shared() using the same data generation strategy and the same key argument results in the same data each time the generators are sampled.
- given() is a function annotation that provides the data from the generators to the actual tests.
- settings() overrides hypotesis settings for individual tests.
First step is to code the generators:
states = text(min_size=1)
stroke_df_gen = data_frames([
column("state", dtype=object, elements=shared(states, key="states")),
column("target", dtype=float),
column("garbage1", dtype=object, elements=text()),
column("garbage2", dtype=int),
column("garbage3", dtype=float)
])
eqi_df_gen = data_frames([
column("state", dtype=object, elements=shared(states, key="states")),
column("metric1", dtype=float),
column("metric2", dtype=float),
column("garbage1", dtype=object, elements=text()),
column("garbage2", dtype=int),
column("garbage3", dtype=float)
])
Code explanation:
- states() generates random texts to be used as “state names”.
- stroke_df_gen contains the generator for the data frame containing the target variable. Since the U.S. state column is used to merge the data frame of the target variable with the one with the metrics, the values generated by these columns need to be consistent with one another. This is achieved with the shared strategy from the hypothesis library. The target variable must be a floating point number (stroke mortality per 100,000 population). Additional “garbage” columns were added to represent the extra data in the dataset. Hopefully, the functions should ignore these.
- eqi_df_gen generates the metrics data frame. Like stroke_df_gen, it uses the shared strategy to keep the consistency of the U.S. state column between the two data frames. At a glance, all metrics columns seem to be floating point numbers on the original dataset, so that is represented in the data frame generator. Also, like in stroke_df_gen, “garbage” columns were added to represent the extra data found in the original dataset.
For the actual tests, I use the datatest library, as it provides a straightforward way to test the properties of pandas data frames (as well as other common data types). Its validate() function can be used both to test the types of each column/variable in a data frame and to test whether the values of a column pass a check from a helper function (for instance lambda x: x > 0
, to check if all values are positive numbers).
Test code:
@given(stroke_df_gen)
def test_get_target(s):
res = get_target(s, "state", "target")
validate(res, requirement=(str, float))
validate(res["target"], requirement=lambda x: x >= 0)
def _not_nan(x):
return not isnan(x)
@given(eqi_df_gen)
def test_get_metrics(s):
res = get_metrics(s, "state", ["metric1", "metric2"])
validate(res, requirement=(str, float, float))
validate(res["metric1"], requirement=_not_nan)
validate(res["metric1"], requirement=_not_nan)
def _check_scaled(x):
return 0 <= x <= 1
@settings(deadline=None)
@given(stroke_df_gen, eqi_df_gen)
def test_get_df(strokes, eqi):
strokes = get_target(strokes, "state", "target")
eqi = get_metrics(eqi, "state", ["metric1", "metric2"])
res = get_df(strokes, eqi, "state", "state")
validate(res, requirement=(str, float, float, float))
num_columns = res.select_dtypes(include=np.number)\
.columns\
.tolist()
for c in num_columns:
validate(res[c], requirement=_check_scaled)
Code explanation:
- test_get_target() tests if the resulting data frames of the get_target() function, having received the samples from stroke_df_gen (hypothesis defaults to 100 samples), conforms to column types str and float (note that this indirectly tests the number of columns, as it expects exactly 1 string column and 1 float, in that order) and if the target variable is positive (stroke mortality by 100,000 population should always be a positive number).
- test_get_metrics() tests if get_metrics(), after receiving the samples from eqi_df_gen, returns data frames with 1 column with str type followed by 2 columns with float type. It also checks if the float columns contain missing data (as that can significantly harm the effectiveness of a machine learning model).
- test_get_df() tests if get_df(), receiving samples from both stroke_df_gen and eqi_df_gen samples, returns data frames with 1 column with type str followed by 3 with type float and if each numerical column is scaled between 0 and 1 (important since many machine learning algorithms are sensitive to variables scales).
Test results
Running the tests shows that all functions fail to meet the requirements in certain scenarios. In other words, the property-based tests succeeded in finding edge cases that break the code.
Analyzing the tests output makes it easier to identify what needs to be fixed/addressed in the code.
Tests summary:
============================================================ short test summary info ============================================================
FAILED tests/test_data_cleaning.py::test_get_target - datatest.ValidationError: does not satisfy <lambda> (1 difference): [
FAILED tests/test_data_cleaning.py::test_get_metrics - datatest.ValidationError: does not satisfy _not_nan() (1 difference): [
FAILED tests/test_data_cleaning.py::test_get_df - exceptiongroup.ExceptionGroup: Hypothesis found 3 distinct failures. (3 sub-exceptions)
-
test_get_target() fails due to the
lambda x: x >= 0
validation. This happens because get_target() does not handle cases where the value is not a positive number. When building datasets, input errors may occur that lead to nonsensical values (like in this case, where a positive value is expected but a negative one is generated) or missing values. As it stands, get_target() cannot handle these cases. - test_get_metrics() fails due to the _not_nan() validation. This means get_metrics() does not handle missing values in its current form. This is critical, as the presence of missing values is a common occurrence on this dataset.
- test_get_df() fails in 3 instances, but the test summary does not provide enough detail to understand why. To understand the reason behind these failures, we need to examine the test output more closely.
The output for test_get_df() contains the following error:
| datatest.ValidationError: does not satisfy _check_scaled() (1 difference): [
| Invalid(nan),
| ]
| Falsifying example: test_get_df(
| strokes=
| state target garbage1 garbage2 garbage3
| 0 0 0.0 0 0.0
| ,
| eqi=
| state metric1 metric2 garbage1 garbage2 garbage3
| 0 0 0.0 NaN 0 0.0
| ,
| )
The original function uses MinMaxScaler() from the scikit-learn library to do the scaling on the metrics. This function returns a missing value (NaN) if one is inputted, causing the issue (NaN is not a number between 0 and 1). This error gets propagated from get_target() and get_metrics(). Correcting these functions should be enough to fix this error.
This was the second error found:
| ValueError: Input X contains infinity or a value too large for dtype('float64').
| Falsifying example: test_get_df(
| strokes=
| state target garbage1 garbage2 garbage3
| 0 0 0.0 0 0.0
| ,
| eqi=
| state metric1 metric2 garbage1 garbage2 garbage3
| 0 0 0.0 inf 0 0.0
| ,
| )
This indicates that get_df() cannot handle numbers that exceed the representable range of the float64 data type. While these extreme values are uncommon, data wrangling operations can sometimes produce them, making it advisable to address the issue.
The third error:
| ValueError: Found array with 0 sample(s) (shape=(0, 3)) while a minimum of 1 is required by MinMaxScaler.
| Falsifying example: test_get_df(
| strokes=
| Empty DataFrame
| Columns: [state, target, garbage1, garbage2, garbage3]
| Index: []
| ,
| eqi=
| Empty DataFrame
| Columns: [state, metric1, metric2, garbage1, garbage2, garbage3]
| Index: []
| , # or any other generated value
| )
This shows that get_df() cannot handle empty data frames, which can occur due to issues during data collection/extraction or because of filtering operations.
Fixing the functions
Strategies:
- get_target(): negative values should be replaced with 0 (should not count towards the sum total).
- get_metrics(): U.S. states missing a metric should use the country average in its place. If the metric is missing for all states replace values with 0.
-
get_df():
- 1st error: should be fixed by addressing the previous functions errors.
- 2nd error: this could be addressed in the previous functions, specifically get_target() and get_metrics(), so that they do not include extreme values (for instance, infinity) in their calculations.
- 3rd error: the function should throw an error if the input data frames are empty. A test to check if it properly throws the error in this case should also be written.
Corrected functions:
def get_target(df, location_column, target_column):
aux = df[[location_column, target_column]]
aux[target_column] = aux[target_column]\
.apply(lambda x: x if 0 <= x < 3e8 else .0)
return aux\
.groupby(location_column)\
.sum()\
.reset_index()
def get_metrics(df, location_column, metrics_columns):
aux = df[[location_column] + metrics_columns]
aux[metrics_columns] = aux[metrics_columns]\
.applymap(lambda x: x if -3e8 < x < 3e8 else np.NaN)
aux = aux\
.groupby(location_column)\
.mean()\
.reset_index()
aux[metrics_columns] = aux[metrics_columns]\
.fillna(aux[metrics_columns].mean())\
.fillna(.0)
return aux
def _scale_columns(df):
aux = df.copy()
scaler = MinMaxScaler()
columns = aux.select_dtypes(include=np.number)\
.columns\
.tolist()
aux[columns] = scaler.fit_transform(aux[columns])
return aux
def get_df(target_df,
metrics_df,
target_location_column,
metrics_location_column):
if target_df.empty or metrics_df.empty:
raise Exception("One or more empty DataFrames")
aux = pd.merge(metrics_df,
target_df,
left_on=metrics_location_column,
right_on=target_location_column,
how="inner")
if target_location_column != metrics_location_column:
aux.drop(target_location_column, axis=1, inplace=True)
return _scale_columns(aux)
And the adjustment for test_get_df():
@settings(deadline=None)
@given(stroke_df_gen, eqi_df_gen)
def test_get_df(strokes, eqi):
strokes = get_target(strokes, "state", "target")
eqi = get_metrics(eqi, "state", ["metric1", "metric2"])
if strokes.empty or eqi.empty:
with pytest.raises(Exception) as excinfo:
get_df(strokes, eqi, "state", "state")
assert str(excinfo.value) == "One or more empty DataFrames"
else:
res = get_df(strokes, eqi, "state", "state")
validate(res, requirement=(str, float, float, float))
num_columns = res.select_dtypes(include=np.number)\
.columns\
.tolist()
for c in num_columns:
validate(res[c], requirement=_check_scaled)
With these changes, the code passes the tests.
Conclusion
Property-based testing for data wrangling functions has significant advantages in ensuring the robustness and reliability of data pipelines. By automatically generating a wide range of test cases with edge combinations, property-based testing goes beyond the limitations of traditional testing methods. It uncovers unexpected scenarios and potential issues that might be missed by hand-written tests focused on specific use cases.
The provided example showcased how property-based testing identified critical shortcomings in data wrangling functions, such as handling missing values, extreme numbers, and empty data frames. Addressing these issues proactively prevents errors from propagating downstream in the data pipeline, leading to more reliable and trustworthy results.
Property-based testing is a valuable tool for data scientists, engineers and analysts by streamlining the testing process and enhancing the quality of data wrangling functions. Its ability to generate a diverse set of test cases and uncover hidden issues makes it a powerful ally in building robust and reliable data pipelines.
Furthermore, the potential integration of property-based testing with continuous integration/continuous delivery (CI/CD) pipelines can further automate the testing process and ensure consistent code quality throughout the development lifecycle. As data wrangling becomes increasingly complex, property-based testing will undoubtedly play a crucial role in maintaining data integrity and delivering reliable results for data-driven projects.
Top comments (0)