DEV Community

Cover image for Auto-Sync RAG Pipeline: S3 Events to Bedrock Knowledge Base with Terraform ⚡
Suhas Mallesh
Suhas Mallesh

Posted on

Auto-Sync RAG Pipeline: S3 Events to Bedrock Knowledge Base with Terraform ⚡

Bedrock Knowledge Bases don't auto-sync when documents change. Here's how to build an event-driven pipeline with S3 notifications, SQS batching, and Lambda to trigger ingestion jobs automatically using Terraform.

You've built your Bedrock Knowledge Base, loaded documents into S3, and manually clicked "Sync" in the console. It works. But every time someone adds, updates, or deletes a document, the vector store goes stale until you sync again.

Bedrock Knowledge Bases don't auto-sync. When documents change in your S3 data source, nothing happens until you call StartIngestionJob. Ingestion is incremental - Bedrock only processes files that changed since the last sync - but you still need something to trigger it. This post builds an event-driven pipeline with Terraform: S3 events trigger an SQS queue, which batches changes and invokes a Lambda function that starts the ingestion job. 🎯

🏗️ Architecture Overview

S3 Bucket (docs added/updated/deleted)
    ↓ Event Notification
SQS Queue (batches events, 5-min window)
    ↓ Lambda Event Source Mapping
Lambda Function (calls StartIngestionJob)
    ↓ On failure
SQS Dead Letter Queue (failed invocations)
    ↓ CloudWatch
Alarm → SNS notification
Enter fullscreen mode Exit fullscreen mode

Why SQS between S3 and Lambda? Without batching, uploading 50 documents triggers 50 separate Lambda invocations, each calling StartIngestionJob. But only one ingestion job can run per data source at a time - the rest fail. SQS collects events over a batching window and delivers them to a single Lambda invocation, which triggers one sync for the entire batch.

🔧 Terraform: The Full Pipeline

SQS Queues

# sync/sqs.tf

resource "aws_sqs_queue" "kb_sync" {
  name                       = "${var.environment}-kb-sync-queue"
  visibility_timeout_seconds = 900    # 6x Lambda timeout
  message_retention_seconds  = 86400  # 24 hours
  delay_seconds             = 300     # 5-min delay for batching

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.kb_sync_dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sqs_queue" "kb_sync_dlq" {
  name                      = "${var.environment}-kb-sync-dlq"
  message_retention_seconds = 1209600  # 14 days
}

resource "aws_sqs_queue_policy" "allow_s3" {
  queue_url = aws_sqs_queue.kb_sync.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Principal = { Service = "s3.amazonaws.com" }
      Action    = "sqs:SendMessage"
      Resource  = aws_sqs_queue.kb_sync.arn
      Condition = {
        ArnEquals = {
          "aws:SourceArn" = var.kb_data_source_bucket_arn
        }
      }
    }]
  })
}
Enter fullscreen mode Exit fullscreen mode

The delay_seconds = 300 is the key design choice. When someone uploads multiple files, S3 sends an event per file immediately. The 5-minute delay holds all those messages in the queue before they become visible to Lambda, effectively batching rapid uploads into a single sync trigger.

S3 Event Notification

# sync/s3_notification.tf

resource "aws_s3_bucket_notification" "kb_docs" {
  bucket = var.kb_data_source_bucket_id

  queue {
    queue_arn     = aws_sqs_queue.kb_sync.arn
    events        = [
      "s3:ObjectCreated:*",
      "s3:ObjectRemoved:*"
    ]
    filter_prefix = var.s3_prefix  # e.g., "documents/"
  }

  depends_on = [aws_sqs_queue_policy.allow_s3]
}
Enter fullscreen mode Exit fullscreen mode

Filter by prefix to avoid triggering on metadata files or other non-document objects in the bucket.

Lambda Function

# sync/lambda.tf

data "archive_file" "kb_sync" {
  type        = "zip"
  source_dir  = "${path.module}/lambda/kb_sync"
  output_path = "${path.module}/lambda/kb_sync.zip"
}

resource "aws_lambda_function" "kb_sync" {
  function_name    = "${var.environment}-kb-sync"
  handler          = "index.handler"
  runtime          = "python3.12"
  timeout          = 120
  memory_size      = 128
  role             = aws_iam_role.kb_sync_lambda.arn
  filename         = data.archive_file.kb_sync.output_path
  source_code_hash = data.archive_file.kb_sync.output_base64sha256

  environment {
    variables = {
      KNOWLEDGE_BASE_ID = var.knowledge_base_id
      DATA_SOURCE_ID    = var.data_source_id
    }
  }
}

resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn                   = aws_sqs_queue.kb_sync.arn
  function_name                      = aws_lambda_function.kb_sync.arn
  batch_size                         = 10
  maximum_batching_window_in_seconds = 300  # Wait up to 5 min for batch
  enabled                            = true
}
Enter fullscreen mode Exit fullscreen mode

