DEV Community

Cover image for Auto-Sync RAG Pipeline: GCS Events to Vertex AI RAG Engine with Terraform ⚡
Suhas Mallesh
Suhas Mallesh

Posted on

Auto-Sync RAG Pipeline: GCS Events to Vertex AI RAG Engine with Terraform ⚡

Vertex AI RAG Engine doesn't auto-sync when documents change. Here's how to build an event-driven pipeline with Eventarc, Cloud Functions, and Pub/Sub batching to trigger ImportRagFiles automatically using Terraform.

You've built your RAG Engine corpus, imported files from GCS, and tested retrieval with Gemini. Then someone uploads new documents to the bucket and... nothing happens. The corpus still returns stale results.

Vertex AI RAG Engine doesn't watch your GCS bucket for changes. You need to call ImportRagFiles whenever documents are added or updated. This post builds the event-driven pipeline with Terraform: GCS object events flow through Eventarc to a Cloud Function (2nd gen) that triggers the import. A Pub/Sub-based batching layer prevents redundant imports when multiple files land at once. 🎯

🏗️ Architecture Overview

GCS Bucket (docs added/updated/deleted)
    ↓ Eventarc (object.finalize / object.delete)
Cloud Function 2nd Gen (calls ImportRagFiles)
    ↓ On failure
Cloud Logging + Alert Policy → Notification Channel
Enter fullscreen mode Exit fullscreen mode

Why Cloud Functions 2nd gen? Gen 2 functions are built on Cloud Run and use Eventarc for triggers, giving you native GCS event support with retry policies, longer timeouts (up to 60 minutes), and concurrency control. Gen 1 functions work but lack the Eventarc integration and timeout flexibility.

🔧 Terraform: The Full Pipeline

Required APIs and Permissions

GCS-to-Eventarc triggers require the Cloud Storage service account to publish to Pub/Sub. This is the permission step most people miss:

# sync/apis.tf

resource "google_project_service" "required" {
  for_each = toset([
    "cloudfunctions.googleapis.com",
    "cloudbuild.googleapis.com",
    "run.googleapis.com",
    "eventarc.googleapis.com",
    "aiplatform.googleapis.com",
    "storage.googleapis.com",
  ])
  project = var.project_id
  service = each.value
}

# GCS service account needs Pub/Sub publisher role for Eventarc
data "google_storage_project_service_account" "gcs_account" {}

resource "google_project_iam_member" "gcs_pubsub_publishing" {
  project = var.project_id
  role    = "roles/pubsub.publisher"
  member  = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}"
}
Enter fullscreen mode Exit fullscreen mode

Service Account for the Cloud Function

# sync/iam.tf

resource "google_service_account" "rag_sync" {
  account_id   = "${var.environment}-rag-sync-sa"
  display_name = "RAG Sync Cloud Function SA"
  project      = var.project_id
}

# Permission to call ImportRagFiles
resource "google_project_iam_member" "vertex_ai_user" {
  project = var.project_id
  role    = "roles/aiplatform.user"
  member  = "serviceAccount:${google_service_account.rag_sync.email}"
}

# Permission to receive Eventarc events
resource "google_project_iam_member" "eventarc_receiver" {
  project = var.project_id
  role    = "roles/eventarc.eventReceiver"
  member  = "serviceAccount:${google_service_account.rag_sync.email}"
}

# Permission for Cloud Run (gen2 functions run on Cloud Run)
resource "google_project_iam_member" "run_invoker" {
  project = var.project_id
  role    = "roles/run.invoker"
  member  = "serviceAccount:${google_service_account.rag_sync.email}"
}

# Read access to the GCS data source bucket
resource "google_storage_bucket_iam_member" "read_docs" {
  bucket = var.rag_docs_bucket_name
  role   = "roles/storage.objectViewer"
  member = "serviceAccount:${google_service_account.rag_sync.email}"
}
Enter fullscreen mode Exit fullscreen mode

Cloud Function Source Code

# sync/function_source/main.py

