DEV Community

Thesius Code
Thesius Code

Posted on

Medallion Architecture in Databricks: A Complete Implementation Guide

Every data team eventually hits the same wall: raw data scattered across landing zones, inconsistent transformations bolted on over time, and no clear lineage when something breaks at 2 AM. The Medallion Architecture exists to solve exactly this — and while the Bronze-Silver-Gold concept is simple, most guides skip the production details that actually matter.

This guide goes further. We'll build a complete, production-ready Medallion Architecture implementation in Databricks with PySpark — including schema enforcement, data quality gates, incremental processing, and metadata tracking.

What Is the Medallion Architecture?

The Medallion Architecture is a data design pattern that organizes your lakehouse into three logical layers:

  • Bronze (Raw) — Ingests data as-is from source systems. Append-only, schema-on-read. Your insurance policy.
  • Silver (Cleaned) — Deduplicates, validates, and conforms data. Schema-on-write with enforced types.
  • Gold (Business) — Aggregated, business-level datasets optimized for analytics, ML, and reporting.

Each layer serves a distinct purpose, and data flows progressively from raw to refined.

Why This Pattern Works

  1. Debuggability — When something breaks in Gold, you trace back through Silver to Bronze. Raw data is never lost.
  2. Reprocessing — Schema changes or business logic updates? Replay from Bronze without re-ingesting from source.
  3. Separation of concerns — Ingestion engineers own Bronze, data engineers own Silver, analytics engineers own Gold.
  4. Data quality — Each layer transition is an explicit checkpoint where you validate and enforce quality.

Setting Up the Foundation

Before writing pipeline code, you need a solid project structure. Here's what a production Databricks workspace looks like:

lakehouse/
├── bronze/
│   ├── ingest_orders.py
│   ├── ingest_customers.py
│   └── ingest_events.py
├── silver/
│   ├── clean_orders.py
│   ├── clean_customers.py
│   └── clean_events.py
├── gold/
│   ├── agg_daily_revenue.py
│   ├── agg_customer_lifetime.py
│   └── dim_product_catalog.py
├── lib/
│   ├── quality_checks.py
│   ├── schema_registry.py
│   └── metadata_tracker.py
├── tests/
│   ├── test_bronze_orders.py
│   └── test_silver_orders.py
└── config/
    ├── schemas.yaml
    └── pipeline_config.yaml
Enter fullscreen mode Exit fullscreen mode

Catalog and Schema Setup with Unity Catalog

-- Create catalogs for each environment
CREATE CATALOG IF NOT EXISTS prod;
CREATE CATALOG IF NOT EXISTS staging;

-- Create schemas for each medallion layer
CREATE SCHEMA IF NOT EXISTS prod.bronze;
CREATE SCHEMA IF NOT EXISTS prod.silver;
CREATE SCHEMA IF NOT EXISTS prod.gold;

-- Set default permissions
GRANT USE CATALOG ON CATALOG prod TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.bronze TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.silver TO `data-engineers`;
GRANT SELECT ON SCHEMA prod.gold TO `data-analysts`;
Enter fullscreen mode Exit fullscreen mode

Bronze Layer: Raw Ingestion

The Bronze layer is your data landing zone. The rules are simple: ingest everything, lose nothing, track metadata.

Core Bronze Ingestion Pattern

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
    current_timestamp, input_file_name, lit, col
)
from pyspark.sql.types import StructType, StringType
from delta.tables import DeltaTable


