It's important to monitor Dataform - jobs executed by Dataform can be the primary source of BigQuery costs in a modern data platform. Forgetting to incrementalise a table, using a table instead of a view in the wrong place or performing complex window functions on a large table can all incur large costs and long run times.
Using the WorkflowInvocationAction for each job we can extract its BigQuery Job ID, then extract key metadata for each BigQuery job by querying INFORMATION_SCHEMA.JOBS_BY_PROJECT, before writing the output back to BigQuery so that it can be analysed (maybe even by transforming it in Dataform).
from google.cloud import dataform_v1
from google.cloud import bigquery
from datetime import datetime
# ------------------------------------------------------------
# CONFIG
# ------------------------------------------------------------
PROJECT_ID = "my-project"
REGION = "europe-west2"
REPOSITORY_ID = "analytics"
WORKFLOW_INVOCATION_ID = "123456789"
BQ_REGION = "region-europe-west2"
OUTPUT_TABLE = "my-project.raw_dataform_monitoring.raw_dataform_bigquery_metrics"
# ------------------------------------------------------------
# CLIENTS
# ------------------------------------------------------------
dataform = dataform_v1.DataformClient()
bq = bigquery.Client(project=PROJECT_ID)
repository = dataform.repository_path(
PROJECT_ID,
REGION,
REPOSITORY_ID
)
invocation_name = f"{repository}/workflowInvocations/{WORKFLOW_INVOCATION_ID}"
# ------------------------------------------------------------
# 1. GET WORKFLOW INVOCATION ACTIONS → EXTRACT JOB IDS
# ------------------------------------------------------------
job_ids = set()
actions = dataform.list_workflow_invocation_actions(
parent=invocation_name
)
for action in actions:
# only BigQuery actions contain job metadata
if hasattr(action, "bigquery_action") and action.bigquery_action:
if action.bigquery_action.job_id:
job_ids.add(action.bigquery_action.job_id)
job_ids = list(job_ids)
print(f"Found {len(job_ids)} BigQuery jobs")
# ------------------------------------------------------------
# 2. SINGLE BIGQUERY QUERY FOR ALL JOBS
# ------------------------------------------------------------
if not job_ids:
print("No jobs found.")
exit()
job_filter = ",".join([f'"{jid}"' for jid in job_ids])
query = f"""
SELECT
job_id,
creation_time,
start_time,
end_time,
state,
total_bytes_processed,
total_bytes_billed,
total_slot_ms,
reservation_id
FROM `{BQ_REGION}.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE job_id IN ({job_filter})
AND creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
"""
results = bq.query(query).result()
# ------------------------------------------------------------
# 3. WRITE TO MONITORING TABLE
# ------------------------------------------------------------
rows = []
for job in results:
runtime = None
if job.start_time and job.end_time:
runtime = (job.end_time - job.start_time).total_seconds()
rows.append({
"job_id": job.job_id,
"workflow_invocation_id": WORKFLOW_INVOCATION_ID,
"creation_time": job.creation_time,
"start_time": job.start_time,
"end_time": job.end_time,
"runtime_seconds": runtime,
"state": job.state,
"bytes_processed": job.total_bytes_processed,
"bytes_billed": job.total_bytes_billed,
"slot_ms": job.total_slot_ms,
"reservation_id": job.reservation_id,
"ingested_at": datetime.utcnow().isoformat()
})
errors = bq.insert_rows_json(OUTPUT_TABLE, rows)
if errors:
print("Insert errors:", errors)
else:
print(f"Wrote {len(rows)} rows to {OUTPUT_TABLE}")
Note that INFORMATION_SCHEMA.JOBS_BY_PROJECT is only queried once, rather than being called per job. JOBS_BY_PROJECT can be a huge table, especially if the same project is being queried by high-usage BI tools. Make sure to use the creation_time partition column to reduce query costs.
This auditing can be automatically triggered by creating a log sync that pushes to a PubSub and triggers a Cloud Function running the above code whenever a Workflow Execution completes.
Top comments (0)