DEV Community

Cover image for Auto-Sync RAG Pipeline: Blob Events to Azure AI Search with Terraform ⚡
Suhas Mallesh
Suhas Mallesh

Posted on

Auto-Sync RAG Pipeline: Blob Events to Azure AI Search with Terraform ⚡

Azure AI Search indexers can run on a schedule, but that means stale results between intervals. Here's how to build an event-driven pipeline with Event Grid, Service Bus batching, and a Function App to trigger indexer runs on demand using Terraform.

You've built your Azure AI Search index, configured a blob indexer, and set it to run on a 5-minute schedule. But that means documents uploaded between intervals sit unindexed. For internal tools this might be fine. For customer-facing RAG, 5 minutes of stale results is too long.

Azure AI Search indexers support scheduled runs (minimum every 5 minutes) or on-demand triggers via the REST API. This post builds an event-driven pipeline with Terraform: Blob storage events fire through Event Grid, batch in a Service Bus queue, and invoke a Function App that calls the Run Indexer API. Near-real-time sync without polling. 🎯

🏗️ Architecture Overview

Blob Storage (docs added/updated/deleted)
    ↓ Event Grid Subscription
Service Bus Queue (batches events, 2-min lock)
    ↓ Function App Trigger
Function App (calls POST /indexers/{name}/run)
    ↓ On failure
Service Bus Dead Letter Queue → Monitor Alert
Enter fullscreen mode Exit fullscreen mode

Why not just use the scheduled indexer? You can, and for many workloads it's the right choice. The scheduled indexer handles change detection automatically using blob timestamps. But the minimum interval is 5 minutes, and you pay for indexer execution time even when nothing changed. Event-driven sync triggers only when documents actually change, and the indexer processes immediately.

Why Service Bus between Event Grid and the Function? Same batching problem as AWS. Uploading 50 documents fires 50 events. Each would trigger a separate Run Indexer call, but only one indexer run can execute at a time. Service Bus collects events and the Function processes the batch with a single indexer run.

🔧 Terraform: The Full Pipeline

Event Grid and Service Bus

# sync/event_grid.tf

resource "azurerm_servicebus_namespace" "this" {
  name                = "${var.environment}-${var.project}-sb"
  location            = azurerm_resource_group.this.location
  resource_group_name = azurerm_resource_group.this.name
  sku                 = "Standard"
}

resource "azurerm_servicebus_queue" "indexer_sync" {
  name         = "indexer-sync"
  namespace_id = azurerm_servicebus_namespace.this.id

  lock_duration                       = "PT2M"
  max_delivery_count                  = 3
  dead_lettering_on_message_expiration = true
}

# Event Grid system topic for the storage account
resource "azurerm_eventgrid_system_topic" "blob_events" {
  name                   = "${var.environment}-blob-events"
  location               = azurerm_resource_group.this.location
  resource_group_name    = azurerm_resource_group.this.name
  source_arm_resource_id = azurerm_storage_account.docs.id
  topic_type             = "Microsoft.Storage.StorageAccounts"
}

# Route blob events to Service Bus queue
resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" {
  name                = "blob-to-indexer-sync"
  system_topic        = azurerm_eventgrid_system_topic.blob_events.name
  resource_group_name = azurerm_resource_group.this.name

  service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id

  included_event_types = [
    "Microsoft.Storage.BlobCreated",
    "Microsoft.Storage.BlobDeleted"
  ]

  subject_filter {
    subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/"
    subject_ends_with   = ""
  }
}
Enter fullscreen mode Exit fullscreen mode

The subject_filter ensures only events from your documents container trigger the pipeline, ignoring uploads to other containers in the same storage account.

Function App

Azure AI Search indexes, indexers, and data sources don't have native Terraform resources. The azurerm provider covers the search service itself, but data plane operations (creating indexes, running indexers) require REST API calls. The Function App handles this.

# sync/function_app.tf

resource "azurerm_service_plan" "sync" {
  name                = "${var.environment}-sync-plan"
  location            = azurerm_resource_group.this.location
  resource_group_name = azurerm_resource_group.this.name
  os_type             = "Linux"
  sku_name            = "Y1"  # Consumption plan
}

resource "azurerm_linux_function_app" "indexer_sync" {
  name                       = "${var.environment}-indexer-sync"
  location                   = azurerm_resource_group.this.location
  resource_group_name        = azurerm_resource_group.this.name
  service_plan_id            = azurerm_service_plan.sync.id
  storage_account_name       = azurerm_storage_account.function.name
  storage_account_access_key = azurerm_storage_account.function.primary_access_key

  site_config {
    application_stack {
      python_version = "3.11"
    }
  }

  app_settings = {
    SEARCH_SERVICE_NAME = azurerm_search_service.this.name
    SEARCH_ADMIN_KEY    = azurerm_search_service.this.primary_key
    INDEXER_NAME        = var.indexer_name
    SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string
  }
}
Enter fullscreen mode Exit fullscreen mode

The Consumption plan (Y1) means you pay only when the function executes. No idle cost.

Function App Code

