DEV Community

Cover image for Build the dependency graph of your BigQuery pipelines at no cost: a Python implementation
Λ\: Clément Bosc for Stack Labs

Posted on

Build the dependency graph of your BigQuery pipelines at no cost: a Python implementation

Nowadays, in a lot of Data Stacks, most of the Data Engineering task is writing SQL.
SQL is a powerful language that is well suited for building most batch data pipelines: it's universal, great for structured data and (most of the time) easy to write. On the other hand, one of the complexities can be orchestrating the series of SQL statements in the correct order of the dependencies, meaning that if a SQL query references the result of another one, the second should be run before the first.

For a personal project I pursued the quest of automatically generating the graph of BigQuery transformations dependencies using a small Python script. I wanted to use graph theory and SQL syntax parser. Let’s see how.

What are the dependencies in BigQuery SQL transformations ?

In a BigQuery (or any other DataWarehouse actually), transformations are typically chained to form a more or less complex DAG (Direct Acyclic Graph). In this DAG, all the transformations are sourced from one or more tables and the result is dumped in a single table, that can in turn be used in other transformations and so on. In this graph all the nodes appear to be tables (sources or destinations) and the edges are dependencies, meaning the transformation references the sources tables in a FROM or JOIN statement to produce the target.

Here is an example:

Build Graph from BigQuery transformations

Here we can see that the D table is produced from tables A & B; table E produced from table D & C and table F from tables D & E. From this diagram we can easily see that the transformation D should be run first, followed by the E and finally the transformation F.

Automatically infer the graph with Python

In the project we used Python lib networkx and a DiGraph object (Direct Graph). To detect a table reference in a Query, we use sqlglot, a SQL parser (among other things) that works well with Bigquery.

Here is the first part of the Python script to create the graph, simplified for this blog post.

import networkx as nx

# all the transfromations are stored in a structure
# here let's assume Transformation object only contains the
# query and the destination table (project_id, dataset_id,
# table_id)

transformations: list[Transformation]

graph = nx.DiGraph()
for transfo in transformations:
    dependencies = find_query_dependencies(transfo.query)

    # Add nodes, the transfo infos are added as node metadata
    graph.add_node(transfo.destination_table, transfo=transfo)

    # Add edges
    for src in dependencies:
        # here note that is the src does not exist yet are a
        # node it's created
        graph.add_edge(src, transfo.destination_table)
Enter fullscreen mode Exit fullscreen mode

Now let’s see how to find the dependencies of a SQL query by using the sqlglot SQL parser:

from sqlglot import parse_one, exp

def find_query_dependencies(query: str) -> set[str]:
    """Find all the tables in the query"""
    return {
        f"{t.catalog}.{t.db}.{t.name}"
        for t in parse_one(
            query, read="bigquery"
        ).find_all(exp.Table)
}
Enter fullscreen mode Exit fullscreen mode

In the piece of code above, we use the parse_one function from sqlglot to parse the transformation query using BigQuery dialect into a tree that can be search on :

sqlglot parse_one

Automatically infer the schema of the output tables

Another cool feature we can add to our script is the ability to auto-detect with a high precision the expected output schema of all our tables (D, E and F in the example), even if they haven't yet been created. This can be very helpful if we want to create the tables using an Infrastructure as Code tool like Terraform before the transformations even run.
For this feature, I used the following BigQuery trick: we can run a query with a LIMIT 0 at the end of the SELECT statement. The awesomeness of this is that BigQuery won't charge anything (0 bytes billed) but will still create a temporary output table with the expected schema (including NULLABLE / REQUIRED coherence) !
To generate the query with LIMIT 0 we need to add it to all the SELECT statements of a query (including all the subqueries). Let’s use sqlglot again by defining a SQL transformer:

def limit_transformer(node):
    """This sqlglot transformer function add a limit 0 to
       every SELECT stmnt"""
    if isinstance(node, exp.Select):
        return node.limit(0)
    return node

