Large Language Models (LLMs) are incredibly powerful at generating content, but they can have “hallucinations” (making up things with confidence) or give us outdated data, since they are “bound to the data they were trained on” (Julien, Hanza, and Antonio – LLM Engineer’s Handbook).
In many cases, you may also want your LLMs to answer questions using your own documents or internal knowledge bases. Retraining models to achieve this is time and cost-consuming.
That is where Retrieval-Augmented Generation (RAG) becomes a practical solution.
In this blog, we will build a Q&A bot using a RAG architecture, with the knowledge base stored in Amazon DynamoDB for non-production environments and Amazon OpenSearch for production workloads.
The app is built using Kiro 🔥
What this app can do
The app allows you to build a knowledge base for the model. Users only need to upload documents (currently supporting .txt and .pdf formats). If any documents become obsolete, simply delete them—the app will automatically trigger a process to remove the corresponding embedding vectors. The same happens when re-uploading documents: old chunks are removed before adding new ones.
The app provides a fallback to the general LLM knowledge, with a clear indication when no relevant documents are found. The threshold is configurable for easy updates.
The app is built using AWS CDK. For cost savings, only the production environment uses AWS OpenSearch as the vector database, with CloudFront in front of S3 hosting the static website. Other environments use AWS DynamoDB as the vector store.
If you prefer a Q&A bot without a knowledge base, simply set the ENABLE_RAG environment variable to false before deploying. If no value is set, it defaults to true, and the knowledge base will be deployed.
Before jumping into building the app, there are some terms that will be used:
RAG
Retrieval-augmented generation (RAG) is a method created by Meta to enhance the accuracy of LLM and reduce false information (Louis, Building LLMs for Production). RAG works by adding information from the retrieval step as context to the prompt, then the LLMs generate the answer. RAG allow you keep the LLMs up to date without retraining the model.
Tokens and Embeddings
Tokens are small chunks of text. For the LLMs to compute language, it converts tokens into numeric representations called embeddings. Embeddings are vector representations of data that attempt to capture its meaning (Jay, Maarten - Hand-on Large Language Models)
Vector Database
A vector database is a specialized system that stores and queries high-dimensional vectors efficiently. These databases are fundamental for Retrieval Augmented Generation (RAG) applications.Overview of vector databases - AWS Prescriptive Guidance.
Amazon supports several vector database options including Amazon OpenSearch, Kendra, and Amazon RDS for PostgreSQL with pgvector. Vector database options - AWS Prescriptive Guidance
Architecture

