DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

Databricks Analytics Engineering: Module 4: Data Transformation Patterns

Module 4: Data Transformation Patterns

Learning Objectives

By the end of this module, you will be able to:

  1. Apply common transformation patterns: deduplication, pivot, unpivot, gap-fill, and sessionization
  2. Write complex joins including semi-joins, anti-joins, and inequality joins
  3. Parse and transform semi-structured data (JSON, XML) with Databricks SQL
  4. Use array and map operations for nested data transformations
  5. Implement higher-order functions and SQL UDFs for custom logic

4.1 Deduplication Patterns

Duplicate records are one of the most common data quality issues. The approach depends on whether duplicates are exact or fuzzy.

Exact Deduplication with ROW_NUMBER

-- Keep the latest record per customer based on updated_at
WITH ranked AS (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id
            ORDER BY updated_at DESC, _ingested_at DESC
        ) AS row_num
    FROM silver.stg_customers
)
SELECT * EXCEPT(row_num)
FROM ranked
WHERE row_num = 1;
Enter fullscreen mode Exit fullscreen mode

Deduplication with QUALIFY (Databricks SQL)

-- Cleaner syntax using QUALIFY (supported in Databricks SQL)
SELECT *
FROM silver.stg_customers
QUALIFY ROW_NUMBER() OVER (
    PARTITION BY customer_id
    ORDER BY updated_at DESC
) = 1;
Enter fullscreen mode Exit fullscreen mode

Hash-Based Change Detection

-- Use MD5 hash to detect actual changes vs duplicate ingestion
WITH hashed AS (
    SELECT
        *,
        MD5(CONCAT_WS('|',
            COALESCE(customer_name, ''),
            COALESCE(email, ''),
            COALESCE(phone, ''),
            COALESCE(segment, '')
        )) AS _row_hash
    FROM silver.stg_customers
),

deduped AS (
    SELECT *
    FROM hashed
    QUALIFY ROW_NUMBER() OVER (
        PARTITION BY customer_id, _row_hash
        ORDER BY _ingested_at DESC
    ) = 1
)

SELECT * FROM deduped;
Enter fullscreen mode Exit fullscreen mode

Deduplication as a dbt Macro

-- macros/deduplicate.sql
{% macro deduplicate(relation, partition_by, order_by='_ingested_at DESC') %}
SELECT *
FROM {{ relation }}
QUALIFY ROW_NUMBER() OVER (
    PARTITION BY {{ partition_by }}
    ORDER BY {{ order_by }}
) = 1
{% endmacro %}

-- Usage in a model:
-- {{ deduplicate(ref('stg_shopify__orders'), 'order_id') }}
Enter fullscreen mode Exit fullscreen mode

4.2 Pivot and Unpivot

PIVOT: Rows to Columns

Transform row-level category values into separate columns.

-- Monthly revenue by product category, pivoted into columns
SELECT *
FROM (
    SELECT
        DATE_FORMAT(order_date, 'yyyy-MM') AS month,
        product_category,
        net_revenue
    FROM gold.fact_sales f
    JOIN gold.dim_product p ON f.product_key = p.product_key
)
PIVOT (
    SUM(net_revenue)
    FOR product_category IN (
        'Electronics' AS electronics,
        'Clothing' AS clothing,
        'Home & Garden' AS home_garden,
        'Sports' AS sports
    )
)
ORDER BY month;
Enter fullscreen mode Exit fullscreen mode

UNPIVOT: Columns to Rows

Normalize wide tables into a long format suitable for analysis.

-- Unpivot monthly metric columns into rows
SELECT *
FROM (
    SELECT
        product_id,
        jan_revenue,
        feb_revenue,
        mar_revenue
    FROM reports.quarterly_product_summary
)
UNPIVOT (
    revenue FOR month IN (
        jan_revenue AS 'January',
        feb_revenue AS 'February',
        mar_revenue AS 'March'
    )
)
ORDER BY product_id, month;
Enter fullscreen mode Exit fullscreen mode

Dynamic Pivot with dbt

