If you work with Google Cloud for your Data Platform there are chances that you use BigQuery and run your Data pipelines transformations in a ELT manner: using BQ query engine to run transformations as a series of SELECT statements, one after another. Indeed over the last few years, ELT and tools like DBT or Dataform have been the de-facto standard for running and organizing your Data transformations at scale.
Theses tools, that we may group under the “SQL orchestration tools” banner are great for many reasons:
- SQL is the main and only language to express a transformation, indeed SQL is great for structured data (and even semi-structured data)
- They do a great job at centralizing the transformations: nice for audits, lineage tracking and trust
- They simplify the DataOps experience and help onboard Data Analysts in Data Engineer tasks
- They can almost automatically infer the transformation dependencies by creating a DAG.
BUT, for my Platform Engineering background, they have a major flow: they miss a state. Indeed if you take declarative IaC tools like Terraform, the current state of the Data Platform infrastructure is stored in a file (the state), including the tables/views, the permissions etc...
But how is this a problem ?
The problem is that tools like DBT or Dataform are only running DML statements. For example to create a table the generated statement will be CREATE OR REPLACE TABLE AS SELECT your_transformation. This means that the tool never knows if the object exists before or not, so you cannot attach IAM permission on it with Terraform (because the object is re-created every day in your daily batch) neither can you use the table as agreement interface with consumers because the table does not exists prior to the transformation.
The solution: an experimental tool that use the best of both worlds
I wanted to keep the benefits from SQL orchestration tools (like Dataform on GCP), but in conjunction with Terraform for the Ops benefits, by keeping in mind the following requirements:
- Table dependencies between 2 transformations (running the transformation B after the A if B reference table A in the query) should be automatically inferred
- Table schema (type, column_name) must be automatically inferred: user should not lose time on writing the table schema if it can be deduced from the output.
- Table should be automatically created prior to the transformation (not by the transformation) with an IaC tool : Terraform
- Be able to have a custom monitoring interface that gathers all the transformations information: status, cost, performance, custom error messages etc..
Architecture proposal
Here is the architecture proposal for my experimental transformation DataOps-oriented tool
- Transformation are BigQuery queries
- Orchestration is carried by an auto-generated Cloud Workflow with all the correct dependencies and parallel steps when possible (if two transformations can run at the same time)
- Monitoring is a BigQuery timestamp-partitioned table with a Pub/Sub topic (and an Avro schema for the interface) and a push-to-BQ streaming subscription
- Transformations are defined in a git repository in yaml files. Jinja template are supported for flexibility and factorisation)
- A Cloud Run endpoint that host all the schema/dependencies inference logic and Workflow body generation according to the transformation dependencies (more on the Cloud Run below)
How to infer dependencies ?
Here is where the magic happens : the automatic dependency inference. Let’s remind it, DAG in data pipelines are nothing more than Graphs (Direct Acyclic Graphs), so let’s use a Graph library to build them from raw SQL declarations. You can find all the detailed process and Python implementation examples in this post: Build the dependency graph of your BigQuery pipelines at no cost: a Python implementation
The raw SQL declarations are sent by Terraform to a remote Cloud Run instance that computes the inference logic (DAG creation, Workflows source code generation, table schema generation), so Terraform that immediately creates the tables and workflows, prior to any transformations.
Exemple: the experiment in action
Let’s take a simple example: we are in a standard Data Platform Architecture with a 3 layer principal: Bronze (raw data), Silver (curated data) and Gold (aggregated/meaningful data). We need to run a data transformation pipeline, in SQL, that cleans the raw data (for deduplication and type-conversion for ex) and builds an analytics-ready aggregated table from the cleaned data.
The demo dataset is a very simple retail-oriented data model (orders, products and users), orders being the fact table.
Our tool, based on Terraform, needs to create the SILVER and GOLD tables, with the correct schemas, ahead of the transformations running, and the Cloud Workflow source definition.
The data transformation files:
The transformations are described in a yaml file, specifying the destination table and the SQL transformation query as a single select.
Building the silver layer, here it’s only a deduplication step for the sake of the demo
workflow_group: demo
destination_table:
  project_id: ${raw_project}
  dataset_id: ${app_name}_ds_3_demo_${multiregion_id}_${project_env}
  table_id: orders
  location: EU
query: >
  SELECT
    *
  FROM `${raw_project}.sldp_demo_retail_analytics_raw_data_eu_${project_env}.orders_v1`
  QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY insertion_time DESC) = 1
Building the gold layer, here an aggregated table of the total amount of sold products per month and consumer.
workflow_group: 3-demo
destination_table:
  project_id: ${raw_project}
  dataset_id: ${app_name}_ds_3_demo_${multiregion_id}_${project_env}
  table_id: total_cost_by_user
  location: EU
description: "Total cost by user and month. Granularity: [user_id, month]"
query: >
  SELECT
    u.email,
    DATE_TRUNC(DATE(o.created_at), MONTH) as month,
    SUM(o.quantity * p.price) as total_amount,
    COUNT(DISTINCT o.id) as total_orders,
    CURRENT_TIMESTAMP() as insertion_time
  FROM `${raw_project}.${app_name}_ds_3_demo_${multiregion_id}_${project_env}.orders` o
  JOIN `${raw_project}.${app_name}_ds_3_demo_${multiregion_id}_${project_env}.users` u
    ON u.id = o.user_id
 JOIN `${raw_project}.${app_name}_ds_3_demo_${multiregion_id}_${project_env}.products` p
    ON p.id = o.product_id
  GROUP BY email, month
