DEV Community

DatanestDigital
DatanestDigital

Posted on

Medallion Architecture in Databricks: A Complete Implementation Guide

The Medallion Architecture has become the de facto standard for organizing data in a lakehouse. If you've worked with Databricks, Delta Lake, or any modern data platform, you've encountered the Bronze-Silver-Gold pattern. But most tutorials stop at the theory.

This guide goes further. We'll build a complete, production-ready Medallion Architecture implementation in Databricks with PySpark — including schema enforcement, data quality gates, incremental processing, and metadata tracking.

What Is the Medallion Architecture?

The Medallion Architecture is a data design pattern that organizes your lakehouse into three logical layers:

  • Bronze (Raw) — Ingests data as-is from source systems. Append-only, schema-on-read. Your insurance policy.
  • Silver (Cleaned) — Deduplicates, validates, and conforms data. Schema-on-write with enforced types.
  • Gold (Business) — Aggregated, business-level datasets optimized for analytics, ML, and reporting.

Each layer serves a distinct purpose, and data flows progressively from raw to refined.

Why This Pattern Works

  1. Debuggability — When something breaks in Gold, you trace back through Silver to Bronze. Raw data is never lost.
  2. Reprocessing — Schema changes or business logic updates? Replay from Bronze without re-ingesting from source.
  3. Separation of concerns — Ingestion engineers own Bronze, data engineers own Silver, analytics engineers own Gold.
  4. Data quality — Each layer transition is an explicit checkpoint where you validate and enforce quality.

Setting Up the Foundation

Before writing pipeline code, you need a solid project structure. Here's what a production Databricks workspace looks like:

lakehouse/
├── bronze/
│   ├── ingest_orders.py
│   ├── ingest_customers.py
│   └── ingest_events.py
├── silver/
│   ├── clean_orders.py
│   ├── clean_customers.py
│   └── clean_events.py
├── gold/
│   ├── agg_daily_revenue.py
│   ├── agg_customer_lifetime.py
│   └── dim_product_catalog.py
├── lib/
│   ├── quality_checks.py
│   ├── schema_registry.py
│   └── metadata_tracker.py
├── tests/
│   ├── test_bronze_orders.py
│   └── test_silver_orders.py
└── config/
    ├── schemas.yaml
    └── pipeline_config.yaml
Enter fullscreen mode Exit fullscreen mode

Catalog and Schema Setup with Unity Catalog

-- Create catalogs for each environment
CREATE CATALOG IF NOT EXISTS prod;
CREATE CATALOG IF NOT EXISTS staging;

-- Create schemas for each medallion layer
CREATE SCHEMA IF NOT EXISTS prod.bronze;
CREATE SCHEMA IF NOT EXISTS prod.silver;
CREATE SCHEMA IF NOT EXISTS prod.gold;

-- Set default permissions
GRANT USE CATALOG ON CATALOG prod TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.bronze TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.silver TO `data-engineers`;
GRANT SELECT ON SCHEMA prod.gold TO `data-analysts`;
Enter fullscreen mode Exit fullscreen mode

Bronze Layer: Raw Ingestion

The Bronze layer is your data landing zone. The rules are simple: ingest everything, lose nothing, track metadata.

Core Bronze Ingestion Pattern

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
    current_timestamp, input_file_name, lit, col
)
from pyspark.sql.types import StructType, StringType
from delta.tables import DeltaTable


class BronzeIngester:
    """Production Bronze layer ingestion with metadata tracking."""

    def __init__(self, spark: SparkSession, source_name: str):
        self.spark = spark
        self.source_name = source_name
        self.target_table = f"prod.bronze.{source_name}"

    def ingest_from_landing(
        self,
        landing_path: str,
        file_format: str = "json",
        schema: StructType | None = None,
    ) -> DataFrame:
        """Ingest raw files from landing zone into Bronze."""

        reader = self.spark.read.format(file_format)

        if schema:
            reader = reader.schema(schema)
        else:
            # For Bronze, infer schema — we'll enforce in Silver
            reader = reader.option("inferSchema", "true")

        if file_format == "json":
            reader = reader.option("multiLine", "true") \
                           .option("mode", "PERMISSIVE") \
                           .option("columnNameOfCorruptRecord", "_corrupt_record")

        raw_df = reader.load(landing_path)

        # Add Bronze metadata columns
        enriched_df = raw_df \
            .withColumn("_bronze_ingested_at", current_timestamp()) \
            .withColumn("_bronze_source_file", input_file_name()) \
            .withColumn("_bronze_source_system", lit(self.source_name)) \
            .withColumn("_bronze_batch_id", lit(self._generate_batch_id()))

        return enriched_df

    def write_bronze(self, df: DataFrame, mode: str = "append") -> int:
        """Write DataFrame to Bronze Delta table."""

        df.write \
            .format("delta") \
            .mode(mode) \
            .option("mergeSchema", "true") \
            .saveAsTable(self.target_table)

        row_count = df.count()
        self._log_ingestion(row_count)
        return row_count

    def ingest_incremental(
        self, landing_path: str, file_format: str = "json"
    ) -> int:
        """Auto-Loader based incremental ingestion."""

        stream_df = self.spark.readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format", file_format) \
            .option("cloudFiles.inferColumnTypes", "true") \
            .option("cloudFiles.schemaLocation",
                    f"/mnt/checkpoints/{self.source_name}/schema") \
            .load(landing_path)

        enriched_df = stream_df \
            .withColumn("_bronze_ingested_at", current_timestamp()) \
            .withColumn("_bronze_source_file", input_file_name()) \
            .withColumn("_bronze_source_system", lit(self.source_name))

        query = enriched_df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation",
                    f"/mnt/checkpoints/{self.source_name}/checkpoint") \
            .option("mergeSchema", "true") \
            .trigger(availableNow=True) \
            .toTable(self.target_table)

        query.awaitTermination()
        return query.lastProgress["numInputRows"]

    def _generate_batch_id(self) -> str:
        from datetime import datetime
        return f"{self.source_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"

    def _log_ingestion(self, row_count: int):
        print(f"[Bronze] {self.source_name}: ingested {row_count} rows")