-- macros/pivot_values.sql
{% macro get_column_values(table, column) %}
    {% set query %}
        SELECT DISTINCT {{ column }}
        FROM {{ table }}
        ORDER BY 1
    {% endset %}
    {% set results = run_query(query) %}
    {% if execute %}
        {{ return(results.columns[0].values()) }}
    {% else %}
        {{ return([]) }}
    {% endif %}
{% endmacro %}

-- Usage in a model:
-- {% set categories = get_column_values(ref('dim_product'), 'category_level_1') %}
Enter fullscreen mode Exit fullscreen mode

4.3 Gap-Fill (Missing Data Interpolation)

Gap-filling creates rows for missing time periods, critical for accurate time series analysis and dashboards.

Date Spine Gap-Fill

-- Fill missing dates in daily revenue with zero values
WITH date_spine AS (
    SELECT full_date AS date_day
    FROM gold.dim_date
    WHERE full_date BETWEEN '2026-01-01' AND '2026-03-31'
),

daily_revenue AS (
    SELECT
        order_date AS date_day,
        SUM(net_revenue) AS revenue
    FROM gold.fact_sales f
    JOIN gold.dim_date d ON f.date_key = d.date_key
    GROUP BY order_date
),

gap_filled AS (
    SELECT
        ds.date_day,
        COALESCE(dr.revenue, 0) AS revenue,
        CASE WHEN dr.revenue IS NULL THEN true ELSE false END AS is_imputed
    FROM date_spine ds
    LEFT JOIN daily_revenue dr ON ds.date_day = dr.date_day
)

SELECT * FROM gap_filled
ORDER BY date_day;
Enter fullscreen mode Exit fullscreen mode

Gap-Fill with Forward Fill (Last Known Value)

-- Forward-fill missing inventory levels
WITH date_spine AS (
    SELECT full_date AS date_day
    FROM gold.dim_date
    WHERE full_date BETWEEN '2026-01-01' AND '2026-03-31'
),

sparse_data AS (
    SELECT
        snapshot_date,
        product_id,
        quantity_on_hand
    FROM silver.inventory_snapshots
),

cross_joined AS (
    SELECT
        ds.date_day,
        p.product_id
    FROM date_spine ds
    CROSS JOIN (SELECT DISTINCT product_id FROM sparse_data) p
),

joined AS (
    SELECT
        cj.date_day,
        cj.product_id,
        sd.quantity_on_hand
    FROM cross_joined cj
    LEFT JOIN sparse_data sd
        ON cj.date_day = sd.snapshot_date
        AND cj.product_id = sd.product_id
),

forward_filled AS (
    SELECT
        date_day,
        product_id,
        -- Forward-fill: use LAST_VALUE with IGNORE NULLS
        LAST_VALUE(quantity_on_hand, true) OVER (
            PARTITION BY product_id
            ORDER BY date_day
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS quantity_on_hand
    FROM joined
)

SELECT * FROM forward_filled
ORDER BY product_id, date_day;
Enter fullscreen mode Exit fullscreen mode

4.4 Sessionization

Sessionization groups a stream of events into logical sessions based on time gaps.

-- Web analytics sessionization with a 30-minute inactivity threshold
WITH events AS (
    SELECT
        user_id,
        event_timestamp,
        page_url,
        event_type,
        LAG(event_timestamp) OVER (
            PARTITION BY user_id
            ORDER BY event_timestamp
        ) AS prev_event_timestamp
    FROM silver.stg_web_events
),

session_boundaries AS (
    SELECT
        *,
        CASE
            WHEN prev_event_timestamp IS NULL THEN 1
            WHEN TIMESTAMPDIFF(MINUTE, prev_event_timestamp, event_timestamp) > 30
                THEN 1
            ELSE 0
        END AS is_new_session
    FROM events
),

session_numbered AS (
    SELECT
        *,
        SUM(is_new_session) OVER (
            PARTITION BY user_id
            ORDER BY event_timestamp
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS session_number,
        CONCAT(
            user_id, '-',
            SUM(is_new_session) OVER (
                PARTITION BY user_id
                ORDER BY event_timestamp
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            )
        ) AS session_id
    FROM session_boundaries
)

SELECT
    session_id,
    user_id,
    session_number,
    MIN(event_timestamp) AS session_start,
    MAX(event_timestamp) AS session_end,
    TIMESTAMPDIFF(
        SECOND,
        MIN(event_timestamp),
        MAX(event_timestamp)
    ) AS session_duration_seconds,
    COUNT(*) AS event_count,
    FIRST_VALUE(page_url) OVER (
        PARTITION BY session_id
        ORDER BY event_timestamp
    ) AS landing_page,
    LAST_VALUE(page_url) OVER (
        PARTITION BY session_id
        ORDER BY event_timestamp
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) AS exit_page
FROM session_numbered
GROUP BY session_id, user_id, session_number, page_url, event_timestamp;
Enter fullscreen mode Exit fullscreen mode

Simplified Sessionization as dbt Model

-- models/intermediate/int_web_sessions.sql
{{
    config(
        materialized='incremental',
        unique_key='session_id',
        incremental_strategy='merge'
    )
}}

WITH events_with_gap AS (
    SELECT
        user_id,
        event_timestamp,
        page_url,
        event_type,
        CASE
            WHEN TIMESTAMPDIFF(
                MINUTE,
                LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp),
                event_timestamp
            ) > 30
            OR LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp) IS NULL
            THEN 1
            ELSE 0
        END AS is_session_start
    FROM {{ ref('stg_web_events') }}
    {% if is_incremental() %}
    WHERE event_timestamp > (SELECT MAX(session_end) FROM {{ this }})
    {% endif %}
),

