I previously used BigQuery data updated daily for monthly model training. However, the manual processes of data preprocessing, labeling, and training were prone to frequent errors. To improve operational efficiency, I automated these processes using Google Cloud's Vertex AI Pipelines.
What Is Vertex AI Pipelines?
Vertex AI Pipelines is a service that allows you to build and manage machine learning pipelines on Google Cloud. With Kubeflow Pipelines SDK or TFX Pipeline DSL, you can efficiently execute pipelines in a serverless environment without worrying about managing Kubernetes clusters.
It is highly compatible with the Google Cloud ecosystem and provides the following benefits:
- Prebuilt components for BigQuery and AutoML
- Scheduled execution without relying on Cloud Scheduler
- Serverless and scalable
Implementation Steps
File Structure
The files necessary for defining and running the pipeline are organized as follows:
- sql_queries/
- add_reaction_label.sql
- training_pre_processing.sql
- undersampling.sql
- pipeline_definition.py
- pipeline_notebook.ipynb
-
sql_queries
: Stores SQL scripts for preprocessing -
pipeline_definition.py
: Contains the pipeline definition -
pipeline_notebook.ipynb
: Jupyter notebook for compiling and running the pipeline
Pipeline Definition
Below is the pipeline definition in pipeline_definition.py
:
from google.cloud import aiplatform
from kfp import dsl
from kfp.dsl import component
import datetime
from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
aiplatform.init(project=PROJECT_ID, location="asia-northeast1")
RECIPIENTS_LIST = ["example@email.com"] # NOTE: You can register up to three email addresses
PROJECT_ID = "your-project-id"
def load_sql(file_path):
with open(file_path, "r") as file:
return file.read()
@dsl.component(base_image='python:3.12', packages_to_install=['google-cloud-bigquery'])
def create_bigquery_op(dataset_name: str, location: str) -> str:
"""Creates a BigQuery dataset if it does not exist."""
from google.cloud import bigquery
client = bigquery.Client(project=PROJECT_ID)
dataset_id = f"{PROJECT_ID}.{dataset_name}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = location
dataset = client.create_dataset(dataset, exists_ok=True)
print(f"Dataset {dataset_id} created.")
return dataset_id
@dsl.pipeline(name="data-preprocessing-and-training-pipeline")
def my_pipeline():
today = datetime.date.today().strftime('%Y%m%d')
dest_dataset = f"pre_processed_dataset_{today}"
create_dataset_op = create_bigquery_op(
dataset_name=dest_dataset, location="asia-northeast1")
label_sql = load_sql("sql_queries/add_reaction_label.sql")
preprocess_sql = load_sql("sql_queries/training_pre_processing.sql")
undersampling_sql = load_sql("sql_queries/undersampling.sql")
formatted_label_sql = label_sql.format(
dataset=dest_dataset, table="add_reaction_label")
formatted_preprocess_sql = preprocess_sql.format(
dataset=dest_dataset, table="training_preprocessed")
formatted_undersampling_sql = undersampling_sql.format(
dataset=dest_dataset, table="summary_all_processed_undersampling")
notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)
with dsl.ExitHandler(notify_email_op):
label_sql_op = BigqueryQueryJobOp(
query=formatted_label_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
preprocess_sql_op = BigqueryQueryJobOp(
query=formatted_preprocess_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(label_sql_op)
undersampling_sql_op = BigqueryQueryJobOp(
query=formatted_undersampling_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(preprocess_sql_op)
dataset_create_op = TabularDatasetCreateOp(
display_name=f"tabular_dataset_from_bigquery_{today}",
bq_source=f"bq://{PROJECT_ID}.{dest_dataset}.summary_all_processed_undersampling",
project="{{$.pipeline_google_cloud_project_id}}",
location="asia-northeast1"
).after(undersampling_sql_op)
model_training_op = AutoMLTabularTrainingJobRunOp(
display_name=f"visitor_prediction_model_{today}",
dataset=dataset_create_op.outputs["dataset"],
target_column="reaction_score",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
budget_milli_node_hours=72000,
project="{{$.pipeline_google_cloud_project_id}}",
optimization_prediction_type="classification",
location="asia-northeast1"
).after(dataset_create_op)
Adding Execution Date to Dataset Names
To distinguish datasets for each pipeline run, the execution date is included in the dataset name. This is dynamically generated using the create_bigquery_op
component.
today = datetime.date.today().strftime('%Y%m%d')
dest_dataset = f"pre_processed_dataset_{today}"
create_dataset_op = create_bigquery_op(
dataset_name=dest_dataset, location="asia-northeast1")
Dynamically Loading SQL Files with Variables
To reuse the same pipeline with different datasets or table names, dynamic placeholders are used in SQL scripts. SQL files are loaded using the load_sql
function, and placeholders like {dataset}
and {table}
are dynamically replaced.
def load_sql(file_path):
with open(file_path, "r") as file:
return file.read()
undersampling_sql = load_sql("sql_queries/undersampling.sql")
formatted_undersampling_sql = undersampling_sql.format(
dataset=dest_dataset, table="summary_all_processed_undersampling")
Example of undersampling.sql
:
CREATE OR REPLACE TABLE `{dataset}.{table}` AS
WITH class_counts AS (
SELECT reaction_score, COUNT(*) as count
FROM `{dataset}.training_preprocessed`
GROUP BY reaction_score
),
median_count AS (
SELECT APPROX_QUANTILES(count, 2)[OFFSET(1)] as target_count
FROM class_counts
)
SELECT data.*
FROM `{dataset}.training_preprocessed` data
JOIN class_counts
ON data.reaction_score = class_counts.reaction_score
JOIN median_count
ON TRUE
WHERE RAND() < (median_count.target_count / class_counts.count);
Using Google Cloud Pipeline Components
Google Cloud Pipeline Components simplify interactions with various Google Cloud services. The components used in this pipeline include:
-
BigqueryQueryJobOp
: Runs SQL on BigQuery -
TabularDatasetCreateOp
: Registers a BigQuery table as a dataset in Vertex AI -
AutoMLTabularTrainingJobRunOp
: Runs AutoML training jobs for tabular data -
VertexNotificationEmailOp
: Sends notifications to specified email addresses
Controlling Execution Order
Execution dependencies between components are controlled using the .after()
method. For example:
label_sql_op = BigqueryQueryJobOp(
query=formatted_label_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
Sending Email Notifications at Pipeline Completion
To send notifications upon pipeline completion, the dsl.ExitHandler
is used. It wraps the pipeline steps and ensures that notifications are sent regardless of success or failure.
notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)
with dsl.ExitHandler(notify_email_op):
label_sql_op = BigqueryQueryJobOp(
query=formatted_label_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
Sample email notification:
Executing the Pipeline
To run Vertex AI Pipelines, you first compile the pipeline and then execute it either on-demand or on a schedule. For this example, Vertex AI Workbench was used as the execution environment.
On-Demand Execution
For on-demand execution, the pipeline definition is first compiled into a YAML file. Then, the compiled YAML is used to execute the pipeline.
The following code compiles the pipeline definition from pipeline_definition.py
and saves it as a compiled_pipeline.yaml
file:
# pipeline_notebook.ipynb
from google.cloud import aiplatform
from kfp import compiler
import pipeline_definition
aiplatform.init(project=<project-id>, location="asia-northeast1")
compiler.Compiler().compile(
pipeline_func=pipeline_definition.my_pipeline,
package_path="compiled_pipeline.yaml"
)
Using the compiled compiled_pipeline.yaml
, the pipeline can be executed with the following code:
# pipeline_notebook.ipynb
aiplatform.PipelineJob(
display_name="data-preprocessing-and-training-pipeline",
template_path="compiled_pipeline.yaml",
parameter_values={},
# enable_caching=False
).submit()
When the execution succeeds, the results are displayed in the Vertex AI console, allowing you to track the progress and status of each step.
Scheduled Execution
To execute the pipeline on a regular schedule, use the PipelineJob.create_schedule
method. The following example creates a schedule to run the pipeline at 9:00 AM JST on the first day of every month:
# pipeline_notebook.ipynb
pipeline_job = aiplatform.PipelineJob(
display_name="data-preprocessing-and-training-pipeline",
template_path="compiled_pipeline.yaml",
parameter_values={},
# enable_caching=False
)
pipeline_job_schedule = pipeline_job.create_schedule(
display_name="monthly-data-preprocessing-and-training",
cron="TZ=Asia/Tokyo 0 9 1 * *", # 9:00 AM JST on the 1st of every month
max_concurrent_run_count=1,
max_run_count=None
)
Once the schedule is registered, you can verify it in the Vertex AI schedule tab, where the following screen will be displayed:
Conclusion
By leveraging Vertex AI Pipelines, I successfully automated the previously error-prone manual processes of data preprocessing, labeling, and model training. This solution not only improved operational efficiency but also provided scalability and reliability for handling growing data volumes. Vertex AI’s seamless integration with BigQuery and AutoML components made it straightforward to build a robust end-to-end pipeline, which can now be executed both on-demand and on a schedule with minimal manual intervention.
Top comments (0)