Every data team eventually hits the same wall: raw data scattered across landing zones, inconsistent transformations bolted on over time, and no clear lineage when something breaks at 2 AM. The Medallion Architecture exists to solve exactly this — and while the Bronze-Silver-Gold concept is simple, most guides skip the production details that actually matter.
This guide goes further. We'll build a complete, production-ready Medallion Architecture implementation in Databricks with PySpark — including schema enforcement, data quality gates, incremental processing, and metadata tracking.
What Is the Medallion Architecture?
The Medallion Architecture is a data design pattern that organizes your lakehouse into three logical layers:
- Bronze (Raw) — Ingests data as-is from source systems. Append-only, schema-on-read. Your insurance policy.
- Silver (Cleaned) — Deduplicates, validates, and conforms data. Schema-on-write with enforced types.
- Gold (Business) — Aggregated, business-level datasets optimized for analytics, ML, and reporting.
Each layer serves a distinct purpose, and data flows progressively from raw to refined.
Why This Pattern Works
- Debuggability — When something breaks in Gold, you trace back through Silver to Bronze. Raw data is never lost.
- Reprocessing — Schema changes or business logic updates? Replay from Bronze without re-ingesting from source.
- Separation of concerns — Ingestion engineers own Bronze, data engineers own Silver, analytics engineers own Gold.
- Data quality — Each layer transition is an explicit checkpoint where you validate and enforce quality.
Setting Up the Foundation
Before writing pipeline code, you need a solid project structure. Here's what a production Databricks workspace looks like:
lakehouse/
├── bronze/
│ ├── ingest_orders.py
│ ├── ingest_customers.py
│ └── ingest_events.py
├── silver/
│ ├── clean_orders.py
│ ├── clean_customers.py
│ └── clean_events.py
├── gold/
│ ├── agg_daily_revenue.py
│ ├── agg_customer_lifetime.py
│ └── dim_product_catalog.py
├── lib/
│ ├── quality_checks.py
│ ├── schema_registry.py
│ └── metadata_tracker.py
├── tests/
│ ├── test_bronze_orders.py
│ └── test_silver_orders.py
└── config/
├── schemas.yaml
└── pipeline_config.yaml
Catalog and Schema Setup with Unity Catalog
-- Create catalogs for each environment
CREATE CATALOG IF NOT EXISTS prod;
CREATE CATALOG IF NOT EXISTS staging;
-- Create schemas for each medallion layer
CREATE SCHEMA IF NOT EXISTS prod.bronze;
CREATE SCHEMA IF NOT EXISTS prod.silver;
CREATE SCHEMA IF NOT EXISTS prod.gold;
-- Set default permissions
GRANT USE CATALOG ON CATALOG prod TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.bronze TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.silver TO `data-engineers`;
GRANT SELECT ON SCHEMA prod.gold TO `data-analysts`;
Bronze Layer: Raw Ingestion
The Bronze layer is your data landing zone. The rules are simple: ingest everything, lose nothing, track metadata.
Core Bronze Ingestion Pattern
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
current_timestamp, input_file_name, lit, col
)
from pyspark.sql.types import StructType, StringType
from delta.tables import DeltaTable
class BronzeIngester:
"""Production Bronze layer ingestion with metadata tracking."""
def __init__(self, spark: SparkSession, source_name: str):
self.spark = spark
self.source_name = source_name
self.target_table = f"prod.bronze.{source_name}"
def ingest_from_landing(
self,
landing_path: str,
file_format: str = "json",
schema: StructType | None = None,
) -> DataFrame:
"""Ingest raw files from landing zone into Bronze."""
reader = self.spark.read.format(file_format)
if schema:
reader = reader.schema(schema)
else:
# For Bronze, infer schema — we'll enforce in Silver
reader = reader.option("inferSchema", "true")
if file_format == "json":
reader = reader.option("multiLine", "true") \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record")
raw_df = reader.load(landing_path)
# Add Bronze metadata columns
enriched_df = raw_df \
.withColumn("_bronze_ingested_at", current_timestamp()) \
.withColumn("_bronze_source_file", input_file_name()) \
.withColumn("_bronze_source_system", lit(self.source_name)) \
.withColumn("_bronze_batch_id", lit(self._generate_batch_id()))
return enriched_df
def write_bronze(self, df: DataFrame, mode: str = "append") -> int:
"""Write DataFrame to Bronze Delta table."""
df.write \
.format("delta") \
.mode(mode) \
.option("mergeSchema", "true") \
.saveAsTable(self.target_table)
row_count = df.count()
self._log_ingestion(row_count)
return row_count
def ingest_incremental(
self, landing_path: str, file_format: str = "json"
) -> int:
"""Auto-Loader based incremental ingestion."""
stream_df = self.spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", file_format) \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaLocation",
f"/mnt/checkpoints/{self.source_name}/schema") \
.load(landing_path)
enriched_df = stream_df \
.withColumn("_bronze_ingested_at", current_timestamp()) \
.withColumn("_bronze_source_file", input_file_name()) \
.withColumn("_bronze_source_system", lit(self.source_name))
query = enriched_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation",
f"/mnt/checkpoints/{self.source_name}/checkpoint") \
.option("mergeSchema", "true") \
.trigger(availableNow=True) \
.toTable(self.target_table)
query.awaitTermination()
return query.lastProgress["numInputRows"]
def _generate_batch_id(self) -> str:
from datetime import datetime
return f"{self.source_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
def _log_ingestion(self, row_count: int):
print(f"[Bronze] {self.source_name}: ingested {row_count} rows")
Running Bronze Ingestion
# In a Databricks notebook or job
ingester = BronzeIngester(spark, "orders")
# Batch ingestion
df = ingester.ingest_from_landing("/mnt/landing/orders/2026-03-10/")
count = ingester.write_bronze(df)
print(f"Ingested {count} order records to Bronze")
# Or use Auto Loader for incremental
count = ingester.ingest_incremental("/mnt/landing/orders/")
Silver Layer: Clean and Conform
Silver is where the real engineering happens. You deduplicate, enforce schemas, validate data quality, and create a "single source of truth."
Data Quality Framework
Before any record crosses into Silver, it passes through a quality gate. This framework lets you define rules declaratively and quarantine failures instead of silently dropping data.
from dataclasses import dataclass
from typing import Callable
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when, sum as spark_sum
@dataclass
class QualityRule:
name: str
check: Callable[[DataFrame], DataFrame]
threshold: float # Minimum pass rate (0.0 to 1.0)
is_critical: bool = False # Critical = halt pipeline on failure
class DataQualityGate:
"""Validates data between medallion layers."""
def __init__(self, table_name: str, rules: list[QualityRule]):
self.table_name = table_name
self.rules = rules
self.results: list[dict] = []
def validate(self, df: DataFrame) -> tuple[bool, DataFrame]:
"""Run all quality checks. Returns (passed, quarantined_df)."""
total_rows = df.count()
quarantine_condition = None
all_passed = True
for rule in self.rules:
# Apply the check — returns DF with boolean column
checked_df = rule.check(df)
pass_count = checked_df.filter(
col("_quality_passed")
).count()
pass_rate = pass_count / total_rows if total_rows > 0 else 1.0
passed = pass_rate >= rule.threshold
result = {
"rule": rule.name,
"pass_rate": round(pass_rate, 4),
"threshold": rule.threshold,
"passed": passed,
"critical": rule.is_critical,
"failed_rows": total_rows - pass_count,
}
self.results.append(result)
if not passed:
all_passed = False
if rule.is_critical:
raise DataQualityError(
f"Critical quality check failed: {rule.name} "
f"(pass_rate={pass_rate:.2%}, "
f"threshold={rule.threshold:.2%})"
)
# Build quarantine filter
fail_condition = ~col("_quality_passed")
if quarantine_condition is None:
quarantine_condition = fail_condition
else:
quarantine_condition = quarantine_condition | fail_condition
df = checked_df.drop("_quality_passed")
# Separate clean and quarantined records
if quarantine_condition is not None:
quarantined_df = df.filter(quarantine_condition)
else:
quarantined_df = df.limit(0)
return all_passed, quarantined_df
def print_report(self):
print(f"\n{'='*60}")
print(f"Data Quality Report: {self.table_name}")
print(f"{'='*60}")
for r in self.results:
status = "PASS" if r["passed"] else "FAIL"
print(
f" [{status}] {r['rule']}: "
f"{r['pass_rate']:.2%} "
f"(threshold: {r['threshold']:.2%}, "
f"failed: {r['failed_rows']})"
)
print(f"{'='*60}\n")
class DataQualityError(Exception):
pass
Silver Transformation Pipeline
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
col, trim, lower, to_timestamp, row_number, sha2, concat_ws
)
from pyspark.sql.window import Window
class SilverTransformer:
"""Production Silver layer transformation pipeline."""
def __init__(self, spark: SparkSession, entity_name: str):
self.spark = spark
self.entity_name = entity_name
self.source_table = f"prod.bronze.{entity_name}"
self.target_table = f"prod.silver.{entity_name}"
def read_bronze_incremental(
self, last_processed_ts: str | None = None
) -> DataFrame:
"""Read new records from Bronze since last processing."""
df = self.spark.table(self.source_table)
if last_processed_ts:
df = df.filter(
col("_bronze_ingested_at") > last_processed_ts
)
return df.filter(col("_corrupt_record").isNull()) \
.drop("_corrupt_record")
def deduplicate(
self, df: DataFrame, key_columns: list[str],
order_column: str = "_bronze_ingested_at"
) -> DataFrame:
"""Deduplicate using key columns, keeping latest record."""
window = Window.partitionBy(
*key_columns
).orderBy(col(order_column).desc())
return df.withColumn("_row_num", row_number().over(window)) \
.filter(col("_row_num") == 1) \
.drop("_row_num")
def apply_schema(
self, df: DataFrame, column_types: dict[str, str]
) -> DataFrame:
"""Cast columns to target types with error handling."""
for col_name, target_type in column_types.items():
if col_name in df.columns:
df = df.withColumn(col_name, col(col_name).cast(target_type))
return df
def add_silver_metadata(self, df: DataFrame) -> DataFrame:
"""Add Silver-layer tracking columns."""
from pyspark.sql.functions import current_timestamp, lit
key_cols = [c for c in df.columns if not c.startswith("_")]
return df \
.withColumn("_silver_processed_at", current_timestamp()) \
.withColumn(
"_silver_hash",
sha2(concat_ws("||", *[col(c) for c in key_cols]), 256)
)
def merge_into_silver(
self, source_df: DataFrame, key_columns: list[str]
):
"""SCD Type 1 merge into Silver table."""
if not self.spark.catalog.tableExists(self.target_table):
source_df.write.format("delta").saveAsTable(self.target_table)
return
target = DeltaTable.forName(self.spark, self.target_table)
merge_condition = " AND ".join(
[f"target.{k} = source.{k}" for k in key_columns]
)
target.alias("target") \
.merge(source_df.alias("source"), merge_condition) \
.whenMatchedUpdateAll(
condition="source._silver_hash != target._silver_hash"
) \
.whenNotMatchedInsertAll() \
.execute()
Putting Silver Together
# Define quality rules for orders
quality_rules = [
QualityRule(
name="order_id_not_null",
check=lambda df: df.withColumn(
"_quality_passed", col("order_id").isNotNull()
),
threshold=1.0,
is_critical=True,
),
QualityRule(
name="order_amount_positive",
check=lambda df: df.withColumn(
"_quality_passed", col("total_amount") > 0
),
threshold=0.99,
),
QualityRule(
name="valid_order_date",
check=lambda df: df.withColumn(
"_quality_passed",
col("order_date").between("2020-01-01", "2027-12-31")
),
threshold=0.995,
),
]
# Run the Silver pipeline
transformer = SilverTransformer(spark, "orders")
bronze_df = transformer.read_bronze_incremental()
deduped_df = transformer.deduplicate(bronze_df, ["order_id"])
# Quality gate
gate = DataQualityGate("silver.orders", quality_rules)
passed, quarantined = gate.validate(deduped_df)
gate.print_report()
# Apply schema and write
typed_df = transformer.apply_schema(deduped_df, {
"order_id": "long",
"total_amount": "decimal(18,2)",
"order_date": "date",
"customer_id": "long",
})
enriched_df = transformer.add_silver_metadata(typed_df)
transformer.merge_into_silver(enriched_df, ["order_id"])
Gold Layer: Business-Ready Datasets
Gold tables are pre-aggregated datasets optimized for specific consumption patterns: dashboards, ML features, or API serving.
Gold Aggregation Example
from pyspark.sql.functions import (
sum as spark_sum, count, avg, max as spark_max,
min as spark_min, datediff, current_date
)
def build_customer_lifetime_value(spark: SparkSession) -> DataFrame:
"""Gold table: Customer Lifetime Value metrics."""
orders = spark.table("prod.silver.orders")
customers = spark.table("prod.silver.customers")
clv = orders.groupBy("customer_id").agg(
spark_sum("total_amount").alias("total_revenue"),
count("order_id").alias("total_orders"),
avg("total_amount").alias("avg_order_value"),
spark_min("order_date").alias("first_order_date"),
spark_max("order_date").alias("last_order_date"),
).withColumn(
"customer_tenure_days",
datediff(current_date(), col("first_order_date"))
).withColumn(
"days_since_last_order",
datediff(current_date(), col("last_order_date"))
)
# Join with customer details
result = clv.join(
customers.select("customer_id", "customer_name", "segment", "region"),
on="customer_id",
how="left"
)
# Write as Gold table with Z-ORDER for query optimization
result.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("prod.gold.customer_lifetime_value")
# Optimize for query performance
spark.sql("""
OPTIMIZE prod.gold.customer_lifetime_value
ZORDER BY (segment, region)
""")
return result
Daily Revenue Gold Table
def build_daily_revenue(spark: SparkSession) -> DataFrame:
"""Gold table: Daily revenue aggregations."""
orders = spark.table("prod.silver.orders")
products = spark.table("prod.silver.products")
daily = orders \
.join(products, on="product_id", how="left") \
.groupBy("order_date", "product_category", "region") \
.agg(
spark_sum("total_amount").alias("revenue"),
count("order_id").alias("order_count"),
avg("total_amount").alias("avg_order_value"),
spark_sum("quantity").alias("units_sold"),
)
daily.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.saveAsTable("prod.gold.daily_revenue")
return daily
Production Considerations
Table Maintenance Automation
Delta tables accumulate small files and old versions over time. Schedule this maintenance to run daily or after large batch loads.
def maintain_delta_tables(spark: SparkSession, catalog: str = "prod"):
"""Run maintenance on all Delta tables in the catalog."""
schemas = ["bronze", "silver", "gold"]
for schema in schemas:
tables = spark.sql(
f"SHOW TABLES IN {catalog}.{schema}"
).collect()
for table in tables:
table_name = f"{catalog}.{schema}.{table.tableName}"
print(f"Maintaining {table_name}...")
# Optimize small files
spark.sql(f"OPTIMIZE {table_name}")
# Vacuum old versions (retain 7 days)
spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")
# Analyze for query optimization
spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
Monitoring and Lineage
def get_table_health(spark: SparkSession, table_name: str) -> dict:
"""Get health metrics for a Delta table."""
detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
history = spark.sql(
f"DESCRIBE HISTORY {table_name} LIMIT 10"
).collect()
return {
"table": table_name,
"size_gb": round(detail.sizeInBytes / (1024**3), 2),
"num_files": detail.numFiles,
"last_modified": str(detail.lastModified),
"recent_operations": [
{
"version": h.version,
"operation": h.operation,
"timestamp": str(h.timestamp),
}
for h in history
],
}
Common Pitfalls and How to Avoid Them
1. Schema drift in Bronze — Use Auto Loader's mergeSchema option and track schema evolution. Don't let unknown columns silently appear in Silver.
2. Over-aggregating in Gold — Build Gold tables at the right granularity. Too aggregated = inflexible. Too granular = just another Silver table.
3. Skipping data quality — Every Bronze-to-Silver transition needs validation. Quarantine bad records instead of dropping them.
4. Not partitioning intelligently — Partition Bronze by ingestion date, Silver by business date, Gold by the most common filter column.
5. Ignoring table maintenance — OPTIMIZE and VACUUM should run on schedule. Small files kill query performance.
Summary
The Medallion Architecture is straightforward in concept but requires disciplined implementation:
| Layer | Purpose | Write Pattern | Quality |
|---|---|---|---|
| Bronze | Raw landing | Append-only | Schema-on-read |
| Silver | Cleaned, deduped | Merge (SCD) | Enforced schema + quality gates |
| Gold | Business aggregations | Overwrite/Merge | Pre-validated from Silver |
The code patterns in this article are extracted from real production implementations running at scale on Databricks. Take them, adapt them to your domain, and build with confidence.
If you found this useful and want production-ready pipeline templates you can drop straight into your Databricks workspace, check out DataStack Pro — it includes a Medallion Architecture accelerator, metadata-driven pipeline configs, and 20+ data engineering tools built from real enterprise implementations.
Top comments (0)