with_session_id AS (
    SELECT
        *,
        CONCAT(user_id, '-', SUM(is_session_start) OVER (
            PARTITION BY user_id ORDER BY event_timestamp
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        )) AS session_id
    FROM events_with_gap
)

SELECT
    session_id,
    user_id,
    MIN(event_timestamp) AS session_start,
    MAX(event_timestamp) AS session_end,
    COUNT(*) AS event_count,
    COUNT(DISTINCT page_url) AS distinct_pages
FROM with_session_id
GROUP BY session_id, user_id
Enter fullscreen mode Exit fullscreen mode

4.5 Complex Joins

Semi-Join (EXISTS)

Return rows from one table that have matches in another, without duplicating.

-- Customers who have placed at least one order
SELECT c.*
FROM gold.dim_customer c
WHERE EXISTS (
    SELECT 1
    FROM gold.fact_sales f
    WHERE f.customer_key = c.customer_key
);
Enter fullscreen mode Exit fullscreen mode

Anti-Join (NOT EXISTS)

Return rows from one table that have no matches in another.

-- Products that have never been sold
SELECT p.*
FROM gold.dim_product p
WHERE NOT EXISTS (
    SELECT 1
    FROM gold.fact_sales f
    WHERE f.product_key = p.product_key
);

-- Alternative using LEFT JOIN / IS NULL (same execution plan)
SELECT p.*
FROM gold.dim_product p
LEFT JOIN gold.fact_sales f ON p.product_key = f.product_key
WHERE f.product_key IS NULL;
Enter fullscreen mode Exit fullscreen mode

Inequality Join (Range Join)

-- Map events to time-based pricing tiers
SELECT
    e.event_id,
    e.event_date,
    e.product_id,
    p.price_tier,
    p.unit_price
FROM silver.events e
JOIN silver.pricing_history p
    ON e.product_id = p.product_id
    AND e.event_date >= p.effective_from
    AND e.event_date < p.effective_to;
Enter fullscreen mode Exit fullscreen mode

LATERAL Join (Table-Valued Functions)

-- Get the top 3 products per customer using LATERAL
SELECT
    c.customer_id,
    c.customer_name,
    top_products.*
FROM gold.dim_customer c,
LATERAL (
    SELECT
        p.product_name,
        SUM(f.net_revenue) AS total_spend
    FROM gold.fact_sales f
    JOIN gold.dim_product p ON f.product_key = p.product_key
    WHERE f.customer_key = c.customer_key
    GROUP BY p.product_name
    ORDER BY total_spend DESC
    LIMIT 3
) AS top_products
WHERE c.is_current = true;
Enter fullscreen mode Exit fullscreen mode

4.6 Semi-Structured Data: JSON

Databricks SQL has rich support for JSON data stored in STRING or VARIANT columns.

Extracting JSON Fields

