DEV Community

Atsushi Suzuki
Atsushi Suzuki

Posted on

Automating BigQuery Data Preprocessing and AutoML with Vertex AI Pipelines

I previously used BigQuery data updated daily for monthly model training. However, the manual processes of data preprocessing, labeling, and training were prone to frequent errors. To improve operational efficiency, I automated these processes using Google Cloud's Vertex AI Pipelines.

What Is Vertex AI Pipelines?

Vertex AI Pipelines is a service that allows you to build and manage machine learning pipelines on Google Cloud. With Kubeflow Pipelines SDK or TFX Pipeline DSL, you can efficiently execute pipelines in a serverless environment without worrying about managing Kubernetes clusters.

It is highly compatible with the Google Cloud ecosystem and provides the following benefits:

  • Prebuilt components for BigQuery and AutoML
  • Scheduled execution without relying on Cloud Scheduler
  • Serverless and scalable

Implementation Steps

File Structure

The files necessary for defining and running the pipeline are organized as follows:

- sql_queries/
  - add_reaction_label.sql
  - training_pre_processing.sql
  - undersampling.sql
- pipeline_definition.py
- pipeline_notebook.ipynb
Enter fullscreen mode Exit fullscreen mode
  • sql_queries: Stores SQL scripts for preprocessing
  • pipeline_definition.py: Contains the pipeline definition
  • pipeline_notebook.ipynb: Jupyter notebook for compiling and running the pipeline

Pipeline Definition

Below is the pipeline definition in pipeline_definition.py:

from google.cloud import aiplatform
from kfp import dsl
from kfp.dsl import component
import datetime
from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

aiplatform.init(project=PROJECT_ID, location="asia-northeast1")

RECIPIENTS_LIST = ["example@email.com"]  # NOTE: You can register up to three email addresses
PROJECT_ID = "your-project-id"

def load_sql(file_path):
    with open(file_path, "r") as file:
        return file.read()

@dsl.component(base_image='python:3.12', packages_to_install=['google-cloud-bigquery'])
def create_bigquery_op(dataset_name: str, location: str) -> str:
    """Creates a BigQuery dataset if it does not exist."""
    from google.cloud import bigquery
    client = bigquery.Client(project=PROJECT_ID)
    dataset_id = f"{PROJECT_ID}.{dataset_name}"
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = location
    dataset = client.create_dataset(dataset, exists_ok=True)
    print(f"Dataset {dataset_id} created.")
    return dataset_id