The maximum_batching_window_in_seconds on the event source mapping adds a second layer of batching. Combined with the SQS delay, this ensures rapid file uploads are grouped into one Lambda invocation.

Lambda Function Code

# sync/lambda/kb_sync/index.py

import boto3
import os
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

bedrock_agent = boto3.client("bedrock-agent")

def handler(event, context):
    kb_id = os.environ["KNOWLEDGE_BASE_ID"]
    ds_id = os.environ["DATA_SOURCE_ID"]

    # Log what triggered the sync
    record_count = len(event.get("Records", []))
    logger.info(f"Received {record_count} S3 events, triggering sync")

    try:
        response = bedrock_agent.start_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=ds_id
        )
        job_id = response["ingestionJob"]["ingestionJobId"]
        status = response["ingestionJob"]["status"]
        logger.info(f"Ingestion job started: {job_id}, status: {status}")
        return {"statusCode": 200, "jobId": job_id}

    except bedrock_agent.exceptions.ConflictException:
        # An ingestion job is already running - safe to skip
        logger.info("Ingestion job already in progress, skipping")
        return {"statusCode": 200, "message": "Job already running"}

    except Exception as e:
        logger.error(f"Failed to start ingestion job: {str(e)}")
        raise  # Let SQS retry via redrive policy
Enter fullscreen mode Exit fullscreen mode

The ConflictException handler is critical. If a sync is already running when Lambda fires, Bedrock throws a conflict error. Rather than retrying (which would fail again), we log it and return success. The running job will already pick up the new files since ingestion is incremental.

IAM Role for Lambda

# sync/iam.tf

resource "aws_iam_role" "kb_sync_lambda" {
  name = "${var.environment}-kb-sync-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "lambda.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_basic" {
  role       = aws_iam_role.kb_sync_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy" "kb_sync_permissions" {
  name = "kb-sync-permissions"
  role = aws_iam_role.kb_sync_lambda.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = "bedrock:StartIngestionJob"
        Resource = "arn:aws:bedrock:${var.region}:${data.aws_caller_identity.current.account_id}:knowledge-base/${var.knowledge_base_id}"
      },
      {
        Effect = "Allow"
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = aws_sqs_queue.kb_sync.arn
      }
    ]
  })
}
Enter fullscreen mode Exit fullscreen mode

Monitoring: DLQ Alarm

# sync/monitoring.tf

resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
  alarm_name          = "${var.environment}-kb-sync-dlq-alarm"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 300
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "KB sync failures landing in DLQ"
  alarm_actions       = [var.sns_alert_topic_arn]

  dimensions = {
    QueueName = aws_sqs_queue.kb_sync_dlq.name
  }
}
Enter fullscreen mode Exit fullscreen mode

If messages end up in the dead letter queue after 3 retries, something is genuinely broken (IAM permissions changed, Knowledge Base deleted, service outage). Alert on it.

⚠️ Edge Cases and Gotchas

One job at a time. Bedrock allows only one ingestion job per data source concurrently. The ConflictException handler in the Lambda code handles this gracefully.

Ingestion is incremental. Bedrock tracks which files changed since the last sync. You don't need to worry about re-processing unchanged documents. Each sync only processes added, modified, or deleted files.

Large batch uploads. If you're uploading hundreds of files at once (initial load or migration), consider disabling the S3 notification first, uploading all files, then triggering a single manual sync. Re-enable the notification afterward.

Metadata files. If you use .metadata.json companion files (from our Advanced RAG post), uploading the metadata file triggers a separate S3 event. The 5-minute batching window handles this - both the document and its metadata file get picked up in the same sync.

Lambda timeout. StartIngestionJob is async - Lambda just kicks off the job and returns. The actual ingestion runs in the background and can take minutes or hours depending on document count. Lambda's 120-second timeout is more than enough.

📐 Alternative: Scheduled Sync

If event-driven is overkill for your use case, a scheduled approach is simpler:

resource "aws_scheduler_schedule" "kb_sync" {
  name       = "${var.environment}-kb-sync-schedule"
  group_name = "default"

  schedule_expression = "rate(1 hour)"  # or cron(0 2 * * ? *)

  flexible_time_window {
    mode = "OFF"
  }

  target {
    arn      = aws_lambda_function.kb_sync.arn
    role_arn = aws_iam_role.scheduler_role.arn
  }
}
Enter fullscreen mode Exit fullscreen mode

Use scheduled sync when documents update on a predictable cadence (daily reports, weekly uploads). Use event-driven sync when documents arrive unpredictably and you need near-real-time freshness.

⏭️ What's Next

This is Post 4 of the AWS RAG Pipeline with Terraform series.


Your Knowledge Base now stays fresh automatically. Drop a document in S3, and within minutes your RAG pipeline has it chunked, embedded, and searchable. No manual syncs, no cron jobs, no stale answers.

Found this helpful? Follow for the full RAG Pipeline with Terraform series! 💬

Top comments (0)