DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

PySpark Utils Library

PySpark Utils Library

Battle-tested utility functions for PySpark data engineering — transformations, data quality, SCD, schema evolution, logging, dedup, and DataFrame diffing.

Stop rewriting the same PySpark boilerplate on every project. This library gives you the production-ready building blocks that data engineering teams use daily — fully typed, tested, and documented.


What's Inside

Module What It Does
transformations 15 reusable DataFrame transforms: column cleaning, casting, flattening, pivoting, hashing
data_quality Chainable DQ validation framework with structured reports and severity levels
scd SCD Type 1 (overwrite) and Type 2 (full history) merge utilities for Delta Lake
schema_utils Schema comparison, evolution, DDL conversion, and compatibility checking
logging_utils Structured pipeline logging with correlation IDs, metrics, and Delta table sink
dedup Window-based, hash-based, and fuzzy deduplication strategies
diff DataFrame comparison with row-level, column-level, and schema diffs

20 files — every one fully type-hinted, tested, and documented.


Quick Start

1. Install

pip install pyspark-utils-library
Enter fullscreen mode Exit fullscreen mode

Or install from the source directory:

pip install -e .
Enter fullscreen mode Exit fullscreen mode

For fuzzy deduplication support (Levenshtein distance):

pip install pyspark-utils-library[fuzzy]
Enter fullscreen mode Exit fullscreen mode

For development (testing, linting, type checking):

pip install pyspark-utils-library[dev]
Enter fullscreen mode Exit fullscreen mode

2. Import and Use

from pyspark_utils.transformations import clean_column_names, add_metadata_columns
from pyspark_utils.data_quality import DQValidator
from pyspark_utils.scd import scd2_merge
from pyspark_utils.logging_utils import PipelineLogger

# Clean and enrich a raw DataFrame
df = spark.read.table("bronze.raw_orders")
df = clean_column_names(df)
df = add_metadata_columns(df, source="erp_system")

# Validate data quality
report = (
    DQValidator(df)
    .check_nulls(["order_id", "customer_id"])
    .check_unique(["order_id"])
    .check_range("amount", min_val=0.0)
    .validate()
)

# Log results
logger = PipelineLogger(pipeline_name="orders_silver")
logger.log_dq_report(report)
Enter fullscreen mode Exit fullscreen mode

3. Use in Databricks

Upload the library to your Databricks workspace or install it as a cluster library:

/Repos/<your-user>/pyspark-utils-library/
Enter fullscreen mode Exit fullscreen mode

All modules are compatible with Databricks Runtime 13.x+ and work inside notebooks, jobs, and DLT pipelines.


File Structure

pyspark-utils-library/
│
├── README.md                          # This file
├── LICENSE                            # MIT License
├── setup.py                           # Package configuration
├── pyproject.toml                     # Build system & tool config
│
├── pyspark_utils/
│   ├── __init__.py                    # Package init — version, public API exports
│   ├── transformations.py             # 15 reusable DataFrame transforms
│   ├── data_quality.py                # Chainable DQ validator with reporting
│   ├── scd.py                         # SCD Type 1 & Type 2 Delta Lake merges
│   ├── schema_utils.py                # Schema comparison, evolution, DDL tools
│   ├── logging_utils.py               # Structured JSON logging with Delta sink
│   ├── dedup.py                       # Window, hash, and fuzzy dedup strategies
│   └── diff.py                        # DataFrame comparison and diff reports
│
├── tests/
│   ├── __init__.py                    # Test package init
│   ├── conftest.py                    # SparkSession fixture & sample DataFrames
│   ├── test_transformations.py        # 25+ tests across 14 test classes
│   ├── test_data_quality.py           # 20+ tests across 9 test classes
│   ├── test_scd.py                    # 9 tests for SCD1 & SCD2
│   ├── test_schema_utils.py           # 20+ tests across 6 test classes
│   ├── test_dedup.py                  # 24+ tests across 4 test classes
│   └── test_diff.py                   # 22+ tests across 5 test classes
│
└── examples/
    └── usage_examples.py              # Databricks notebook-style examples for all modules
