DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

Data Pipeline Testing Kit

Data Pipeline Testing Kit

Comprehensive testing framework for PySpark data pipelines — from unit tests to integration validation.

By Datanest Digital | Version 1.0.0 | $39


What You Get

A complete testing toolkit for data pipelines running on Databricks and PySpark, including:

  • Test Framework — base classes and runners for PySpark unit/integration tests
  • Data Generators — realistic synthetic data factories for customers, orders, events
  • Custom Assertions — DataFrame-level assertions for schema, row count, nulls, uniqueness
  • Mock Utilities — helpers for mocking spark, dbutils, Delta tables, and external APIs
  • Snapshot Testing — golden-file comparison for pipeline output validation
  • Sample Fixtures — ready-to-use JSON test data (customers, orders, expected outputs)
  • Pipeline Tests — complete examples testing bronze, silver, and gold layers

File Tree

data-pipeline-testing/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│   ├── test_framework.py        # Base test classes and PySpark test runner
│   ├── data_generators.py       # Synthetic data factories
│   ├── assertions.py            # DataFrame assertion library
│   ├── mock_utils.py            # Spark/dbutils/Delta mocking helpers
│   └── snapshot_testing.py      # Golden-file snapshot comparison
├── fixtures/
│   ├── sample_customers.json    # 50 customer records
│   ├── sample_orders.json       # 100 order records
│   └── expected_outputs/
│       └── customer_summary.json
├── tests/
│   ├── conftest.py              # Shared pytest fixtures with SparkSession
│   ├── test_bronze_pipeline.py  # Bronze layer ingestion tests
│   ├── test_silver_pipeline.py  # Silver layer transformation tests
│   └── test_gold_pipeline.py    # Gold layer aggregation tests
├── configs/
│   └── test_config.yaml         # Test environment configuration
└── guides/
    └── testing-data-pipelines.md
Enter fullscreen mode Exit fullscreen mode

Getting Started

1. Install Dependencies

pip install pyspark delta-spark pytest pyyaml
Enter fullscreen mode Exit fullscreen mode

2. Use the Test Framework

from test_framework import SparkTestCase

class TestMyPipeline(SparkTestCase):
    def test_ingestion(self):
        # Create test data
        df = self.create_dataframe(
            [("Alice", 100), ("Bob", 200)],
            schema=["name", "amount"]
        )
        # Run your pipeline logic
        result = my_transform(df)
        # Assert results
        self.assert_row_count(result, 2)
        self.assert_no_nulls(result, ["name", "amount"])
Enter fullscreen mode Exit fullscreen mode

3. Generate Test Data

from data_generators import CustomerGenerator, OrderGenerator

customers = CustomerGenerator(seed=42).generate(count=1000)
orders = OrderGenerator(seed=42).generate(
    count=5000,
    customer_ids=customers.select("customer_id")
)
Enter fullscreen mode Exit fullscreen mode

4. Use Custom Assertions

from assertions import DataFrameAssertions

assertions = DataFrameAssertions(spark)
assertions.assert_schema_matches(result_df, expected_schema)
assertions.assert_column_values_in(result_df, "status", ["active", "inactive", "churned"])
assertions.assert_unique(result_df, ["customer_id"])
Enter fullscreen mode Exit fullscreen mode

5. Snapshot Testing

from snapshot_testing import SnapshotTester

tester = SnapshotTester(snapshot_dir="fixtures/expected_outputs")
tester.assert_matches(result_df, "customer_summary")  # Compares to golden file
Enter fullscreen mode Exit fullscreen mode

Architecture

┌──────────────────────────────────────────────────────────┐
│                    Test Runner (pytest)                    │
├──────────────┬──────────────┬──────────────┬─────────────┤
│  test_bronze │  test_silver │  test_gold   │  your tests │
├──────────────┴──────────────┴──────────────┴─────────────┤
│                   Test Framework Layer                     │
│  ┌────────────┐ ┌────────────┐ ┌────────────────────────┐│
│  │ Assertions │ │ Generators │ │ Snapshot Testing       ││
│  └────────────┘ └────────────┘ └────────────────────────┘│
├──────────────────────────────────────────────────────────┤
│                   Mock / Fixture Layer                     │
│  ┌────────────┐ ┌────────────┐ ┌────────────────────────┐│
│  │ Mock Utils │ │ conftest   │ │ JSON Fixtures          ││
│  └────────────┘ └────────────┘ └────────────────────────┘│
├──────────────────────────────────────────────────────────┤
│              PySpark (local) / Delta Spark                 │
└──────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Requirements

  • Python 3.10+
  • PySpark 3.5+
  • delta-spark 3.1+
  • pytest 7+
  • PyYAML 6+
  • Java 11+ (for local Spark)

Related Products


This is 1 of 11 resources in the Data Pipeline Pro toolkit. Get the complete [Data Pipeline Testing Kit] with all files, templates, and documentation for $39.

Get the Full Kit →

Or grab the entire Data Pipeline Pro bundle (11 products) for $169 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)