Before we dive into the step by step walkthrough of the diagram, we split the architecture into to production and non-production environments for cost savings.
For production, we use Amazon OpenSearch as vector database and leverage the native vector search it provided, along with AWS CloudFront for CDN for https and caching.
For non-production we use Amazon DynamoDB as vector store and use AWS Lambda to scan items and compute cosine similarity.
Step 1: Upload document to S3
Users upload documents for knowledge base to Amazon S3.
This automatically triggers AWS Lambda function to generate embeddings.
Step 2: Generate Embeddings
AWS Lambda function chunks the documents and invokes Amazone Bedrock with the Titan model to generate embeddings.
Depending on the environment, the embeddings are stored in Amazon DynamoDB (non-production) or Amazon OpenSearch (production environment).
Each record store:
- chunkId
- documentId
- chunkIndex
- content
- embedding
- sourceKey
- format
- createdAt
Step 3: Users ask questions via frontend
Users access the frontend, configure API Gateway URL, API key, then submit their questions.
Step 4: Frontend sends the request to API Gateway
The request is sent from the frontend to Amazon API Gateway
Step 5: Amazon API Gateway invokes AWS Lambda function
API Gateway invoke lambdas a function to process the request and generate an answer.
Step 6: Amazon Lambda handles the request
Firstly, the Lambda function generate an embbeding for the question by calling the Amazon Bedrock API with the Titan model
- For non-production, Lambda searchs for similar chunks by scanning DynamoDB and computing the cosine similarity.
- For production, Lambda calls Amazon OpenSearch to perform the native vector similarity search Lambda then formats the prompt using the retrieved relevant text and call Amzon Bedrock InvokeModel API with Clause Sonet to get the final answer Finally, Lambda returns the response to the frontend Note: The models used for embedding generation and for producing the final answer are configurable.
AWS Resources:
- AWS CloudFront
- AWS S3 buckets
- Amazon API Gateway
- AWS Lambda
- AWS DynamoDB
- AWS OpenSearch Service
- Amazon Bedrock
- AWS Identity and Access Management (IAM)
- Amazon CloudWatch
Prerequisites:
- An AWS account that has been bootstrapped for AWS CDK
- Environment setup: ** Note.js ** Typescript ** AWS CDK Toolkit ** Docker (used for bundling Lambda functions when deploying)
- AWS Credentials: keep them handy so you can deploy the stacks
Building the app
1. Build the frontend
The frontend is built using simple HTML and JavaScript.
We will create two files: index.html and error.html, which will be uploaded to S3 later
Index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Knowledge Q&A Bot</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #f5f5f5;
min-height: 100vh;
padding: 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
}
h1 {
text-align: center;
color: #333;
margin-bottom: 30px;
}
.card {
background: white;
border-radius: 12px;
padding: 24px;
margin-bottom: 20px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.card h2 {
font-size: 18px;
color: #666;
margin-bottom: 16px;
}
.input-group {
display: flex;
gap: 10px;
margin-bottom: 16px;
}
input[type="text"], textarea {
flex: 1;
padding: 12px 16px;
border: 2px solid #e0e0e0;
border-radius: 8px;
font-size: 16px;
transition: border-color 0.2s;
}
input[type="text"]:focus, textarea:focus {
outline: none;
border-color: #007bff;
}
textarea {
min-height: 100px;
resize: vertical;
}
button {
padding: 12px 24px;
background: #007bff;
color: white;
border: none;
border-radius: 8px;
font-size: 16px;
cursor: pointer;
transition: background 0.2s;
}
button:hover { background: #0056b3; }
button:disabled {
background: #ccc;
cursor: not-allowed;
}
.file-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 8px 12px;
background: #f5f5f5;
border-radius: 6px;
margin-bottom: 8px;
}
.file-item .remove {
color: #dc3545;
cursor: pointer;
padding: 4px 8px;
}
.answer-box {
background: #f8f9fa;
border-radius: 8px;
padding: 16px;
margin-top: 16px;
display: none;
}
.answer-box.show { display: block; }
.answer-box h3 {
font-size: 14px;
color: #666;
margin-bottom: 8px;
}
.answer-text {
font-size: 16px;
line-height: 1.6;
color: #333;
}
.sources {
margin-top: 16px;
padding-top: 16px;
border-top: 1px solid #e0e0e0;
}
.sources h4 {
font-size: 12px;
color: #888;
margin-bottom: 8px;
}
.source-item {
font-size: 13px;
color: #666;
padding: 8px;
background: white;
border-radius: 4px;
margin-bottom: 6px;
}
.source-item .score {
color: #28a745;
font-weight: 600;
}
.loading {
display: none;
text-align: center;
padding: 20px;
}
.loading.show { display: block; }
.spinner {
width: 40px;
height: 40px;
border: 4px solid #f3f3f3;
border-top: 4px solid #007bff;
border-radius: 50%;
animation: spin 1s linear infinite;
margin: 0 auto 10px;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.config-section {
display: flex;
gap: 10px;
flex-wrap: wrap;
}
.config-section input {
flex: 1;
min-width: 200px;
}
.status {
padding: 8px 12px;
border-radius: 6px;
font-size: 14px;
margin-top: 10px;
}
.status.success { background: #d4edda; color: #155724; }
.status.error { background: #f8d7da; color: #721c24; }
.status.info { background: #cce5ff; color: #004085; }
</style>
</head>
<body>
<div class="container">
<h1>📚 Knowledge Q&A Bot</h1>
<!-- Config -->
<div class="card">
<h2>⚙️ Configuration</h2>
<div class="config-section">
<div style="flex: 1; min-width: 200px;">
<label for="apiUrl" style="display: block; font-size: 12px; color: #666; margin-bottom: 4px;">API URL</label>
<input type="text" id="apiUrl" placeholder="https://xxx.execute-api.ap-southeast-2.amazonaws.com/kate">
</div>
<div style="flex: 1; min-width: 200px;">
<label for="apiKey" style="display: block; font-size: 12px; color: #666; margin-bottom: 4px;">API Key</label>
<input type="text" id="apiKey" placeholder="Enter your API key">
</div>
<button onclick="saveConfig()">Save</button>
</div>
<div id="configStatus"></div>
</div>
<!-- Ask -->
<div class="card">
<h2>💬 Ask a Question</h2>
<div style="background: #f0f7ff; padding: 12px; border-radius: 6px; margin-bottom: 16px; font-size: 13px; color: #555;">
<strong>💡 Tips for better answers:</strong>
<ul style="margin: 8px 0 0 20px; padding: 0;">
<li>Be specific: "How do I configure Lambda timeout?" vs "Tell me about Lambda"</li>
<li>Ask one thing at a time for focused responses</li>
<li>Include context: "In AWS CDK, how do I..." helps narrow the search</li>
<li>Try rephrasing if the first answer isn't helpful</li>
</ul>
</div>
<div class="input-group">
<textarea id="question" placeholder="Example: How do I set up DynamoDB with on-demand billing in AWS CDK?"></textarea>
</div>
<button onclick="askQuestion()" id="askBtn">Ask</button>
<div class="loading" id="loading">
<div class="spinner"></div>
<p>Thinking...</p>
</div>
<div class="answer-box" id="answerBox">
<h3>Answer</h3>
<div class="answer-text" id="answerText"></div>
<div class="sources" id="sources"></div>
</div>
</div>
</div>
<script>
// Load saved config
document.getElementById('apiUrl').value = localStorage.getItem('apiUrl') || '';
document.getElementById('apiKey').value = localStorage.getItem('apiKey') || '';
function saveConfig() {
const apiUrl = document.getElementById('apiUrl').value.trim();
const apiKey = document.getElementById('apiKey').value.trim();
localStorage.setItem('apiUrl', apiUrl);
localStorage.setItem('apiKey', apiKey);
showStatus('configStatus', 'Configuration saved!', 'success');
}
function showStatus(elementId, message, type) {
const el = document.getElementById(elementId);
el.innerHTML = `<div class="status ${type}">${message}</div>`;
setTimeout(() => el.innerHTML = '', 5000);
}
async function askQuestion() {
const apiUrl = localStorage.getItem('apiUrl');
const apiKey = localStorage.getItem('apiKey');
const question = document.getElementById('question').value.trim();
if (!apiUrl || !apiKey) {
showStatus('configStatus', 'Please configure API URL and Key first', 'error');
return;
}
if (!question) {
alert('Please enter a question');
return;
}
const loading = document.getElementById('loading');
const answerBox = document.getElementById('answerBox');
const askBtn = document.getElementById('askBtn');
loading.classList.add('show');
answerBox.classList.remove('show');
askBtn.disabled = true;
try {
const response = await fetch(`${apiUrl}/ask`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': apiKey
},
body: JSON.stringify({ question, topK: 5 })
});
const data = await response.json();
if (!response.ok) {
throw new Error(data.error || 'Request failed');
}
// Display fallback notice if applicable
let answerHtml = '';
if (data.fallback) {
answerHtml = `
<div style="background: #fff3cd; border-left: 4px solid #ffc107; padding: 12px; margin-bottom: 12px; border-radius: 4px;">
<strong>⚠️ General Knowledge Response</strong>
<p style="margin: 4px 0 0 0; font-size: 13px; color: #856404;">
${data.fallbackReason || 'No relevant documents found in knowledge base'}.
This answer is based on general knowledge, not the knowledge base.
</p>
</div>
`;
}
answerHtml += `<div>${data.answer}</div>`;
document.getElementById('answerText').innerHTML = answerHtml;
const sourcesEl = document.getElementById('sources');
if (data.sources && data.sources.length > 0) {
sourcesEl.innerHTML = `
<h4>Sources</h4>
${data.sources.map(s => `
<div class="source-item">
<strong>${s.documentId}</strong> (chunk ${s.chunkIndex})
<span class="score">${(s.score * 100).toFixed(1)}% match</span>
<p style="margin-top: 4px; font-size: 12px;">${s.excerpt}</p>
</div>
`).join('')}
`;
} else {
sourcesEl.innerHTML = '';
}
answerBox.classList.add('show');
} catch (error) {
alert('Error: ' + error.message);
} finally {
loading.classList.remove('show');
askBtn.disabled = false;
}
}
// Enter key to submit
document.getElementById('question').addEventListener('keydown', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
askQuestion();
}
});
</script>
</body>
</html>
Error.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Error - Knowledge Q&A Bot</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
padding: 20px;
}
.error-container {
background: white;
border-radius: 20px;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3);
max-width: 600px;
width: 100%;
padding: 60px 40px;
text-align: center;
}
.error-icon {
font-size: 80px;
margin-bottom: 20px;
}
h1 {
color: #333;
font-size: 48px;
margin-bottom: 10px;
font-weight: 700;
}
h2 {
color: #666;
font-size: 24px;
margin-bottom: 30px;
font-weight: 400;
}
p {
color: #777;
font-size: 16px;
line-height: 1.6;
margin-bottom: 30px;
}
.btn {
padding: 12px 30px;
border: none;
border-radius: 8px;
font-size: 16px;
font-weight: 600;
cursor: pointer;
text-decoration: none;
display: inline-block;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
@media (max-width: 600px) {
.error-container {
padding: 40px 20px;
}
h1 {
font-size: 36px;
}
h2 {
font-size: 20px;
}
}
</style>
</head>
<body>
<div class="error-container">
<div class="error-icon">⚠️</div>
<h1>Oops!</h1>
<h2>Something went wrong</h2>
<p>The page you're looking for doesn't exist. This might happen if the URL is incorrect or the page has been removed.</p>
<a href="/" class="btn">Go to Home</a>
</div>
</body>
</html>
2. Build the infrastructure using AWS CDK
2.1 S3 Buckets
We will use 2 buckets for this app:
Frontend S3 bucket: host the website
/**
* S3 Buckets Stack
*
* This stack creates and manages S3 buckets for the Knowledge Q&A Bot:
* - Documents bucket: Private storage for uploaded documents (TXT, PDF)
* - Frontend bucket: Public static website hosting for the chat UI
*
* The stack is separated from the main application stack to allow
* independent lifecycle management of storage resources.
*/
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as s3deploy from 'aws-cdk-lib/aws-s3-deployment';
import { Construct } from 'constructs';
import * as path from 'path';
/**
* Props for the S3 Buckets Stack
*/
export interface S3BucketsStackProps extends cdk.StackProps {
/** Name of the CloudFormation stack */
stackName: string;
/** AWS region for deployment */
region: string;
/** AWS account ID */
accountId: string;
/** Environment name (e.g., 'kate', 'dev', 'prod') */
envName: string;
/** Name for the frontend S3 bucket */
frontendBucketName: string;
}
/**
* Stack that creates S3 bucket for frontend hosting
* Note: Documents bucket has been moved to KnowledgeQaBotStack to avoid cyclic dependencies
*/
export class S3BucketsStack extends cdk.Stack {
/** S3 bucket for hosting the frontend static website */
public readonly frontendBucket: s3.Bucket;
constructor(scope: Construct, id: string, props: S3BucketsStackProps) {
const { region, accountId, envName } = props;
// Merge environment configuration with provided props
const updatedProps = {
env: {
region: region,
account: accountId,
},
...props,
};
super(scope, id, updatedProps);
// ========================================
// Frontend Bucket
// ========================================
// Public bucket for hosting the static website (HTML/CSS/JS)
// - Website hosting enabled with index.html as default
// - Public read access for website visitors
// - Always destroyed on stack deletion (frontend can be redeployed)
this.frontendBucket = new s3.Bucket(this, 'FrontendBucket', {
bucketName: props.frontendBucketName,
websiteIndexDocument: 'index.html',
websiteErrorDocument: 'error.html',
publicReadAccess: true,
blockPublicAccess: new s3.BlockPublicAccess({
blockPublicAcls: false,
blockPublicPolicy: false,
ignorePublicAcls: false,
restrictPublicBuckets: false,
}),
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
// ========================================
// Frontend Deployment
// ========================================
// Automatically deploy frontend files from ./frontend directory
// This runs on every CDK deploy to update the website
new s3deploy.BucketDeployment(this, 'DeployFrontend', {
sources: [s3deploy.Source.asset(path.join(__dirname, '../frontend'))],
destinationBucket: this.frontendBucket,
});
// ========================================
// Stack Outputs
// ========================================
// Export values for use by other stacks and for reference
new cdk.CfnOutput(this, 'FrontendBucketName', {
value: this.frontendBucket.bucketName,
description: 'S3 bucket for frontend',
exportName: `FrontendBucketName-${envName}`,
});
new cdk.CfnOutput(this, 'FrontendUrl', {
value: this.frontendBucket.bucketWebsiteUrl,
description: 'Frontend website URL (S3 direct)',
exportName: `FrontendS3Url-${envName}`,
});
}
}
Document S3 storage bucket: stores the documents uploaded for knowledge base, currently accepts txt and pdf format
// ========================================
// Documents S3 Bucket
// ========================================
// Create the documents bucket in this stack to avoid cyclic dependencies
// with S3 event notifications
// For convenience during development/testing, always destroy buckets
// Uncomment below for production to retain data on stack deletion
// const isProduction = envName.toLowerCase() === 'prod';
// const documentsRemovalPolicy = isProduction
// ? cdk.RemovalPolicy.RETAIN
// : cdk.RemovalPolicy.DESTROY;
// const autoDeleteDocuments = !isProduction;
const documentsRemovalPolicy = cdk.RemovalPolicy.DESTROY;
const autoDeleteDocuments = true;
this.documentsBucket = new s3.Bucket(this, 'DocumentsBucket', {
bucketName: resourceName(envName, 'documents'),
encryption: s3.BucketEncryption.S3_MANAGED,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
versioned: true,
enforceSSL: true,
removalPolicy: documentsRemovalPolicy,
autoDeleteObjects: autoDeleteDocuments,
});
2.2 API Gateway
We will create REST API with
- API key for authentication
- Usage plan to control access and prevent over spending
// ========================================
// API Gateway
// ========================================
// REST API for Q&A queries with CORS support
this.api = new apigateway.RestApi(this, 'QaApi', {
restApiName: resourceName(envName, 'api'),
description: 'Knowledge Q&A Bot API',
deployOptions: {
stageName: envName,
},
// Enable CORS for frontend access
defaultCorsPreflightOptions: {
allowOrigins: apigateway.Cors.ALL_ORIGINS,
allowMethods: apigateway.Cors.ALL_METHODS,
allowHeaders: ['Content-Type', 'x-api-key'],
},
});
// ========================================
// API Key & Usage Plan
// ========================================
// Protect API with key and enforce rate limits to control costs
const apiKey = this.api.addApiKey('ApiKey', {
apiKeyName: resourceName(envName, 'api-key'),
});
const usagePlan = this.api.addUsagePlan('UsagePlan', {
name: resourceName(envName, 'usage-plan'),
throttle: {
rateLimit: 10, // 10 requests per second
burstLimit: 20, // Allow bursts up to 20
},
quota: {
limit: 1000, // 1000 requests per month
period: apigateway.Period.MONTH,
},
});
usagePlan.addApiKey(apiKey);
usagePlan.addApiStage({ stage: this.api.deploymentStage });
// ========================================
// API Endpoints
// ========================================
// POST /ask - Submit a question and get an answer
const askResource = this.api.root.addResource('ask');
askResource.addMethod('POST', new apigateway.LambdaIntegration(this.queryFunction), {
apiKeyRequired: true,
});
2.3 Lambda
Lambda will be used for both document ingestion and query processing
Document Ingestion
- Trigger by OBJECT_CREATED/OBJECT_REMOVE events from S3
- Chunks the documents
- Calls Amazon Bedrock with the Titan model to generate embeddings
- Stores embeddings in Amazon DynamoDB (non-production) or Amazon OpenSearch (production) Handler
"""
Document Ingestion Lambda Handler
This Lambda function is triggered by S3 object creation events when
documents are uploaded to the documents bucket. It processes each
document through the following pipeline:
1. Parse: Extract text content from TXT or PDF files
2. Chunk: Split text into overlapping chunks for better retrieval
3. Embed: Generate vector embeddings using Amazon Bedrock Titan
4. Store: Save chunks and embeddings to vector store (DynamoDB or OpenSearch)
Features:
- Automatic document deletion: Removes old chunks when re-uploading
- Batch processing: Efficiently stores multiple chunks
- Error handling: Continues processing even if individual documents fail
- Flexible storage: Supports both DynamoDB (dev) and OpenSearch (prod)
Environment Variables:
ENABLE_RAG: Enable/disable RAG mode (default: true)
USE_OPENSEARCH: Use OpenSearch instead of DynamoDB (default: false)
TABLE_NAME: DynamoDB table for storing chunks (if not using OpenSearch)
OPENSEARCH_ENDPOINT: OpenSearch domain endpoint (if using OpenSearch)
BUCKET_NAME: S3 bucket containing documents
CHUNK_SIZE: Target size for text chunks (default: 1000)
CHUNK_OVERLAP: Overlap between chunks (default: 200)
EMBEDDING_MODEL: Bedrock model ID for embeddings
LOG_LEVEL: Logging level (default: INFO)
"""
import json
import os
import logging
from typing import Any
import boto3
from services.parser import DocumentParser
from services.chunker import TextChunker
from services.embedding import EmbeddingService
from services.vector_store import VectorStore
from services.serialization import serialize_chunk
# ========================================
# Logging Configuration
# ========================================
log_level = os.environ.get("LOG_LEVEL", "INFO")
logging.basicConfig(level=log_level)
logger = logging.getLogger(__name__)
# ========================================
# AWS Client Initialization
# ========================================
# Initialize AWS clients once at module load for connection reuse
s3_client = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")
bedrock = boto3.client("bedrock-runtime")
# ========================================
# Environment Variables
# ========================================
ENABLE_RAG = os.environ.get("ENABLE_RAG", "true").lower() == "true"
USE_OPENSEARCH = os.environ.get("USE_OPENSEARCH", "false").lower() == "true"
TABLE_NAME = os.environ.get("TABLE_NAME", "not-used")
BUCKET_NAME = os.environ["BUCKET_NAME"]
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "1000"))
CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", "200"))
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "amazon.titan-embed-text-v1")
# ========================================
# Service Initialization
# ========================================
# Initialize services with configured clients and settings
parser = DocumentParser(s3_client, BUCKET_NAME)
chunker = TextChunker(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
embedding_service = EmbeddingService(bedrock, EMBEDDING_MODEL)
# Choose vector store based on environment
if ENABLE_RAG:
if USE_OPENSEARCH:
from services.opensearch_vector_store import OpenSearchVectorStore
opensearch_endpoint = os.environ.get("OPENSEARCH_ENDPOINT", "")
if not opensearch_endpoint:
raise ValueError("OPENSEARCH_ENDPOINT required when USE_OPENSEARCH=true")
vector_store = OpenSearchVectorStore(opensearch_endpoint)
logger.info(f"Using OpenSearch at {opensearch_endpoint}")
else:
vector_store = VectorStore(dynamodb.Table(TABLE_NAME))
logger.info(f"Using DynamoDB table {TABLE_NAME}")
else:
vector_store = None
logger.info("RAG disabled - no vector store initialized")
def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""
Lambda handler for document ingestion and deletion.
Processes S3 event notifications for uploaded/deleted documents:
- ObjectCreated: Parse, chunk, embed, and store document
- ObjectRemoved: Delete all chunks for the document
Args:
event: S3 event notification containing Records array
context: Lambda context object (unused)
Returns:
Response dict with statusCode and body containing:
- processed: Number of successfully processed documents
- deleted: Number of successfully deleted documents
- errors: List of error messages for failed operations
"""
logger.info(f"Received event: {json.dumps(event)}")
processed_count = 0
deleted_count = 0
errors = []
# Process each S3 event record
for record in event.get("Records", []):
try:
# ========================================
# Extract S3 Object Information
# ========================================
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
event_name = record["eventName"]
logger.info(f"Processing event {event_name}: s3://{bucket}/{key}")
# Generate unique document ID from S3 key
document_id = key.replace("/", "_").replace(".", "_")
# ========================================
# Handle Delete Events
# ========================================
if event_name.startswith("ObjectRemoved"):
logger.info(f"Deleting chunks for document: {document_id}")
vector_store.delete_by_document(document_id)
deleted_count += 1
logger.info(f"Successfully deleted document: {key}")
continue
# ========================================
# Handle Upload/Reupload Events
# ========================================
# Step 0: Delete existing chunks (handles reuploads)
# This ensures no orphaned chunks remain if new version
# has fewer chunks than the old version
logger.info(f"Checking for existing chunks: {document_id}")
vector_store.delete_by_document(document_id)
# ========================================
# Step 1: Parse Document
# ========================================
# Extract text content from TXT or PDF file
parsed_doc = parser.parse(key)
if not parsed_doc:
logger.warning(f"Could not parse document: {key}")
continue
# ========================================
# Step 2: Chunk Text
# ========================================
# Split into overlapping chunks for better retrieval
chunks = chunker.chunk(parsed_doc["content"])
logger.info(f"Created {len(chunks)} chunks from document")
# ========================================
# Step 3 & 4: Embed and Store Each Chunk
# ========================================
for chunk in chunks:
# Generate vector embedding via Bedrock Titan
embedding = embedding_service.embed(chunk["content"])
# Prepare chunk record for DynamoDB
stored_chunk = {
"chunkId": f"{document_id}#{chunk['index']}",
"documentId": document_id,
"chunkIndex": chunk["index"],
"content": chunk["content"],
"embedding": embedding,
"sourceKey": key,
"format": parsed_doc["metadata"]["format"],
"createdAt": parsed_doc["metadata"]["extractedAt"],
}
# Store serialized chunk in DynamoDB
vector_store.store(serialize_chunk(stored_chunk))
processed_count += 1
logger.info(f"Successfully processed document: {key}")
except Exception as e:
logger.error(f"Error processing record: {e}", exc_info=True)
errors.append(str(e))
return {
"statusCode": 200,
"body": json.dumps(
{
"processed": processed_count,
"deleted": deleted_count,
"errors": errors,
}
),
}
Deploy Lambda function with its log group in AWS Cloudwatch
// ========================================
// Ingestion Lambda
// ========================================
// Triggered by S3 uploads, processes documents:
// 1. Parse document (TXT/PDF)
// 2. Split into chunks
// 3. Generate embeddings via Bedrock Titan
// 4. Store chunks + embeddings in DynamoDB
// Create log group explicitly so it's managed by CloudFormation
// and deleted when the stack is destroyed
const logRetentionDays = config.logRetentionDays || 1;
const ingestLogGroup = new logs.LogGroup(this, 'IngestLogGroup', {
logGroupName: `/aws/lambda/${resourceName(envName, 'ingest')}`,
retention: logRetentionDays as logs.RetentionDays,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
this.ingestFunction = new PythonFunction(this, 'IngestFunction', {
functionName: resourceName(envName, 'ingest'),
entry: 'src/lambdas/ingest',
runtime: lambda.Runtime.PYTHON_3_12,
index: 'handler.py',
handler: 'handler',
description: `Document ingestion function (config: ${configHash})`,
memorySize: config.lambdaMemorySize || 1024,
timeout: Duration.seconds(config.lambdaTimeout || 300),
environment: commonEnv,
logGroup: ingestLogGroup,
});
Then create an S3 trigger event and add this Lambda function as the destination.
// ========================================
// S3 Event Triggers (Only if RAG enabled)
// ========================================
// Automatically process documents when uploaded or deleted from S3
// - OBJECT_CREATED: Parse, chunk, embed, and store
// - OBJECT_REMOVED: Delete all chunks for the document
// Skip if RAG is disabled (no document processing needed)
//
// Note: Event notifications are added here in the same stack where
// the Lambda is created to avoid cyclic dependencies
if (enableRag) {
// Supported document formats
// Add new formats here to automatically enable processing
const supportedFormats = ['.txt', '.pdf'];
// Register event notifications for each supported format
supportedFormats.forEach(format => {
// Handle uploads and reuploads
this.documentsBucket.addEventNotification(
s3.EventType.OBJECT_CREATED,
new s3n.LambdaDestination(this.ingestFunction),
{ suffix: format }
);
// Handle deletions
this.documentsBucket.addEventNotification(
s3.EventType.OBJECT_REMOVED,
new s3n.LambdaDestination(this.ingestFunction),
{ suffix: format }
);
});
}
Query Processing
- Receives user questions via API Gateway
- Generates embeddings for the question using Amazon Bedrock
- Searches for relevant chunks:
- Non-prod: scan DynamoDB and compute cosine similarity
- Prod: query OpenSearch using native vector search
- Formats a prompt with the retrieved chunks
- Calls Amazon Bedrock InvokeModel API to get the answer
- Returns the response to the frontend
Handler
"""
Query Lambda Handler
This Lambda function handles Q&A requests from API Gateway.
It processes questions through the following pipeline:
1. Embed: Convert question to vector embedding (Bedrock Titan)
2. Retrieve: Find similar document chunks via cosine similarity (DynamoDB/OpenSearch)
3. Check: Verify similarity threshold (fallback to general knowledge if too low)
4. Generate: Create grounded answer (Bedrock Claude)
5. Respond: Return answer with source citations
Features:
- RAG mode: Retrieves relevant documents and generates grounded answers
- Direct LLM mode: Generates answers without retrieval (when RAG disabled)
- Smart fallback: Uses general knowledge when no relevant documents found
- Similarity threshold: Ensures retrieved documents are actually relevant
Environment Variables:
ENABLE_RAG: Enable/disable RAG mode (default: true)
TABLE_NAME: DynamoDB table containing document chunks
TOP_K: Number of similar chunks to retrieve (default: 5)
SIMILARITY_THRESHOLD: Minimum similarity score for RAG (default: 0.5)
EMBEDDING_MODEL: Bedrock model ID for embeddings
LLM_MODEL: Bedrock model ID for answer generation
LOG_LEVEL: Logging level (default: INFO)
"""
import json
import os
import logging
from typing import Any
import boto3
from services.embedding import EmbeddingService
from services.retrieval import RetrievalService
from services.answer_generation import AnswerGenerationService
from services.vector_store import VectorStore
# ========================================
# Logging Configuration
# ========================================
log_level = os.environ.get("LOG_LEVEL", "INFO")
logging.basicConfig(level=log_level)
logger = logging.getLogger(__name__)
# ========================================
# AWS Client Initialization
# ========================================
# Initialize AWS clients once at module load for connection reuse
dynamodb = boto3.resource("dynamodb")
bedrock = boto3.client("bedrock-runtime")
# ========================================
# Environment Variables
# ========================================
ENABLE_RAG = os.environ.get("ENABLE_RAG", "true").lower() == "true"
TABLE_NAME = os.environ.get("TABLE_NAME", "not-used")
TOP_K = int(os.environ.get("TOP_K", "5"))
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "amazon.titan-embed-text-v1")
LLM_MODEL = os.environ.get("LLM_MODEL", "anthropic.claude-3-sonnet-20240229-v1:0")
SIMILARITY_THRESHOLD = float(os.environ.get("SIMILARITY_THRESHOLD", "0.5"))
# ========================================
# Service Initialization
# ========================================
# Initialize services with configured clients and settings
answer_service = AnswerGenerationService(bedrock, LLM_MODEL)
# Only initialize RAG services if enabled
if ENABLE_RAG:
embedding_service = EmbeddingService(bedrock, EMBEDDING_MODEL)
# Choose vector store based on environment
USE_OPENSEARCH = os.environ.get("USE_OPENSEARCH", "false").lower() == "true"
if USE_OPENSEARCH:
from services.opensearch_vector_store import OpenSearchVectorStore
opensearch_endpoint = os.environ.get("OPENSEARCH_ENDPOINT", "")
if not opensearch_endpoint:
raise ValueError("OPENSEARCH_ENDPOINT environment variable is required when USE_OPENSEARCH=true")
vector_store = OpenSearchVectorStore(opensearch_endpoint)
logger.info(f"RAG mode enabled - using OpenSearch at {opensearch_endpoint}")
else:
vector_store = VectorStore(dynamodb.Table(TABLE_NAME))
logger.info(f"RAG mode enabled - using DynamoDB table {TABLE_NAME}")
retrieval_service = RetrievalService(embedding_service, vector_store)
else:
embedding_service = None
vector_store = None
retrieval_service = None
logger.info("RAG mode disabled - using direct LLM responses")
def handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""
Lambda handler for Q&A queries.
Processes POST requests from API Gateway with a question,
retrieves relevant document chunks, and generates a grounded answer.
Args:
event: API Gateway event containing body with question
context: Lambda context object (unused)
Returns:
API Gateway response with:
- 200: Answer and sources on success
- 400: Error message for invalid requests
- 500: Error message for server errors
"""
logger.info(f"Received event: {json.dumps(event)}")
try:
# ========================================
# Parse and Validate Request
# ========================================
body = json.loads(event.get("body", "{}"))
question = body.get("question", "").strip()
top_k = body.get("topK", TOP_K)
# Validate required fields
if not question:
return {
"statusCode": 400,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type,x-api-key",
},
"body": json.dumps({"error": "Question is required"}),
}
logger.info(f"Processing question: {question} (RAG: {ENABLE_RAG})")
# ========================================
# RAG Mode: Retrieve + Generate
# ========================================
if ENABLE_RAG and retrieval_service:
# Step 1 & 2: Embed Query and Retrieve Chunks
# Convert question to embedding and find similar chunks
retrieval_result = retrieval_service.retrieve(question, top_k)
# Handle Empty Results or Low Similarity - Fallback to General Knowledge
# If no relevant documents found or all scores below threshold, use direct LLM
chunks = retrieval_result["chunks"]
if not chunks:
logger.info("No documents found, falling back to general knowledge")
answer_result = answer_service.generate(question, [])
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type,x-api-key",
},
"body": json.dumps(
{
"answer": answer_result["answer"],
"sources": [],
"fallback": True,
"fallbackReason": "No relevant documents found in knowledge base",
}
),
}
# Check if best match is below similarity threshold
best_score = chunks[0].get("score", 0)
if best_score < SIMILARITY_THRESHOLD:
logger.info(f"Best similarity score {best_score:.3f} below threshold {SIMILARITY_THRESHOLD}, falling back to general knowledge")
answer_result = answer_service.generate(question, [])
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type,x-api-key",
},
"body": json.dumps(
{
"answer": answer_result["answer"],
"sources": [],
"fallback": True,
"fallbackReason": f"No sufficiently relevant documents found (best match: {best_score:.1%})",
}
),
}
# Step 3: Generate Answer with Context
# Use Bedrock Claude to generate grounded answer from context
answer_result = answer_service.generate(
question, retrieval_result["chunks"]
)
# Format response with sources
response = {
"answer": answer_result["answer"],
"sources": [
{
"documentId": chunk["documentId"],
"chunkIndex": chunk["chunkIndex"],
"excerpt": (
chunk["content"][:200] + "..."
if len(chunk["content"]) > 200
else chunk["content"]
),
"score": chunk["score"],
}
for chunk in retrieval_result["chunks"]
],
}
# ========================================
# Direct LLM Mode: No Retrieval
# ========================================
else:
# Generate answer directly without context
# This demonstrates "before RAG" behavior
answer_result = answer_service.generate(question, [])
# Format response without sources
response = {
"answer": answer_result["answer"],
"sources": [],
"mode": "direct-llm", # Indicate this is non-RAG mode
}
# ========================================
# Return Response
# ========================================
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type,x-api-key",
},
"body": json.dumps(response),
}
except Exception as e:
logger.error(f"Error processing request: {e}", exc_info=True)
# Include more details in development/debug mode
error_detail = (
str(e) if log_level == "DEBUG" else "Service temporarily unavailable"
)
return {
"statusCode": 500,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type,x-api-key",
},
"body": json.dumps({"error": error_detail, "type": type(e).__name__}),
}
Deploy query Lambda function with its logroup in Amazon Cloudwatch
// ========================================
// Query Lambda
// ========================================
// Handles Q&A requests from API Gateway:
// 1. Generate query embedding via Bedrock Titan
// 2. Find similar chunks via cosine similarity
// 3. Generate answer via Bedrock Claude
// 4. Return answer with source citations
// Create log group explicitly so it's managed by CloudFormation
// and deleted when the stack is destroyed
const queryLogGroup = new logs.LogGroup(this, 'QueryLogGroup', {
logGroupName: `/aws/lambda/${resourceName(envName, 'query')}`,
retention: logRetentionDays as logs.RetentionDays,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
this.queryFunction = new PythonFunction(this, 'QueryFunction', {
functionName: resourceName(envName, 'query'),
entry: 'src/lambdas/query',
runtime: lambda.Runtime.PYTHON_3_12,
index: 'handler.py',
handler: 'handler',
description: `Query processing function (config: ${configHash})`,
memorySize: config.lambdaMemorySize || 512,
timeout: Duration.seconds(config.lambdaTimeout || 30),
environment: commonEnv,
logGroup: queryLogGroup,
});
// this.queryFunction.addEnvironment('EMBEDDING_MODEL', config.embeddingModel || 'test')
// ========================================
// IAM Permissions
// ========================================
// Grant least-privilege access to AWS resources
// Ingestion Lambda: read documents, read/write chunks (only if RAG enabled)
// Note: Needs read access to query DocumentIndex GSI when deleting old chunks
this.documentsBucket.grantRead(this.ingestFunction);
if (enableRag && chunksTable) {
chunksTable.grantReadWriteData(this.ingestFunction);
}
// Query Lambda: read chunks for similarity search (only if RAG enabled)
if (enableRag && chunksTable) {
chunksTable.grantReadData(this.queryFunction);
}
// Both Lambdas need Bedrock access for embeddings and LLM
const bedrockPolicy = new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['bedrock:InvokeModel'],
resources: ['*'], // Bedrock doesn't support resource-level permissions
});
this.ingestFunction.addToRolePolicy(bedrockPolicy);
this.queryFunction.addToRolePolicy(bedrockPolicy);
// If using OpenSearch, grant Lambda access to the domain
if (useOpenSearch && openSearchDomain) {
const openSearchPolicy = new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'es:ESHttpGet',
'es:ESHttpPost',
'es:ESHttpPut',
'es:ESHttpDelete',
],
resources: [`${openSearchDomain.domainArn}/*`],
});
this.ingestFunction.addToRolePolicy(openSearchPolicy);
this.queryFunction.addToRolePolicy(openSearchPolicy);
}
2.4 DynamoDB
DynamoDB will serve as the vector storage for the non-production environment (though it is nto a true vector database). We store embeddings as JSON, and similarity searches will be computed in the Lambda function. Although this is not a true vector database, it is simple and cost-effective for small datasets and development environments.
_Note:
At the time of writing, DynamoDB does not support native vector similarity search by itself. Amazon provides “Vector search for Amazon DynamoDB with zero ETL for Amazon OpenSearch Service”, but using OpenSearch incurs addition costs. _
/**
* DynamoDB Stack (Development/Testing Only)
*
* This stack creates and manages DynamoDB tables for the Knowledge Q&A Bot:
* - Chunks table: Stores document chunks with vector embeddings
* - Uses in-memory cosine similarity for vector search
*
* Note: This stack is only deployed for non-production environments.
* Production uses OpenSearch for high-performance vector search.
*
* The stack is separated from the main application stack to allow
* independent lifecycle management of data resources.
*/
import * as cdk from 'aws-cdk-lib';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import { Construct } from 'constructs';
/**
* Props for the DynamoDB Stack
*/
export interface DynamoDBStackProps extends cdk.StackProps {
/** Name of the CloudFormation stack */
stackName: string;
/** Environment name (e.g., 'kate', 'dev', 'prod') */
envName: string;
/** Name for the chunks DynamoDB table */
tableName: string;
}
/**
* Stack that creates DynamoDB tables for document storage
*/
export class DynamoDBStack extends cdk.Stack {
/** DynamoDB table for storing document chunks and embeddings */
public readonly chunksTable: dynamodb.Table;
constructor(scope: Construct, id: string, props: DynamoDBStackProps) {
super(scope, id, props);
const { envName } = props;
// Production environments retain tables on stack deletion
// Non-production environments auto-delete for easy cleanup
const isProduction = envName.toLowerCase() === 'prod';
const tableRemovalPolicy = isProduction
? cdk.RemovalPolicy.RETAIN
: cdk.RemovalPolicy.DESTROY;
// ========================================
// Chunks Table
// ========================================
// Stores document chunks with their vector embeddings
// - Partition key: chunkId (format: {documentId}#{chunkIndex})
// - On-demand billing for cost efficiency (pay per request)
// - GSI on documentId for efficient document deletion
this.chunksTable = new dynamodb.Table(this, 'ChunksTable', {
tableName: props.tableName,
partitionKey: { name: 'chunkId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
removalPolicy: tableRemovalPolicy,
// Enable point-in-time recovery for production
...(isProduction && {
pointInTimeRecoverySpecification: {
pointInTimeRecoveryEnabled: true,
},
}),
});
// ========================================
// Global Secondary Index
// ========================================
// Index for querying all chunks of a document
// Used when deleting a document to remove all its chunks
this.chunksTable.addGlobalSecondaryIndex({
indexName: 'DocumentIndex',
partitionKey: { name: 'documentId', type: dynamodb.AttributeType.STRING },
});
// ========================================
// Stack Outputs
// ========================================
new cdk.CfnOutput(this, 'ChunksTableName', {
value: this.chunksTable.tableName,
description: 'DynamoDB table for document chunks',
exportName: `ChunksTableName-${envName}`,
});
new cdk.CfnOutput(this, 'ChunksTableArn', {
value: this.chunksTable.tableArn,
description: 'DynamoDB table ARN for document chunks',
exportName: `ChunksTableArn-${envName}`,
});
}
}
2.5 Cloudfront
For frontend distribution with caching and https support
/**
* CloudFront Stack (Production Only)
*
* This stack creates a CloudFront distribution for the frontend.
*
* Note: This stack is only deployed for production environments.
* Development/testing environments serve frontend directly from S3.
*
* CloudFront provides:
* - HTTPS support with AWS-managed certificate
* - Global CDN for fast access worldwide
* - Custom domain support (optional)
* - Better caching and performance
*/
import * as cdk from 'aws-cdk-lib';
import * as cloudfront from 'aws-cdk-lib/aws-cloudfront';
import * as origins from 'aws-cdk-lib/aws-cloudfront-origins';
import * as s3 from 'aws-cdk-lib/aws-s3';
import { Construct } from 'constructs';
/**
* Props for the CloudFront Stack
*/
export interface CloudFrontStackProps extends cdk.StackProps {
/** Name of the CloudFormation stack */
stackName: string;
/** Environment name (e.g., 'kate', 'dev', 'prod') */
envName: string;
/** S3 bucket for frontend (from S3BucketsStack) */
frontendBucket: s3.IBucket;
}
/**
* Stack that creates CloudFront distribution for frontend
*/
export class CloudFrontStack extends cdk.Stack {
/** CloudFront distribution for frontend */
public readonly distribution: cloudfront.Distribution;
/** CloudFront domain name */
public readonly distributionDomainName: string;
constructor(scope: Construct, id: string, props: CloudFrontStackProps) {
super(scope, id, props);
const { envName, frontendBucket } = props;
// ========================================
// CloudFront Distribution
// ========================================
// CDN for frontend with HTTPS and global edge locations
this.distribution = new cloudfront.Distribution(this, 'FrontendDistribution', {
comment: `Knowledge Q&A Bot Frontend - ${envName}`,
// Origin: S3 bucket with website hosting
defaultBehavior: {
origin: new origins.S3Origin(frontendBucket),
viewerProtocolPolicy: cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
allowedMethods: cloudfront.AllowedMethods.ALLOW_GET_HEAD,
cachedMethods: cloudfront.CachedMethods.CACHE_GET_HEAD,
compress: true,
// Cache policy for static content
cachePolicy: cloudfront.CachePolicy.CACHING_OPTIMIZED,
},
// Default root object
defaultRootObject: 'index.html',
// Error responses
errorResponses: [
{
httpStatus: 404,
responseHttpStatus: 200,
responsePagePath: '/index.html',
ttl: cdk.Duration.minutes(5),
},
],
// Price class - use all edge locations for prod, cheaper for dev
priceClass: envName.toLowerCase() === 'prod'
? cloudfront.PriceClass.PRICE_CLASS_ALL
: cloudfront.PriceClass.PRICE_CLASS_100,
// Enable IPv6
enableIpv6: true,
});
this.distributionDomainName = this.distribution.distributionDomainName;
// ========================================
// Stack Outputs
// ========================================
new cdk.CfnOutput(this, 'DistributionId', {
value: this.distribution.distributionId,
description: 'CloudFront distribution ID',
exportName: `CloudFrontDistributionId-${envName}`,
});
new cdk.CfnOutput(this, 'DistributionDomainName', {
value: this.distribution.distributionDomainName,
description: 'CloudFront distribution domain name',
exportName: `CloudFrontDomainName-${envName}`,
});
new cdk.CfnOutput(this, 'FrontendUrl', {
value: `https://${this.distribution.distributionDomainName}`,
description: 'Frontend URL (HTTPS)',
exportName: `FrontendUrl-${envName}`,
});
}
}
2.6 OpenSearch
True vector database, faster vector search
/**
* OpenSearch Stack (Production Only)
*
* This stack creates an OpenSearch domain for high-performance vector similarity search.
* OpenSearch provides native k-NN (k-nearest neighbors) support for
* efficient vector search at scale.
*
* Note: This stack is only deployed for production environments.
* Development/testing environments use DynamoDB with in-memory cosine similarity.
*
* Features:
* - k-NN plugin enabled for vector search
* - Fine-grained access control
* - Encryption at rest and in transit
* - Single-node configuration for cost efficiency (scale up as needed)
*/
import * as cdk from 'aws-cdk-lib';
import * as opensearch from 'aws-cdk-lib/aws-opensearchservice';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Construct } from 'constructs';
/**
* Props for the OpenSearch Stack
*/
export interface OpenSearchStackProps extends cdk.StackProps {
/** Name of the CloudFormation stack */
stackName: string;
/** Environment name (e.g., 'kate', 'dev', 'prod') */
envName: string;
/** Domain name for OpenSearch */
domainName: string;
}
/**
* Stack that creates OpenSearch domain for vector search
*/
export class OpenSearchStack extends cdk.Stack {
/** OpenSearch domain for vector similarity search */
public readonly domain: opensearch.Domain;
/** Domain endpoint URL */
public readonly domainEndpoint: string;
constructor(scope: Construct, id: string, props: OpenSearchStackProps) {
super(scope, id, props);
const { envName } = props;
const region = props.env?.region || 'us-east-1';
const accountId = props.env?.account || cdk.Aws.ACCOUNT_ID;
// Production environments use larger instances and retain on deletion
const isProduction = envName.toLowerCase() === 'prod';
// ========================================
// OpenSearch Domain
// ========================================
// Domain for storing and searching vector embeddings
// - k-NN plugin enabled for vector similarity search
// - t3.small.search for cost-effective prototype/dev
// - Single node for dev, multi-node for prod
this.domain = new opensearch.Domain(this, 'VectorSearchDomain', {
domainName: props.domainName,
version: opensearch.EngineVersion.OPENSEARCH_2_11,
// Capacity configuration
capacity: {
dataNodes: 1,
dataNodeInstanceType: 't3.small.search',
// No dedicated master nodes for cost-effective single-node setup
// For production scale, use 3+ master nodes and 2+ data nodes
multiAzWithStandbyEnabled: false, // T3 instances don't support Multi-AZ with standby
},
// Storage configuration
ebs: {
volumeSize: isProduction ? 100 : 20, // GB
volumeType: ec2.EbsDeviceVolumeType.GP3,
},
// Security configuration
enforceHttps: true,
nodeToNodeEncryption: true,
encryptionAtRest: {
enabled: true,
},
// Access policy - allow IAM authenticated access from this account
// This allows Lambda functions with proper IAM permissions to access
// Using explicit actions instead of es:* to force CDK update
accessPolicies: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
principals: [
new iam.ArnPrincipal(`arn:aws:iam::${accountId}:root`)
],
actions: [
'es:ESHttpDelete',
'es:ESHttpGet',
'es:ESHttpHead',
'es:ESHttpPost',
'es:ESHttpPut',
'es:ESHttpPatch'
],
resources: [`arn:aws:es:${region}:${accountId}:domain/${props.domainName}/*`],
}),
],
// Fine-grained access control disabled for simplicity
// Enable in production with proper user/role configuration
// fineGrainedAccessControl: {
// masterUserArn: `arn:aws:iam::${accountId}:root`,
// },
// Removal policy
removalPolicy: isProduction ? cdk.RemovalPolicy.DESTROY : cdk.RemovalPolicy.DESTROY,
});
this.domainEndpoint = this.domain.domainEndpoint;
// ========================================
// Stack Outputs
// ========================================
// Export domain endpoint for cross-stack reference
const endpointExport = new cdk.CfnOutput(this, 'DomainEndpoint', {
value: this.domain.domainEndpoint,
description: 'OpenSearch domain endpoint',
exportName: `OpenSearchEndpoint-${envName}`,
});
// Export domain ARN for IAM policies
const arnExport = new cdk.CfnOutput(this, 'DomainArn', {
value: this.domain.domainArn,
description: 'OpenSearch domain ARN',
exportName: `OpenSearchArn-${envName}`,
});
new cdk.CfnOutput(this, 'DomainName', {
value: this.domain.domainName,
description: 'OpenSearch domain name',
exportName: `OpenSearchDomainName-${envName}`,
});
}
}
Demo Result (Before vs After Upload documents to the knowledge base)
LLM Without the knowledge base
LLM With Sucessfull Retrieval
Final Thoughts
Now we have a Q&A bot with our own knowledge base. We can easily update our bot with latest documents without retraining the model, and even use our private data.
AWS provide option that we can simply connect and AWS S3 bucket to Amazon Bedrock, allowing AWS to handle the heavy lifting for you. But be aware of potential costs the end of month. For more detail refer to Connect to Amazon S3 for your knowledge base
Reference
- Choosing an AWS vector database for RAG use cases - AWS Prescriptive Guidance
- What is RAG? - Retrieval-Augmented Generation AI Explained - AWS
- Vector search for Amazon DynamoDB with zero ETL for Amazon OpenSearch Service
- AWS announces Amazon DynamoDB zero-ETL integration with Amazon OpenSearch Service - AWS
- Vector database options - AWS Prescriptive Guidance
- Connect to Amazon S3 for your knowledge base
- Louis. Building LLMs for Production
- Jay, Maarten. Hand-on Large Language Models
- Julien, Hanza, & Antonio. LLM Engineer’s Handbook


Top comments (0)