Enter fullscreen mode Exit fullscreen mode

Module Deep Dives

transformations — 15 DataFrame Transforms

Reusable, composable functions that each take a DataFrame and return a new DataFrame. Designed to be chained using PySpark's .transform() method.

Column Cleaning

from pyspark_utils.transformations import clean_column_names, rename_columns_bulk

# Normalize all column names to lowercase snake_case
df = clean_column_names(df)
# "Order ID" → "order_id", "customer-name" → "customer_name"

# Bulk rename with a mapping dict
df = rename_columns_bulk(df, {"old_col": "new_col", "amt": "amount"})
Enter fullscreen mode Exit fullscreen mode

Type Casting

from pyspark_utils.transformations import cast_columns, type_cast_dataframe

# Cast specific columns
df = cast_columns(df, {"amount": "double", "quantity": "integer"})

# Auto-cast all columns based on a target schema
df = type_cast_dataframe(df, target_schema)
Enter fullscreen mode Exit fullscreen mode

Struct & Array Operations

from pyspark_utils.transformations import flatten_struct, explode_array

# Flatten nested structs into top-level columns (address.city → address_city)
df = flatten_struct(df)

# Explode an array column with position index
df = explode_array(df, array_column="line_items", position_alias="item_index")
Enter fullscreen mode Exit fullscreen mode

Pivoting & Unpivoting

from pyspark_utils.transformations import pivot_aggregate, unpivot

# Pivot: rows → columns
df_pivoted = pivot_aggregate(
    df,
    group_columns=["customer_id"],
    pivot_column="metric_name",
    agg_column="metric_value",
    agg_function="sum",
)

# Unpivot: columns → rows
df_unpivoted = unpivot(
    df,
    id_columns=["customer_id"],
    value_columns=["revenue", "cost", "profit"],
    var_col_name="metric",
    val_col_name="value",
)
Enter fullscreen mode Exit fullscreen mode

Metadata & Hashing

from pyspark_utils.transformations import (
    add_metadata_columns,
    hash_columns,
    add_surrogate_key,
)

# Add ETL metadata: _load_timestamp, _source, _filename
df = add_metadata_columns(df, source="salesforce")

# Create a SHA-256 hash column from selected columns
df = hash_columns(df, columns=["name", "email"], output_column="row_hash")

# Add a monotonically increasing surrogate key
df = add_surrogate_key(df, key_column="sk_customer")
Enter fullscreen mode Exit fullscreen mode

String & Null Helpers

from pyspark_utils.transformations import (
    trim_strings,
    coalesce_columns,
    filter_nulls,
    standardize_timestamps,
)

# Trim whitespace from all string columns
df = trim_strings(df)

# Coalesce multiple columns into one (first non-null wins)
df = coalesce_columns(df, columns=["phone_mobile", "phone_work", "phone_home"], output="phone")

# Filter rows where any/all specified columns are null
df = filter_nulls(df, columns=["order_id", "amount"], how="any")

# Parse and standardize mixed timestamp formats to a consistent format
df = standardize_timestamps(df, columns=["created_at", "updated_at"])
Enter fullscreen mode Exit fullscreen mode

data_quality — Chainable Validation Framework

A fluent validation API that accumulates rules and produces a structured DQReport with pass/fail/warning counts.

Basic Validation

from pyspark_utils.data_quality import DQValidator

report = (
    DQValidator(df)
    .check_nulls(["order_id", "customer_id"])
    .check_unique(["order_id"])
    .check_range("amount", min_val=0.0, max_val=1_000_000.0)
    .check_regex("email", r"^[\w.+-]+@[\w-]+\.[\w.]+$")
    .validate()
)

print(report)
# DQReport(total=5, passed=4, failed=1, warnings=0, ...)
Enter fullscreen mode Exit fullscreen mode

Referential Integrity

report = (
    DQValidator(orders_df)
    .check_referential_integrity(
        column="customer_id",
        reference_df=customers_df,
        reference_column="id",
    )
    .validate()
)
Enter fullscreen mode Exit fullscreen mode

Schema Validation

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

