Series overview
This series of blog posts is aimed at Dataform users who are looking to explore beyond its core functionality. Among other things we'll dig into Dataform's powerful API, create automated CI/CD processes in GitHub Actions, build pipelines to monitor costs, and modify the config {} block through code.
Dataform's API is the natural starting point for diving into Dataform's more powerful and less-documented capabilities. Understanding of the API allows users to utilise Dataform as the all-powerful hub of an automated modern data platform.
Dataform API key concepts
The API has two objects that serve as essential components of any workflow:
-
CompilationResult- the compiled state of a Dataform workspace at a specific point in time (the result of building the DAG). -
WorkflowInvocation- a single execution of aCompilationResult(the result of running the DAG).
CompilationResult
from google.cloud import dataform_v1
client = dataform_v1.DataformClient()
project_id = "my-project"
region = "europe-west2"
repository_id = "analytics"
workspace_id = "dev"
workspace = client.workspace_path(
project_id,
region,
repository_id,
workspace_id,
)
repository = client.repository_path(
project_id,
region,
repository_id,
)
compilation_result = dataform_v1.CompilationResult(
workspace=workspace
)
response = client.create_compilation_result(
parent=repository,
compilation_result=compilation_result,
)
print(response.name)
The response takes the form projects/<project_id>/locations/<region>/repositories/<repository_id>/compilationResults/<compilation_result_id> (where <compilation_result_id> is a UUID v4).
A CompilationResult object is generated in the Dataform UI each time a code change is detected, although the UI conceals its ID. The ID can be seen in the Executions page in the UI following a run.
The CompilationResult includes information about each job inside the Dataform workspace, allowing users to traverse the DAG through code and extract information such as the config {} block of each file. Each job can then be iterated over via:
request = dataform_v1.QueryCompilationResultActionsRequest(
name=COMPILATION_RESULT
)
response = client.query_compilation_result_actions(
request=request
)
for action in response.compilation_result_actions:
...
WorkflowInvocation
from google.cloud import dataform_v1
client = dataform_v1.DataformClient()
project_id = "my-project"
region = "europe-west2"
repository_id = "analytics"
repository = client.repository_path(
project_id , region, repository_id
)
# compilation_result comes from the previous snippet
invocation = dataform_v1.WorkflowInvocation(
compilation_result=compilation_result.name
)
response = client.create_workflow_invocation(
parent=repository,
workflow_invocation=invocation,
)
print(f'{response.name}: {response.state}')
A WorkflowInvocation is generated in the Dataform UI each time an Execution is run:
The WorkflowInvocation includes information about each job that has been executed, allowing users to tie jobs to their BigQuery job IDs, and see the objects created (such as tables or views). Each executed job can be iterated over via:
request = dataform_v1.QueryWorkflowInvocationActionsRequest
name=WORKFLOW_INVOCATION
)
response = client.query_workflow_invocation_actions(
request=request
)
for action in response.workflow_invocation_actions:
...
Tying them together
The real power of Dataform’s API emerges when these two objects are combined. A typical workflow looks like this:
- A workspace is updated (manually or via Git),
- A
CompilationResultis generated from the workspace, - A
WorkflowInvocationis created from that compilation, - BigQuery executes the resulting DAG.
With just these two objects it's possible to automate building and running Dataform DAGs outside of the Dataform UI. This enables easy CI/CD pipelines, which we'll cover in a later post.



Top comments (0)