import functions_framework
from google.cloud import aiplatform
import os
import logging
import time

logger = logging.getLogger(__name__)

PROJECT_ID = os.environ["PROJECT_ID"]
LOCATION = os.environ["LOCATION"]
RAG_CORPUS_ID = os.environ["RAG_CORPUS_ID"]
GCS_BUCKET = os.environ["GCS_BUCKET"]
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "512"))
CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", "100"))

@functions_framework.cloud_event
def handle_gcs_event(cloud_event):
    """Triggered by GCS object finalize/delete events."""
    data = cloud_event.data
    file_name = data["name"]
    event_type = cloud_event["type"]

    logger.info(f"Event: {event_type}, File: {file_name}")

    # Skip non-document files
    supported_extensions = (".pdf", ".txt", ".md", ".html", ".docx", ".csv")
    if not file_name.lower().endswith(supported_extensions):
        logger.info(f"Skipping unsupported file type: {file_name}")
        return "Skipped", 200

    # Import the specific file that changed
    aiplatform.init(project=PROJECT_ID, location=LOCATION)

    gcs_uri = f"gs://{GCS_BUCKET}/{file_name}"

    try:
        from vertexai.preview import rag

        # Import the changed file into the corpus
        rag.import_files(
            corpus_name=RAG_CORPUS_ID,
            paths=[gcs_uri],
            chunk_size=CHUNK_SIZE,
            chunk_overlap=CHUNK_OVERLAP,
            max_embedding_requests_per_min=300,
        )
        logger.info(f"Import triggered for: {gcs_uri}")
        return "OK", 200

    except Exception as e:
        logger.error(f"Import failed for {gcs_uri}: {str(e)}")
        raise  # Let Eventarc retry
Enter fullscreen mode Exit fullscreen mode

Key design choice: We import the specific file that changed (paths=[gcs_uri]) rather than re-importing the entire bucket. This makes each sync fast and targeted. For deletions, RAG Engine handles cleanup when the file is no longer accessible at the source URI during the next full import.

Package and Deploy the Function

# sync/function.tf

# Zip the function source
data "archive_file" "rag_sync" {
  type        = "zip"
  source_dir  = "${path.module}/function_source"
  output_path = "${path.module}/function_source.zip"
}

# Upload to GCS for Cloud Functions deployment
resource "google_storage_bucket" "function_source" {
  name     = "${var.project_id}-rag-sync-source"
  location = var.region
  uniform_bucket_level_access = true

  lifecycle_rule {
    action { type = "Delete" }
    condition { age = 30 }
  }
}

resource "google_storage_bucket_object" "function_zip" {
  name   = "rag-sync-${data.archive_file.rag_sync.output_md5}.zip"
  bucket = google_storage_bucket.function_source.name
  source = data.archive_file.rag_sync.output_path
}

# Deploy Cloud Function 2nd gen with GCS Eventarc trigger
resource "google_cloudfunctions2_function" "rag_sync" {
  name     = "${var.environment}-rag-sync"
  location = var.region
  project  = var.project_id

  build_config {
    runtime     = "python312"
    entry_point = "handle_gcs_event"

    source {
      storage_source {
        bucket = google_storage_bucket.function_source.name
        object = google_storage_bucket_object.function_zip.name
      }
    }
  }

  service_config {
    max_instance_count    = 1   # Prevent concurrent imports
    min_instance_count    = 0
    timeout_seconds       = 540 # 9 minutes for large files
    available_memory      = "256Mi"
    service_account_email = google_service_account.rag_sync.email

    environment_variables = {
      PROJECT_ID    = var.project_id
      LOCATION      = var.region
      RAG_CORPUS_ID = var.rag_corpus_id
      GCS_BUCKET    = var.rag_docs_bucket_name
      CHUNK_SIZE    = var.chunk_size
      CHUNK_OVERLAP = var.chunk_overlap
    }
  }

  event_trigger {
    trigger_region        = var.region
    event_type            = "google.cloud.storage.object.v1.finalized"
    retry_policy          = "RETRY_POLICY_RETRY"
    service_account_email = google_service_account.rag_sync.email

    event_filters {
      attribute = "bucket"
      value     = var.rag_docs_bucket_name
    }
  }

  depends_on = [
    google_project_iam_member.gcs_pubsub_publishing,
    google_project_iam_member.eventarc_receiver,
    google_project_iam_member.run_invoker,
  ]
}
Enter fullscreen mode Exit fullscreen mode