expected_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("amount", DoubleType(), True),
])

report = (
    DQValidator(df)
    .check_schema(expected_schema)
    .validate()
)
Enter fullscreen mode Exit fullscreen mode

Custom Rules

report = (
    DQValidator(df)
    .check_custom(
        name="positive_margin",
        rule_fn=lambda d: d.filter(F.col("margin") < 0).count() == 0,
        description="All margins must be non-negative",
    )
    .validate()
)
Enter fullscreen mode Exit fullscreen mode

Row Count Validation

report = (
    DQValidator(df)
    .check_row_count(min_count=1000, max_count=10_000_000)
    .validate()
)
Enter fullscreen mode Exit fullscreen mode

Fail-Fast Mode

from pyspark_utils.data_quality import DQValidator

# Raises ValueError immediately on first failure
DQValidator(df).check_nulls(["order_id"]).validate_or_raise()
Enter fullscreen mode Exit fullscreen mode

Working with Reports

report = DQValidator(df).check_nulls(["order_id"]).validate()

# Structured access
print(report.passed)      # Number of passed checks
print(report.failed)      # Number of failed checks
print(report.is_healthy)  # True if zero failures

# Iterate individual results
for result in report.results:
    print(f"{result.rule_name}: {result.status}{result.detail}")

# Serialize to JSON for logging
print(report.to_json())
Enter fullscreen mode Exit fullscreen mode

scd — Slowly Changing Dimensions for Delta Lake

Production-ready SCD implementations that use Delta Lake MERGE operations.

SCD Type 2 (Full History)

Maintains a complete history of changes. Each record gets effective_date, expiry_date, and is_current columns.

from pyspark_utils.scd import scd2_merge

scd2_merge(
    spark=spark,
    source_df=new_customers,
    target_table="gold.dim_customer",
    key_columns=["customer_id"],
    tracked_columns=["name", "email", "address"],
)
Enter fullscreen mode Exit fullscreen mode

After the merge:

  • New customers are inserted with is_current=True and expiry_date=9999-12-31
  • Changed customers: the old row is expired (closed), a new row is inserted as current
  • Unchanged customers are left untouched

Change Detection

Inspect what would change before running the merge:

from pyspark_utils.scd import detect_changes

changes = detect_changes(
    source_df=new_data,
    target_df=spark.read.table("gold.dim_customer"),
    key_columns=["customer_id"],
    tracked_columns=["name", "email"],
)

print(f"New records: {changes['inserts'].count()}")
print(f"Changed records: {changes['updates'].count()}")
print(f"Unchanged records: {changes['unchanged'].count()}")
Enter fullscreen mode Exit fullscreen mode

Initialize a History Table

Create an SCD2 table from scratch with the required metadata columns:

from pyspark_utils.scd import create_history_table

create_history_table(
    spark=spark,
    source_df=initial_load,
    target_table="gold.dim_product",
    key_columns=["product_id"],
)
Enter fullscreen mode Exit fullscreen mode

Close Expired Records

Manually expire records that match a condition:

from pyspark_utils.scd import close_expired_records

close_expired_records(
    spark=spark,
    target_table="gold.dim_customer",
    key_column_values={"customer_id": ["CUST-001", "CUST-002"]},
)
Enter fullscreen mode Exit fullscreen mode

SCD Type 1 (Overwrite)

Simple upsert — overwrites existing records with new values:

from pyspark_utils.scd import scd1_overwrite

scd1_overwrite(
    spark=spark,
    source_df=updated_products,
    target_table="gold.dim_product",
    key_columns=["product_id"],
)
Enter fullscreen mode Exit fullscreen mode

schema_utils — Schema Comparison & Evolution

Tools for handling schema drift and evolution in production pipelines.

Compare Two Schemas

from pyspark_utils.schema_utils import compare_schemas

result = compare_schemas(old_schema, new_schema)
print(result.added)           # ["new_column"]
print(result.removed)         # ["deprecated_col"]
print(result.type_changed)    # {"amount": ("string", "double")}
print(result.is_compatible)   # False (breaking changes detected)
Enter fullscreen mode Exit fullscreen mode

