The following presents how to improve the performance of the DBT built-in delete-insert
incremental strategy on snowflake so we can control snowflake query costs. It is broken down into:
- Defining the problem, with supporting performance statistics
- Desired solution requirements
- Solution implementation, with supporting performance statistics
TL;DR
We implemented a DBT custom incremental strategy, along with incremental predicates to improve snowflake query performance:
- Reduced MBs scanned by ~99.68%
- Reduced micro-partitions scanned by ~99.53%
- Reduced query time from
19
seconds to1.3
seconds
Less data is being scanned, so the snowflake warehouse is waiting less time on I/O, so the query completes faster.
Disclaimer
Custom incremental strategies and incremental predicates are more advanced uses of DBT for incremental processing. But I suppose that’s where you have the most fun, so lets get stuck in!
Problem
When using the DBT built-in delete-insert
incremental strategy on large volumes of data, you can get inefficient queries on snowflake when the delete
statement is executed. This means queries take longer and increase warehouse costs.
Taking an example target table:
- With
~458 million
rows - Is
~26 GB
in size - Has
~2560
micro-partitions
With a DBT model that:
- Is running every
30 minutes
- Typically there are
~100K
rows to merge into the target table on every run. As data can arrive out-of-order, a subsequent run will pick these up, but means it can include rows already processed.
With DBT model config:
- name: model_name
config:
materialized: "incremental"
incremental_strategy: "delete+insert"
on_schema_change: "append_new_columns"
unique_key: ["dw_order_created_skey"] -- varchar(100)
cluster_by: ["to_date(order_created_at)"]
Default delete
SQL generated by DBT, before it inserts data in the same transaction:
delete from target_table as DBT_INTERNAL_DEST
where (dw_order_created_skey) in (
select distinct dw_order_created_skey
from source_temp_table as DBT_INTERNAL_SOURCE
);
Performance Statistics
To find the rows in the target table to delete with the matching dw_order_created_skey
(see node profile overview image below), snowflake has to:
- Scan
~11 GB
of the target table - Scan all
~2560 micro-partitions
- Query takes
~19 seconds
Why? - The query is not filtering on order_created_at
to allow snowflake to use the clustering key
of to_date(order_created_at)
to find the matching rows to delete.
Query plan
Desired Solution
To limit the data read in the target table above. We can make use of incremental_predicates in the model config. This will add SQL to filter the target table.
DBT model config:
- name: model_name
config:
materialized: "incremental"
incremental_strategy: "delete+insert"
on_schema_change: "append_new_columns"
unique_key: ["dw_order_created_skey"]
cluster_by: ["to_date(order_created_at)"]
incremental_predicates:
- "order_created_at >= (select dateadd(hour, -24, min(order_created_at)) from DBT_INTERNAL_SOURCE)"
Issues with this
- The incremental_predicates docs states dbt does not check the syntax of the SQL statements, so it does not change anything in the SQL.
- We get an error when it executes on snowflake:
Object 'DBT_INTERNAL_SOURCE' does not exist or not authorized.
- We cannot hardcode the snowflake table name in the incremental_predicates, as its dynamically generated by DBT.
Solution Implementation
We need to:
- Do some pre-processing on each element of
incremental_predicates
to replaceDBT_INTERNAL_SOURCE
with actualsource_temp_table
so SQL like the below is generated by DBT for better performance:
delete from target_table as DBT_INTERNAL_DEST
where (dw_order_created_skey) in (
select distinct dw_order_created_skey
from source_temp_table as DBT_INTERNAL_SOURCE
)
-- Added by incremental_predicates
and order_created_at >= (select dateadd(hour, -24, min(order_created_at)) from source_temp_table)
;
- Continue to call the default DBT
delete+insert
incremental strategy with the new value forincremental_predicates
in the arguments dictionary.
How - The below macro implements a light-weight custom incremental strategy do this. You can see at the end it calls the default get_incremental_delete_insert_sql
DBT code.
{% macro get_incremental_custom_delete_insert_sql(arg_dict) %}
{% set custom_arg_dict = arg_dict.copy() %}
{% set source = custom_arg_dict.get('temp_relation') | string %}
{% set target = custom_arg_dict.get('target_relation') | string %}
{% if source is none %}
{{ exceptions.raise_compiler_error('temp_relation is not present in arguments!') }}
{% endif %}
{% if target is none %}
{{ exceptions.raise_compiler_error('target_relation is not present in arguments!') }}
{% endif %}
{% set raw_predicates = custom_arg_dict.get('incremental_predicates', []) %}
{% if raw_predicates is string %}
{% set predicates = [raw_predicates] %}
{% else %}
{% set predicates = raw_predicates %}
{% endif %}
{% if predicates %}
{% set replaced_predicates = [] %}
{% for predicate in predicates %}
{% set replaced = predicate
| replace('DBT_INTERNAL_SOURCE', source)
| replace('DBT_INTERNAL_DEST', target)
%}
{% do replaced_predicates.append(replaced) %}
{% endfor %}
{% do custom_arg_dict.update({'incremental_predicates': replaced_predicates}) %}
{% endif %}
{{ log('Calling get_incremental_delete_insert_sql with args: ' ~ custom_arg_dict, info=False) }}
{{ get_incremental_delete_insert_sql(custom_arg_dict) }}
{% endmacro %}
This is now callable from the DBT model config by setting incremental_strategy
to custom_delete_insert
.
- name: model_name
config:
materialized: "incremental"
incremental_strategy: "custom_delete_insert"
on_schema_change: "append_new_columns"
unique_key: ["dw_order_created_skey"]
cluster_by: ["to_date(order_created_at)"]
incremental_predicates:
- "order_created_at >= (select dateadd(hour, -24, min(order_created_at)) from DBT_INTERNAL_SOURCE)"
Performance Improvement Statistics
To find ~100K
rows to delete in the target table, now snowflake has to only:
- Scan
~35 MB
of the target table, 11 GB → 35 MB = ~99.68% improvement - Scan
12 micro-partitions
, 2560 → 12 = ~99.53% improvement - Query takes
~1.3 seconds
Less data is being scanned, so the snowflake warehouse is waiting less time on I/O, so the query completes faster.
Query plan
If you're interested in hearing more about how we use DBT at Super Payments, feel free to reach out!
Top comments (0)