The Medallion Architecture has become the de facto standard for organizing data in a lakehouse. If you've worked with Databricks, Delta Lake, or any modern data platform, you've encountered the Bronze-Silver-Gold pattern. But most tutorials stop at the theory.
This guide goes further. We'll build a complete, production-ready Medallion Architecture implementation in Databricks with PySpark — including schema enforcement, data quality gates, incremental processing, and metadata tracking.
What Is the Medallion Architecture?
The Medallion Architecture is a data design pattern that organizes your lakehouse into three logical layers:
- Bronze (Raw) — Ingests data as-is from source systems. Append-only, schema-on-read. Your insurance policy.
- Silver (Cleaned) — Deduplicates, validates, and conforms data. Schema-on-write with enforced types.
- Gold (Business) — Aggregated, business-level datasets optimized for analytics, ML, and reporting.
Each layer serves a distinct purpose, and data flows progressively from raw to refined.
Why This Pattern Works
- Debuggability — When something breaks in Gold, you trace back through Silver to Bronze. Raw data is never lost.
- Reprocessing — Schema changes or business logic updates? Replay from Bronze without re-ingesting from source.
- Separation of concerns — Ingestion engineers own Bronze, data engineers own Silver, analytics engineers own Gold.
- Data quality — Each layer transition is an explicit checkpoint where you validate and enforce quality.
Setting Up the Foundation
Before writing pipeline code, you need a solid project structure. Here's what a production Databricks workspace looks like:
lakehouse/
├── bronze/
│ ├── ingest_orders.py
│ ├── ingest_customers.py
│ └── ingest_events.py
├── silver/
│ ├── clean_orders.py
│ ├── clean_customers.py
│ └── clean_events.py
├── gold/
│ ├── agg_daily_revenue.py
│ ├── agg_customer_lifetime.py
│ └── dim_product_catalog.py
├── lib/
│ ├── quality_checks.py
│ ├── schema_registry.py
│ └── metadata_tracker.py
├── tests/
│ ├── test_bronze_orders.py
│ └── test_silver_orders.py
└── config/
├── schemas.yaml
└── pipeline_config.yaml
Catalog and Schema Setup with Unity Catalog
-- Create catalogs for each environment
CREATE CATALOG IF NOT EXISTS prod;
CREATE CATALOG IF NOT EXISTS staging;
-- Create schemas for each medallion layer
CREATE SCHEMA IF NOT EXISTS prod.bronze;
CREATE SCHEMA IF NOT EXISTS prod.silver;
CREATE SCHEMA IF NOT EXISTS prod.gold;
-- Set default permissions
GRANT USE CATALOG ON CATALOG prod TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.bronze TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.silver TO `data-engineers`;
GRANT SELECT ON SCHEMA prod.gold TO `data-analysts`;
Bronze Layer: Raw Ingestion
The Bronze layer is your data landing zone. The rules are simple: ingest everything, lose nothing, track metadata.
Core Bronze Ingestion Pattern
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
current_timestamp, input_file_name, lit, col
)
from pyspark.sql.types import StructType, StringType
from delta.tables import DeltaTable
class BronzeIngester:
"""Production Bronze layer ingestion with metadata tracking."""
def __init__(self, spark: SparkSession, source_name: str):
self.spark = spark
self.source_name = source_name
self.target_table = f"prod.bronze.{source_name}"
def ingest_from_landing(
self,
landing_path: str,
file_format: str = "json",
schema: StructType | None = None,
) -> DataFrame:
"""Ingest raw files from landing zone into Bronze."""
reader = self.spark.read.format(file_format)
if schema:
reader = reader.schema(schema)
else:
# For Bronze, infer schema — we'll enforce in Silver
reader = reader.option("inferSchema", "true")
if file_format == "json":
reader = reader.option("multiLine", "true") \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record")
raw_df = reader.load(landing_path)
# Add Bronze metadata columns
enriched_df = raw_df \
.withColumn("_bronze_ingested_at", current_timestamp()) \
.withColumn("_bronze_source_file", input_file_name()) \
.withColumn("_bronze_source_system", lit(self.source_name)) \
.withColumn("_bronze_batch_id", lit(self._generate_batch_id()))
return enriched_df
def write_bronze(self, df: DataFrame, mode: str = "append") -> int:
"""Write DataFrame to Bronze Delta table."""
df.write \
.format("delta") \
.mode(mode) \
.option("mergeSchema", "true") \
.saveAsTable(self.target_table)
row_count = df.count()
self._log_ingestion(row_count)
return row_count
def ingest_incremental(
self, landing_path: str, file_format: str = "json"
) -> int:
"""Auto-Loader based incremental ingestion."""
stream_df = self.spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", file_format) \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaLocation",
f"/mnt/checkpoints/{self.source_name}/schema") \
.load(landing_path)
enriched_df = stream_df \
.withColumn("_bronze_ingested_at", current_timestamp()) \
.withColumn("_bronze_source_file", input_file_name()) \
.withColumn("_bronze_source_system", lit(self.source_name))
query = enriched_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation",
f"/mnt/checkpoints/{self.source_name}/checkpoint") \
.option("mergeSchema", "true") \
.trigger(availableNow=True) \
.toTable(self.target_table)
query.awaitTermination()
return query.lastProgress["numInputRows"]
def _generate_batch_id(self) -> str:
from datetime import datetime
return f"{self.source_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
def _log_ingestion(self, row_count: int):
print(f"[Bronze] {self.source_name}: ingested {row_count} rows")
Running Bronze Ingestion
# In a Databricks notebook or job
ingester = BronzeIngester(spark, "orders")
# Batch ingestion
df = ingester.ingest_from_landing("/mnt/landing/orders/2026-03-10/")
count = ingester.write_bronze(df)
print(f"Ingested {count} order records to Bronze")
# Or use Auto Loader for incremental
count = ingester.ingest_incremental("/mnt/landing/orders/")
Silver Layer: Clean and Conform
Silver is where the real engineering happens. You deduplicate, enforce schemas, validate data quality, and create a "single source of truth."
Data Quality Framework
from dataclasses import dataclass
from typing import Callable
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when, sum as spark_sum
@dataclass
class QualityRule:
name: str
check: Callable[[DataFrame], DataFrame]
threshold: float # Minimum pass rate (0.0 to 1.0)
is_critical: bool = False # Critical = halt pipeline on failure
class DataQualityGate:
"""Validates data between medallion layers."""
def __init__(self, table_name: str, rules: list[QualityRule]):
self.table_name = table_name
self.rules = rules
self.results: list[dict] = []
def validate(self, df: DataFrame) -> tuple[bool, DataFrame]:
"""Run all quality checks. Returns (passed, quarantined_df)."""
total_rows = df.count()
quarantine_condition = None
all_passed = True
for rule in self.rules:
# Apply the check — returns DF with boolean column
checked_df = rule.check(df)
pass_count = checked_df.filter(
col("_quality_passed")
).count()
pass_rate = pass_count / total_rows if total_rows > 0 else 1.0
passed = pass_rate >= rule.threshold
result = {
"rule": rule.name,
"pass_rate": round(pass_rate, 4),
"threshold": rule.threshold,
"passed": passed,
"critical": rule.is_critical,
"failed_rows": total_rows - pass_count,
}
self.results.append(result)
if not passed:
all_passed = False
if rule.is_critical:
raise DataQualityError(
f"Critical quality check failed: {rule.name} "
f"(pass_rate={pass_rate:.2%}, "
f"threshold={rule.threshold:.2%})"
)
# Build quarantine filter
fail_condition = ~col("_quality_passed")
if quarantine_condition is None:
quarantine_condition = fail_condition
else:
quarantine_condition = quarantine_condition | fail_condition
df = checked_df.drop("_quality_passed")
# Separate clean and quarantined records
if quarantine_condition is not None:
quarantined_df = df.filter(quarantine_condition)
else:
quarantined_df = df.limit(0)
return all_passed, quarantined_df
def print_report(self):
print(f"\n{'='*60}")
print(f"Data Quality Report: {self.table_name}")
print(f"{'='*60}")
for r in self.results:
status = "PASS" if r["passed"] else "FAIL"
print(
f" [{status}] {r['rule']}: "
f"{r['pass_rate']:.2%} "
f"(threshold: {r['threshold']:.2%}, "
f"failed: {r['failed_rows']})"
)
print(f"{'='*60}\n")
class DataQualityError(Exception):
pass
Silver Transformation Pipeline
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
col, trim, lower, to_timestamp, row_number, sha2, concat_ws
)
from pyspark.sql.window import Window
class SilverTransformer:
"""Production Silver layer transformation pipeline."""
def __init__(self, spark: SparkSession, entity_name: str):
self.spark = spark
self.entity_name = entity_name
self.source_table = f"prod.bronze.{entity_name}"
self.target_table = f"prod.silver.{entity_name}"
def read_bronze_incremental(
self, last_processed_ts: str | None = None
) -> DataFrame:
"""Read new records from Bronze since last processing."""
df = self.spark.table(self.source_table)
if last_processed_ts:
df = df.filter(
col("_bronze_ingested_at") > last_processed_ts
)
return df.filter(col("_corrupt_record").isNull()) \
.drop("_corrupt_record")
def deduplicate(
self, df: DataFrame, key_columns: list[str],
order_column: str = "_bronze_ingested_at"
) -> DataFrame:
"""Deduplicate using key columns, keeping latest record."""
window = Window.partitionBy(
*key_columns
).orderBy(col(order_column).desc())
return df.withColumn("_row_num", row_number().over(window)) \
.filter(col("_row_num") == 1) \
.drop("_row_num")
def apply_schema(
self, df: DataFrame, column_types: dict[str, str]
) -> DataFrame:
"""Cast columns to target types with error handling."""
for col_name, target_type in column_types.items():
if col_name in df.columns:
df = df.withColumn(col_name, col(col_name).cast(target_type))
return df
def add_silver_metadata(self, df: DataFrame) -> DataFrame:
"""Add Silver-layer tracking columns."""
from pyspark.sql.functions import current_timestamp, lit
key_cols = [c for c in df.columns if not c.startswith("_")]
return df \
.withColumn("_silver_processed_at", current_timestamp()) \
.withColumn(
"_silver_hash",
sha2(concat_ws("||", *[col(c) for c in key_cols]), 256)
)
def merge_into_silver(
self, source_df: DataFrame, key_columns: list[str]
):
"""SCD Type 1 merge into Silver table."""
if not self.spark.catalog.tableExists(self.target_table):
source_df.write.format("delta").saveAsTable(self.target_table)
return
target = DeltaTable.forName(self.spark, self.target_table)
merge_condition = " AND ".join(
[f"target.{k} = source.{k}" for k in key_columns]
)
target.alias("target") \
.merge(source_df.alias("source"), merge_condition) \
.whenMatchedUpdateAll(
condition="source._silver_hash != target._silver_hash"
) \
.whenNotMatchedInsertAll() \
.execute()
Putting Silver Together
# Define quality rules for orders
quality_rules = [
QualityRule(
name="order_id_not_null",
check=lambda df: df.withColumn(
"_quality_passed", col("order_id").isNotNull()
),
threshold=1.0,
is_critical=True,
),
QualityRule(
name="order_amount_positive",
check=lambda df: df.withColumn(
"_quality_passed", col("total_amount") > 0
),
threshold=0.99,
),
QualityRule(
name="valid_order_date",
check=lambda df: df.withColumn(
"_quality_passed",
col("order_date").between("2020-01-01", "2027-12-31")
),
threshold=0.995,
),
]
# Run the Silver pipeline
transformer = SilverTransformer(spark, "orders")
bronze_df = transformer.read_bronze_incremental()
deduped_df = transformer.deduplicate(bronze_df, ["order_id"])
# Quality gate
gate = DataQualityGate("silver.orders", quality_rules)
passed, quarantined = gate.validate(deduped_df)
gate.print_report()
# Apply schema and write
typed_df = transformer.apply_schema(deduped_df, {
"order_id": "long",
"total_amount": "decimal(18,2)",
"order_date": "date",
"customer_id": "long",
})
enriched_df = transformer.add_silver_metadata(typed_df)
transformer.merge_into_silver(enriched_df, ["order_id"])
Gold Layer: Business-Ready Datasets
Gold tables are pre-aggregated datasets optimized for specific consumption patterns: dashboards, ML features, or API serving.
Gold Aggregation Example
from pyspark.sql.functions import (
sum as spark_sum, count, avg, max as spark_max,
min as spark_min, datediff, current_date
)
def build_customer_lifetime_value(spark: SparkSession) -> DataFrame:
"""Gold table: Customer Lifetime Value metrics."""
orders = spark.table("prod.silver.orders")
customers = spark.table("prod.silver.customers")
clv = orders.groupBy("customer_id").agg(
spark_sum("total_amount").alias("total_revenue"),
count("order_id").alias("total_orders"),
avg("total_amount").alias("avg_order_value"),
spark_min("order_date").alias("first_order_date"),
spark_max("order_date").alias("last_order_date"),
).withColumn(
"customer_tenure_days",
datediff(current_date(), col("first_order_date"))
).withColumn(
"days_since_last_order",
datediff(current_date(), col("last_order_date"))
)
# Join with customer details
result = clv.join(
customers.select("customer_id", "customer_name", "segment", "region"),
on="customer_id",
how="left"
)
# Write as Gold table with Z-ORDER for query optimization
result.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("prod.gold.customer_lifetime_value")
# Optimize for query performance
spark.sql("""
OPTIMIZE prod.gold.customer_lifetime_value
ZORDER BY (segment, region)
""")
return result
Daily Revenue Gold Table
def build_daily_revenue(spark: SparkSession) -> DataFrame:
"""Gold table: Daily revenue aggregations."""
orders = spark.table("prod.silver.orders")
products = spark.table("prod.silver.products")
daily = orders \
.join(products, on="product_id", how="left") \
.groupBy("order_date", "product_category", "region") \
.agg(
spark_sum("total_amount").alias("revenue"),
count("order_id").alias("order_count"),
avg("total_amount").alias("avg_order_value"),
spark_sum("quantity").alias("units_sold"),
)
daily.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.saveAsTable("prod.gold.daily_revenue")
return daily
Production Considerations
Table Maintenance Automation
def maintain_delta_tables(spark: SparkSession, catalog: str = "prod"):
"""Run maintenance on all Delta tables in the catalog."""
schemas = ["bronze", "silver", "gold"]
for schema in schemas:
tables = spark.sql(
f"SHOW TABLES IN {catalog}.{schema}"
).collect()
for table in tables:
table_name = f"{catalog}.{schema}.{table.tableName}"
print(f"Maintaining {table_name}...")
# Optimize small files
spark.sql(f"OPTIMIZE {table_name}")
# Vacuum old versions (retain 7 days)
spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")
# Analyze for query optimization
spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
Monitoring and Lineage
def get_table_health(spark: SparkSession, table_name: str) -> dict:
"""Get health metrics for a Delta table."""
detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
history = spark.sql(
f"DESCRIBE HISTORY {table_name} LIMIT 10"
).collect()
return {
"table": table_name,
"size_gb": round(detail.sizeInBytes / (1024**3), 2),
"num_files": detail.numFiles,
"last_modified": str(detail.lastModified),
"recent_operations": [
{
"version": h.version,
"operation": h.operation,
"timestamp": str(h.timestamp),
}
for h in history
],
}
Common Pitfalls and How to Avoid Them
1. Schema drift in Bronze — Use Auto Loader's mergeSchema option and track schema evolution. Don't let unknown columns silently appear in Silver.
2. Over-aggregating in Gold — Build Gold tables at the right granularity. Too aggregated = inflexible. Too granular = just another Silver table.
3. Skipping data quality — Every Bronze-to-Silver transition needs validation. Quarantine bad records instead of dropping them.
4. Not partitioning intelligently — Partition Bronze by ingestion date, Silver by business date, Gold by the most common filter column.
5. Ignoring table maintenance — OPTIMIZE and VACUUM should run on schedule. Small files kill query performance.
Summary
The Medallion Architecture is straightforward in concept but requires disciplined implementation:
| Layer | Purpose | Write Pattern | Quality |
|---|---|---|---|
| Bronze | Raw landing | Append-only | Schema-on-read |
| Silver | Cleaned, deduped | Merge (SCD) | Enforced schema + quality gates |
| Gold | Business aggregations | Overwrite/Merge | Pre-validated from Silver |
The code patterns in this article are extracted from real production implementations running at scale on Databricks.
Take Your Databricks Platform to Production
If you're building a Medallion Architecture and want to accelerate your implementation, check out the Medallion Architecture Accelerator from Datanest Platform Pro. It includes:
- Production-ready Bronze/Silver/Gold pipeline generator
- Metadata-driven pipeline configuration
- Built-in data quality gates
- 20 production-tested Databricks & Azure tools in the complete bundle
The full Datanest Platform Pro collection covers everything from Spark optimization and CI/CD to Unity Catalog migration and monitoring — all built from real enterprise implementations.
Use code LAUNCH40 for 40% off any product, or STUDENT for 50% off if you're a student.
Top comments (0)