DEV Community

Robert Nubel
Robert Nubel

Posted on

Transform data in Snowflake with Streams, Tasks, and Python

Data pipelines are everywhere in the enterprise, understandably: data is the lifeblood of a company, and without being able to get it to those who need it, work would grind to a halt. The classic paradigm for building data pipelines has historically been ETL (Extract-Transform-Load). The name says it all: you build a job which extracts data from one source, apply your desired reshaping/aggregation/fancification, and pushes it to a destination. But one of my favorite developments over the past decade or so is the ELT paradigm (Extract-Load-Transform), which defers the reshaping until your data has already made it to the destination -- giving you flexibility to adjust that transform as needed and slimming down the components in your pipeline.

Snowflake is a cloud data warehouse that's the target of many data pipelines, and has three features that I love for building data pipelines where you do your transformation after you've loaded it in: Streams, User-Defined Functions (UDFs), and Tasks.

  • Streams are like tables, except they only contain data that's new from their source. They can include all changes, or just inserts, depending on your needs. They work by storing an offset to Snowflake's internal CDC information, similar to a Kafka consumer offset, meaning streams don't actually store any data and can be re-created easily.
  • UDFs are functions that you can write in a variety of languages (including #python). These have some language-specific particulars (for example, JavaScript UDFs take in all rows to the same execution instance, whereas Python UDFs can execute on one row or on batches of rows, exposed to the UDF as a pandas DataFrame) but overall are incredibly useful. They're also great for cases where you're working with rich JSON data that your team doesn't want to work with in plain SQL.
  • Tasks are scheduled jobs that live right inside Snowflake, and be scheduled without the need to involve separate scheduling software.

In this tutorial, I'm going to show how you can build out the Transform step of an ELT pipeline entirely inside Snowflake. I won't go into how your data gets extracted from whatever source or loaded into Snowflake (but I really like Kafka).

Step 0: Setup

We'll use Snowflake's provided dataset just to easily generate data.

USE WORKSHEET;
USE SCHEMA WORKSHEET.RNUBEL;

CREATE TABLE ORDERS
  AS (SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS LIMIT 1);
Enter fullscreen mode Exit fullscreen mode

Step 1: Define your stream

Now that we've got a table, let's create a stream on it.

CREATE OR REPLACE STREAM ORDERS_STREAM ON TABLE ORDERS APPEND_ONLY = true;
Enter fullscreen mode Exit fullscreen mode

A couple notes:

  • APPEND_ONLY = true is a flag indicating we only want to see new records (i.e., INSERTS). If you also need to account for updates or deletes, don't pass this flag, and be prepared to handle those other operations.
  • When a stream is created, it initially will have its offset set to the tip of the table's internal changelog, and therefore contain no data if you query it. You can move this offset back with an AT or BEFORE clause: see the docs for more information.

We expect our stream to be empty, at the moment:

SELECT COUNT(*) FROM ORDERS_STREAM;   // returns 0
Enter fullscreen mode Exit fullscreen mode

Let's insert some data to see the stream in action.

INSERT INTO ORDERS
  (SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS LIMIT 10);

SELECT COUNT(*) FROM ORDERS_STREAM; // returns 10
Enter fullscreen mode Exit fullscreen mode

Note that you can query that count as many times as you'd like and you'll still get 10. So when does the offset advance and clear the stream? It's dangerously simple: whenever any DML operation happens that involves the stream. This will work in our favor later, but it can be surprising. For now, this dummy insert operation will clear it:

CREATE TEMPORARY TABLE temp_delete_stuff AS (SELECT * FROM orders_stream);
DROP TABLE temp_delete_stuff;
SELECT COUNT(*) FROM ORDERS_STREAM; // returns 0
Enter fullscreen mode Exit fullscreen mode

Step 2: Define a UDF

Now, this step might be optional for you. Maybe your transform stage can all happen in SQL, and you can skip right to Step 3, but I think having access to Python opens up a lot of possibilities. So let's make a Python UDF that will transform a row from our source table into a fancy destination row. Actually, it won't be fancy, because this is a tutorial, but it will at least be generated by Python.

CREATE OR REPLACE FUNCTION transform_orders("row" OBJECT)
RETURNS TABLE (order_key TEXT, order_age_in_days INTEGER)
LANGUAGE PYTHON
HANDLER = 'OrderTransformer'
RUNTIME_VERSION='3.8'
AS
'
from datetime import date

class OrderTransformer:
  def __init__(self):
    pass

  def process(self, row):
    age = date.today() - date.fromisoformat(row["O_ORDERDATE"])
    return [(row["O_ORDERKEY"], age.days)]

  def end_partition(self):
    pass
'
;
Enter fullscreen mode Exit fullscreen mode

Notes here:

  • The input to this function is an OBJECT which we expect to hold the row as a dictionary. To get the row into this format, we'll use the Snowflake function object_construct(), but this is mostly just to demonstrate flexibility and might not be what you need. You might be better off specifying specific input columns.
  • This UDF returns a table, so it has to return a list of tuples. This isn't the only option; your UDF could return a static value that you then break out to rows later on. It all depends on what sort of transform you're doing.

To test this out, run it on your full (mini) data set:

SELECT order_key, order_age_in_days
FROM
  (SELECT object_construct(*) as_object FROM orders) orders,
  LATERAL transform_orders(orders.as_object);
Enter fullscreen mode Exit fullscreen mode
ORDER_KEY ORDER_AGE_IN_DAYS
4800004 9,071
4800005 10,334
4800006 10,586
4800007 10,637
3600001 9,932

Granted, we didn't need Python to do that, but it's still cool.

Step 3: Create a destination table

Simple enough:

CREATE TABLE ORDER_FACTS (
  ORDER_KEY TEXT,
  ORDER_AGE_IN_DAYS INTEGER
);
Enter fullscreen mode Exit fullscreen mode

Step 4: Create a procedure to transform and save all new records

This is where things get fun. We're going to leverage Snowflake's MERGE statement, which lets us run a query and compare every returned row to the target table and decide if the row needs an update or an insert:

CREATE OR REPLACE PROCEDURE load_orders()
RETURNS INT
LANGUAGE SQL
AS $$
begin
    BEGIN TRANSACTION;
    MERGE INTO ORDER_FACTS dst USING (
      SELECT order_key,
        MAX(order_age_in_days) AS order_age_in_days
      FROM
        (SELECT object_construct(*) as_object FROM orders_stream) orders,
        LATERAL transform_orders(orders.as_object) output
      GROUP BY 1
    ) src
    ON src.order_key = dst.order_key
    WHEN MATCHED THEN
      UPDATE SET
      order_age_in_days = src.order_age_in_days
    WHEN NOT MATCHED THEN
      INSERT (order_key, order_age_in_days)
      VALUES (src.order_key, src.order_age_in_days);
    COMMIT;

    return 1;
end
$$
;
Enter fullscreen mode Exit fullscreen mode

One key point here is that even though we're positive each row in src will contain just one row per order, we're still using GROUP BY to ensure there's only one row being selected to merge. Otherwise, we could potentially have multiple rows running through the MERGE logic, causing non-deterministic behavior.

Also, note that we wrap the operation in an explicit transaction. I don't know if the caller is necessarily going to have AUTOCOMMIT enabled when it gets called, and there's no reason to risk it.

Insert some sample data and test out your procedure:

INSERT INTO ORDERS
  (SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS LIMIT 100);

CALL load_orders();

SELECT * FROM ORDER_FACTS;
Enter fullscreen mode Exit fullscreen mode

Step 5: Schedule the task

We probably don't want to log into Snowflake and execute the task all day long, so we'll leverage a Task to automatically run it. Suppose you want to refresh all new orders every hour:

CREATE OR REPLACE TASK orders_load_task
  WAREHOUSE = 'YOUR_WAREHOUSE_NAME'
  SCHEDULE = 'USING CRON 0 * * * * America/Chicago'
AS
CALL load_orders();

ALTER TASK orders_load_task RESUME;
Enter fullscreen mode Exit fullscreen mode

When picking the right frequency, keep in mind that bringing up the warehouse comes with costs that might make it beneficial to run this less often. Keep your downstream users' needs in mind, but also keep an eye on the cost.

Conclusion

That finishes our transform! I really like that we were able to do this entirely in Snowflake -- no Airflow required. Snowflake's task system isn't fully fleshed out, though, so at Enova we still supplement this process with conventional DAGs using SnowflakeOperators. I might write about that in a future post.

One thing you might be wondering about is what happens when your transform fails. Maybe data changed, or values are unexpectedly NULL, or some other edge case produces an exception in your UDF. If you aren't handling it, the MERGE statement will fail and cause the task itself to fail, stalling your pipeline. The good news in that case is that no data is lost, assuming you fix the bug and recover before your stream reaches its maximum retention period (perhaps 7 days, perhaps 30; check your Snowflake account details).

If you can't tolerate any downtime of that nature, you could look into employing a dead-letter queue pattern and, after rescuing the error, move failed rows to a separate table for later processing.

Top comments (2)

Collapse
 
shreya123 profile image
Shreya

The evolution of data management has reached new heights with Snowflake’s dynamic functionalities like Streams, Tasks, and the flexibility of Python. These innovative tools collectively usher in a new era of data transformation, enabling businesses to orchestrate seamless data pipelines and drive insightful decision-making processes.

Streams in Snowflake act as a real-time conduit, capturing changes within a database and facilitating instant access to these modifications. This functionality is a game-changer, offering a glimpse into the pulse of data as it evolves, allowing for immediate reactions or analyses.

Tasks, on the other hand, serve as the orchestrators of action within Snowflake. These automated workflows efficiently trigger operations, making use of Snowflake’s powerful SQL capabilities. The ability to schedule and automate data transformations, extractions, and loading processes is invaluable in ensuring a well-oiled data infrastructure.

The integration of Python further amplifies Snowflake’s capabilities. Python’s versatility and extensive library support empower users to manipulate, analyze, and transform data with ease. Leveraging Python within Snowflake opens doors to endless possibilities, from advanced analytics to complex data transformations, all within the familiar syntax and power of Python.

The convergence of Streams, Tasks, and Python in Snowflake represents a paradigm shift in data transformation. Imagine capturing real-time data changes through Streams, initiating automated tasks to process and transform this data, all while harnessing the analytical prowess of Python for deeper insights.

This triumvirate of tools within Snowflake not only streamlines data transformation processes but also democratizes data for all users, regardless of technical expertise. It empowers data engineers, analysts, and scientists alike to collaborate seamlessly, unlocking the full potential of their data assets.

What’s truly remarkable about this amalgamation is its agility and scalability. It caters to the demands of modern businesses, where speed, accuracy, and adaptability are paramount. Whether it’s handling large-scale data transformations, conducting real-time analytics, or deploying machine learning models, this trifecta of tools in Snowflake stands as a robust solution.

Moreover, the synergy of Streams, Tasks, and Python aligns perfectly with the ethos of efficiency and innovation. It offers organizations the ability to swiftly respond to evolving data needs, enabling them to stay agile in an ever-changing landscape.

In conclusion, the fusion of Streams, Tasks, and Python within Snowflake marks a pivotal moment in the evolution of data transformation. It represents the dawn of a new era where real-time data insights, automated workflows, and the prowess of Python converge to reshape how we harness and derive value from our data assets.

Collapse
 
Sloan, the sloth mascot
Comment deleted