class BronzeIngester:
    """Production Bronze layer ingestion with metadata tracking."""

    def __init__(self, spark: SparkSession, source_name: str):
        self.spark = spark
        self.source_name = source_name
        self.target_table = f"prod.bronze.{source_name}"

    def ingest_from_landing(
        self,
        landing_path: str,
        file_format: str = "json",
        schema: StructType | None = None,
    ) -> DataFrame:
        """Ingest raw files from landing zone into Bronze."""

        reader = self.spark.read.format(file_format)

        if schema:
            reader = reader.schema(schema)
        else:
            # For Bronze, infer schema — we'll enforce in Silver
            reader = reader.option("inferSchema", "true")

        if file_format == "json":
            reader = reader.option("multiLine", "true") \
                           .option("mode", "PERMISSIVE") \
                           .option("columnNameOfCorruptRecord", "_corrupt_record")

        raw_df = reader.load(landing_path)

        # Add Bronze metadata columns
        enriched_df = raw_df \
            .withColumn("_bronze_ingested_at", current_timestamp()) \
            .withColumn("_bronze_source_file", input_file_name()) \
            .withColumn("_bronze_source_system", lit(self.source_name)) \
            .withColumn("_bronze_batch_id", lit(self._generate_batch_id()))

        return enriched_df

    def write_bronze(self, df: DataFrame, mode: str = "append") -> int:
        """Write DataFrame to Bronze Delta table."""

        df.write \
            .format("delta") \
            .mode(mode) \
            .option("mergeSchema", "true") \
            .saveAsTable(self.target_table)

        row_count = df.count()
        self._log_ingestion(row_count)
        return row_count

    def ingest_incremental(
        self, landing_path: str, file_format: str = "json"
    ) -> int:
        """Auto-Loader based incremental ingestion."""

        stream_df = self.spark.readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format", file_format) \
            .option("cloudFiles.inferColumnTypes", "true") \
            .option("cloudFiles.schemaLocation",
                    f"/mnt/checkpoints/{self.source_name}/schema") \
            .load(landing_path)

        enriched_df = stream_df \
            .withColumn("_bronze_ingested_at", current_timestamp()) \
            .withColumn("_bronze_source_file", input_file_name()) \
            .withColumn("_bronze_source_system", lit(self.source_name))

        query = enriched_df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation",
                    f"/mnt/checkpoints/{self.source_name}/checkpoint") \
            .option("mergeSchema", "true") \
            .trigger(availableNow=True) \
            .toTable(self.target_table)

        query.awaitTermination()
        return query.lastProgress["numInputRows"]

    def _generate_batch_id(self) -> str:
        from datetime import datetime
        return f"{self.source_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"

    def _log_ingestion(self, row_count: int):
        print(f"[Bronze] {self.source_name}: ingested {row_count} rows")
Enter fullscreen mode Exit fullscreen mode

Running Bronze Ingestion

# In a Databricks notebook or job
ingester = BronzeIngester(spark, "orders")

# Batch ingestion
df = ingester.ingest_from_landing("/mnt/landing/orders/2026-03-10/")
count = ingester.write_bronze(df)
print(f"Ingested {count} order records to Bronze")

# Or use Auto Loader for incremental
count = ingester.ingest_incremental("/mnt/landing/orders/")
Enter fullscreen mode Exit fullscreen mode

Silver Layer: Clean and Conform

Silver is where the real engineering happens. You deduplicate, enforce schemas, validate data quality, and create a "single source of truth."

Data Quality Framework

Before any record crosses into Silver, it passes through a quality gate. This framework lets you define rules declaratively and quarantine failures instead of silently dropping data.

from dataclasses import dataclass
from typing import Callable
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when, sum as spark_sum


@dataclass
class QualityRule:
    name: str
    check: Callable[[DataFrame], DataFrame]
    threshold: float  # Minimum pass rate (0.0 to 1.0)
    is_critical: bool = False  # Critical = halt pipeline on failure


class DataQualityGate:
    """Validates data between medallion layers."""

    def __init__(self, table_name: str, rules: list[QualityRule]):
        self.table_name = table_name
        self.rules = rules
        self.results: list[dict] = []

    def validate(self, df: DataFrame) -> tuple[bool, DataFrame]:
        """Run all quality checks. Returns (passed, quarantined_df)."""

        total_rows = df.count()
        quarantine_condition = None
        all_passed = True

        for rule in self.rules:
            # Apply the check — returns DF with boolean column
            checked_df = rule.check(df)
            pass_count = checked_df.filter(
                col("_quality_passed")
            ).count()
            pass_rate = pass_count / total_rows if total_rows > 0 else 1.0

            passed = pass_rate >= rule.threshold

            result = {
                "rule": rule.name,
                "pass_rate": round(pass_rate, 4),
                "threshold": rule.threshold,
                "passed": passed,
                "critical": rule.is_critical,
                "failed_rows": total_rows - pass_count,
            }
            self.results.append(result)

            if not passed:
                all_passed = False
                if rule.is_critical:
                    raise DataQualityError(
                        f"Critical quality check failed: {rule.name} "
                        f"(pass_rate={pass_rate:.2%}, "
                        f"threshold={rule.threshold:.2%})"
                    )

                # Build quarantine filter
                fail_condition = ~col("_quality_passed")
                if quarantine_condition is None:
                    quarantine_condition = fail_condition
                else:
                    quarantine_condition = quarantine_condition | fail_condition

            df = checked_df.drop("_quality_passed")

        # Separate clean and quarantined records
        if quarantine_condition is not None:
            quarantined_df = df.filter(quarantine_condition)
        else:
            quarantined_df = df.limit(0)

        return all_passed, quarantined_df

    def print_report(self):
        print(f"\n{'='*60}")
        print(f"Data Quality Report: {self.table_name}")
        print(f"{'='*60}")
        for r in self.results:
            status = "PASS" if r["passed"] else "FAIL"
            print(
                f"  [{status}] {r['rule']}: "
                f"{r['pass_rate']:.2%} "
                f"(threshold: {r['threshold']:.2%}, "
                f"failed: {r['failed_rows']})"
            )
        print(f"{'='*60}\n")