Enter fullscreen mode Exit fullscreen mode

Running Bronze Ingestion

# In a Databricks notebook or job
ingester = BronzeIngester(spark, "orders")

# Batch ingestion
df = ingester.ingest_from_landing("/mnt/landing/orders/2026-03-10/")
count = ingester.write_bronze(df)
print(f"Ingested {count} order records to Bronze")

# Or use Auto Loader for incremental
count = ingester.ingest_incremental("/mnt/landing/orders/")
Enter fullscreen mode Exit fullscreen mode

Silver Layer: Clean and Conform

Silver is where the real engineering happens. You deduplicate, enforce schemas, validate data quality, and create a "single source of truth."

Data Quality Framework

from dataclasses import dataclass
from typing import Callable
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when, sum as spark_sum


@dataclass
class QualityRule:
    name: str
    check: Callable[[DataFrame], DataFrame]
    threshold: float  # Minimum pass rate (0.0 to 1.0)
    is_critical: bool = False  # Critical = halt pipeline on failure


class DataQualityGate:
    """Validates data between medallion layers."""

    def __init__(self, table_name: str, rules: list[QualityRule]):
        self.table_name = table_name
        self.rules = rules
        self.results: list[dict] = []

    def validate(self, df: DataFrame) -> tuple[bool, DataFrame]:
        """Run all quality checks. Returns (passed, quarantined_df)."""

        total_rows = df.count()
        quarantine_condition = None
        all_passed = True

        for rule in self.rules:
            # Apply the check — returns DF with boolean column
            checked_df = rule.check(df)
            pass_count = checked_df.filter(
                col("_quality_passed")
            ).count()
            pass_rate = pass_count / total_rows if total_rows > 0 else 1.0

            passed = pass_rate >= rule.threshold

            result = {
                "rule": rule.name,
                "pass_rate": round(pass_rate, 4),
                "threshold": rule.threshold,
                "passed": passed,
                "critical": rule.is_critical,
                "failed_rows": total_rows - pass_count,
            }
            self.results.append(result)

            if not passed:
                all_passed = False
                if rule.is_critical:
                    raise DataQualityError(
                        f"Critical quality check failed: {rule.name} "
                        f"(pass_rate={pass_rate:.2%}, "
                        f"threshold={rule.threshold:.2%})"
                    )

                # Build quarantine filter
                fail_condition = ~col("_quality_passed")
                if quarantine_condition is None:
                    quarantine_condition = fail_condition
                else:
                    quarantine_condition = quarantine_condition | fail_condition

            df = checked_df.drop("_quality_passed")

        # Separate clean and quarantined records
        if quarantine_condition is not None:
            quarantined_df = df.filter(quarantine_condition)
        else:
            quarantined_df = df.limit(0)

        return all_passed, quarantined_df

    def print_report(self):
        print(f"\n{'='*60}")
        print(f"Data Quality Report: {self.table_name}")
        print(f"{'='*60}")
        for r in self.results:
            status = "PASS" if r["passed"] else "FAIL"
            print(
                f"  [{status}] {r['rule']}: "
                f"{r['pass_rate']:.2%} "
                f"(threshold: {r['threshold']:.2%}, "
                f"failed: {r['failed_rows']})"
            )
        print(f"{'='*60}\n")


class DataQualityError(Exception):
    pass
Enter fullscreen mode Exit fullscreen mode

Silver Transformation Pipeline

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
    col, trim, lower, to_timestamp, row_number, sha2, concat_ws
)
from pyspark.sql.window import Window


