Bedrock Knowledge Bases don't auto-sync when documents change. Here's how to build an event-driven pipeline with S3 notifications, SQS batching, and Lambda to trigger ingestion jobs automatically using Terraform.
You've built your Bedrock Knowledge Base, loaded documents into S3, and manually clicked "Sync" in the console. It works. But every time someone adds, updates, or deletes a document, the vector store goes stale until you sync again.
Bedrock Knowledge Bases don't auto-sync. When documents change in your S3 data source, nothing happens until you call StartIngestionJob. Ingestion is incremental - Bedrock only processes files that changed since the last sync - but you still need something to trigger it. This post builds an event-driven pipeline with Terraform: S3 events trigger an SQS queue, which batches changes and invokes a Lambda function that starts the ingestion job. 🎯
🏗️ Architecture Overview
S3 Bucket (docs added/updated/deleted)
↓ Event Notification
SQS Queue (batches events, 5-min window)
↓ Lambda Event Source Mapping
Lambda Function (calls StartIngestionJob)
↓ On failure
SQS Dead Letter Queue (failed invocations)
↓ CloudWatch
Alarm → SNS notification
Why SQS between S3 and Lambda? Without batching, uploading 50 documents triggers 50 separate Lambda invocations, each calling StartIngestionJob. But only one ingestion job can run per data source at a time - the rest fail. SQS collects events over a batching window and delivers them to a single Lambda invocation, which triggers one sync for the entire batch.
🔧 Terraform: The Full Pipeline
SQS Queues
# sync/sqs.tf
resource "aws_sqs_queue" "kb_sync" {
name = "${var.environment}-kb-sync-queue"
visibility_timeout_seconds = 900 # 6x Lambda timeout
message_retention_seconds = 86400 # 24 hours
delay_seconds = 300 # 5-min delay for batching
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.kb_sync_dlq.arn
maxReceiveCount = 3
})
}
resource "aws_sqs_queue" "kb_sync_dlq" {
name = "${var.environment}-kb-sync-dlq"
message_retention_seconds = 1209600 # 14 days
}
resource "aws_sqs_queue_policy" "allow_s3" {
queue_url = aws_sqs_queue.kb_sync.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "s3.amazonaws.com" }
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.kb_sync.arn
Condition = {
ArnEquals = {
"aws:SourceArn" = var.kb_data_source_bucket_arn
}
}
}]
})
}
The delay_seconds = 300 is the key design choice. When someone uploads multiple files, S3 sends an event per file immediately. The 5-minute delay holds all those messages in the queue before they become visible to Lambda, effectively batching rapid uploads into a single sync trigger.
S3 Event Notification
# sync/s3_notification.tf
resource "aws_s3_bucket_notification" "kb_docs" {
bucket = var.kb_data_source_bucket_id
queue {
queue_arn = aws_sqs_queue.kb_sync.arn
events = [
"s3:ObjectCreated:*",
"s3:ObjectRemoved:*"
]
filter_prefix = var.s3_prefix # e.g., "documents/"
}
depends_on = [aws_sqs_queue_policy.allow_s3]
}
Filter by prefix to avoid triggering on metadata files or other non-document objects in the bucket.
Lambda Function
# sync/lambda.tf
data "archive_file" "kb_sync" {
type = "zip"
source_dir = "${path.module}/lambda/kb_sync"
output_path = "${path.module}/lambda/kb_sync.zip"
}
resource "aws_lambda_function" "kb_sync" {
function_name = "${var.environment}-kb-sync"
handler = "index.handler"
runtime = "python3.12"
timeout = 120
memory_size = 128
role = aws_iam_role.kb_sync_lambda.arn
filename = data.archive_file.kb_sync.output_path
source_code_hash = data.archive_file.kb_sync.output_base64sha256
environment {
variables = {
KNOWLEDGE_BASE_ID = var.knowledge_base_id
DATA_SOURCE_ID = var.data_source_id
}
}
}
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.kb_sync.arn
function_name = aws_lambda_function.kb_sync.arn
batch_size = 10
maximum_batching_window_in_seconds = 300 # Wait up to 5 min for batch
enabled = true
}
The maximum_batching_window_in_seconds on the event source mapping adds a second layer of batching. Combined with the SQS delay, this ensures rapid file uploads are grouped into one Lambda invocation.
Lambda Function Code
# sync/lambda/kb_sync/index.py
import boto3
import os
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
bedrock_agent = boto3.client("bedrock-agent")
def handler(event, context):
kb_id = os.environ["KNOWLEDGE_BASE_ID"]
ds_id = os.environ["DATA_SOURCE_ID"]
# Log what triggered the sync
record_count = len(event.get("Records", []))
logger.info(f"Received {record_count} S3 events, triggering sync")
try:
response = bedrock_agent.start_ingestion_job(
knowledgeBaseId=kb_id,
dataSourceId=ds_id
)
job_id = response["ingestionJob"]["ingestionJobId"]
status = response["ingestionJob"]["status"]
logger.info(f"Ingestion job started: {job_id}, status: {status}")
return {"statusCode": 200, "jobId": job_id}
except bedrock_agent.exceptions.ConflictException:
# An ingestion job is already running - safe to skip
logger.info("Ingestion job already in progress, skipping")
return {"statusCode": 200, "message": "Job already running"}
except Exception as e:
logger.error(f"Failed to start ingestion job: {str(e)}")
raise # Let SQS retry via redrive policy
The ConflictException handler is critical. If a sync is already running when Lambda fires, Bedrock throws a conflict error. Rather than retrying (which would fail again), we log it and return success. The running job will already pick up the new files since ingestion is incremental.
IAM Role for Lambda
# sync/iam.tf
resource "aws_iam_role" "kb_sync_lambda" {
name = "${var.environment}-kb-sync-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic" {
role = aws_iam_role.kb_sync_lambda.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_role_policy" "kb_sync_permissions" {
name = "kb-sync-permissions"
role = aws_iam_role.kb_sync_lambda.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = "bedrock:StartIngestionJob"
Resource = "arn:aws:bedrock:${var.region}:${data.aws_caller_identity.current.account_id}:knowledge-base/${var.knowledge_base_id}"
},
{
Effect = "Allow"
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
Resource = aws_sqs_queue.kb_sync.arn
}
]
})
}
Monitoring: DLQ Alarm
# sync/monitoring.tf
resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
alarm_name = "${var.environment}-kb-sync-dlq-alarm"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 300
statistic = "Sum"
threshold = 0
alarm_description = "KB sync failures landing in DLQ"
alarm_actions = [var.sns_alert_topic_arn]
dimensions = {
QueueName = aws_sqs_queue.kb_sync_dlq.name
}
}
If messages end up in the dead letter queue after 3 retries, something is genuinely broken (IAM permissions changed, Knowledge Base deleted, service outage). Alert on it.
⚠️ Edge Cases and Gotchas
One job at a time. Bedrock allows only one ingestion job per data source concurrently. The ConflictException handler in the Lambda code handles this gracefully.
Ingestion is incremental. Bedrock tracks which files changed since the last sync. You don't need to worry about re-processing unchanged documents. Each sync only processes added, modified, or deleted files.
Large batch uploads. If you're uploading hundreds of files at once (initial load or migration), consider disabling the S3 notification first, uploading all files, then triggering a single manual sync. Re-enable the notification afterward.
Metadata files. If you use .metadata.json companion files (from our Advanced RAG post), uploading the metadata file triggers a separate S3 event. The 5-minute batching window handles this - both the document and its metadata file get picked up in the same sync.
Lambda timeout. StartIngestionJob is async - Lambda just kicks off the job and returns. The actual ingestion runs in the background and can take minutes or hours depending on document count. Lambda's 120-second timeout is more than enough.
📐 Alternative: Scheduled Sync
If event-driven is overkill for your use case, a scheduled approach is simpler:
resource "aws_scheduler_schedule" "kb_sync" {
name = "${var.environment}-kb-sync-schedule"
group_name = "default"
schedule_expression = "rate(1 hour)" # or cron(0 2 * * ? *)
flexible_time_window {
mode = "OFF"
}
target {
arn = aws_lambda_function.kb_sync.arn
role_arn = aws_iam_role.scheduler_role.arn
}
}
Use scheduled sync when documents update on a predictable cadence (daily reports, weekly uploads). Use event-driven sync when documents arrive unpredictably and you need near-real-time freshness.
⏭️ What's Next
This is Post 4 of the AWS RAG Pipeline with Terraform series.
- Post 1: Bedrock Knowledge Base - Basic Setup 🔍
- Post 2: Advanced RAG - Chunking, Search, Reranking 🧠
- Post 3: S3 Vectors - Cheapest Vector Store 💰
- Post 4: Auto-Sync Pipeline (you are here) ⚡
Your Knowledge Base now stays fresh automatically. Drop a document in S3, and within minutes your RAG pipeline has it chunked, embedded, and searchable. No manual syncs, no cron jobs, no stale answers. ⚡
Found this helpful? Follow for the full RAG Pipeline with Terraform series! 💬
Top comments (0)