Evolve a DataFrame to Match a Target Schema

from pyspark_utils.schema_utils import evolve_schema

# Adds missing columns (as null) and reorders to match target
evolved_df = evolve_schema(df, target_schema, fill_new_with_null=True)
Enter fullscreen mode Exit fullscreen mode

Schema ↔ DDL Conversion

from pyspark_utils.schema_utils import schema_to_ddl, ddl_to_schema

# Convert StructType to DDL string (useful for CREATE TABLE)
ddl_string = schema_to_ddl(df.schema)
# "order_id STRING NOT NULL, amount DOUBLE, created_at TIMESTAMP"

# Parse DDL back to StructType
schema = ddl_to_schema("order_id STRING, amount DOUBLE")
Enter fullscreen mode Exit fullscreen mode

Validate Compatibility

from pyspark_utils.schema_utils import validate_schema_compatibility

is_ok = validate_schema_compatibility(
    source_schema=incoming_df.schema,
    target_schema=spark.read.table("silver.orders").schema,
)
# Returns True if source can safely write to target
Enter fullscreen mode Exit fullscreen mode

Merge Multiple Schemas

from pyspark_utils.schema_utils import merge_schemas

# Union of all fields from multiple schemas
merged = merge_schemas([schema_a, schema_b, schema_c])
Enter fullscreen mode Exit fullscreen mode

logging_utils — Structured Pipeline Logging

Production logging with correlation IDs, metric tracking, and optional Delta table persistence.

Basic Logging

from pyspark_utils.logging_utils import PipelineLogger

logger = PipelineLogger(pipeline_name="orders_silver")
logger.info("Pipeline started", extra={"batch_date": "2026-01-15"})
logger.warning("Null rate above threshold", extra={"column": "email", "pct": 0.12})
logger.error("Schema mismatch detected")
Enter fullscreen mode Exit fullscreen mode

All log entries include:

  • ISO-8601 timestamp
  • Pipeline name
  • Correlation ID (auto-generated UUID, consistent for the entire pipeline run)
  • Structured JSON output compatible with Azure Monitor, ELK, and Splunk

Metric Tracking

logger.track_metric("row_count", 125_000)
logger.track_metric("null_pct_order_id", 0.001)
logger.track_metric("processing_time_seconds", 42.5)

# Retrieve tracked metrics
metrics = logger.get_metrics()
# {"row_count": 125000, "null_pct_order_id": 0.001, ...}
Enter fullscreen mode Exit fullscreen mode

Threshold Alerting

# Alert if a metric exceeds bounds
logger.alert_on_threshold("null_pct_order_id", max_val=0.05)
# No alert — 0.001 < 0.05

logger.track_metric("null_pct_email", 0.12)
logger.alert_on_threshold("null_pct_email", max_val=0.05)
# WARNING: null_pct_email (0.12) exceeds max threshold 0.05
Enter fullscreen mode Exit fullscreen mode

Delta Table Sink

# Persist all logs to a Delta table
logger = PipelineLogger(
    pipeline_name="orders_silver",
    log_to_delta_table="audit.pipeline_logs",
)
# Logs are buffered and flushed to Delta on close or explicit flush

logger.info("Processing batch")
logger.flush_to_delta()  # Explicit flush
Enter fullscreen mode Exit fullscreen mode

Context Manager Support

with PipelineLogger(pipeline_name="orders_silver") as logger:
    logger.info("Starting")
    # ... pipeline logic ...
    logger.info("Done")
# Automatically flushes on exit
Enter fullscreen mode Exit fullscreen mode

dedup — Multiple Deduplication Strategies

Four approaches to deduplication, each suited to different use cases.

Window Dedup (Recommended)

Uses ROW_NUMBER() to keep the preferred record per key group. Best for most use cases.

from pyspark_utils.dedup import window_dedup

deduped = window_dedup(
    df,
    key_columns=["customer_id"],
    order_columns=[("updated_at", "desc")],
    keep="first",  # Keep the most recent per customer
)
Enter fullscreen mode Exit fullscreen mode

Hash Dedup