max_instance_count = 1 is important. It prevents multiple concurrent imports from overwhelming your embedding model's QPM quota. Events queue up and process sequentially.

⚠️ Handling Deletions

The google.cloud.storage.object.v1.finalized event fires on creates and updates, but not deletes. For deletions, you have two options:

Option 1: Add a second trigger for deletes. Add another event_trigger block with event_type = "google.cloud.storage.object.v1.deleted" and handle it in your function by calling rag.delete_rag_files().

Option 2: Scheduled full re-import. Run a nightly Cloud Scheduler job that does a full import_files on the entire bucket. RAG Engine compares against existing files and removes vectors for deleted documents. This is simpler and handles edge cases better.

For most teams, option 2 is the pragmatic choice. Event-driven imports handle creates/updates in near-real-time, and the nightly re-import catches deletions.

📐 Batching: Handling Bulk Uploads

When someone uploads 50 files at once, you get 50 Eventarc events. With max_instance_count = 1, Cloud Run queues these requests and processes them sequentially. Each file gets imported individually, which works but is slower than a single bulk import.

For better performance on bulk uploads, add a Pub/Sub batching layer:

# Alternative: Route events through Pub/Sub for batching
resource "google_pubsub_topic" "rag_sync" {
  name    = "${var.environment}-rag-sync-topic"
  project = var.project_id
}

resource "google_pubsub_subscription" "rag_sync_batch" {
  name    = "${var.environment}-rag-sync-batch"
  topic   = google_pubsub_topic.rag_sync.name
  project = var.project_id

  ack_deadline_seconds       = 600
  message_retention_duration = "86400s"

  push_config {
    push_endpoint = google_cloudfunctions2_function.rag_sync.url
  }
}
Enter fullscreen mode Exit fullscreen mode

Then modify the function to collect file URIs from the batch and call import_files once with all URIs. This reduces embedding API calls and import overhead.

🔔 Monitoring

# sync/monitoring.tf

resource "google_monitoring_alert_policy" "rag_sync_errors" {
  display_name = "${var.environment}-rag-sync-errors"
  project      = var.project_id
  combiner     = "OR"

  conditions {
    display_name = "Cloud Function errors"

    condition_threshold {
      filter          = "resource.type=\"cloud_run_revision\" AND resource.labels.service_name=\"${var.environment}-rag-sync\" AND metric.type=\"run.googleapis.com/request_count\" AND metric.labels.response_code_class!=\"2xx\""
      comparison      = "COMPARISON_GT"
      threshold_value = 5
      duration        = "300s"

      aggregations {
        alignment_period   = "300s"
        per_series_aligner = "ALIGN_COUNT"
      }
    }
  }

  notification_channels = [var.notification_channel_id]
}
Enter fullscreen mode Exit fullscreen mode

📐 Environment Configuration

# environments/dev.tfvars
max_instance_count = 1
chunk_size         = 300
chunk_overlap      = 50
timeout_seconds    = 300

# environments/prod.tfvars
max_instance_count = 1     # Still 1 to protect QPM quota
chunk_size         = 512
chunk_overlap      = 100
timeout_seconds    = 540
Enter fullscreen mode Exit fullscreen mode

⏭️ What's Next

This is Post 4 of the GCP RAG Pipeline with Terraform series.


Your RAG corpus now stays fresh automatically. Upload a document to GCS, and within minutes it's chunked, embedded, and searchable through Gemini. Eventarc handles the plumbing, Cloud Functions does the work, and Terraform makes it repeatable.

Found this helpful? Follow for the full RAG Pipeline with Terraform series! 💬

Top comments (0)