class DataQualityError(Exception):
    pass
Enter fullscreen mode Exit fullscreen mode

Silver Transformation Pipeline

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
    col, trim, lower, to_timestamp, row_number, sha2, concat_ws
)
from pyspark.sql.window import Window


class SilverTransformer:
    """Production Silver layer transformation pipeline."""

    def __init__(self, spark: SparkSession, entity_name: str):
        self.spark = spark
        self.entity_name = entity_name
        self.source_table = f"prod.bronze.{entity_name}"
        self.target_table = f"prod.silver.{entity_name}"

    def read_bronze_incremental(
        self, last_processed_ts: str | None = None
    ) -> DataFrame:
        """Read new records from Bronze since last processing."""
        df = self.spark.table(self.source_table)

        if last_processed_ts:
            df = df.filter(
                col("_bronze_ingested_at") > last_processed_ts
            )

        return df.filter(col("_corrupt_record").isNull()) \
                 .drop("_corrupt_record")

    def deduplicate(
        self, df: DataFrame, key_columns: list[str],
        order_column: str = "_bronze_ingested_at"
    ) -> DataFrame:
        """Deduplicate using key columns, keeping latest record."""
        window = Window.partitionBy(
            *key_columns
        ).orderBy(col(order_column).desc())

        return df.withColumn("_row_num", row_number().over(window)) \
                 .filter(col("_row_num") == 1) \
                 .drop("_row_num")

    def apply_schema(
        self, df: DataFrame, column_types: dict[str, str]
    ) -> DataFrame:
        """Cast columns to target types with error handling."""
        for col_name, target_type in column_types.items():
            if col_name in df.columns:
                df = df.withColumn(col_name, col(col_name).cast(target_type))
        return df

    def add_silver_metadata(self, df: DataFrame) -> DataFrame:
        """Add Silver-layer tracking columns."""
        from pyspark.sql.functions import current_timestamp, lit

        key_cols = [c for c in df.columns if not c.startswith("_")]
        return df \
            .withColumn("_silver_processed_at", current_timestamp()) \
            .withColumn(
                "_silver_hash",
                sha2(concat_ws("||", *[col(c) for c in key_cols]), 256)
            )

    def merge_into_silver(
        self, source_df: DataFrame, key_columns: list[str]
    ):
        """SCD Type 1 merge into Silver table."""
        if not self.spark.catalog.tableExists(self.target_table):
            source_df.write.format("delta").saveAsTable(self.target_table)
            return

        target = DeltaTable.forName(self.spark, self.target_table)
        merge_condition = " AND ".join(
            [f"target.{k} = source.{k}" for k in key_columns]
        )

        target.alias("target") \
            .merge(source_df.alias("source"), merge_condition) \
            .whenMatchedUpdateAll(
                condition="source._silver_hash != target._silver_hash"
            ) \
            .whenNotMatchedInsertAll() \
            .execute()
Enter fullscreen mode Exit fullscreen mode

Putting Silver Together

# Define quality rules for orders
quality_rules = [
    QualityRule(
        name="order_id_not_null",
        check=lambda df: df.withColumn(
            "_quality_passed", col("order_id").isNotNull()
        ),
        threshold=1.0,
        is_critical=True,
    ),
    QualityRule(
        name="order_amount_positive",
        check=lambda df: df.withColumn(
            "_quality_passed", col("total_amount") > 0
        ),
        threshold=0.99,
    ),
    QualityRule(
        name="valid_order_date",
        check=lambda df: df.withColumn(
            "_quality_passed",
            col("order_date").between("2020-01-01", "2027-12-31")
        ),
        threshold=0.995,
    ),
]

# Run the Silver pipeline
transformer = SilverTransformer(spark, "orders")
bronze_df = transformer.read_bronze_incremental()
deduped_df = transformer.deduplicate(bronze_df, ["order_id"])

# Quality gate
gate = DataQualityGate("silver.orders", quality_rules)
passed, quarantined = gate.validate(deduped_df)
gate.print_report()

# Apply schema and write
typed_df = transformer.apply_schema(deduped_df, {
    "order_id": "long",
    "total_amount": "decimal(18,2)",
    "order_date": "date",
    "customer_id": "long",
})
enriched_df = transformer.add_silver_metadata(typed_df)
transformer.merge_into_silver(enriched_df, ["order_id"])
Enter fullscreen mode Exit fullscreen mode

Gold Layer: Business-Ready Datasets

Gold tables are pre-aggregated datasets optimized for specific consumption patterns: dashboards, ML features, or API serving.

Gold Aggregation Example

from pyspark.sql.functions import (
    sum as spark_sum, count, avg, max as spark_max,
    min as spark_min, datediff, current_date
)


