PySpark Utils Library
Battle-tested utility functions for PySpark data engineering — transformations, data quality, SCD, schema evolution, logging, dedup, and DataFrame diffing.
Stop rewriting the same PySpark boilerplate on every project. This library gives you the production-ready building blocks that data engineering teams use daily — fully typed, tested, and documented.
What's Inside
| Module | What It Does |
|---|---|
| transformations | 15 reusable DataFrame transforms: column cleaning, casting, flattening, pivoting, hashing |
| data_quality | Chainable DQ validation framework with structured reports and severity levels |
| scd | SCD Type 1 (overwrite) and Type 2 (full history) merge utilities for Delta Lake |
| schema_utils | Schema comparison, evolution, DDL conversion, and compatibility checking |
| logging_utils | Structured pipeline logging with correlation IDs, metrics, and Delta table sink |
| dedup | Window-based, hash-based, and fuzzy deduplication strategies |
| diff | DataFrame comparison with row-level, column-level, and schema diffs |
20 files — every one fully type-hinted, tested, and documented.
Quick Start
1. Install
pip install pyspark-utils-library
Or install from the source directory:
pip install -e .
For fuzzy deduplication support (Levenshtein distance):
pip install pyspark-utils-library[fuzzy]
For development (testing, linting, type checking):
pip install pyspark-utils-library[dev]
2. Import and Use
from pyspark_utils.transformations import clean_column_names, add_metadata_columns
from pyspark_utils.data_quality import DQValidator
from pyspark_utils.scd import scd2_merge
from pyspark_utils.logging_utils import PipelineLogger
# Clean and enrich a raw DataFrame
df = spark.read.table("bronze.raw_orders")
df = clean_column_names(df)
df = add_metadata_columns(df, source="erp_system")
# Validate data quality
report = (
DQValidator(df)
.check_nulls(["order_id", "customer_id"])
.check_unique(["order_id"])
.check_range("amount", min_val=0.0)
.validate()
)
# Log results
logger = PipelineLogger(pipeline_name="orders_silver")
logger.log_dq_report(report)
3. Use in Databricks
Upload the library to your Databricks workspace or install it as a cluster library:
/Repos/<your-user>/pyspark-utils-library/
All modules are compatible with Databricks Runtime 13.x+ and work inside notebooks, jobs, and DLT pipelines.
File Structure
pyspark-utils-library/
│
├── README.md # This file
├── LICENSE # MIT License
├── setup.py # Package configuration
├── pyproject.toml # Build system & tool config
│
├── pyspark_utils/
│ ├── __init__.py # Package init — version, public API exports
│ ├── transformations.py # 15 reusable DataFrame transforms
│ ├── data_quality.py # Chainable DQ validator with reporting
│ ├── scd.py # SCD Type 1 & Type 2 Delta Lake merges
│ ├── schema_utils.py # Schema comparison, evolution, DDL tools
│ ├── logging_utils.py # Structured JSON logging with Delta sink
│ ├── dedup.py # Window, hash, and fuzzy dedup strategies
│ └── diff.py # DataFrame comparison and diff reports
│
├── tests/
│ ├── __init__.py # Test package init
│ ├── conftest.py # SparkSession fixture & sample DataFrames
│ ├── test_transformations.py # 25+ tests across 14 test classes
│ ├── test_data_quality.py # 20+ tests across 9 test classes
│ ├── test_scd.py # 9 tests for SCD1 & SCD2
│ ├── test_schema_utils.py # 20+ tests across 6 test classes
│ ├── test_dedup.py # 24+ tests across 4 test classes
│ └── test_diff.py # 22+ tests across 5 test classes
│
└── examples/
└── usage_examples.py # Databricks notebook-style examples for all modules
Module Deep Dives
transformations — 15 DataFrame Transforms
Reusable, composable functions that each take a DataFrame and return a new DataFrame. Designed to be chained using PySpark's .transform() method.
Column Cleaning
from pyspark_utils.transformations import clean_column_names, rename_columns_bulk
# Normalize all column names to lowercase snake_case
df = clean_column_names(df)
# "Order ID" → "order_id", "customer-name" → "customer_name"
# Bulk rename with a mapping dict
df = rename_columns_bulk(df, {"old_col": "new_col", "amt": "amount"})
Type Casting
from pyspark_utils.transformations import cast_columns, type_cast_dataframe
# Cast specific columns
df = cast_columns(df, {"amount": "double", "quantity": "integer"})
# Auto-cast all columns based on a target schema
df = type_cast_dataframe(df, target_schema)
Struct & Array Operations
from pyspark_utils.transformations import flatten_struct, explode_array
# Flatten nested structs into top-level columns (address.city → address_city)
df = flatten_struct(df)
# Explode an array column with position index
df = explode_array(df, array_column="line_items", position_alias="item_index")
Pivoting & Unpivoting
from pyspark_utils.transformations import pivot_aggregate, unpivot
# Pivot: rows → columns
df_pivoted = pivot_aggregate(
df,
group_columns=["customer_id"],
pivot_column="metric_name",
agg_column="metric_value",
agg_function="sum",
)
# Unpivot: columns → rows
df_unpivoted = unpivot(
df,
id_columns=["customer_id"],
value_columns=["revenue", "cost", "profit"],
var_col_name="metric",
val_col_name="value",
)
Metadata & Hashing
from pyspark_utils.transformations import (
add_metadata_columns,
hash_columns,
add_surrogate_key,
)
# Add ETL metadata: _load_timestamp, _source, _filename
df = add_metadata_columns(df, source="salesforce")
# Create a SHA-256 hash column from selected columns
df = hash_columns(df, columns=["name", "email"], output_column="row_hash")
# Add a monotonically increasing surrogate key
df = add_surrogate_key(df, key_column="sk_customer")
String & Null Helpers
from pyspark_utils.transformations import (
trim_strings,
coalesce_columns,
filter_nulls,
standardize_timestamps,
)
# Trim whitespace from all string columns
df = trim_strings(df)
# Coalesce multiple columns into one (first non-null wins)
df = coalesce_columns(df, columns=["phone_mobile", "phone_work", "phone_home"], output="phone")
# Filter rows where any/all specified columns are null
df = filter_nulls(df, columns=["order_id", "amount"], how="any")
# Parse and standardize mixed timestamp formats to a consistent format
df = standardize_timestamps(df, columns=["created_at", "updated_at"])
data_quality — Chainable Validation Framework
A fluent validation API that accumulates rules and produces a structured DQReport with pass/fail/warning counts.
Basic Validation
from pyspark_utils.data_quality import DQValidator
report = (
DQValidator(df)
.check_nulls(["order_id", "customer_id"])
.check_unique(["order_id"])
.check_range("amount", min_val=0.0, max_val=1_000_000.0)
.check_regex("email", r"^[\w.+-]+@[\w-]+\.[\w.]+$")
.validate()
)
print(report)
# DQReport(total=5, passed=4, failed=1, warnings=0, ...)
Referential Integrity
report = (
DQValidator(orders_df)
.check_referential_integrity(
column="customer_id",
reference_df=customers_df,
reference_column="id",
)
.validate()
)
Schema Validation
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
expected_schema = StructType([
StructField("order_id", StringType(), False),
StructField("amount", DoubleType(), True),
])
report = (
DQValidator(df)
.check_schema(expected_schema)
.validate()
)
Custom Rules
report = (
DQValidator(df)
.check_custom(
name="positive_margin",
rule_fn=lambda d: d.filter(F.col("margin") < 0).count() == 0,
description="All margins must be non-negative",
)
.validate()
)
Row Count Validation
report = (
DQValidator(df)
.check_row_count(min_count=1000, max_count=10_000_000)
.validate()
)
Fail-Fast Mode
from pyspark_utils.data_quality import DQValidator
# Raises ValueError immediately on first failure
DQValidator(df).check_nulls(["order_id"]).validate_or_raise()
Working with Reports
report = DQValidator(df).check_nulls(["order_id"]).validate()
# Structured access
print(report.passed) # Number of passed checks
print(report.failed) # Number of failed checks
print(report.is_healthy) # True if zero failures
# Iterate individual results
for result in report.results:
print(f"{result.rule_name}: {result.status} — {result.detail}")
# Serialize to JSON for logging
print(report.to_json())
scd — Slowly Changing Dimensions for Delta Lake
Production-ready SCD implementations that use Delta Lake MERGE operations.
SCD Type 2 (Full History)
Maintains a complete history of changes. Each record gets effective_date, expiry_date, and is_current columns.
from pyspark_utils.scd import scd2_merge
scd2_merge(
spark=spark,
source_df=new_customers,
target_table="gold.dim_customer",
key_columns=["customer_id"],
tracked_columns=["name", "email", "address"],
)
After the merge:
-
New customers are inserted with
is_current=Trueandexpiry_date=9999-12-31 - Changed customers: the old row is expired (closed), a new row is inserted as current
- Unchanged customers are left untouched
Change Detection
Inspect what would change before running the merge:
from pyspark_utils.scd import detect_changes
changes = detect_changes(
source_df=new_data,
target_df=spark.read.table("gold.dim_customer"),
key_columns=["customer_id"],
tracked_columns=["name", "email"],
)
print(f"New records: {changes['inserts'].count()}")
print(f"Changed records: {changes['updates'].count()}")
print(f"Unchanged records: {changes['unchanged'].count()}")
Initialize a History Table
Create an SCD2 table from scratch with the required metadata columns:
from pyspark_utils.scd import create_history_table
create_history_table(
spark=spark,
source_df=initial_load,
target_table="gold.dim_product",
key_columns=["product_id"],
)
Close Expired Records
Manually expire records that match a condition:
from pyspark_utils.scd import close_expired_records
close_expired_records(
spark=spark,
target_table="gold.dim_customer",
key_column_values={"customer_id": ["CUST-001", "CUST-002"]},
)
SCD Type 1 (Overwrite)
Simple upsert — overwrites existing records with new values:
from pyspark_utils.scd import scd1_overwrite
scd1_overwrite(
spark=spark,
source_df=updated_products,
target_table="gold.dim_product",
key_columns=["product_id"],
)
schema_utils — Schema Comparison & Evolution
Tools for handling schema drift and evolution in production pipelines.
Compare Two Schemas
from pyspark_utils.schema_utils import compare_schemas
result = compare_schemas(old_schema, new_schema)
print(result.added) # ["new_column"]
print(result.removed) # ["deprecated_col"]
print(result.type_changed) # {"amount": ("string", "double")}
print(result.is_compatible) # False (breaking changes detected)
Evolve a DataFrame to Match a Target Schema
from pyspark_utils.schema_utils import evolve_schema
# Adds missing columns (as null) and reorders to match target
evolved_df = evolve_schema(df, target_schema, fill_new_with_null=True)
Schema ↔ DDL Conversion
from pyspark_utils.schema_utils import schema_to_ddl, ddl_to_schema
# Convert StructType to DDL string (useful for CREATE TABLE)
ddl_string = schema_to_ddl(df.schema)
# "order_id STRING NOT NULL, amount DOUBLE, created_at TIMESTAMP"
# Parse DDL back to StructType
schema = ddl_to_schema("order_id STRING, amount DOUBLE")
Validate Compatibility
from pyspark_utils.schema_utils import validate_schema_compatibility
is_ok = validate_schema_compatibility(
source_schema=incoming_df.schema,
target_schema=spark.read.table("silver.orders").schema,
)
# Returns True if source can safely write to target
Merge Multiple Schemas
from pyspark_utils.schema_utils import merge_schemas
# Union of all fields from multiple schemas
merged = merge_schemas([schema_a, schema_b, schema_c])
logging_utils — Structured Pipeline Logging
Production logging with correlation IDs, metric tracking, and optional Delta table persistence.
Basic Logging
from pyspark_utils.logging_utils import PipelineLogger
logger = PipelineLogger(pipeline_name="orders_silver")
logger.info("Pipeline started", extra={"batch_date": "2026-01-15"})
logger.warning("Null rate above threshold", extra={"column": "email", "pct": 0.12})
logger.error("Schema mismatch detected")
All log entries include:
- ISO-8601 timestamp
- Pipeline name
- Correlation ID (auto-generated UUID, consistent for the entire pipeline run)
- Structured JSON output compatible with Azure Monitor, ELK, and Splunk
Metric Tracking
logger.track_metric("row_count", 125_000)
logger.track_metric("null_pct_order_id", 0.001)
logger.track_metric("processing_time_seconds", 42.5)
# Retrieve tracked metrics
metrics = logger.get_metrics()
# {"row_count": 125000, "null_pct_order_id": 0.001, ...}
Threshold Alerting
# Alert if a metric exceeds bounds
logger.alert_on_threshold("null_pct_order_id", max_val=0.05)
# No alert — 0.001 < 0.05
logger.track_metric("null_pct_email", 0.12)
logger.alert_on_threshold("null_pct_email", max_val=0.05)
# WARNING: null_pct_email (0.12) exceeds max threshold 0.05
Delta Table Sink
# Persist all logs to a Delta table
logger = PipelineLogger(
pipeline_name="orders_silver",
log_to_delta_table="audit.pipeline_logs",
)
# Logs are buffered and flushed to Delta on close or explicit flush
logger.info("Processing batch")
logger.flush_to_delta() # Explicit flush
Context Manager Support
with PipelineLogger(pipeline_name="orders_silver") as logger:
logger.info("Starting")
# ... pipeline logic ...
logger.info("Done")
# Automatically flushes on exit
dedup — Multiple Deduplication Strategies
Four approaches to deduplication, each suited to different use cases.
Window Dedup (Recommended)
Uses ROW_NUMBER() to keep the preferred record per key group. Best for most use cases.
from pyspark_utils.dedup import window_dedup
deduped = window_dedup(
df,
key_columns=["customer_id"],
order_columns=[("updated_at", "desc")],
keep="first", # Keep the most recent per customer
)
Hash Dedup
Generates a content hash from all (or selected) columns and removes exact duplicates:
from pyspark_utils.dedup import hash_dedup
deduped = hash_dedup(
df,
columns=["name", "email", "phone"], # Hash only these columns
)
Fuzzy Dedup
Approximate matching using Levenshtein edit distance. Useful for deduplicating messy string data like names and addresses. Requires the fuzzy extra.
from pyspark_utils.dedup import fuzzy_dedup
deduped = fuzzy_dedup(
df,
key_column="customer_name",
threshold=2, # Max edit distance to consider a match
group_columns=["city"], # Reduce comparison space
)
Dedup Report
Produce a summary of duplicates without removing them — useful for data profiling:
from pyspark_utils.dedup import dedup_report
report = dedup_report(df, key_columns=["customer_id"])
report.show()
# +-------------+-----+
# | customer_id | count|
# +-------------+-----+
# | CUST-001 | 3 |
# | CUST-042 | 2 |
# +-------------+-----+
diff — DataFrame Comparison
Compare two DataFrames to understand what changed. Essential for pipeline testing, data validation, and regression checks.
Full Row-Level Diff
from pyspark_utils.diff import df_diff
result = df_diff(
left=df_expected,
right=df_actual,
key_columns=["order_id"],
)
result["added"].show() # Rows in actual but not expected
result["removed"].show() # Rows in expected but not actual
result["changed"].show() # Rows with same key but different values
result["unchanged"].show() # Identical rows
summary = result["summary"]
print(summary)
# DiffSummary(left=1000, right=1005, added=10, removed=5, changed=3, unchanged=992)
Column-Level Diff
See which specific columns changed for matching rows:
from pyspark_utils.diff import column_diff
changes = column_diff(
left=df_before,
right=df_after,
key_columns=["customer_id"],
)
changes.show()
# +-------------+--------+----------+-----------+
# | customer_id | column | old_value| new_value |
# +-------------+--------+----------+-----------+
# | CUST-001 | email | old@x.co | new@x.co |
# | CUST-001 | phone | 555-0001 | 555-0099 |
# +-------------+--------+----------+-----------+
Schema Diff
Compare schemas between two DataFrames:
from pyspark_utils.diff import schema_diff
result = schema_diff(df_v1, df_v2)
print(result.added) # Columns added in v2
print(result.removed) # Columns removed in v2
print(result.type_changed) # Columns with type changes
Row Count Comparison
Quick sanity check between pipeline stages:
python
from pyspark_utils.diff import row_count_comparison
*... [content trimmed for length — full version in the complete kit]*
---
> **This is 1 of 6 resources in the [DataStack Pro](https://datanest-stores.pages.dev/data-engineering/) toolkit.** Get the complete [PySpark Utils Library] with all files, templates, and documentation for $29.
>
> **[Get the Full Kit →](https://buy.stripe.com/4gM7sL4azehg9xo4i5gjG2B)**
>
> Or grab the entire DataStack Pro bundle (6 products) for $164 — save 30%.
>
> **[Get the Complete Bundle →](https://buy.stripe.com/fZubJ122r7SS24W01PgjG2G)**
---
## Related Articles
- [Databricks Audit Toolkit: Scheduling Automated Audits](https://dev.to/thesius_code_7a136ae718b7/databricks-audit-toolkit-scheduling-automated-audits-5c4k)
- [Databricks Starter Kit](https://dev.to/thesius_code_7a136ae718b7/databricks-starter-kit-a-comprehensive-guide-2026-449c)
- [Unity Catalog Governance Pack: Encryption Guide for Unity Catalog](https://dev.to/thesius_code_7a136ae718b7/unity-catalog-governance-pack-encryption-guide-for-unity-catalog-358e)
Top comments (0)