Module 4: Data Transformation Patterns
Learning Objectives
By the end of this module, you will be able to:
- Apply common transformation patterns: deduplication, pivot, unpivot, gap-fill, and sessionization
- Write complex joins including semi-joins, anti-joins, and inequality joins
- Parse and transform semi-structured data (JSON, XML) with Databricks SQL
- Use array and map operations for nested data transformations
- Implement higher-order functions and SQL UDFs for custom logic
4.1 Deduplication Patterns
Duplicate records are one of the most common data quality issues. The approach depends on whether duplicates are exact or fuzzy.
Exact Deduplication with ROW_NUMBER
-- Keep the latest record per customer based on updated_at
WITH ranked AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY updated_at DESC, _ingested_at DESC
) AS row_num
FROM silver.stg_customers
)
SELECT * EXCEPT(row_num)
FROM ranked
WHERE row_num = 1;
Deduplication with QUALIFY (Databricks SQL)
-- Cleaner syntax using QUALIFY (supported in Databricks SQL)
SELECT *
FROM silver.stg_customers
QUALIFY ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY updated_at DESC
) = 1;
Hash-Based Change Detection
-- Use MD5 hash to detect actual changes vs duplicate ingestion
WITH hashed AS (
SELECT
*,
MD5(CONCAT_WS('|',
COALESCE(customer_name, ''),
COALESCE(email, ''),
COALESCE(phone, ''),
COALESCE(segment, '')
)) AS _row_hash
FROM silver.stg_customers
),
deduped AS (
SELECT *
FROM hashed
QUALIFY ROW_NUMBER() OVER (
PARTITION BY customer_id, _row_hash
ORDER BY _ingested_at DESC
) = 1
)
SELECT * FROM deduped;
Deduplication as a dbt Macro
-- macros/deduplicate.sql
{% macro deduplicate(relation, partition_by, order_by='_ingested_at DESC') %}
SELECT *
FROM {{ relation }}
QUALIFY ROW_NUMBER() OVER (
PARTITION BY {{ partition_by }}
ORDER BY {{ order_by }}
) = 1
{% endmacro %}
-- Usage in a model:
-- {{ deduplicate(ref('stg_shopify__orders'), 'order_id') }}
4.2 Pivot and Unpivot
PIVOT: Rows to Columns
Transform row-level category values into separate columns.
-- Monthly revenue by product category, pivoted into columns
SELECT *
FROM (
SELECT
DATE_FORMAT(order_date, 'yyyy-MM') AS month,
product_category,
net_revenue
FROM gold.fact_sales f
JOIN gold.dim_product p ON f.product_key = p.product_key
)
PIVOT (
SUM(net_revenue)
FOR product_category IN (
'Electronics' AS electronics,
'Clothing' AS clothing,
'Home & Garden' AS home_garden,
'Sports' AS sports
)
)
ORDER BY month;
UNPIVOT: Columns to Rows
Normalize wide tables into a long format suitable for analysis.
-- Unpivot monthly metric columns into rows
SELECT *
FROM (
SELECT
product_id,
jan_revenue,
feb_revenue,
mar_revenue
FROM reports.quarterly_product_summary
)
UNPIVOT (
revenue FOR month IN (
jan_revenue AS 'January',
feb_revenue AS 'February',
mar_revenue AS 'March'
)
)
ORDER BY product_id, month;
Dynamic Pivot with dbt
-- macros/pivot_values.sql
{% macro get_column_values(table, column) %}
{% set query %}
SELECT DISTINCT {{ column }}
FROM {{ table }}
ORDER BY 1
{% endset %}
{% set results = run_query(query) %}
{% if execute %}
{{ return(results.columns[0].values()) }}
{% else %}
{{ return([]) }}
{% endif %}
{% endmacro %}
-- Usage in a model:
-- {% set categories = get_column_values(ref('dim_product'), 'category_level_1') %}
4.3 Gap-Fill (Missing Data Interpolation)
Gap-filling creates rows for missing time periods, critical for accurate time series analysis and dashboards.
Date Spine Gap-Fill
-- Fill missing dates in daily revenue with zero values
WITH date_spine AS (
SELECT full_date AS date_day
FROM gold.dim_date
WHERE full_date BETWEEN '2026-01-01' AND '2026-03-31'
),
daily_revenue AS (
SELECT
order_date AS date_day,
SUM(net_revenue) AS revenue
FROM gold.fact_sales f
JOIN gold.dim_date d ON f.date_key = d.date_key
GROUP BY order_date
),
gap_filled AS (
SELECT
ds.date_day,
COALESCE(dr.revenue, 0) AS revenue,
CASE WHEN dr.revenue IS NULL THEN true ELSE false END AS is_imputed
FROM date_spine ds
LEFT JOIN daily_revenue dr ON ds.date_day = dr.date_day
)
SELECT * FROM gap_filled
ORDER BY date_day;
Gap-Fill with Forward Fill (Last Known Value)
-- Forward-fill missing inventory levels
WITH date_spine AS (
SELECT full_date AS date_day
FROM gold.dim_date
WHERE full_date BETWEEN '2026-01-01' AND '2026-03-31'
),
sparse_data AS (
SELECT
snapshot_date,
product_id,
quantity_on_hand
FROM silver.inventory_snapshots
),
cross_joined AS (
SELECT
ds.date_day,
p.product_id
FROM date_spine ds
CROSS JOIN (SELECT DISTINCT product_id FROM sparse_data) p
),
joined AS (
SELECT
cj.date_day,
cj.product_id,
sd.quantity_on_hand
FROM cross_joined cj
LEFT JOIN sparse_data sd
ON cj.date_day = sd.snapshot_date
AND cj.product_id = sd.product_id
),
forward_filled AS (
SELECT
date_day,
product_id,
-- Forward-fill: use LAST_VALUE with IGNORE NULLS
LAST_VALUE(quantity_on_hand, true) OVER (
PARTITION BY product_id
ORDER BY date_day
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS quantity_on_hand
FROM joined
)
SELECT * FROM forward_filled
ORDER BY product_id, date_day;
4.4 Sessionization
Sessionization groups a stream of events into logical sessions based on time gaps.
-- Web analytics sessionization with a 30-minute inactivity threshold
WITH events AS (
SELECT
user_id,
event_timestamp,
page_url,
event_type,
LAG(event_timestamp) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
) AS prev_event_timestamp
FROM silver.stg_web_events
),
session_boundaries AS (
SELECT
*,
CASE
WHEN prev_event_timestamp IS NULL THEN 1
WHEN TIMESTAMPDIFF(MINUTE, prev_event_timestamp, event_timestamp) > 30
THEN 1
ELSE 0
END AS is_new_session
FROM events
),
session_numbered AS (
SELECT
*,
SUM(is_new_session) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS session_number,
CONCAT(
user_id, '-',
SUM(is_new_session) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
) AS session_id
FROM session_boundaries
)
SELECT
session_id,
user_id,
session_number,
MIN(event_timestamp) AS session_start,
MAX(event_timestamp) AS session_end,
TIMESTAMPDIFF(
SECOND,
MIN(event_timestamp),
MAX(event_timestamp)
) AS session_duration_seconds,
COUNT(*) AS event_count,
FIRST_VALUE(page_url) OVER (
PARTITION BY session_id
ORDER BY event_timestamp
) AS landing_page,
LAST_VALUE(page_url) OVER (
PARTITION BY session_id
ORDER BY event_timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS exit_page
FROM session_numbered
GROUP BY session_id, user_id, session_number, page_url, event_timestamp;
Simplified Sessionization as dbt Model
-- models/intermediate/int_web_sessions.sql
{{
config(
materialized='incremental',
unique_key='session_id',
incremental_strategy='merge'
)
}}
WITH events_with_gap AS (
SELECT
user_id,
event_timestamp,
page_url,
event_type,
CASE
WHEN TIMESTAMPDIFF(
MINUTE,
LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp),
event_timestamp
) > 30
OR LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp) IS NULL
THEN 1
ELSE 0
END AS is_session_start
FROM {{ ref('stg_web_events') }}
{% if is_incremental() %}
WHERE event_timestamp > (SELECT MAX(session_end) FROM {{ this }})
{% endif %}
),
with_session_id AS (
SELECT
*,
CONCAT(user_id, '-', SUM(is_session_start) OVER (
PARTITION BY user_id ORDER BY event_timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)) AS session_id
FROM events_with_gap
)
SELECT
session_id,
user_id,
MIN(event_timestamp) AS session_start,
MAX(event_timestamp) AS session_end,
COUNT(*) AS event_count,
COUNT(DISTINCT page_url) AS distinct_pages
FROM with_session_id
GROUP BY session_id, user_id
4.5 Complex Joins
Semi-Join (EXISTS)
Return rows from one table that have matches in another, without duplicating.
-- Customers who have placed at least one order
SELECT c.*
FROM gold.dim_customer c
WHERE EXISTS (
SELECT 1
FROM gold.fact_sales f
WHERE f.customer_key = c.customer_key
);
Anti-Join (NOT EXISTS)
Return rows from one table that have no matches in another.
-- Products that have never been sold
SELECT p.*
FROM gold.dim_product p
WHERE NOT EXISTS (
SELECT 1
FROM gold.fact_sales f
WHERE f.product_key = p.product_key
);
-- Alternative using LEFT JOIN / IS NULL (same execution plan)
SELECT p.*
FROM gold.dim_product p
LEFT JOIN gold.fact_sales f ON p.product_key = f.product_key
WHERE f.product_key IS NULL;
Inequality Join (Range Join)
-- Map events to time-based pricing tiers
SELECT
e.event_id,
e.event_date,
e.product_id,
p.price_tier,
p.unit_price
FROM silver.events e
JOIN silver.pricing_history p
ON e.product_id = p.product_id
AND e.event_date >= p.effective_from
AND e.event_date < p.effective_to;
LATERAL Join (Table-Valued Functions)
-- Get the top 3 products per customer using LATERAL
SELECT
c.customer_id,
c.customer_name,
top_products.*
FROM gold.dim_customer c,
LATERAL (
SELECT
p.product_name,
SUM(f.net_revenue) AS total_spend
FROM gold.fact_sales f
JOIN gold.dim_product p ON f.product_key = p.product_key
WHERE f.customer_key = c.customer_key
GROUP BY p.product_name
ORDER BY total_spend DESC
LIMIT 3
) AS top_products
WHERE c.is_current = true;
4.6 Semi-Structured Data: JSON
Databricks SQL has rich support for JSON data stored in STRING or VARIANT columns.
Extracting JSON Fields
-- Parse JSON payload from raw events
SELECT
event_id,
event_timestamp,
-- Dot notation on STRING column (returns STRING)
raw_payload:user_id::STRING AS user_id,
raw_payload:event_type::STRING AS event_type,
raw_payload:page.url::STRING AS page_url,
raw_payload:page.referrer::STRING AS referrer,
-- Nested access
raw_payload:device.type::STRING AS device_type,
raw_payload:device.os::STRING AS device_os,
raw_payload:device.browser::STRING AS browser,
-- Numeric extraction with casting
raw_payload:metrics.load_time_ms::INT AS load_time_ms,
raw_payload:metrics.scroll_depth::DOUBLE AS scroll_depth
FROM bronze.raw_web_events;
JSON with get_json_object and json_tuple
-- Alternative extraction functions
SELECT
event_id,
GET_JSON_OBJECT(raw_payload, '$.user_id') AS user_id,
GET_JSON_OBJECT(raw_payload, '$.page.url') AS page_url,
-- Extract multiple fields at once
jt.*
FROM bronze.raw_web_events
LATERAL VIEW JSON_TUPLE(
raw_payload,
'user_id', 'event_type', 'timestamp'
) jt AS user_id, event_type, event_ts;
Flattening JSON Arrays
-- Explode order line items from a JSON array
SELECT
order_id,
order_date,
item.product_id,
item.quantity,
item.unit_price
FROM bronze.raw_orders,
LATERAL VIEW EXPLODE(
FROM_JSON(
line_items,
'ARRAY<STRUCT<product_id: STRING, quantity: INT, unit_price: DOUBLE>>'
)
) AS item;
-- Alternative with INLINE for struct arrays
SELECT
order_id,
order_date,
inline_item.*
FROM (
SELECT
order_id,
order_date,
FROM_JSON(
line_items,
'ARRAY<STRUCT<product_id: STRING, quantity: INT, unit_price: DOUBLE>>'
) AS parsed_items
FROM bronze.raw_orders
),
LATERAL VIEW INLINE(parsed_items) AS inline_item;
4.7 Semi-Structured Data: XML
-- Parse XML using xpath functions
SELECT
xpath_string(xml_payload, '/order/order_id') AS order_id,
xpath_string(xml_payload, '/order/customer/name') AS customer_name,
xpath_double(xml_payload, '/order/total') AS order_total,
-- Extract multiple line items
xpath(xml_payload, '/order/items/item/product_id/text()') AS product_ids,
xpath(xml_payload, '/order/items/item/quantity/text()') AS quantities
FROM bronze.raw_xml_orders;
4.8 Array and Map Operations
Array Functions
-- Common array operations in Databricks SQL
SELECT
customer_id,
-- Create arrays
ARRAY('Electronics', 'Clothing', 'Home') AS categories,
-- Array aggregation
COLLECT_LIST(product_category) AS purchased_categories,
COLLECT_SET(product_category) AS unique_categories,
-- Array functions
SIZE(COLLECT_SET(product_category)) AS category_count,
ARRAY_CONTAINS(COLLECT_SET(product_category), 'Electronics') AS bought_electronics,
ARRAY_DISTINCT(COLLECT_LIST(product_category)) AS distinct_cats,
ARRAY_SORT(COLLECT_SET(product_category)) AS sorted_cats,
-- Array to string
ARRAY_JOIN(ARRAY_SORT(COLLECT_SET(product_category)), ', ') AS categories_csv
FROM gold.fact_sales f
JOIN gold.dim_product p ON f.product_key = p.product_key
GROUP BY customer_id;
Map Functions
-- Build and query maps
SELECT
customer_id,
-- Create map from key-value pairs
MAP_FROM_ENTRIES(
COLLECT_LIST(
STRUCT(product_category, total_revenue)
)
) AS category_revenue_map,
-- Access map values
MAP_FROM_ENTRIES(
COLLECT_LIST(STRUCT(product_category, total_revenue))
)['Electronics'] AS electronics_revenue,
-- Map keys and values
MAP_KEYS(
MAP_FROM_ENTRIES(COLLECT_LIST(STRUCT(product_category, total_revenue)))
) AS categories,
MAP_VALUES(
MAP_FROM_ENTRIES(COLLECT_LIST(STRUCT(product_category, total_revenue)))
) AS revenues
FROM (
SELECT
customer_id,
product_category,
SUM(net_revenue) AS total_revenue
FROM gold.fact_sales f
JOIN gold.dim_product p ON f.product_key = p.product_key
GROUP BY customer_id, product_category
);
4.9 Higher-Order Functions
Higher-order functions operate on arrays with lambda expressions. They avoid the need to EXPLODE/COLLECT.
TRANSFORM
-- Apply a function to each element of an array
SELECT
customer_id,
tags,
TRANSFORM(tags, tag -> UPPER(tag)) AS tags_upper,
TRANSFORM(tags, tag -> CONCAT('category:', tag)) AS tags_prefixed,
TRANSFORM(
monthly_revenues,
rev -> ROUND(rev * 1.1, 2)
) AS revenues_with_10pct_uplift
FROM silver.customer_profiles;
FILTER
-- Filter array elements matching a condition
SELECT
order_id,
line_items,
FILTER(line_items, item -> item.quantity > 5) AS bulk_items,
FILTER(line_items, item -> item.unit_price > 100.00) AS premium_items,
SIZE(FILTER(line_items, item -> item.quantity > 5)) AS bulk_item_count
FROM silver.parsed_orders;
AGGREGATE (REDUCE)
-- Reduce an array to a single value
SELECT
order_id,
line_items,
AGGREGATE(
line_items,
CAST(0.0 AS DOUBLE),
(acc, item) -> acc + (item.quantity * item.unit_price)
) AS computed_total,
AGGREGATE(
line_items,
CAST(0 AS INT),
(acc, item) -> acc + item.quantity
) AS total_quantity
FROM silver.parsed_orders;
EXISTS (Array Predicate)
-- Check if any element matches a condition
SELECT
customer_id,
purchase_categories,
EXISTS(purchase_categories, cat -> cat = 'Electronics') AS is_tech_buyer,
EXISTS(purchase_categories, cat -> cat LIKE '%Premium%') AS is_premium_buyer
FROM gold.customer_summary;
4.10 SQL UDFs
When built-in functions are insufficient, SQL UDFs provide reusable custom logic without the Python serialization overhead.
Scalar UDF
-- Calculate customer lifetime value tier
CREATE OR REPLACE FUNCTION gold.classify_clv(
total_revenue DOUBLE,
total_orders INT,
tenure_months INT
)
RETURNS STRING
RETURN CASE
WHEN total_revenue > 10000 AND total_orders > 50 THEN 'Platinum'
WHEN total_revenue > 5000 AND total_orders > 20 THEN 'Gold'
WHEN total_revenue > 1000 AND total_orders > 5 THEN 'Silver'
WHEN total_revenue > 0 THEN 'Bronze'
ELSE 'Prospect'
END;
-- Usage
SELECT
customer_id,
customer_name,
gold.classify_clv(lifetime_revenue, total_orders, tenure_months) AS clv_tier
FROM gold.customer_summary;
Table-Valued UDF
... [content trimmed for length — full version in the complete kit]
This is 1 of 5 resources in the Datanest Academy toolkit. Get the complete [Databricks Analytics Engineering] with all files, templates, and documentation for $129.
Or grab the entire Datanest Academy bundle (5 products) for $XXX — save XX%.
Top comments (0)