Generates a content hash from all (or selected) columns and removes exact duplicates:

from pyspark_utils.dedup import hash_dedup

deduped = hash_dedup(
    df,
    columns=["name", "email", "phone"],  # Hash only these columns
)
Enter fullscreen mode Exit fullscreen mode

Fuzzy Dedup

Approximate matching using Levenshtein edit distance. Useful for deduplicating messy string data like names and addresses. Requires the fuzzy extra.

from pyspark_utils.dedup import fuzzy_dedup

deduped = fuzzy_dedup(
    df,
    key_column="customer_name",
    threshold=2,  # Max edit distance to consider a match
    group_columns=["city"],  # Reduce comparison space
)
Enter fullscreen mode Exit fullscreen mode

Dedup Report

Produce a summary of duplicates without removing them — useful for data profiling:

from pyspark_utils.dedup import dedup_report

report = dedup_report(df, key_columns=["customer_id"])
report.show()
# +-------------+-----+
# | customer_id | count|
# +-------------+-----+
# | CUST-001    |    3 |
# | CUST-042    |    2 |
# +-------------+-----+
Enter fullscreen mode Exit fullscreen mode

diff — DataFrame Comparison

Compare two DataFrames to understand what changed. Essential for pipeline testing, data validation, and regression checks.

Full Row-Level Diff

from pyspark_utils.diff import df_diff

result = df_diff(
    left=df_expected,
    right=df_actual,
    key_columns=["order_id"],
)

result["added"].show()      # Rows in actual but not expected
result["removed"].show()    # Rows in expected but not actual
result["changed"].show()    # Rows with same key but different values
result["unchanged"].show()  # Identical rows

summary = result["summary"]
print(summary)
# DiffSummary(left=1000, right=1005, added=10, removed=5, changed=3, unchanged=992)
Enter fullscreen mode Exit fullscreen mode

Column-Level Diff

See which specific columns changed for matching rows:

from pyspark_utils.diff import column_diff

changes = column_diff(
    left=df_before,
    right=df_after,
    key_columns=["customer_id"],
)
changes.show()
# +-------------+--------+----------+-----------+
# | customer_id | column | old_value| new_value |
# +-------------+--------+----------+-----------+
# | CUST-001    | email  | old@x.co | new@x.co  |
# | CUST-001    | phone  | 555-0001 | 555-0099  |
# +-------------+--------+----------+-----------+
Enter fullscreen mode Exit fullscreen mode

Schema Diff

Compare schemas between two DataFrames:

from pyspark_utils.diff import schema_diff

result = schema_diff(df_v1, df_v2)
print(result.added)          # Columns added in v2
print(result.removed)        # Columns removed in v2
print(result.type_changed)   # Columns with type changes
Enter fullscreen mode Exit fullscreen mode

Row Count Comparison

Quick sanity check between pipeline stages:


python
from pyspark_utils.diff import row_count_comparison

*... [content trimmed for length — full version in the complete kit]*

---

> **This is 1 of 6 resources in the [DataStack Pro](https://datanest-stores.pages.dev/data-engineering/) toolkit.** Get the complete [PySpark Utils Library] with all files, templates, and documentation for $29.
>
> **[Get the Full Kit →](https://buy.stripe.com/4gM7sL4azehg9xo4i5gjG2B)**
>
> Or grab the entire DataStack Pro bundle (6 products) for $164 — save 30%.
>
> **[Get the Complete Bundle →](https://buy.stripe.com/fZubJ122r7SS24W01PgjG2G)**


---

## Related Articles

- [Databricks Audit Toolkit: Scheduling Automated Audits](https://dev.to/thesius_code_7a136ae718b7/databricks-audit-toolkit-scheduling-automated-audits-5c4k)
- [Databricks Starter Kit](https://dev.to/thesius_code_7a136ae718b7/databricks-starter-kit-a-comprehensive-guide-2026-449c)
- [Unity Catalog Governance Pack: Encryption Guide for Unity Catalog](https://dev.to/thesius_code_7a136ae718b7/unity-catalog-governance-pack-encryption-guide-for-unity-catalog-358e)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)