Schema Evolution Toolkit
Detect, validate, and migrate schema changes across Delta Lake tables — safely and automatically.
By Datanest Digital | Version 1.0.0 | $39
What You Get
A complete toolkit for managing schema evolution in Databricks / Delta Lake pipelines:
- Schema Detector — compare live table schemas against expected definitions, detect drift
- Schema Migrator — apply safe migrations (add columns, widen types, rename) with rollback
- Compatibility Checker — verify backward/forward compatibility before deploying changes
- Schema Registry — Delta-table-backed registry for versioned schema definitions
- Schema Validator — validate DataFrames against registered schemas at runtime
- Ready-to-use Notebooks — detect drift and evolve schemas interactively
- Schema Versions — example v1/v2 JSON schema files for a customer table
- Evolution Strategy Guide — comprehensive guide to schema versioning patterns
File Tree
schema-evolution-toolkit/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│ ├── schema_detector.py # Detect schema drift between expected and actual
│ ├── schema_migrator.py # Apply safe migrations with rollback support
│ ├── compatibility_checker.py # Backward/forward compatibility validation
│ ├── schema_registry.py # Delta-table-backed schema version registry
│ └── schema_validator.py # Runtime DataFrame schema validation
├── configs/
│ ├── schema_policy.yaml # Evolution rules and policies per layer
│ └── schemas/
│ ├── v1_customer.json # Version 1 customer schema
│ └── v2_customer.json # Version 2 customer schema (evolved)
├── notebooks/
│ ├── detect_drift.py # Interactive drift detection notebook
│ └── evolve_schema.py # Interactive schema migration notebook
├── tests/
│ ├── conftest.py # Shared pytest fixtures (SparkSession, sample schemas)
│ ├── test_schema_detector.py # Detector unit tests
│ └── test_compatibility.py # Compatibility checker tests
└── guides/
└── schema-evolution-strategy.md # Versioning & compatibility guide
Architecture
┌──────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Upstream │────▶│ Schema Detector │────▶│ Drift Report │
│ Data Source │ └──────────────────┘ └────────┬────────┘
└──────────────┘ │
▼
┌──────────────────┐ ┌─────────────────┐
│ Compatibility │◀────│ Decision: │
│ Checker │ │ Evolve? │
└───────┬──────────┘ └─────────────────┘
│
Compatible? │
┌─────────────┼─────────────┐
▼ YES │ ▼ NO
┌────────────────┐ │ ┌────────────────────┐
│ Schema Migrator│ │ │ Alert / Manual │
│ (apply DDL) │ │ │ Review Required │
└───────┬────────┘ │ └────────────────────┘
│ │
▼ │
┌────────────────┐ │
│ Schema Registry│◀──┘
│ (store version)│
└────────────────┘
Getting Started
1. Detect Schema Drift
from schema_detector import SchemaDetector
detector = SchemaDetector(spark)
# Compare a live table against an expected JSON schema
report = detector.compare_table_to_schema(
table_name="catalog.bronze.customers",
expected_schema_path="/Volumes/schemas/v2_customer.json",
)
print(report.summary())
# SchemaReport: 2 added columns, 1 type change, 0 removed columns
2. Check Compatibility
from compatibility_checker import CompatibilityChecker
checker = CompatibilityChecker()
result = checker.check_backward_compatible(old_schema, new_schema)
if not result.is_compatible:
for issue in result.issues:
print(f" BREAKING: {issue}")
3. Apply a Migration
from schema_migrator import SchemaMigrator
migrator = SchemaMigrator(spark)
migrator.add_columns(
table_name="catalog.silver.customers",
columns={"loyalty_tier": "STRING", "last_login": "TIMESTAMP"},
)
4. Register a Schema Version
from schema_registry import SchemaRegistry
registry = SchemaRegistry(spark, registry_table="catalog.meta.schema_registry")
registry.register(
subject="customers",
version=2,
schema=new_schema,
description="Added loyalty_tier and last_login columns",
)
5. Validate at Runtime
from schema_validator import SchemaValidator
validator = SchemaValidator(spark)
result = validator.validate(df, expected_schema, mode="strict")
if not result.is_valid:
raise ValueError(f"Schema validation failed: {result.errors}")
Requirements
- Python 3.9+
- PySpark 3.4+ / Databricks Runtime 13.3+
- Delta Lake (
delta-spark) - PyYAML (for config loading)
Configuration
See configs/schema_policy.yaml for evolution rules per medallion layer:
layers:
bronze:
compatibility_mode: none
auto_add_nullable_columns: true
silver:
compatibility_mode: backward
allow_type_widening: true
gold:
compatibility_mode: full
blocked_operations: [drop_column, narrow_type, change_nullable]
Running Tests
pip install pyspark pytest delta-spark pyyaml
pytest tests/ -v
Related Products
- Delta Lake Patterns — Delta Lake optimization patterns
- Data Catalog Builder — Build searchable data catalogs
- CDC Replication Toolkit — Change data capture and replication patterns
This is 1 of 11 resources in the Data Pipeline Pro toolkit. Get the complete [Schema Evolution Toolkit] with all files, templates, and documentation for $39.
Or grab the entire Data Pipeline Pro bundle (11 products) for $169 — save 30%.
Top comments (0)