And after running the terraform plan command we can see the following output:
Terraform will perform the following actions:
# google_bigquery_table.destination_tables["orders"] will be created
  + resource "google_bigquery_table" "destination_tables" {
      + creation_time       = (known after apply)
      + dataset_id          = "sldp_ds_3_demo_eu_dev"
      + schema              = jsonencode(
            [
              + {
                  + mode        = "NULLABLE"
                  + name        = "id"
                  + type        = "INTEGER"
                },
        ...
          ])
}
# google_bigquery_table.destination_tables["products"] will be created
  + resource "google_bigquery_table" "destination_tables" {
    ...
}
# google_bigquery_table.destination_tables["users"] will be created
  + resource "google_bigquery_table" "destination_tables" {
    ...
}
 # google_bigquery_table.destination_tables["total_cost_by_user"] will be created
  + resource "google_bigquery_table" "destination_tables" {
      + dataset_id          = "sldp_ds_3_demo_eu_dev"
      + description         = "Total cost by user and month. Granularity: [user_id, month]"
      + id                  = (known after apply)
      + schema              = jsonencode(
            [
              + {
                  + description = null
                  + mode        = "NULLABLE"
                  + name        = "email"
                  + policyTags  = {
                      + names = []
                    }
                  + type        = "STRING"
                },
              + {
                  + description = null
                  + mode        = "NULLABLE"
                  + name        = "month"
                  + policyTags  = {
                      + names = []
                    }
                  + type        = "DATE"
                },
        ...
     ])
}
# google_workflows_workflow.data_transfo["3-demo"] will be created
  + resource "google_workflows_workflow" "data_transfo" {
      + create_time      = (known after apply)
      + description      = (known after apply)
      + effective_labels = (known after apply)
      + id               = (known after apply)
      + name             = "wkf_datatransfo_3_demo_euw1_dev"
      + name_prefix      = (known after apply)
      + project          = "sldp-front-dev"
      + region           = "europe-west1"
      + revision_id      = (known after apply)
      + service_account  = "..."
      + source_contents  = jsonencode(
        <Coming from the Cloud Run backend mentioned above, called directly by terraform with the data http provider>
     )
}
Plan: 5 to add, 0 to change, 0 to destroy.
The auto-generated Cloud Workflow DAG:
In the auto-generated Cloud Workflow, we can find 4 steps, one for each table. In our example above:
- 3 can be done in parallel (the Silver tables) for deduplication and typing. Here we use the topological generation method in our graph.
- 1 step for the Gold transformation, that needs to wait for the termination of the previous steps, because the Silver tables are referenced by the Gold table.
In this Workflows, each step will do the following:
- Compile the query : in all our transformations we can use Jinja templating language. Workflows input parameters can be used in the transformation template. For example, we can use the “incremental” parameter to have a different transformation logic is we want to deal with incremental updates
- Run the BigQuery job (compiled query)
- Log the status of the job: the workflow publishes an event in a Pub/Sub topic that will dump in realtime in a BigQuery monitoring table, in order to track the status of every step and every workflow.
More features…
The experiment is very feature rich now, here are some of the features we added:
- Every transformation can have some SQL scripting pre-operations. The pre-operations are taken into account to process the dependency graph (if you create temporary tables for example) and are run into the same BQ session as the main transformation. BTW, checkout this great article by my friend Matthieu explaining the implementation in Python BigQuery transactions over multiple queries, with sessions.
- You can use Python Jinja tempting in every transformation by using some common variables that are available at run time : in the workflow, every transformation step is first “compiled” before being sent to BigQuery.
- You can define custom query templates that can be used across all the project: for example a Merge template is available for everyone to use to Implement merge strategy in the final table instead of replace/append.
- All templates can implement an incremental alternative (using Jinja conditions). For example, the Default template appends data to the final table if workflow is run in incremental mode or overwrites the data in non-incremental mode.
- All the input parameters of your workflows can be used in Jinja templates.
- After every workflow step, a real-time structured log information is being published to the monitoring Pub/Sub topic to be immediately streaming into the monitoring BQ table.
Conclusion
It works like a charm !
This architecture is being used for a few months internally at Stack Labs to process our internal data pipelines : there are extremely few pipeline errors at runtime (even less than with Dataform that sometimes lost the connection to the git provider), it’s very cost effective (the DAG generation is completely free thanks to a few hacks), the custom templating system is very flexible for advanced data engineering use cases and we now have proper custom monitoring logs at every transformation step to build real time monitoring dashboards !
So yes, it’s a very geeky approach, and the developer experience is local-first and git-oriented, but if like me you have a Software Engineer background you will feel very comfortable doing Data Engineer/Analyst tasks using this approach. This will probably stay at the experimental phase, but it was fun designing a Serverless, DevOps-oriented Data Transformation and applying Graph theory in the solution. Feel free to ping me for the source code.
 
 
              
 
                      


 
    
Top comments (0)