class SilverTransformer:
    """Production Silver layer transformation pipeline."""

    def __init__(self, spark: SparkSession, entity_name: str):
        self.spark = spark
        self.entity_name = entity_name
        self.source_table = f"prod.bronze.{entity_name}"
        self.target_table = f"prod.silver.{entity_name}"

    def read_bronze_incremental(
        self, last_processed_ts: str | None = None
    ) -> DataFrame:
        """Read new records from Bronze since last processing."""
        df = self.spark.table(self.source_table)

        if last_processed_ts:
            df = df.filter(
                col("_bronze_ingested_at") > last_processed_ts
            )

        return df.filter(col("_corrupt_record").isNull()) \
                 .drop("_corrupt_record")

    def deduplicate(
        self, df: DataFrame, key_columns: list[str],
        order_column: str = "_bronze_ingested_at"
    ) -> DataFrame:
        """Deduplicate using key columns, keeping latest record."""
        window = Window.partitionBy(
            *key_columns
        ).orderBy(col(order_column).desc())

        return df.withColumn("_row_num", row_number().over(window)) \
                 .filter(col("_row_num") == 1) \
                 .drop("_row_num")

    def apply_schema(
        self, df: DataFrame, column_types: dict[str, str]
    ) -> DataFrame:
        """Cast columns to target types with error handling."""
        for col_name, target_type in column_types.items():
            if col_name in df.columns:
                df = df.withColumn(col_name, col(col_name).cast(target_type))
        return df

    def add_silver_metadata(self, df: DataFrame) -> DataFrame:
        """Add Silver-layer tracking columns."""
        from pyspark.sql.functions import current_timestamp, lit

        key_cols = [c for c in df.columns if not c.startswith("_")]
        return df \
            .withColumn("_silver_processed_at", current_timestamp()) \
            .withColumn(
                "_silver_hash",
                sha2(concat_ws("||", *[col(c) for c in key_cols]), 256)
            )

    def merge_into_silver(
        self, source_df: DataFrame, key_columns: list[str]
    ):
        """SCD Type 1 merge into Silver table."""
        if not self.spark.catalog.tableExists(self.target_table):
            source_df.write.format("delta").saveAsTable(self.target_table)
            return

        target = DeltaTable.forName(self.spark, self.target_table)
        merge_condition = " AND ".join(
            [f"target.{k} = source.{k}" for k in key_columns]
        )

        target.alias("target") \
            .merge(source_df.alias("source"), merge_condition) \
            .whenMatchedUpdateAll(
                condition="source._silver_hash != target._silver_hash"
            ) \
            .whenNotMatchedInsertAll() \
            .execute()
Enter fullscreen mode Exit fullscreen mode

Putting Silver Together

# Define quality rules for orders
quality_rules = [
    QualityRule(
        name="order_id_not_null",
        check=lambda df: df.withColumn(
            "_quality_passed", col("order_id").isNotNull()
        ),
        threshold=1.0,
        is_critical=True,
    ),
    QualityRule(
        name="order_amount_positive",
        check=lambda df: df.withColumn(
            "_quality_passed", col("total_amount") > 0
        ),
        threshold=0.99,
    ),
    QualityRule(
        name="valid_order_date",
        check=lambda df: df.withColumn(
            "_quality_passed",
            col("order_date").between("2020-01-01", "2027-12-31")
        ),
        threshold=0.995,
    ),
]

# Run the Silver pipeline
transformer = SilverTransformer(spark, "orders")
bronze_df = transformer.read_bronze_incremental()
deduped_df = transformer.deduplicate(bronze_df, ["order_id"])

# Quality gate
gate = DataQualityGate("silver.orders", quality_rules)
passed, quarantined = gate.validate(deduped_df)
gate.print_report()

# Apply schema and write
typed_df = transformer.apply_schema(deduped_df, {
    "order_id": "long",
    "total_amount": "decimal(18,2)",
    "order_date": "date",
    "customer_id": "long",
})
enriched_df = transformer.add_silver_metadata(typed_df)
transformer.merge_into_silver(enriched_df, ["order_id"])
Enter fullscreen mode Exit fullscreen mode

Gold Layer: Business-Ready Datasets

Gold tables are pre-aggregated datasets optimized for specific consumption patterns: dashboards, ML features, or API serving.

Gold Aggregation Example

from pyspark.sql.functions import (
    sum as spark_sum, count, avg, max as spark_max,
    min as spark_min, datediff, current_date
)


