DEV Community

Jag Thind for Super Payments

Posted on

Improve DBT Incremental Performance on Snowflake using Custom Incremental Strategy

The following presents how to improve the performance of the DBT built-in delete-insert incremental strategy on snowflake so we can control snowflake query costs. It is broken down into:

  1. Defining the problem, with supporting performance statistics
  2. Desired solution requirements
  3. Solution implementation, with supporting performance statistics

TL;DR

We implemented a DBT custom incremental strategy, along with incremental predicates to improve snowflake query performance:

  • Reduced MBs scanned by ~99.68%
  • Reduced micro-partitions scanned by ~99.53%
  • Reduced query time from 19 seconds to 1.3 seconds

Less data is being scanned, so the snowflake warehouse is waiting less time on I/O, so the query completes faster.

Disclaimer

Custom incremental strategies and incremental predicates are more advanced uses of DBT for incremental processing. But I suppose that’s where you have the most fun, so lets get stuck in!


Problem

When using the DBT built-in delete-insert incremental strategy on large volumes of data, you can get inefficient queries on snowflake when the delete statement is executed. This means queries take longer and increase warehouse costs.

Taking an example target table:

  • With ~458 million rows
  • Is ~26 GB in size
  • Has ~2560 micro-partitions

With a DBT model that:

  • Is running every 30 minutes
  • Typically there are ~100K rows to merge into the target table on every run. As data can arrive out-of-order, a subsequent run will pick these up, but means it can include rows already processed.

With DBT model config:

  - name: model_name
    config:
      materialized: "incremental"
      incremental_strategy: "delete+insert"
      on_schema_change: "append_new_columns"
      unique_key: ["dw_order_created_skey"] -- varchar(100)
      cluster_by: ["to_date(order_created_at)"]
Enter fullscreen mode Exit fullscreen mode

Default delete SQL generated by DBT, before it inserts data in the same transaction:

delete from target_table as DBT_INTERNAL_DEST
where (dw_order_created_skey) in (
  select distinct dw_order_created_skey
  from source_temp_table as DBT_INTERNAL_SOURCE
);
Enter fullscreen mode Exit fullscreen mode

Performance Statistics

To find the rows in the target table to delete with the matching dw_order_created_skey (see node profile overview image below), snowflake has to:

  • Scan ~11 GB of the target table
  • Scan all ~2560 micro-partitions
  • Query takes ~19 seconds

Why? - The query is not filtering on order_created_at to allow snowflake to use the clustering key of to_date(order_created_at) to find the matching rows to delete.

Query plan

delete query plan

node profile overview

Desired Solution

To limit the data read in the target table above. We can make use of incremental_predicates in the model config. This will add SQL to filter the target table.

DBT model config:

  - name: model_name
    config:
      materialized: "incremental"
      incremental_strategy: "delete+insert"
      on_schema_change: "append_new_columns"
      unique_key: ["dw_order_created_skey"]
      cluster_by: ["to_date(order_created_at)"]
      incremental_predicates:
        - "order_created_at >= (select dateadd(hour, -24, min(order_created_at)) from DBT_INTERNAL_SOURCE)"
Enter fullscreen mode Exit fullscreen mode

Issues with this

  • The incremental_predicates docs states dbt does not check the syntax of the SQL statements, so it does not change anything in the SQL.
  • We get an error when it executes on snowflake: Object 'DBT_INTERNAL_SOURCE' does not exist or not authorized.
  • We cannot hardcode the snowflake table name in the incremental_predicates, as its dynamically generated by DBT.

Solution Implementation

We need to:

  • Do some pre-processing on each element of incremental_predicates to replace DBT_INTERNAL_SOURCE with actual source_temp_table so SQL like the below is generated by DBT for better performance:
delete from target_table as DBT_INTERNAL_DEST
where (dw_order_created_skey) in (
  select distinct dw_order_created_skey
  from source_temp_table as DBT_INTERNAL_SOURCE
)
-- Added by incremental_predicates
and order_created_at >= (select dateadd(hour, -24, min(order_created_at)) from source_temp_table)
;
Enter fullscreen mode Exit fullscreen mode
  • Continue to call the default DBT delete+insert incremental strategy with the new value for incremental_predicates in the arguments dictionary.

How - The below macro implements a light-weight custom incremental strategy do this. You can see at the end it calls the default get_incremental_delete_insert_sql DBT code.

{% macro get_incremental_custom_delete_insert_sql(arg_dict) %}
  {% set custom_arg_dict = arg_dict.copy() %}
  {% set source = custom_arg_dict.get('temp_relation') | string %}
  {% set target = custom_arg_dict.get('target_relation') | string %}

  {% if source is none %}
    {{ exceptions.raise_compiler_error('temp_relation is not present in arguments!') }}
  {% endif %}

  {% if target is none %}
    {{ exceptions.raise_compiler_error('target_relation is not present in arguments!') }}
  {% endif %}

  {% set raw_predicates = custom_arg_dict.get('incremental_predicates', []) %}

  {% if raw_predicates is string %}
    {% set predicates = [raw_predicates] %}
  {% else %}
    {% set predicates = raw_predicates %}
  {% endif %}

  {% if predicates %}
    {% set replaced_predicates = [] %}
    {% for predicate in predicates %}
      {% set replaced = predicate
        | replace('DBT_INTERNAL_SOURCE', source)
        | replace('DBT_INTERNAL_DEST', target)
      %}
      {% do replaced_predicates.append(replaced) %}
    {% endfor %}
    {% do custom_arg_dict.update({'incremental_predicates': replaced_predicates}) %}
  {% endif %}

  {{ log('Calling get_incremental_delete_insert_sql with args: ' ~ custom_arg_dict, info=False) }}
  {{ get_incremental_delete_insert_sql(custom_arg_dict) }}
{% endmacro %}
Enter fullscreen mode Exit fullscreen mode

This is now callable from the DBT model config by setting incremental_strategy to custom_delete_insert.

  - name: model_name
    config:
      materialized: "incremental"
      incremental_strategy: "custom_delete_insert"
      on_schema_change: "append_new_columns"
      unique_key: ["dw_order_created_skey"]
      cluster_by: ["to_date(order_created_at)"]
      incremental_predicates:
        - "order_created_at >= (select dateadd(hour, -24, min(order_created_at)) from DBT_INTERNAL_SOURCE)"
Enter fullscreen mode Exit fullscreen mode

Performance Improvement Statistics

To find ~100K rows to delete in the target table, now snowflake has to only:

  • Scan ~35 MB of the target table, 11 GB → 35 MB = ~99.68% improvement
  • Scan 12 micro-partitions, 2560 → 12 = ~99.53% improvement
  • Query takes ~1.3 seconds

Less data is being scanned, so the snowflake warehouse is waiting less time on I/O, so the query completes faster.

Query plan

delete query plan

node profile overview


If you're interested in hearing more about how we use DBT at Super Payments, feel free to reach out!

Top comments (0)