query = """
WITH source_A as (
   SELECT "HelloWorld" as hello
), source_B as (
   SELECT CURRENT_DATE() as date
)
SELECT * 
FROM source_A, source_B
"""

sample_query = (
    parse_one(query, dialect=Dialects.BIGQUERY)
        .transform(limit_transformer)
        .sql(dialect=Dialects.BIGQUERY)
)
print(sample_query)

# =====================

# WITH source_A as (
#    SELECT "HelloWorld" as hello LIMIT 0
# ), source_B as (
#    SELECT CURRENT_DATE() as date LIMIT 0
# )
# SELECT * 
# FROM source_A, source_B
# LIMIT 0
Enter fullscreen mode Exit fullscreen mode

Once we have our new query with all the LIMIT 0, we need to create a BQ job that runs this query, for free !

from google.cloud import bigquery

def fetch_destination_schema(query: str):
bq_client = bigquery.Client()

query_job = bq_client.query(query=query)
query_job.result()

# Fetch the temporary table schema created by BigQuery
tmp_dest_table = bq_client.get_table(query_job.destination)
destination_schema = tmp_dest_table.schema

return destination_schema, str(query_job.destination)
Enter fullscreen mode Exit fullscreen mode

Now in order to generate the output schema from all the tables in the graph, even the last one which depends on tables that have not yet been created (here, tables D and E will only be temporary tables, not real tables), we need our schema generation method and apply it to each node in the DiGraph in the "correct" order.
We call this order the topological order in graph theory, i.e. in this example first table D, then E, then F. For each node, we need to replace the reference of the real table in the body of the transformation with the reference of the temporary table previously created . This way, even if none of tables D and E exist, we can still deduce the final schema for table F !

This process can be illustrated like this:

automatically infer data transformation schemas bigquery

for node_id in nx.topological_sort(graph):
    # here the "transfo" key is where the transformation
    # metadata have been stored in the node
    query_to_run = graph.nodes[node_id]["transfo"].query
    ancestors = nx.ancestors(graph, node_id)

    # We exclude all the ancestors that don't have transfo
    # metadata, meaning all nodes that are not and
    # intermediary output table
    ancestors = filter(
        lambda x: "transfo" in graph.nodes[x], ancestors
    )

    for ancestor_table in ancestors:
        query_to_run = query_to_run.replace(
            ancestor_table,
            graph.nodes[ancestor_table]["transfo"].tmp_table
        )

    schema, tmp_table = fetch_destination_schema(query_to_run)
    graph.nodes[node_id]["transfo"].tmp_table = tmp_table

# This will be the exact schema of the last transformation
print(schema)
Enter fullscreen mode Exit fullscreen mode

And voilà ! With this graph generation technique and a bit of BigQuery ticks, we were able to automatically infer a dependency graph of complex SQL transformations, as well as the exact final schema of all tables, without any tables being created and at zero cost !
In a next blog post we will see how I've applied and improved this technique to build an experimental and Ops-oriented data orchestration tool !

Top comments (2)

Collapse
 
ze_dach profile image
Mourad DACHRAOUI • Edited

Thanks for sharing! nx and sqlglot (sqlmesh's great lib) are pretty handy for this kind of use. We used it at one of our customers for a similar use where we had to manage the same problem but with views, which adds a bit of recursivity to the equivalent of find_query_dependencies method. I'm looking forward to see the next step, and how the future data-ops orchestration tool you preparing will handle this!

I can see a lot of similarities in the approach presented in this post, which also led us to develop starlake a YAML declarative data pipelines for effortless Extract / Load / Transform / Orchestrate, any feedbacks are welcome and very appreciated!

Regarding the use of terraform, what are the table creation use cases, given that it's generally the ETL tool take care of this?

Collapse
 
clementbosc profile image
Λ\: Clément Bosc • Edited

Dear @ze_dach,

Sorry for the last answer and thanks for your comment !
I just published the kind-of-part 2 of this blog post here: dev.to/stack-labs/applying-graph-t...

In the last blog post I explain why, IMO, the usual table creation approach is not really convenient, and why a Terraform-based solution is nice

Best,
Clem