Azure AI Search indexers can run on a schedule, but that means stale results between intervals. Here's how to build an event-driven pipeline with Event Grid, Service Bus batching, and a Function App to trigger indexer runs on demand using Terraform.
You've built your Azure AI Search index, configured a blob indexer, and set it to run on a 5-minute schedule. But that means documents uploaded between intervals sit unindexed. For internal tools this might be fine. For customer-facing RAG, 5 minutes of stale results is too long.
Azure AI Search indexers support scheduled runs (minimum every 5 minutes) or on-demand triggers via the REST API. This post builds an event-driven pipeline with Terraform: Blob storage events fire through Event Grid, batch in a Service Bus queue, and invoke a Function App that calls the Run Indexer API. Near-real-time sync without polling. 🎯
🏗️ Architecture Overview
Blob Storage (docs added/updated/deleted)
↓ Event Grid Subscription
Service Bus Queue (batches events, 2-min lock)
↓ Function App Trigger
Function App (calls POST /indexers/{name}/run)
↓ On failure
Service Bus Dead Letter Queue → Monitor Alert
Why not just use the scheduled indexer? You can, and for many workloads it's the right choice. The scheduled indexer handles change detection automatically using blob timestamps. But the minimum interval is 5 minutes, and you pay for indexer execution time even when nothing changed. Event-driven sync triggers only when documents actually change, and the indexer processes immediately.
Why Service Bus between Event Grid and the Function? Same batching problem as AWS. Uploading 50 documents fires 50 events. Each would trigger a separate Run Indexer call, but only one indexer run can execute at a time. Service Bus collects events and the Function processes the batch with a single indexer run.
🔧 Terraform: The Full Pipeline
Event Grid and Service Bus
# sync/event_grid.tf
resource "azurerm_servicebus_namespace" "this" {
name = "${var.environment}-${var.project}-sb"
location = azurerm_resource_group.this.location
resource_group_name = azurerm_resource_group.this.name
sku = "Standard"
}
resource "azurerm_servicebus_queue" "indexer_sync" {
name = "indexer-sync"
namespace_id = azurerm_servicebus_namespace.this.id
lock_duration = "PT2M"
max_delivery_count = 3
dead_lettering_on_message_expiration = true
}
# Event Grid system topic for the storage account
resource "azurerm_eventgrid_system_topic" "blob_events" {
name = "${var.environment}-blob-events"
location = azurerm_resource_group.this.location
resource_group_name = azurerm_resource_group.this.name
source_arm_resource_id = azurerm_storage_account.docs.id
topic_type = "Microsoft.Storage.StorageAccounts"
}
# Route blob events to Service Bus queue
resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" {
name = "blob-to-indexer-sync"
system_topic = azurerm_eventgrid_system_topic.blob_events.name
resource_group_name = azurerm_resource_group.this.name
service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id
included_event_types = [
"Microsoft.Storage.BlobCreated",
"Microsoft.Storage.BlobDeleted"
]
subject_filter {
subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/"
subject_ends_with = ""
}
}
The subject_filter ensures only events from your documents container trigger the pipeline, ignoring uploads to other containers in the same storage account.
Function App
Azure AI Search indexes, indexers, and data sources don't have native Terraform resources. The azurerm provider covers the search service itself, but data plane operations (creating indexes, running indexers) require REST API calls. The Function App handles this.
# sync/function_app.tf
resource "azurerm_service_plan" "sync" {
name = "${var.environment}-sync-plan"
location = azurerm_resource_group.this.location
resource_group_name = azurerm_resource_group.this.name
os_type = "Linux"
sku_name = "Y1" # Consumption plan
}
resource "azurerm_linux_function_app" "indexer_sync" {
name = "${var.environment}-indexer-sync"
location = azurerm_resource_group.this.location
resource_group_name = azurerm_resource_group.this.name
service_plan_id = azurerm_service_plan.sync.id
storage_account_name = azurerm_storage_account.function.name
storage_account_access_key = azurerm_storage_account.function.primary_access_key
site_config {
application_stack {
python_version = "3.11"
}
}
app_settings = {
SEARCH_SERVICE_NAME = azurerm_search_service.this.name
SEARCH_ADMIN_KEY = azurerm_search_service.this.primary_key
INDEXER_NAME = var.indexer_name
SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string
}
}
The Consumption plan (Y1) means you pay only when the function executes. No idle cost.
Function App Code
# sync/function_code/function_app.py
import azure.functions as func
import requests
import os
import logging
app = func.FunctionApp()
@app.service_bus_queue_trigger(
arg_name="msg",
queue_name="indexer-sync",
connection="SERVICEBUS_CONNECTION"
)
def run_indexer(msg: func.ServiceBusMessage):
"""Triggered by Service Bus messages from blob events."""
service_name = os.environ["SEARCH_SERVICE_NAME"]
admin_key = os.environ["SEARCH_ADMIN_KEY"]
indexer_name = os.environ["INDEXER_NAME"]
event_data = msg.get_body().decode("utf-8")
logging.info(f"Blob event received: {event_data[:200]}")
# Call the Run Indexer REST API
url = (
f"https://{service_name}.search.windows.net"
f"/indexers/{indexer_name}/run"
f"?api-version=2024-07-01"
)
headers = {
"api-key": admin_key,
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers)
if response.status_code == 202:
logging.info(f"Indexer run triggered: {indexer_name}")
elif response.status_code == 409:
# Indexer already running - safe to skip
logging.info("Indexer already running, skipping")
else:
logging.error(
f"Failed to trigger indexer: {response.status_code} "
f"{response.text}"
)
raise Exception(f"Indexer run failed: {response.status_code}")
The 409 handler is critical. When the indexer is already running, the API returns 409 Conflict. Like the AWS ConflictException pattern, we log and skip. The running indexer already picks up the new blobs through its change detection mechanism.
🔄 Change and Delete Detection
Azure AI Search blob indexers have built-in change detection using blob timestamps. When you trigger Run Indexer, it only processes blobs that have been modified since the last successful run. You don't need to track which files changed - the indexer handles this automatically.
For deletions, use the soft delete pattern. Instead of deleting blobs directly, add a metadata property like IsDeleted = true. Configure the indexer's data source with a soft delete column detection policy:
{
"dataDeletionDetectionPolicy": {
"@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy",
"softDeleteColumnName": "IsDeleted",
"softDeleteMarkerValue": "true"
}
}
The indexer sees the metadata change, finds the matching document in the search index, and removes it. This is the recommended approach from Microsoft because hard deletes from blob storage don't propagate to the search index.
⚠️ Edge Cases and Gotchas
Indexer execution limits. Azure AI Search indexers have a maximum execution time that varies by tier: 2 hours for Basic, 24 hours for Standard. If you're indexing large documents with AI enrichment (OCR, chunking, embedding), a single run can take significant time. Plan accordingly.
One indexer run at a time. Just like Bedrock and RAG Engine, only one indexer execution can run per indexer at a time. The 409 handling in the function code addresses this.
Service Bus message lock. The lock_duration = "PT2M" gives the function 2 minutes to process each message. If the function times out, the message becomes visible again and retries. After 3 failed attempts (max_delivery_count), it goes to the dead letter queue.
Event Grid retry policy. Event Grid retries failed deliveries to Service Bus with exponential backoff for up to 24 hours. Between Event Grid retries and Service Bus dead lettering, you have robust failure handling without custom code.
📐 Alternative: Scheduled Indexer (Simpler)
If event-driven is overkill, use the built-in indexer schedule. You configure this via the AI Search REST API when creating the indexer:
{
"name": "my-blob-indexer",
"dataSourceName": "blob-datasource",
"targetIndexName": "rag-index",
"schedule": {
"interval": "PT5M"
}
}
This polls for changes every 5 minutes. No Event Grid, no Service Bus, no Function App. The trade-off is a 0-5 minute delay before new documents appear in search results.
🔄 Tri-Cloud Auto-Sync Comparison
| Component | AWS | GCP | Azure |
|---|---|---|---|
| Event source | S3 Notification | Eventarc | Event Grid |
| Batching | SQS (delay_seconds) | max_instance_count=1 | Service Bus queue |
| Compute | Lambda | Cloud Function 2nd gen | Function App |
| Sync API | StartIngestionJob | ImportRagFiles | Run Indexer (REST) |
| Change detection | Incremental (built-in) | Per-file import | Blob timestamps |
| Delete handling | Incremental sync | Scheduled re-import | Soft delete metadata |
| Conflict handling | ConflictException | Sequential (max 1) | 409 Conflict |
| Minimum cost | ~$0 (Lambda free tier) | ~$0 (Cloud Functions) | ~$0 (Consumption plan) |
All three clouds follow the same pattern: storage event → message queue → serverless function → sync API. The implementations differ, but the architecture is identical.
⏭️ What's Next
This is Post 4 of the Azure RAG Pipeline with Terraform series.
- Post 1: Azure AI Search RAG - Basic Setup 🔍
- Post 2: Advanced RAG - Three-Layer Retrieval 🧠
- Post 3: Cosmos DB Vector Search - NoSQL-Native RAG 💰
- Post 4: Auto-Sync Pipeline (you are here) ⚡
Your search index now stays current automatically. Upload a document to Blob Storage, and within seconds Event Grid fires, Service Bus batches, and the Function App triggers your indexer. No polling, no stale results, no wasted compute. ⚡
Found this helpful? Follow for the full RAG Pipeline with Terraform series! 💬
Top comments (0)