-- Parse JSON payload from raw events
SELECT
    event_id,
    event_timestamp,

    -- Dot notation on STRING column (returns STRING)
    raw_payload:user_id::STRING AS user_id,
    raw_payload:event_type::STRING AS event_type,
    raw_payload:page.url::STRING AS page_url,
    raw_payload:page.referrer::STRING AS referrer,

    -- Nested access
    raw_payload:device.type::STRING AS device_type,
    raw_payload:device.os::STRING AS device_os,
    raw_payload:device.browser::STRING AS browser,

    -- Numeric extraction with casting
    raw_payload:metrics.load_time_ms::INT AS load_time_ms,
    raw_payload:metrics.scroll_depth::DOUBLE AS scroll_depth

FROM bronze.raw_web_events;
Enter fullscreen mode Exit fullscreen mode

JSON with get_json_object and json_tuple

-- Alternative extraction functions
SELECT
    event_id,
    GET_JSON_OBJECT(raw_payload, '$.user_id') AS user_id,
    GET_JSON_OBJECT(raw_payload, '$.page.url') AS page_url,

    -- Extract multiple fields at once
    jt.*
FROM bronze.raw_web_events
LATERAL VIEW JSON_TUPLE(
    raw_payload,
    'user_id', 'event_type', 'timestamp'
) jt AS user_id, event_type, event_ts;
Enter fullscreen mode Exit fullscreen mode

Flattening JSON Arrays

-- Explode order line items from a JSON array
SELECT
    order_id,
    order_date,
    item.product_id,
    item.quantity,
    item.unit_price
FROM bronze.raw_orders,
LATERAL VIEW EXPLODE(
    FROM_JSON(
        line_items,
        'ARRAY<STRUCT<product_id: STRING, quantity: INT, unit_price: DOUBLE>>'
    )
) AS item;

-- Alternative with INLINE for struct arrays
SELECT
    order_id,
    order_date,
    inline_item.*
FROM (
    SELECT
        order_id,
        order_date,
        FROM_JSON(
            line_items,
            'ARRAY<STRUCT<product_id: STRING, quantity: INT, unit_price: DOUBLE>>'
        ) AS parsed_items
    FROM bronze.raw_orders
),
LATERAL VIEW INLINE(parsed_items) AS inline_item;
Enter fullscreen mode Exit fullscreen mode

4.7 Semi-Structured Data: XML

-- Parse XML using xpath functions
SELECT
    xpath_string(xml_payload, '/order/order_id') AS order_id,
    xpath_string(xml_payload, '/order/customer/name') AS customer_name,
    xpath_double(xml_payload, '/order/total') AS order_total,

    -- Extract multiple line items
    xpath(xml_payload, '/order/items/item/product_id/text()') AS product_ids,
    xpath(xml_payload, '/order/items/item/quantity/text()') AS quantities
FROM bronze.raw_xml_orders;
Enter fullscreen mode Exit fullscreen mode

4.8 Array and Map Operations

Array Functions

-- Common array operations in Databricks SQL
SELECT
    customer_id,

    -- Create arrays
    ARRAY('Electronics', 'Clothing', 'Home') AS categories,

    -- Array aggregation
    COLLECT_LIST(product_category) AS purchased_categories,
    COLLECT_SET(product_category) AS unique_categories,

    -- Array functions
    SIZE(COLLECT_SET(product_category)) AS category_count,
    ARRAY_CONTAINS(COLLECT_SET(product_category), 'Electronics') AS bought_electronics,
    ARRAY_DISTINCT(COLLECT_LIST(product_category)) AS distinct_cats,
    ARRAY_SORT(COLLECT_SET(product_category)) AS sorted_cats,

    -- Array to string
    ARRAY_JOIN(ARRAY_SORT(COLLECT_SET(product_category)), ', ') AS categories_csv

FROM gold.fact_sales f
JOIN gold.dim_product p ON f.product_key = p.product_key
GROUP BY customer_id;
Enter fullscreen mode Exit fullscreen mode

Map Functions