def build_customer_lifetime_value(spark: SparkSession) -> DataFrame:
    """Gold table: Customer Lifetime Value metrics."""

    orders = spark.table("prod.silver.orders")
    customers = spark.table("prod.silver.customers")

    clv = orders.groupBy("customer_id").agg(
        spark_sum("total_amount").alias("total_revenue"),
        count("order_id").alias("total_orders"),
        avg("total_amount").alias("avg_order_value"),
        spark_min("order_date").alias("first_order_date"),
        spark_max("order_date").alias("last_order_date"),
    ).withColumn(
        "customer_tenure_days",
        datediff(current_date(), col("first_order_date"))
    ).withColumn(
        "days_since_last_order",
        datediff(current_date(), col("last_order_date"))
    )

    # Join with customer details
    result = clv.join(
        customers.select("customer_id", "customer_name", "segment", "region"),
        on="customer_id",
        how="left"
    )

    # Write as Gold table with Z-ORDER for query optimization
    result.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("prod.gold.customer_lifetime_value")

    # Optimize for query performance
    spark.sql("""
        OPTIMIZE prod.gold.customer_lifetime_value
        ZORDER BY (segment, region)
    """)

    return result
Enter fullscreen mode Exit fullscreen mode

Daily Revenue Gold Table

def build_daily_revenue(spark: SparkSession) -> DataFrame:
    """Gold table: Daily revenue aggregations."""

    orders = spark.table("prod.silver.orders")
    products = spark.table("prod.silver.products")

    daily = orders \
        .join(products, on="product_id", how="left") \
        .groupBy("order_date", "product_category", "region") \
        .agg(
            spark_sum("total_amount").alias("revenue"),
            count("order_id").alias("order_count"),
            avg("total_amount").alias("avg_order_value"),
            spark_sum("quantity").alias("units_sold"),
        )

    daily.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("order_date") \
        .saveAsTable("prod.gold.daily_revenue")

    return daily
Enter fullscreen mode Exit fullscreen mode

Production Considerations

Table Maintenance Automation

Delta tables accumulate small files and old versions over time. Schedule this maintenance to run daily or after large batch loads.

def maintain_delta_tables(spark: SparkSession, catalog: str = "prod"):
    """Run maintenance on all Delta tables in the catalog."""

    schemas = ["bronze", "silver", "gold"]

    for schema in schemas:
        tables = spark.sql(
            f"SHOW TABLES IN {catalog}.{schema}"
        ).collect()

        for table in tables:
            table_name = f"{catalog}.{schema}.{table.tableName}"
            print(f"Maintaining {table_name}...")

            # Optimize small files
            spark.sql(f"OPTIMIZE {table_name}")

            # Vacuum old versions (retain 7 days)
            spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")

            # Analyze for query optimization
            spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
Enter fullscreen mode Exit fullscreen mode

Monitoring and Lineage

def get_table_health(spark: SparkSession, table_name: str) -> dict:
    """Get health metrics for a Delta table."""

    detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
    history = spark.sql(
        f"DESCRIBE HISTORY {table_name} LIMIT 10"
    ).collect()

    return {
        "table": table_name,
        "size_gb": round(detail.sizeInBytes / (1024**3), 2),
        "num_files": detail.numFiles,
        "last_modified": str(detail.lastModified),
        "recent_operations": [
            {
                "version": h.version,
                "operation": h.operation,
                "timestamp": str(h.timestamp),
            }
            for h in history
        ],
    }
Enter fullscreen mode Exit fullscreen mode

Common Pitfalls and How to Avoid Them

1. Schema drift in Bronze — Use Auto Loader's mergeSchema option and track schema evolution. Don't let unknown columns silently appear in Silver.

2. Over-aggregating in Gold — Build Gold tables at the right granularity. Too aggregated = inflexible. Too granular = just another Silver table.

3. Skipping data quality — Every Bronze-to-Silver transition needs validation. Quarantine bad records instead of dropping them.

4. Not partitioning intelligently — Partition Bronze by ingestion date, Silver by business date, Gold by the most common filter column.

5. Ignoring table maintenance — OPTIMIZE and VACUUM should run on schedule. Small files kill query performance.

Summary

The Medallion Architecture is straightforward in concept but requires disciplined implementation:

Layer Purpose Write Pattern Quality
Bronze Raw landing Append-only Schema-on-read
Silver Cleaned, deduped Merge (SCD) Enforced schema + quality gates
Gold Business aggregations Overwrite/Merge Pre-validated from Silver

The code patterns in this article are extracted from real production implementations running at scale on Databricks. Take them, adapt them to your domain, and build with confidence.


If you found this useful and want production-ready pipeline templates you can drop straight into your Databricks workspace, check out DataStack Pro — it includes a Medallion Architecture accelerator, metadata-driven pipeline configs, and 20+ data engineering tools built from real enterprise implementations.

Top comments (0)