def build_customer_lifetime_value(spark: SparkSession) -> DataFrame:
    """Gold table: Customer Lifetime Value metrics."""

    orders = spark.table("prod.silver.orders")
    customers = spark.table("prod.silver.customers")

    clv = orders.groupBy("customer_id").agg(
        spark_sum("total_amount").alias("total_revenue"),
        count("order_id").alias("total_orders"),
        avg("total_amount").alias("avg_order_value"),
        spark_min("order_date").alias("first_order_date"),
        spark_max("order_date").alias("last_order_date"),
    ).withColumn(
        "customer_tenure_days",
        datediff(current_date(), col("first_order_date"))
    ).withColumn(
        "days_since_last_order",
        datediff(current_date(), col("last_order_date"))
    )

    # Join with customer details
    result = clv.join(
        customers.select("customer_id", "customer_name", "segment", "region"),
        on="customer_id",
        how="left"
    )

    # Write as Gold table with Z-ORDER for query optimization
    result.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("prod.gold.customer_lifetime_value")

    # Optimize for query performance
    spark.sql("""
        OPTIMIZE prod.gold.customer_lifetime_value
        ZORDER BY (segment, region)
    """)

    return result
Enter fullscreen mode Exit fullscreen mode

Daily Revenue Gold Table

def build_daily_revenue(spark: SparkSession) -> DataFrame:
    """Gold table: Daily revenue aggregations."""

    orders = spark.table("prod.silver.orders")
    products = spark.table("prod.silver.products")

    daily = orders \
        .join(products, on="product_id", how="left") \
        .groupBy("order_date", "product_category", "region") \
        .agg(
            spark_sum("total_amount").alias("revenue"),
            count("order_id").alias("order_count"),
            avg("total_amount").alias("avg_order_value"),
            spark_sum("quantity").alias("units_sold"),
        )

    daily.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("order_date") \
        .saveAsTable("prod.gold.daily_revenue")

    return daily
Enter fullscreen mode Exit fullscreen mode

Production Considerations

Table Maintenance Automation

def maintain_delta_tables(spark: SparkSession, catalog: str = "prod"):
    """Run maintenance on all Delta tables in the catalog."""

    schemas = ["bronze", "silver", "gold"]

    for schema in schemas:
        tables = spark.sql(
            f"SHOW TABLES IN {catalog}.{schema}"
        ).collect()

        for table in tables:
            table_name = f"{catalog}.{schema}.{table.tableName}"
            print(f"Maintaining {table_name}...")

            # Optimize small files
            spark.sql(f"OPTIMIZE {table_name}")

            # Vacuum old versions (retain 7 days)
            spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")

            # Analyze for query optimization
            spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
Enter fullscreen mode Exit fullscreen mode

Monitoring and Lineage

def get_table_health(spark: SparkSession, table_name: str) -> dict:
    """Get health metrics for a Delta table."""

    detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
    history = spark.sql(
        f"DESCRIBE HISTORY {table_name} LIMIT 10"
    ).collect()

    return {
        "table": table_name,
        "size_gb": round(detail.sizeInBytes / (1024**3), 2),
        "num_files": detail.numFiles,
        "last_modified": str(detail.lastModified),
        "recent_operations": [
            {
                "version": h.version,
                "operation": h.operation,
                "timestamp": str(h.timestamp),
            }
            for h in history
        ],
    }
Enter fullscreen mode Exit fullscreen mode

Common Pitfalls and How to Avoid Them

1. Schema drift in Bronze — Use Auto Loader's mergeSchema option and track schema evolution. Don't let unknown columns silently appear in Silver.

2. Over-aggregating in Gold — Build Gold tables at the right granularity. Too aggregated = inflexible. Too granular = just another Silver table.

3. Skipping data quality — Every Bronze-to-Silver transition needs validation. Quarantine bad records instead of dropping them.

4. Not partitioning intelligently — Partition Bronze by ingestion date, Silver by business date, Gold by the most common filter column.

5. Ignoring table maintenance — OPTIMIZE and VACUUM should run on schedule. Small files kill query performance.

Summary

The Medallion Architecture is straightforward in concept but requires disciplined implementation:

Layer Purpose Write Pattern Quality
Bronze Raw landing Append-only Schema-on-read
Silver Cleaned, deduped Merge (SCD) Enforced schema + quality gates
Gold Business aggregations Overwrite/Merge Pre-validated from Silver

The code patterns in this article are extracted from real production implementations running at scale on Databricks.


Take Your Databricks Platform to Production

If you're building a Medallion Architecture and want to accelerate your implementation, check out the Medallion Architecture Accelerator from Datanest Platform Pro. It includes:

  • Production-ready Bronze/Silver/Gold pipeline generator
  • Metadata-driven pipeline configuration
  • Built-in data quality gates
  • 20 production-tested Databricks & Azure tools in the complete bundle

The full Datanest Platform Pro collection covers everything from Spark optimization and CI/CD to Unity Catalog migration and monitoring — all built from real enterprise implementations.

Use code LAUNCH40 for 40% off any product, or STUDENT for 50% off if you're a student.

Browse the Datanest Platform Pro store

Top comments (0)