-- Build and query maps
SELECT
    customer_id,

    -- Create map from key-value pairs
    MAP_FROM_ENTRIES(
        COLLECT_LIST(
            STRUCT(product_category, total_revenue)
        )
    ) AS category_revenue_map,

    -- Access map values
    MAP_FROM_ENTRIES(
        COLLECT_LIST(STRUCT(product_category, total_revenue))
    )['Electronics'] AS electronics_revenue,

    -- Map keys and values
    MAP_KEYS(
        MAP_FROM_ENTRIES(COLLECT_LIST(STRUCT(product_category, total_revenue)))
    ) AS categories,
    MAP_VALUES(
        MAP_FROM_ENTRIES(COLLECT_LIST(STRUCT(product_category, total_revenue)))
    ) AS revenues

FROM (
    SELECT
        customer_id,
        product_category,
        SUM(net_revenue) AS total_revenue
    FROM gold.fact_sales f
    JOIN gold.dim_product p ON f.product_key = p.product_key
    GROUP BY customer_id, product_category
);
Enter fullscreen mode Exit fullscreen mode

4.9 Higher-Order Functions

Higher-order functions operate on arrays with lambda expressions. They avoid the need to EXPLODE/COLLECT.

TRANSFORM

-- Apply a function to each element of an array
SELECT
    customer_id,
    tags,
    TRANSFORM(tags, tag -> UPPER(tag)) AS tags_upper,
    TRANSFORM(tags, tag -> CONCAT('category:', tag)) AS tags_prefixed,
    TRANSFORM(
        monthly_revenues,
        rev -> ROUND(rev * 1.1, 2)
    ) AS revenues_with_10pct_uplift
FROM silver.customer_profiles;
Enter fullscreen mode Exit fullscreen mode

FILTER

-- Filter array elements matching a condition
SELECT
    order_id,
    line_items,
    FILTER(line_items, item -> item.quantity > 5) AS bulk_items,
    FILTER(line_items, item -> item.unit_price > 100.00) AS premium_items,
    SIZE(FILTER(line_items, item -> item.quantity > 5)) AS bulk_item_count
FROM silver.parsed_orders;
Enter fullscreen mode Exit fullscreen mode

AGGREGATE (REDUCE)

-- Reduce an array to a single value
SELECT
    order_id,
    line_items,
    AGGREGATE(
        line_items,
        CAST(0.0 AS DOUBLE),
        (acc, item) -> acc + (item.quantity * item.unit_price)
    ) AS computed_total,
    AGGREGATE(
        line_items,
        CAST(0 AS INT),
        (acc, item) -> acc + item.quantity
    ) AS total_quantity
FROM silver.parsed_orders;
Enter fullscreen mode Exit fullscreen mode

EXISTS (Array Predicate)

-- Check if any element matches a condition
SELECT
    customer_id,
    purchase_categories,
    EXISTS(purchase_categories, cat -> cat = 'Electronics') AS is_tech_buyer,
    EXISTS(purchase_categories, cat -> cat LIKE '%Premium%') AS is_premium_buyer
FROM gold.customer_summary;
Enter fullscreen mode Exit fullscreen mode

4.10 SQL UDFs

When built-in functions are insufficient, SQL UDFs provide reusable custom logic without the Python serialization overhead.

Scalar UDF

-- Calculate customer lifetime value tier
CREATE OR REPLACE FUNCTION gold.classify_clv(
    total_revenue DOUBLE,
    total_orders INT,
    tenure_months INT
)
RETURNS STRING
RETURN CASE
    WHEN total_revenue > 10000 AND total_orders > 50 THEN 'Platinum'
    WHEN total_revenue > 5000 AND total_orders > 20 THEN 'Gold'
    WHEN total_revenue > 1000 AND total_orders > 5 THEN 'Silver'
    WHEN total_revenue > 0 THEN 'Bronze'
    ELSE 'Prospect'
END;

-- Usage
SELECT
    customer_id,
    customer_name,
    gold.classify_clv(lifetime_revenue, total_orders, tenure_months) AS clv_tier
FROM gold.customer_summary;
Enter fullscreen mode Exit fullscreen mode

Table-Valued UDF

... [content trimmed for length — full version in the complete kit]


This is 1 of 5 resources in the Datanest Academy toolkit. Get the complete [Databricks Analytics Engineering] with all files, templates, and documentation for $129.

Get the Full Kit →

Or grab the entire Datanest Academy bundle (5 products) for $XXX — save XX%.

Get the Complete Bundle →


Related Articles

Top comments (0)