# sync/function_code/function_app.py

import azure.functions as func
import requests
import os
import logging

app = func.FunctionApp()

@app.service_bus_queue_trigger(
    arg_name="msg",
    queue_name="indexer-sync",
    connection="SERVICEBUS_CONNECTION"
)
def run_indexer(msg: func.ServiceBusMessage):
    """Triggered by Service Bus messages from blob events."""
    service_name = os.environ["SEARCH_SERVICE_NAME"]
    admin_key = os.environ["SEARCH_ADMIN_KEY"]
    indexer_name = os.environ["INDEXER_NAME"]

    event_data = msg.get_body().decode("utf-8")
    logging.info(f"Blob event received: {event_data[:200]}")

    # Call the Run Indexer REST API
    url = (
        f"https://{service_name}.search.windows.net"
        f"/indexers/{indexer_name}/run"
        f"?api-version=2024-07-01"
    )
    headers = {
        "api-key": admin_key,
        "Content-Type": "application/json"
    }

    response = requests.post(url, headers=headers)

    if response.status_code == 202:
        logging.info(f"Indexer run triggered: {indexer_name}")
    elif response.status_code == 409:
        # Indexer already running - safe to skip
        logging.info("Indexer already running, skipping")
    else:
        logging.error(
            f"Failed to trigger indexer: {response.status_code} "
            f"{response.text}"
        )
        raise Exception(f"Indexer run failed: {response.status_code}")
Enter fullscreen mode Exit fullscreen mode

The 409 handler is critical. When the indexer is already running, the API returns 409 Conflict. Like the AWS ConflictException pattern, we log and skip. The running indexer already picks up the new blobs through its change detection mechanism.

🔄 Change and Delete Detection

Azure AI Search blob indexers have built-in change detection using blob timestamps. When you trigger Run Indexer, it only processes blobs that have been modified since the last successful run. You don't need to track which files changed - the indexer handles this automatically.

For deletions, use the soft delete pattern. Instead of deleting blobs directly, add a metadata property like IsDeleted = true. Configure the indexer's data source with a soft delete column detection policy:

{
  "dataDeletionDetectionPolicy": {
    "@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy",
    "softDeleteColumnName": "IsDeleted",
    "softDeleteMarkerValue": "true"
  }
}
Enter fullscreen mode Exit fullscreen mode

The indexer sees the metadata change, finds the matching document in the search index, and removes it. This is the recommended approach from Microsoft because hard deletes from blob storage don't propagate to the search index.

⚠️ Edge Cases and Gotchas

Indexer execution limits. Azure AI Search indexers have a maximum execution time that varies by tier: 2 hours for Basic, 24 hours for Standard. If you're indexing large documents with AI enrichment (OCR, chunking, embedding), a single run can take significant time. Plan accordingly.

One indexer run at a time. Just like Bedrock and RAG Engine, only one indexer execution can run per indexer at a time. The 409 handling in the function code addresses this.

Service Bus message lock. The lock_duration = "PT2M" gives the function 2 minutes to process each message. If the function times out, the message becomes visible again and retries. After 3 failed attempts (max_delivery_count), it goes to the dead letter queue.

Event Grid retry policy. Event Grid retries failed deliveries to Service Bus with exponential backoff for up to 24 hours. Between Event Grid retries and Service Bus dead lettering, you have robust failure handling without custom code.

📐 Alternative: Scheduled Indexer (Simpler)

If event-driven is overkill, use the built-in indexer schedule. You configure this via the AI Search REST API when creating the indexer:

{
  "name": "my-blob-indexer",
  "dataSourceName": "blob-datasource",
  "targetIndexName": "rag-index",
  "schedule": {
    "interval": "PT5M"
  }
}
Enter fullscreen mode Exit fullscreen mode

This polls for changes every 5 minutes. No Event Grid, no Service Bus, no Function App. The trade-off is a 0-5 minute delay before new documents appear in search results.

🔄 Tri-Cloud Auto-Sync Comparison

Component AWS GCP Azure
Event source S3 Notification Eventarc Event Grid
Batching SQS (delay_seconds) max_instance_count=1 Service Bus queue
Compute Lambda Cloud Function 2nd gen Function App
Sync API StartIngestionJob ImportRagFiles Run Indexer (REST)
Change detection Incremental (built-in) Per-file import Blob timestamps
Delete handling Incremental sync Scheduled re-import Soft delete metadata
Conflict handling ConflictException Sequential (max 1) 409 Conflict
Minimum cost ~$0 (Lambda free tier) ~$0 (Cloud Functions) ~$0 (Consumption plan)

All three clouds follow the same pattern: storage event → message queue → serverless function → sync API. The implementations differ, but the architecture is identical.

⏭️ What's Next

This is Post 4 of the Azure RAG Pipeline with Terraform series.


Your search index now stays current automatically. Upload a document to Blob Storage, and within seconds Event Grid fires, Service Bus batches, and the Function App triggers your indexer. No polling, no stale results, no wasted compute.

Found this helpful? Follow for the full RAG Pipeline with Terraform series! 💬

Top comments (0)