You write SELECT * FROM orders WHERE dt > '2024-01-01'. But that query alone doesn't create a table, update a partition, or merge with existing records. Something has to wrap your SQL in the right DDL/DML for your specific database, strategy, and context.
In Bruin, that something is the materialization system. It takes your query, looks at your config, and generates the exact SQL needed to materialize the result. Pure string manipulation with a clear purpose.
I want to walk through how it actually works, because the architecture is simpler than you'd expect for something that handles 9 strategies across 14 databases.
The problem
Say you have this SQL asset:
/* @bruin
name: dashboard.user_metrics
type: bq.sql
materialization:
type: table
strategy: delete+insert
incremental_key: updated_at
@bruin */
SELECT user_id, event_count, updated_at
FROM raw.user_events
WHERE updated_at > '2024-01-01'
Your query is a SELECT. But what you actually want to happen is:
- Run that SELECT into a temp table
- Find all distinct values of
updated_atin the temp table - Delete rows from the target table where
updated_atmatches those values - Insert the temp table rows into the target table
And the exact SQL to do that differs between BigQuery, Snowflake, Postgres, DuckDB, and every other database Bruin supports. BigQuery doesn't have real transactions. Snowflake uses different temp table syntax. Postgres has its own quirks.
The naive approach would be to just build one giant function with a bunch of if-else branches. Check the database, check the strategy, generate SQL. It would work, but it would be a mess of 5,000+ lines of spaghetti. And every time you add a new database or strategy, you'd be touching the same fragile function.
The dispatch table
Here's the core of the materialization system. It's in pkg/pipeline/materializer.go:
type (
MaterializerFunc func(task *Asset, query string) (string, error)
AssetMaterializationMap map[MaterializationType]map[MaterializationStrategy]MaterializerFunc
)
type Materializer struct {
MaterializationMap AssetMaterializationMap
FullRefresh bool
}
MaterializerFunc is the signature every materializer must match: take an asset and a query string, return a new query string. That's it. The AssetMaterializationMap is a nested map: outer key is type (table or view), inner key is strategy (merge, append, delete+insert, etc.), value is the function that generates SQL.
The Render method does the dispatch:
func (m *Materializer) Render(asset *Asset, query string) (string, error) {
mat := asset.Materialization
if mat.Type == MaterializationTypeNone {
return removeComments(query), nil
}
strategy := mat.Strategy
if m.FullRefresh && mat.Type == MaterializationTypeTable {
if mat.Strategy != MaterializationStrategyDDL &&
(asset.RefreshRestricted == nil || !*asset.RefreshRestricted) {
strategy = MaterializationStrategyCreateReplace
}
}
if matFunc, ok := m.MaterializationMap[mat.Type][strategy]; ok {
materializedQuery, err := matFunc(asset, query)
if err != nil {
return "", err
}
return removeComments(materializedQuery), nil
}
return "", fmt.Errorf("unsupported materialization type - strategy combination: (%s - %s)",
mat.Type, mat.Strategy)
}
Two things worth noting:
First, the full refresh override. When you run bruin run --full-refresh, every table strategy gets replaced with create+replace, which drops and recreates the table from scratch. But there are two exceptions: DDL strategy (you can't drop/recreate a DDL-only table, that would lose data) and assets marked refresh_restricted: true (for tables you never want accidentally dropped).
Second, the comment stripping. Bruin assets embed YAML config in SQL comments (/* @bruin ... @bruin */). The materializer strips these before sending the query to the database, using a regex: commentRegex = regexp.MustCompile('/\* *@bruin[\s\w\S]*@bruin *\*/').
Each database brings its own map
The core Materializer struct is database-agnostic. Each database package provides its own dispatch map. Here's DuckDB's (from pkg/duckdb/materialization.go):
var matMap = pipeline.AssetMaterializationMap{
pipeline.MaterializationTypeView: {
pipeline.MaterializationStrategyNone: viewMaterializer,
pipeline.MaterializationStrategyAppend: errorMaterializer,
pipeline.MaterializationStrategyCreateReplace: errorMaterializer,
pipeline.MaterializationStrategyDeleteInsert: errorMaterializer,
},
pipeline.MaterializationTypeTable: {
pipeline.MaterializationStrategyNone: buildCreateReplaceQuery,
pipeline.MaterializationStrategyAppend: buildAppendQuery,
pipeline.MaterializationStrategyCreateReplace: buildCreateReplaceQuery,
pipeline.MaterializationStrategyDeleteInsert: buildIncrementalQuery,
pipeline.MaterializationStrategyTruncateInsert: ansisql.BuildTruncateInsertQuery,
pipeline.MaterializationStrategyMerge: buildMergeQuery,
pipeline.MaterializationStrategyTimeInterval: buildTimeIntervalQuery,
pipeline.MaterializationStrategyDDL: buildDDLQuery,
pipeline.MaterializationStrategySCD2ByTime: buildSCD2ByTimeQuery,
pipeline.MaterializationStrategySCD2ByColumn: buildSCD2ByColumnQuery,
},
}
func NewMaterializer(fullRefresh bool) *pipeline.Materializer {
return &pipeline.Materializer{
MaterializationMap: matMap,
FullRefresh: fullRefresh,
}
}
Look at the view section. You can't append to a view, or delete+insert into one. Those combinations map to errorMaterializer, which just returns an error saying "not supported." The dispatch map itself encodes which combinations are valid.
Also notice ansisql.BuildTruncateInsertQuery for truncate+insert. When the SQL is standard enough, databases share an implementation from the pkg/ansisql/ package. That function is just:
func BuildTruncateInsertQuery(task *pipeline.Asset, query string) (string, error) {
queries := []string{
"BEGIN TRANSACTION",
"TRUNCATE TABLE " + task.Name,
fmt.Sprintf("INSERT INTO %s %s", task.Name, strings.TrimSuffix(query, ";")),
"COMMIT",
}
return strings.Join(queries, ";\n") + ";", nil
}
Snowflake, DuckDB, and others all reuse this. BigQuery can't, because BigQuery doesn't support transactions, so it has its own version without the BEGIN/COMMIT wrapper.
Every database creates its Materializer via a NewMaterializer(fullRefresh bool) factory. The calling code doesn't know or care which database-specific functions are in the map. It just calls Render().
From simple to complex: the actual SQL generation
The simple strategies are, well, simple. append is literally:
func buildAppendQuery(asset *pipeline.Asset, query string) (string, error) {
return fmt.Sprintf("INSERT INTO %s %s", asset.Name, query), nil
}
create+replace for BigQuery adds partitioning and clustering:
func buildCreateReplaceQuery(asset *pipeline.Asset, query string) (string, error) {
partitionClause := ""
if mat.PartitionBy != "" {
partitionClause = "PARTITION BY " + mat.PartitionBy
}
clusterByClause := ""
if len(mat.ClusterBy) > 0 {
clusterByClause = "CLUSTER BY " + strings.Join(mat.ClusterBy, ", ")
}
return fmt.Sprintf("CREATE OR REPLACE TABLE %s %s %s AS\n%s",
asset.Name, partitionClause, clusterByClause, query), nil
}
delete+insert is where the complexity starts. BigQuery's version (from pkg/bigquery/materialization.go) has an optimization most people wouldn't think of:
func buildIncrementalQuery(asset *pipeline.Asset, query string) (string, error) {
mat := asset.Materialization
foundCol := asset.GetColumnWithName(mat.IncrementalKey)
if foundCol == nil || foundCol.Type == "" || foundCol.Type == "UNKNOWN" {
return buildIncrementalQueryWithoutTempVariable(asset, query)
}
randPrefix := helpers.PrefixGenerator()
tempTableName := "__bruin_tmp_" + randPrefix
declaredVarName := "distinct_keys_" + randPrefix
queries := []string{
fmt.Sprintf("DECLARE %s array<%s>", declaredVarName, foundCol.Type),
"BEGIN TRANSACTION",
fmt.Sprintf("CREATE TEMP TABLE %s AS %s", tempTableName, strings.TrimSuffix(query, ";")),
fmt.Sprintf("SET %s = (SELECT array_agg(distinct %s) FROM %s)",
declaredVarName, mat.IncrementalKey, tempTableName),
fmt.Sprintf("DELETE FROM %s WHERE %s in unnest(%s)",
asset.Name, mat.IncrementalKey, declaredVarName),
fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", asset.Name, tempTableName),
"COMMIT TRANSACTION",
}
return strings.Join(queries, ";\n") + ";", nil
}
When the column type is known, BigQuery can use a typed DECLARE variable with array_agg and unnest to collect the distinct keys. This is faster than a subquery for large datasets because BigQuery can optimize the array operation. When the type is unknown, it falls back to a simpler approach with an inline SELECT DISTINCT subquery.
Each temp table gets a random prefix via helpers.PrefixGenerator() to avoid collisions when running concurrent pipelines. The naming convention is __bruin_tmp_<random> or __bruin_merge_tmp_<random> depending on the strategy.
Compare this with Snowflake's version (from pkg/snowflake/materialization.go), which is more straightforward because Snowflake has proper transaction support:
func buildIncrementalQuery(task *pipeline.Asset, query string) (string, error) {
tempTableName := "__bruin_tmp_" + helpers.PrefixGenerator()
queries := []string{
"BEGIN TRANSACTION",
fmt.Sprintf("CREATE TEMP TABLE %s AS %s", tempTableName, strings.TrimSuffix(query, ";")),
fmt.Sprintf("DELETE FROM %s WHERE %s in (SELECT DISTINCT %s FROM %s)",
task.Name, mat.IncrementalKey, mat.IncrementalKey, tempTableName),
fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", task.Name, tempTableName),
"DROP TABLE IF EXISTS " + tempTableName,
"COMMIT",
}
return strings.Join(queries, ";\n") + ";", nil
}
No DECLARE, no array_agg. Just a subquery in the DELETE. Also notice Snowflake explicitly drops the temp table before commit, while BigQuery doesn't need to (BigQuery temp tables are session-scoped and auto-cleaned).
Merge: the most complex strategy
Merge is where things get genuinely complicated, because the semantics differ significantly between databases. Here's BigQuery's merge (from pkg/bigquery/materialization.go):
func mergeMaterializer(asset *pipeline.Asset, query string) (string, error) {
primaryKeys := asset.ColumnNamesWithPrimaryKey()
mergeColumns := ansisql.GetColumnsWithMergeLogic(asset)
columnNames := asset.ColumnNames()
on := make([]string, 0, len(primaryKeys))
for _, key := range primaryKeys {
on = append(on, fmt.Sprintf(
"(source.%s = target.%s OR (source.%s IS NULL and target.%s IS NULL))",
key, key, key, key))
}
// ... build WHEN MATCHED and WHEN NOT MATCHED clauses
mergeLines := []string{
fmt.Sprintf("MERGE %s target", asset.Name),
fmt.Sprintf("USING (%s) source", strings.TrimSuffix(query, ";")),
fmt.Sprintf("ON (%s)", onQuery),
whenMatchedThenQuery,
fmt.Sprintf("WHEN NOT MATCHED THEN INSERT(%s) VALUES(%s)",
allColumnValues, allColumnValues),
}
return strings.Join(mergeLines, "\n") + ";", nil
}
See that NULL handling in the ON condition? (source.id = target.id OR (source.id IS NULL and target.id IS NULL)). BigQuery needs this because NULL = NULL evaluates to NULL, not TRUE. Snowflake's version uses plain target.id = source.id because its MERGE handles NULLs differently.
DuckDB can't use MERGE at all (it didn't support it when this was written), so its merge implementation uses a different approach entirely: temp table, UPDATE with a JOIN, then INSERT with NOT EXISTS:
func buildMergeQuery(asset *pipeline.Asset, query string) (string, error) {
// ... setup ...
queries := []string{
"BEGIN TRANSACTION",
fmt.Sprintf("CREATE TEMP TABLE %s AS %s", tempTableName, query),
fmt.Sprintf("UPDATE %s AS target SET %s FROM %s AS source WHERE %s",
asset.Name, updateClause, tempTableName, onCondition),
fmt.Sprintf("INSERT INTO %s (%s) SELECT %s FROM %s AS source "+
"WHERE NOT EXISTS (SELECT 1 FROM %s AS target WHERE %s)",
asset.Name, allColumnNames, allColumnNames,
tempTableName, asset.Name, onCondition),
"DROP TABLE " + tempTableName,
"COMMIT",
}
return strings.Join(queries, ";\n") + ";", nil
}
Same behavior, totally different SQL. The dispatch map makes this invisible to the user.
There's also a nice feature in how merge columns work. The ansisql.GetColumnsWithMergeLogic function (from pkg/ansisql/materialization.go) filters columns:
func GetColumnsWithMergeLogic(asset *pipeline.Asset) []pipeline.Column {
var columns []pipeline.Column
for _, col := range asset.Columns {
if col.PrimaryKey {
continue
}
if col.MergeSQL != "" || col.UpdateOnMerge {
columns = append(columns, col)
}
}
return columns
}
Primary keys are never updated (they're the join condition). Other columns are only updated if they're marked update_on_merge: true or have a custom merge_sql expression. This means users can do things like merge_sql: GREATEST(target.Score, source.Score) to keep whichever score is higher, per-column.
SCD2: generating slowly changing dimensions from a SELECT
The most complex materializer is SCD2 (Slowly Changing Dimension Type 2). It takes your SELECT query and generates SQL that maintains a full history table with _valid_from, _valid_until, and _is_current columns.
There are two variants: scd2_by_column (detects changes by comparing column values) and scd2_by_time (detects changes using a timestamp column). Both are surprisingly different in their generated SQL.
For BigQuery, scd2_by_time generates a MERGE statement. It creates a source CTE that unions two versions of every incoming row: one marked _is_current = TRUE (the new version to insert) and one marked _is_current = FALSE (to expire the old version). Then the MERGE matches these against the target:
MERGE INTO `product_catalog` AS target
USING (
WITH s1 AS (
SELECT product_id, name, price, updated_at FROM raw_products
)
SELECT s1.*, TRUE AS _is_current
FROM s1
UNION ALL
SELECT s1.*, FALSE AS _is_current
FROM s1
JOIN `product_catalog` AS t1 USING (product_id)
WHERE t1._valid_from < CAST(s1.updated_at AS TIMESTAMP) AND t1._is_current
) AS source
ON target.product_id = source.product_id AND target._is_current AND source._is_current
WHEN MATCHED AND (
target._valid_from < CAST(source.updated_at AS TIMESTAMP)
) THEN
UPDATE SET
target._valid_until = CAST(source.updated_at AS TIMESTAMP),
target._is_current = FALSE
WHEN NOT MATCHED BY SOURCE AND target._is_current = TRUE THEN
UPDATE SET
target._valid_until = CURRENT_TIMESTAMP(),
target._is_current = FALSE
WHEN NOT MATCHED BY TARGET THEN
INSERT (product_id, name, price, updated_at, _valid_from, _valid_until, _is_current)
VALUES (source.product_id, source.name, source.price, source.updated_at,
CAST(source.updated_at AS TIMESTAMP), TIMESTAMP('9999-12-31'), TRUE);
That WHEN NOT MATCHED BY SOURCE clause is what handles deletions: records that exist in the target but not in the source get expired.
Snowflake's SCD2 implementation can't use the same approach because of dialect differences. It uses a multi-step transaction instead:
BEGIN TRANSACTION;
SET current_scd2_ts = CURRENT_TIMESTAMP();
-- Step 1: Expire records no longer in source
UPDATE product_catalog AS target
SET _valid_until = $current_scd2_ts, _is_current = FALSE
WHERE target._is_current = TRUE
AND NOT EXISTS (
SELECT 1 FROM (SELECT ...) AS source
WHERE target.product_id = source.product_id
);
-- Step 2: Handle new and changed records via MERGE
MERGE INTO product_catalog AS target
USING ( ... ) AS source
ON target.product_id = source.product_id AND target._is_current AND source._is_current
WHEN MATCHED AND (...) THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT (...) VALUES (...);
COMMIT;
Notice how Snowflake captures the timestamp into a session variable (SET current_scd2_ts = CURRENT_TIMESTAMP()) and reuses it throughout. This is important for consistency: if the transaction takes a few seconds, you don't want different rows getting different expiry timestamps.
Both implementations also handle full refresh mode. When you run --full-refresh, SCD2 tables get bootstrapped with all records marked as current and auto-partitioned by _valid_from:
stmt := fmt.Sprintf(
`CREATE OR REPLACE TABLE %s
%s
%s AS
SELECT
CAST(%s AS TIMESTAMP) AS _valid_from,
src.*,
TIMESTAMP('9999-12-31') AS _valid_until,
TRUE AS _is_current
FROM (
%s
) AS src;`,
tbl, partitionClause, clusterClause,
asset.Materialization.IncrementalKey,
strings.TrimSpace(query),
)
The reserved column names (_valid_from, _valid_until, _is_current) are validated at query generation time. If your asset has a column named _is_current, the materializer returns an error before any SQL hits the database.
The hook wrapper
There's one more layer I want to mention. Assets can define pre/post hooks, SQL statements that run before and after the materialized query:
hooks:
pre:
- query: "CREATE SCHEMA IF NOT EXISTS staging"
post:
- query: "GRANT SELECT ON dashboard.user_metrics TO analytics_role"
Rather than embedding hook logic into every materializer function, there's a HookWrapperMaterializer decorator:
type HookWrapperMaterializer struct {
Mat interface {
Render(asset *Asset, query string) (string, error)
}
}
func (m HookWrapperMaterializer) Render(asset *Asset, query string) (string, error) {
materialized, err := m.Mat.Render(asset, query)
if err != nil {
return "", err
}
return WrapHooks(materialized, asset.Hooks), nil
}
It wraps any materializer, runs the base Render, then prepends/appends the hook queries. No materializer function needs to know about hooks.
What makes this work
The whole system spans around 5,000+ lines of SQL generation code across 14 database packages, but the core pattern is just a two-level map lookup.
Adding a new database means providing a new map. A new strategy means writing a function with the right signature. If a combination doesn't make sense (like append for a view), you plug in an error function. The map itself is the documentation of what's supported.
There are tradeoffs. The MaterializerFunc signature is func(task *Asset, query string) (string, error). It passes the entire Asset struct, so every function has access to everything: columns, primary keys, materialization config, connection details. That's flexible but also means there's no compile-time guarantee that a function only reads what it needs. It's a pragmatic choice for a system where each function needs slightly different fields.
The generated SQL isn't parameterized (it uses fmt.Sprintf with string interpolation). In a different context, this would be a SQL injection concern, but here the inputs come from YAML config files that the user wrote, not from untrusted external input.
If you're building something that needs to generate SQL across multiple databases, this pattern works well. Don't try to abstract away the differences. Accept that BigQuery's MERGE and DuckDB's UPDATE-then-INSERT are fundamentally different operations, give each database its own implementation, and use a dispatch table to route to the right one. The database-specific code ends up being surprisingly readable because each function only worries about one database and one strategy.
The materialization source code is at:
- Core: pkg/pipeline/materializer.go
- Strategies enum: pkg/pipeline/pipeline.go (lines 350-390)
- Shared ANSI SQL: pkg/ansisql/materialization.go
- Per-database implementations: pkg/bigquery/materialization.go, pkg/snowflake/materialization.go, pkg/duckdb/materialization.go, and 11 others
Top comments (0)