@dsl.pipeline(name="data-preprocessing-and-training-pipeline")
def my_pipeline():    
    today = datetime.date.today().strftime('%Y%m%d')
    dest_dataset = f"pre_processed_dataset_{today}"
    create_dataset_op = create_bigquery_op(
        dataset_name=dest_dataset, location="asia-northeast1")

    label_sql = load_sql("sql_queries/add_reaction_label.sql")
    preprocess_sql = load_sql("sql_queries/training_pre_processing.sql")
    undersampling_sql = load_sql("sql_queries/undersampling.sql")

    formatted_label_sql = label_sql.format(
        dataset=dest_dataset, table="add_reaction_label")
    formatted_preprocess_sql = preprocess_sql.format(
        dataset=dest_dataset, table="training_preprocessed")
    formatted_undersampling_sql = undersampling_sql.format(
        dataset=dest_dataset, table="summary_all_processed_undersampling")

    notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

    with dsl.ExitHandler(notify_email_op):
        label_sql_op = BigqueryQueryJobOp(
            query=formatted_label_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(create_dataset_op)

        preprocess_sql_op = BigqueryQueryJobOp(
            query=formatted_preprocess_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(label_sql_op)

        undersampling_sql_op = BigqueryQueryJobOp(
            query=formatted_undersampling_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(preprocess_sql_op)

        dataset_create_op = TabularDatasetCreateOp(
            display_name=f"tabular_dataset_from_bigquery_{today}",
            bq_source=f"bq://{PROJECT_ID}.{dest_dataset}.summary_all_processed_undersampling",
            project="{{$.pipeline_google_cloud_project_id}}",
            location="asia-northeast1"
        ).after(undersampling_sql_op)

        model_training_op = AutoMLTabularTrainingJobRunOp(
            display_name=f"visitor_prediction_model_{today}",
            dataset=dataset_create_op.outputs["dataset"],
            target_column="reaction_score",
            training_fraction_split=0.8,
            validation_fraction_split=0.1,
            test_fraction_split=0.1,
            budget_milli_node_hours=72000,
            project="{{$.pipeline_google_cloud_project_id}}",
            optimization_prediction_type="classification",
            location="asia-northeast1"
        ).after(dataset_create_op)
Enter fullscreen mode Exit fullscreen mode

Adding Execution Date to Dataset Names

To distinguish datasets for each pipeline run, the execution date is included in the dataset name. This is dynamically generated using the create_bigquery_op component.

today = datetime.date.today().strftime('%Y%m%d')
dest_dataset = f"pre_processed_dataset_{today}"
create_dataset_op = create_bigquery_op(
    dataset_name=dest_dataset, location="asia-northeast1")
Enter fullscreen mode Exit fullscreen mode

Dynamically Loading SQL Files with Variables

To reuse the same pipeline with different datasets or table names, dynamic placeholders are used in SQL scripts. SQL files are loaded using the load_sql function, and placeholders like {dataset} and {table} are dynamically replaced.

def load_sql(file_path):
    with open(file_path, "r") as file:
        return file.read()

undersampling_sql = load_sql("sql_queries/undersampling.sql")

formatted_undersampling_sql = undersampling_sql.format(
        dataset=dest_dataset, table="summary_all_processed_undersampling")
Enter fullscreen mode Exit fullscreen mode

Example of undersampling.sql:

CREATE OR REPLACE TABLE `{dataset}.{table}` AS
WITH class_counts AS (
  SELECT reaction_score, COUNT(*) as count
  FROM `{dataset}.training_preprocessed`
  GROUP BY reaction_score
),
median_count AS (
  SELECT APPROX_QUANTILES(count, 2)[OFFSET(1)] as target_count
  FROM class_counts
)
SELECT data.*
FROM `{dataset}.training_preprocessed` data
JOIN class_counts
ON data.reaction_score = class_counts.reaction_score
JOIN median_count
ON TRUE
WHERE RAND() < (median_count.target_count / class_counts.count);
Enter fullscreen mode Exit fullscreen mode

Using Google Cloud Pipeline Components

Google Cloud Pipeline Components simplify interactions with various Google Cloud services. The components used in this pipeline include:

  • BigqueryQueryJobOp: Runs SQL on BigQuery
  • TabularDatasetCreateOp: Registers a BigQuery table as a dataset in Vertex AI
  • AutoMLTabularTrainingJobRunOp: Runs AutoML training jobs for tabular data
  • VertexNotificationEmailOp: Sends notifications to specified email addresses

Controlling Execution Order

Execution dependencies between components are controlled using the .after() method. For example:

label_sql_op = BigqueryQueryJobOp(
    query=formatted_label_sql,
    location="asia-northeast1",
    project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
Enter fullscreen mode Exit fullscreen mode

Sending Email Notifications at Pipeline Completion

To send notifications upon pipeline completion, the dsl.ExitHandler is used. It wraps the pipeline steps and ensures that notifications are sent regardless of success or failure.

notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

with dsl.ExitHandler(notify_email_op):
    label_sql_op = BigqueryQueryJobOp(
        query=formatted_label_sql,
        location="asia-northeast1",
        project="{{$.pipeline_google_cloud_project_id}}"
    ).after(create_dataset_op)
Enter fullscreen mode Exit fullscreen mode

Sample email notification:

Notification Email Screenshot

Executing the Pipeline

To run Vertex AI Pipelines, you first compile the pipeline and then execute it either on-demand or on a schedule. For this example, Vertex AI Workbench was used as the execution environment.

Screenshot 2024-12-06 14.05.19.png

On-Demand Execution

For on-demand execution, the pipeline definition is first compiled into a YAML file. Then, the compiled YAML is used to execute the pipeline.

The following code compiles the pipeline definition from pipeline_definition.py and saves it as a compiled_pipeline.yaml file:

# pipeline_notebook.ipynb
from google.cloud import aiplatform
from kfp import compiler
import pipeline_definition

aiplatform.init(project=<project-id>, location="asia-northeast1")

compiler.Compiler().compile(
    pipeline_func=pipeline_definition.my_pipeline,
    package_path="compiled_pipeline.yaml"
)
Enter fullscreen mode Exit fullscreen mode

Using the compiled compiled_pipeline.yaml, the pipeline can be executed with the following code:

# pipeline_notebook.ipynb
aiplatform.PipelineJob(
    display_name="data-preprocessing-and-training-pipeline",
    template_path="compiled_pipeline.yaml",
    parameter_values={},
    # enable_caching=False
).submit()
Enter fullscreen mode Exit fullscreen mode

When the execution succeeds, the results are displayed in the Vertex AI console, allowing you to track the progress and status of each step.

Screenshot 2024-12-06 13.29.11.png

Scheduled Execution

To execute the pipeline on a regular schedule, use the PipelineJob.create_schedule method. The following example creates a schedule to run the pipeline at 9:00 AM JST on the first day of every month:

# pipeline_notebook.ipynb
pipeline_job = aiplatform.PipelineJob(
    display_name="data-preprocessing-and-training-pipeline",
    template_path="compiled_pipeline.yaml",
    parameter_values={},
    # enable_caching=False
)

pipeline_job_schedule = pipeline_job.create_schedule(
    display_name="monthly-data-preprocessing-and-training",
    cron="TZ=Asia/Tokyo 0 9 1 * *",  # 9:00 AM JST on the 1st of every month
    max_concurrent_run_count=1,
    max_run_count=None
)
Enter fullscreen mode Exit fullscreen mode

Once the schedule is registered, you can verify it in the Vertex AI schedule tab, where the following screen will be displayed:

Screenshot 2024-12-06 13.32.22.png

Conclusion

By leveraging Vertex AI Pipelines, I successfully automated the previously error-prone manual processes of data preprocessing, labeling, and model training. This solution not only improved operational efficiency but also provided scalability and reliability for handling growing data volumes. Vertex AI’s seamless integration with BigQuery and AutoML components made it straightforward to build a robust end-to-end pipeline, which can now be executed both on-demand and on a schedule with minimal manual intervention.

Top comments (0)