A deep dive into transforming a painful user experience into seamless interactions using resource pooling and state machines
The Pain Point That Changed Everything
Picture this: You've just built what you think is an elegant RAG (Retrieval Augmented Generation) system using AWS Bedrock, OpenSearch Serverless, and S3. The architecture looks clean on paper. Then you hit the "create knowledge base" button and... wait. And wait. Thirty seconds later, your users are still staring at a loading spinner.
This isn't just poor UX—it's a business killer in today's instant-gratification world.
The Hidden AWS Reality
Here's what the AWS documentation doesn't make crystal clear:
OpenSearch Serverless indexes take 15-20 seconds to become operationally searchable after creation.
This single constraint cascaded into a nightmare of complexity:
- Create OpenSearch index → 15-20 seconds of mandatory wait time
- Create AWS Knowledge Base → Remote API call with potential failures
- Create Data Source → Another remote API call
- Configure S3 inclusion prefixes → State management complexity
- Handle failures at any step → Rollback hell
Each step could fail independently, creating inconsistent states between our database and AWS resources. The traditional synchronous approach was doomed from the start.
The "Aha" Moment: Think Like a Restaurant
The breakthrough came from an unexpected analogy. High-end restaurants don't start cooking your meal when you order—they prep ingredients in advance. They maintain pools of ready-to-use components.
What if we pre-provisioned AWS resources and maintained them in pools?
The Solution: Resource Pooling with State Machines
I implemented a multi-layered pool system that transforms the user experience:
- Before: Click → Wait 30+ seconds → Maybe get a knowledge base
- After: Click → Instant assignment from pre-warmed pool → Seamless experience
Architecture Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ User Layer │ │ Business Logic │ │ Resource Pools │
│ │ │ │ │ │
│ Instant Response│◄──►│ Assign from Pool│◄──►│ Pre-warmed AWS │
│ │ │ │ │ Resources │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
┌───────▼────────┐
│ Background │
│ Reconciliation │
│ Workers │
└────────────────┘
The Three-Layer Pool System
Layer 1: OpenSearch Index Pool
CREATE TABLE `opensearch_indexes` (
`id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
`vector_index` VARCHAR(500) NOT NULL,
`vector_field` VARCHAR(255) NOT NULL,
`status` ENUM('PROVISIONING', 'AVAILABLE', 'ASSIGNED', 'DESTROYED', 'CLEANUP', 'FAILED') NOT NULL,
`usable_at` TIMESTAMP NULL -- Critical for 15-20s warming period
);
Layer 2: AWS Knowledge Base Pool
CREATE TABLE `aws_knowledge_bases` (
`id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
`aws_kb_name` VARCHAR(100) NOT NULL,
`index_id` BIGINT UNSIGNED NOT NULL,
`knowledge_base_resource_id` VARCHAR(255) NULL,
`status` ENUM('PROVISIONING', 'AVAILABLE', 'ASSIGNED', 'DESTROYED', 'CLEANUP', 'FAILED') NOT NULL,
FOREIGN KEY (`index_id`) REFERENCES `opensearch_indexes`(`id`)
);
Layer 3: User Assignment Layer
CREATE TABLE `user_knowledge_bases` (
`id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(255) NOT NULL,
`user_id` INT UNSIGNED NOT NULL,
`aws_kb_id` BIGINT UNSIGNED NOT NULL,
FOREIGN KEY (`aws_kb_id`) REFERENCES `aws_knowledge_bases`(`id`)
);
The State Machine That Changes Everything
Each resource follows a deterministic state flow:
PROVISIONING → AVAILABLE → ASSIGNED → CLEANUP → DESTROYED
↓ ↑
FAILED (Reconciliation)
This simple state machine eliminates the complexity of managing partial states and failed operations.
Each resource follows a clear state machine:
- PROVISIONING: Resource is being created
- AVAILABLE: Ready to be assigned to users
- ASSIGNED: Currently in use by a user
- CLEANUP: Marked for deletion
- DESTROYED: Deleted (soft delete)
- FAILED: Provisioning failed, needs cleanup
This approach provides several key benefits:
Index Manager
Index Manager is provision manager which handles the provisioning of Opensearch Indexes in the collections.
Property and Instantiation
Let's create a IndexManager
which describes the provision manager and its property and let's try to have constructor which is utility to instantiate it.
The awsClient
is a custom dependency which is being injected for handling AWS Operations. The store
dependency which handles database operations, folks who would be familiar with sqlc
would have an easy day to understand about this.
type IndexManager struct {
awsClient *aws_cloud.AWSCloudClients
store database.Store
baseLogger *logger.Logger
minSize int
maxSize int
maxConcurrentProvisioner int
reconcileTrigger chan struct{}
}
func NewIndexManager(
awsClient *aws_cloud.AWSCloudClients,
store database.Store,
baseLogger *logger.Logger,
) *IndexManager {
return &IndexManager{
awsClient: awsClient,
store: store,
baseLogger: baseLogger,
minSize: globals.MinIndexPoolSize,
maxSize: globals.MaxIndexPoolSize,
maxConcurrentProvisioner: globals.MaxConcurrentProvisioner,
reconcileTrigger: make(chan struct{}, 1),
}
}
Provision a new index
It is a method of IndexManager
and it basically creates a new index in Opensearch.
func (m *IndexManager) provisionNewIndex(ctx context.Context) error {
indexName := utils.GenerateUniqueIndexName(true)
vectorField := globals.VectorFieldName
result, err := m.store.CreateOpensearchIndex(ctx, database.CreateOpensearchIndexParams{
VectorIndex: indexName,
VectorField: vectorField,
})
if err != nil {
return fmt.Errorf("error initiating opensearch indexes in database: %w", err)
}
lastId, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("error fetching last inserted id in opensearch indexs: %w", err)
}
err = m.awsClient.CreateIndexAndWaitForReady(ctx, indexName, vectorField)
if err != nil {
updateErr := m.store.UpdateOpensearchIndexStatus(ctx, database.UpdateOpensearchIndexStatusParams{
Status: database.OpensearchIndexesStatusFAILED,
ID: uint64(lastId),
})
if updateErr != nil {
return fmt.Errorf("error updating opensearch index status to failed due to err (%w): %w", err, updateErr)
}
return fmt.Errorf("error creating opensearch index remotely: %w", err)
}
err = m.store.FinalizeOpensearchIndex(ctx, uint64(lastId))
if err != nil {
return fmt.Errorf("error finalizing opensearch indexes: %w", err)
}
return nil
}
-
GenerateUniqueIndexName
is utility function that actually generates unique name for indexes. -
CreateOpensearchIndex
is database operation function, which basically creates an Opensearch Index in the database withPROVISIONING
status. -
CreateIndexAndWaitForReady
is AWS operation function which actually creates an index in Opensearch and make sure it has been created.- if the remote operation got failed, then we actually marked the state of which was created in initially in database to
FAILED
using functionUpdateOpensearchIndexStatus
- if the remote operation got failed, then we actually marked the state of which was created in initially in database to
-
FinalizeOpensearchIndex
function just marks the state of initially created index toAVAILABLE
such that its available to be assigned and also fill upusable_at
column that justify warming of Opensearch Indexes.
Reconciling Index Pool
func (m *IndexManager) reconcileIndexPool(ctx context.Context) (bool, bool, error) {
readyCount, err := m.store.CountReadyAvailableIndexes(ctx)
if err != nil {
return false, false, fmt.Errorf("error counting indexes by status: %w", err)
}
warmingCount, err := m.store.CountWarmingAvailableIndexes(ctx)
if err != nil {
return false, false, fmt.Errorf("error counting warming indexes: %w", err)
}
provisioningCount, err := m.store.CountProvisioningIndexes(ctx)
if err != nil {
return false, false, fmt.Errorf("failed to count provisioning indexes: %w", err)
}
if readyCount >= int64(m.minSize) {
m.baseLogger.Info().Msgf("Index pool is healthy: Ready=%d", readyCount)
return true, false, nil
}
totalPotentialPool := readyCount + warmingCount + provisioningCount
if totalPotentialPool >= int64(m.minSize) {
return false, false, nil
}
needed := m.minSize - int(totalPotentialPool)
m.baseLogger.Info().Msgf(
"index pool state: Available=%d, Warming=%d, Provisioning=%d. Need to create %d",
readyCount,
warmingCount,
provisioningCount,
needed,
)
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(m.maxConcurrentProvisioner)
for i := 0; i < needed; i++ {
g.Go(func() error {
m.baseLogger.Info().Msg("dispatching index provisioner task")
err := m.provisionNewIndex(gCtx)
if err != nil {
m.baseLogger.Error().Err(err).Msg("failed to provision new index")
return err
}
m.baseLogger.Info().Msg("successfully provisioned index")
return nil
})
}
if err := g.Wait(); err != nil {
return false, false, fmt.Errorf("reconcilation failed during provision of indexs: %w", err)
}
m.baseLogger.Info().Msg("index reconcilation cycle finished")
finalAvailableCount, err := m.store.CountIndexesByStatus(ctx, database.OpensearchIndexesStatusAVAILABLE)
if err != nil {
return false, false, fmt.Errorf("error countintt available indexes after provisioning: %w", err)
}
return finalAvailableCount >= int64(m.minSize), true, nil
}
-
using database query we count indexes in 'AVAILABLE' status and aren't warming up. What I meant by warming up is that it usually takes 15 to 20 seconds for searchable, hence we consider
usable_at
column too for that, so for that query would be something like this (basically defined as methodCountReadyAvailableIndex
)
-- name: CountReadyAvailableIndexes :one select count(*) from opensearch_indexes where status = 'AVAILABLE' and usable_at is not null and usable_at < now() - interval 1 minute;
-
using database query we also count warming available indexes using following query.
-- name: CountWarmingAvailableIndexes :one select count(*) from opensearch_indexes where status = 'AVAILABLE' and usable_at is not null and usable_at >= now() - interval 1 minute;
-
using database query we also count provisioning indexes using following query.
-- name: CountProvisioningIndexes :one select count(*) from opensearch_indexes where status = 'PROVISIONING' and created_at > now() - interval 10 minute;
We are calculating total pool of indexes by adding all the counts, we are kind of conscious about over-provisioning hence we took this step.
If calculated pool is greater than minimum size than we return.
We then calculated the needed number of indexes
-
To simply handle the concurrency in golang we are using
errgroup
because,- if any goroutine in the group returns an error, the entire group is considered failed, and we can handle the error appropriately.
- if one goroutines fails, context get cancelled, signalling other goroutines to stop their work early.
-
Wait()
method block until all goroutines complete.
We create a grouping contexted using
errgroup.WithContext(ctx)
additionally we also limit the concurrency usingSetLimit
method.We run loop until the needed mark is met, and we start goroutine for each count and each goroutine executed
provisionNewIndex
function.And we wait for all goroutine to complete using
Wait()
method.And then we calculate the final
AVAILABLE
count for indexes and we make sure its above the minimum pool size.
Trigger Reconcilation
- We would be reconciling and maintaining the pool of indexes periodically, but there would be instances where we need to trigger sometimes, hence you notice in
NewIndexManager
we created a buffered channel of size 1 and its for better communication and triggering, and the size describes that its not for subsequent triggering but just we balance out with manual triggering and periodic reconciliation. -
triggerReconcilation
does that.
func (m *IndexManager) triggerReconcilation() {
select {
case m.reconcileTrigger <- struct{}{}:
m.baseLogger.Info().Msg("index reconcilation successfully trigger")
default:
m.baseLogger.Info().Msg("index reconcilation trigger ignored, reconcilation is already running")
}
}
Reconcilation Worker
func (m *IndexManager) reconcilationWorker(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
if _, _, err := m.reconcileIndexPool(ctx); err != nil {
m.baseLogger.Error().Err(err).Msg("initial reconcilation failed")
return err
}
for {
select {
case <-ctx.Done():
m.baseLogger.Info().Msg("index reconcilation worker shutting down")
return ctx.Err()
case <-ticker.C:
m.baseLogger.Info().Msg("periodic trigger of reconcilation of indexes")
if _, _, err := m.reconcileIndexPool(ctx); err != nil {
m.baseLogger.Error().Err(err).Msg("periodic index reconcilation failed")
return err
}
case <-m.reconcileTrigger:
m.baseLogger.Info().Msg("event-driven index trigger: starting reconcilation")
select {
case <-m.reconcileTrigger:
default:
}
if _, _, err := m.reconcileIndexPool(ctx); err != nil {
m.baseLogger.Error().Err(err).Msg("event-driven index reconcilation failed")
}
}
}
}
- We have worker which periodically hit the function
reconcileIndexPool
so basically by listening on ticker channel. - Similarly for manual triggering we are listening on
reconcileTrigger
channel which we initialize earlier, listen on that.
Claiming Indexes
- We have query which basically runs to claim indexes based on database state
-- name: ClaimAvailableIndex :one
select
id, vector_index, vector_field
from opensearch_indexes
where status = 'AVAILABLE'
and usable_at is not null
and usable_at < now() - interval 1 minute
order by rand()
limit 1
for update skip locked;
- the reason we have kept
for update skip locked
is because we are choosing randomly from record we fetched based on conditions, if its locked we moved on to next record to select. - And once we fetch that record, we mark it as assigned within the transaction using following query.
-- name: MarkIndexAsAssigned :exec
update opensearch_indexes
set
status = 'ASSIGNED', assigned_at = now()
where id = ?;
- So this is whole transaction defined as
TxClaimAvailableIndex
func (store *SqlStore) TxClaimAvailableIndex(
ctx context.Context,
) (*ClaimAvailableIndexRow, error) {
var response ClaimAvailableIndexRow
err := store.execTx(ctx, func(q *Queries) error {
index, err := q.ClaimAvailableIndex(ctx)
if err != nil {
return fmt.Errorf("failed to claim available index: %w", err)
}
err = q.MarkIndexAsAssigned(ctx, index.ID)
if err != nil {
return fmt.Errorf("failed to mark index as assigned: %w", err)
}
response = index
return nil
})
if err != nil {
return nil, err
}
return &response, nil
}
- After claiming index from database we trigger manually reconciliation of indexes.
func (m *IndexManager) ClaimAvailableIndex(ctx context.Context) (*database.ClaimAvailableIndexRow, error) {
index, err := m.store.TxClaimAvailableIndex(ctx)
if err != nil {
return nil, err
}
go m.triggerReconcilation()
return index, nil
}
Cleaning up indexes
func (m *IndexManager) cleanupRemoteResources(ctx context.Context, indexName string) error {
err := m.awsClient.DeleteIndex(ctx, indexName)
if err != nil {
return fmt.Errorf("error deleting opensearch index remotely: %w", err)
}
return nil
}
func (m *IndexManager) CleanupIndexes(ctx context.Context) error {
indexesForCleanup, err := m.store.FetchIndexesForCleanup(ctx)
if err != nil {
return fmt.Errorf("error fetching indexes for clean up: %w", err)
}
if len(indexesForCleanup) == 0 {
m.baseLogger.Info().Msg("no indexes to cleanup")
return nil
}
m.baseLogger.Info().Msgf("found %d indexes to cleanup", len(indexesForCleanup))
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(m.maxConcurrentProvisioner)
for _, idx := range indexesForCleanup {
indexToCleanup := idx
g.Go(func() error {
if err := gCtx.Err(); err != nil {
m.baseLogger.Warn().Msgf("cleanup cancelled for index %s due to earlier error", indexToCleanup.VectorField)
return err
}
err := m.cleanupRemoteResources(gCtx, indexToCleanup.VectorIndex)
if err != nil {
m.baseLogger.Error().
Err(err).
Str("index", indexToCleanup.VectorIndex).
Msg("failed to cleanup remote resources for index")
}
_, err = m.store.DeleteOpensearchIndex(gCtx, indexToCleanup.ID)
if err != nil {
m.baseLogger.Error().
Err(err).
Uint64("id", indexToCleanup.ID).
Msg("failed to mark index as destroyed in database")
return fmt.Errorf("failed to update database status: %w", err)
}
m.baseLogger.Info().
Str("index", indexToCleanup.VectorIndex).
Msg("successfully cleaned up index")
return nil
})
}
if err := g.Wait(); err != nil {
m.baseLogger.Error().Err(err).Msg("cleanup cycle completed with one or more failures")
}
return nil
}
- We are basically handling remote index cleanup in Opensearch using
cleanupRemoteResources
method. - In
CleanupIndexes
we first fetch the indexes for cleanup and query is like this:
-- name: FetchIndexesForCleanup :many
select
oi.*
from opensearch_indexes oi
left join
aws_knowledge_bases akb on oi.id = akb.index_id
where
oi.status = 'FAILED'
or (oi.status = 'PROVISIONING' and oi.created_at < now() - interval 10 minute)
or (
oi.status = 'CLEANUP'
and (akb.id is null or akb.status = 'DESTROYED')
);
- We are doing joins because we wanted to make sure that indexes which are set for `CLEANUP` do have destroyed knowledge base other wise we would have cascading inconsistency, of course it would not be enforced by database, but we need to take care of that as we are working with soft-deletes which is actually a logical layer of deletion.
- After fetching we trying to concurrently trying to cleanup remotely and once cleaned up remotely we try to soft-delete in database using this query
-- name: DeleteOpensearchIndex :execresult
update opensearch_indexes
set
status = 'DESTROYED',
deleted_at = current_timestamp
where id = ?;
CRON based Cleanup Worker
- According to our requirements we are never going to run this application multiple instances, hence a single instance would run and we would try to initially do vertical scaling, because we are in kind of beta mode so not expecting much traffic hence we are setting CRON job within application to schedule Cleanup, but it would be drawback and not suitable if we are ran multiple instances of this application because then coordination would be complex and that would be another day problem to solve for.
func (m *IndexManager) CleanupIndexWorker(ctx context.Context) {
location, err := time.LoadLocation("America/Chicago")
if err != nil {
m.baseLogger.Error().Err(err).Msg("cannot load the timezone")
return
}
c := cron.New(cron.WithLocation(location))
schedule := "30 3 * * *"
_, err = c.AddFunc(schedule, func() {
m.baseLogger.Info().
Str("schedule", schedule).
Str("timezone", location.String()).
Msg("triggered scheduled index cleanup job")
jobCtx, cancel := context.WithTimeout(context.Background(), 2*time.Hour)
defer cancel()
if err := m.CleanupIndexes(jobCtx); err != nil {
m.baseLogger.Error().Err(err).Msg("scheduled index cleanup job failed")
} else {
m.baseLogger.Info().Msg("scheduled index cleanup job completed successfully")
}
})
if err != nil {
m.baseLogger.Error().Err(err).Msg("fatal: could not schedule cleanup job")
return
}
m.baseLogger.Info().
Str("schedule", schedule).
Str("timezone", location.String()).
Msg("index cleanup job scheduled successfully")
c.Start()
<-ctx.Done()
m.baseLogger.Info().Msg("shutting down scheduled index cleanup worker")
stopCtx := c.Stop()
<-stopCtx.Done()
m.baseLogger.Info().Msg("index cleanup scheduler stopped gracefully")
}
Run Index Manager
- We define method to run Index Manager wrapped in method
Run
func (m *IndexManager) Run(ctx context.Context) error {
m.baseLogger.Info().Msg("starting index manager workers")
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
err := m.reconcilationWorker(ctx)
return err
})
g.Go(func() error {
m.CleanupIndexWorker(gCtx)
return gCtx.Err()
})
m.baseLogger.Info().Msg("all index manager workers are running")
err := g.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
m.baseLogger.Error().Err(err).Msg("index manager stopped due to an error")
return err
}
m.baseLogger.Info().Msg("index manager stopped gracefully")
return nil
}
- We start in background as goroutine the
reconcilationWorker
andCleanupIndexWorker
.
Knowledge Base Manager
- Let's define Knowledge base Manager
type KbManager struct {
awsClient *aws_cloud.AWSCloudClients
store database.Store
config *utils.Config
baseLogger *logger.Logger
indexManager *IndexManager
minSize int
maxSize int
maxConcurrentProvisioner int
reconcileTrigger chan struct{}
}
- Initiating Knowledge base manager
func NewKbManager(
awsClient *aws_cloud.AWSCloudClients,
store database.Store,
baseLogger *logger.Logger,
config *utils.Config,
indexManager *IndexManager,
) *KbManager {
return &KbManager{
awsClient: awsClient,
store: store,
baseLogger: baseLogger,
config: config,
minSize: globals.MinKbPoolSize,
maxSize: globals.MaxKbPoolSize,
maxConcurrentProvisioner: globals.MaxConcurrentProvisioner,
indexManager: indexManager,
reconcileTrigger: make(chan struct{}, 1),
}
}
Provision New Knowledge Base
- The reason we are injecting index manager into knowledge base manager because we need to claim indexes for knowledge bases
func (m *KbManager) provisionNewKnowledgeBase(ctx context.Context) error {
m.baseLogger.Info().Msg("attempting to claim an available index for new knowledge base")
claimedIndex, err := m.indexManager.ClaimAvailableIndex(ctx)
if err != nil {
if database.IsRecordNotFound(err) {
m.baseLogger.Warn().Msg("no available index to claim")
go m.indexManager.triggerReconcilation()
return fmt.Errorf("no available index in the pool")
}
return fmt.Errorf("failed to claim available index: %w", err)
}
m.baseLogger.Info().Msg("successfully claimed indexed")
dataSourceName := utils.GenerateUniqueIndexName(false)
storagePrefix := utils.GenerateS3Prefix()
kbName := utils.GenerateKnowledgeBaseName()
inclusionPrefix := fmt.Sprintf("%s/%s/", m.config.AWSParentFolder, storagePrefix)
result, err := m.store.CreateAwsKnowledgeBase(ctx, database.CreateAwsKnowledgeBaseParams{
AwsKbName: kbName,
IndexID: claimedIndex.ID,
DataSourceName: dataSourceName,
StoragePrefix: inclusionPrefix,
})
if err != nil {
return fmt.Errorf("error initiating knowledge base in database: %w", err)
}
lastId, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("error fetching last inserted id in the knowledge base: %w", err)
}
kbResult, err := m.awsClient.CreateKnowledgeBase(ctx, aws_cloud.CreateKnowledgeBaseParams{
Name: &kbName,
RoleARN: &m.config.AWSIamRoleARN,
S3Location: &m.config.AWSDefaultS3Uri,
EmbeddingModelARN: &m.config.AWSEmbeddingModelARN,
CollectionARN: &m.config.AWSCollectionARN,
VectorIndexName: &claimedIndex.VectorIndex,
VectorField: &claimedIndex.VectorField,
})
if err != nil {
indexErr := m.store.MarkOpensearchIndexForCleanup(ctx, claimedIndex.ID)
if indexErr != nil {
return fmt.Errorf("error marking index for cleanup while rollbacking failure in knowledge base creation (%w): %w", err, indexErr)
}
_, rollbackErr := m.store.DeleteAwsKnowledgeBase(ctx, uint64(lastId))
if rollbackErr != nil {
return fmt.Errorf("error rollbacking knowledge base due to error(%w): %w", err, rollbackErr)
}
return fmt.Errorf("failed to create knowledge base: %w", err)
}
dataSourceResult, err := m.awsClient.CreateDataSource(
ctx,
dataSourceName,
kbResult.KnowledgeBase.KnowledgeBaseId,
&m.config.AWSBucketARN,
&m.config.AWSBucketOwnerID,
&inclusionPrefix,
)
if err != nil {
_, kbErr := m.awsClient.DeleteKnowledgeBase(ctx, *kbResult.KnowledgeBase.KnowledgeBaseId)
if kbErr != nil {
return fmt.Errorf("error deleting knowledge base (due to - %w): %w", err, kbErr)
}
indexErr := m.store.MarkOpensearchIndexForCleanup(ctx, claimedIndex.ID)
if indexErr != nil {
return fmt.Errorf("error marking index for cleanup while rollbacking failure in knowledge base creation (%w): %w", err, indexErr)
}
_, rollbackErr := m.store.DeleteAwsKnowledgeBase(ctx, uint64(lastId))
if rollbackErr != nil {
return fmt.Errorf("error rollbacking knowledge base due to error(%w): %w", err, rollbackErr)
}
return fmt.Errorf("failed to create data source: %w", err)
}
err = m.store.FinalizedAwsKnowledgeBase(ctx, database.FinalizedAwsKnowledgeBaseParams{
DataSourceID: sql.NullString{
Valid: true,
String: *dataSourceResult.DataSource.DataSourceId,
},
KnowledgeBaseResourceID: sql.NullString{
Valid: true,
String: *kbResult.KnowledgeBase.KnowledgeBaseId,
},
ID: uint64(lastId),
})
if err != nil {
dataSourceErr := m.awsClient.DeleteDataSource(ctx, kbResult.KnowledgeBase.KnowledgeBaseId, dataSourceResult.DataSource.DataSourceId)
if dataSourceErr != nil {
return fmt.Errorf("error deleting data source (due to: %w): %w", err, dataSourceErr)
}
_, knowledgeBaseErr := m.awsClient.DeleteKnowledgeBase(ctx, *kbResult.KnowledgeBase.KnowledgeBaseId)
if knowledgeBaseErr != nil {
return fmt.Errorf("error deleting knowledge base (due to: %w): %w", err, knowledgeBaseErr)
}
indexErr := m.store.MarkOpensearchIndexForCleanup(ctx, claimedIndex.ID)
if indexErr != nil {
return fmt.Errorf("error marking index for cleanup while rollbacking failure in knowledge base creation (%w): %w", err, indexErr)
}
return fmt.Errorf("error finalizing knowledge base: %w", err)
}
return nil
}
- Initially we are claiming indexes from index manager
- Then we create AWS Knowledge base in our database using following query,
insert into aws_knowledge_bases (
aws_kb_name,
index_id,
data_source_name,
storage_prefix,
status
) values (
?, ?, ?, ?, 'PROVISIONING'
);
- The we create knowledge base actually in AWS, and if something fails we handle rollback by marking index for cleanup and delete the initiated state in database.
- Then after creating knowledge base we need to create data source and then we finalize the knowledge base creation.
-- name: FinalizedAwsKnowledgeBase :exec
update aws_knowledge_bases
set
data_source_id = ?,
knowledge_base_resource_id = ?,
status = 'AVAILABLE'
where id = ?;
- again we handle rollback if finalization failed by deleting remote data sources, knowledge bases and marking opensearch indexes for cleanup.
Reconciling Knowledge Base Pool
- Similar to what we do in index manager for reconciliation of pool we do the same for knowledge base
- The only difference is we don't have warming instances for this.
func (m *KbManager) reconcileKnowledgeBasePool(ctx context.Context) error {
availableCount, err := m.store.CountKnowledgeBaseByStatus(ctx, database.AwsKnowledgeBasesStatusAVAILABLE)
if err != nil {
return fmt.Errorf("failed to count available knowledge bases: %w", err)
}
provisioningCount, err := m.store.CountProvisioningKnowledgeBase(ctx)
if err != nil {
return fmt.Errorf("failed to count provisioning knowledge base: %w", err)
}
count := availableCount + provisioningCount
if count >= int64(m.minSize) {
return nil
}
needed := m.minSize - int(count)
m.baseLogger.Info().Msgf(
"knowledge base pool state: Available=%d, Provisioning=%d. Need to create %d",
availableCount,
provisioningCount,
needed,
)
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(m.maxConcurrentProvisioner)
for i := 0; i < needed; i++ {
g.Go(func() error {
m.baseLogger.Info().Msg("dispatching knowledge base provisioner task")
err := m.provisionNewKnowledgeBase(gCtx)
if err != nil {
m.baseLogger.Error().Err(err).Msg("failed to provision knowledge base")
return err
}
m.baseLogger.Info().Msg("successfully provisioned a knowledge base")
return nil
})
}
if err := g.Wait(); err != nil {
return fmt.Errorf("reconcilation failed during provisioning knowledge base: %w", err)
}
m.baseLogger.Info().Msg("knowledge base reconcilation cycle finished")
return nil
}
- Then whenever whatever components where knowledge base manager will be utilize they would be able to get available knowledge base using following method.
func (m *KbManager) AssignedAvailableKnowledgeBase(
ctx context.Context,
args database.TxAssignKnowledgeBaseParams,
) (*int64, error) {
knowledgeBaseId, err := m.store.TxAssignKnowledgeBase(ctx, args)
if err != nil {
return nil, err
}
go m.triggerReconcilation()
return knowledgeBaseId, nil
}
- And we are triggering for reconciliation similar to index manager
func (m *KbManager) triggerReconcilation() {
select {
case m.reconcileTrigger <- struct{}{}:
m.baseLogger.Info().Msg("knowledge base reconcilation successfully triggered")
default:
m.baseLogger.Info().Msg("knowledge base reconcilation trigger ignored, a reconcilation is already running")
}
}
Cleaning up Knowledge Bases
func (m *KbManager) cleanupRemoteResources(ctx context.Context, args cleanupRemoteResourcesParams) error {
err := m.awsClient.DeleteDataSource(ctx, &args.kbResourceId, &args.dataSourceId)
if err != nil {
return fmt.Errorf("error deleting data source remotely: %w", err)
}
_, err = m.awsClient.DeleteKnowledgeBase(ctx, args.kbResourceId)
if err != nil {
return fmt.Errorf("error deleting knowledge base remotely: %w", err)
}
err = m.awsClient.DeleteMultipleObjects(ctx, args.inclusionPrefix)
if err != nil {
return fmt.Errorf("error deleting knowledge base documents remotely: %w", err)
}
return nil
}
func (m *KbManager) CleanupKnowledgeBases(
ctx context.Context,
) error {
kbForCleanup, err := m.store.FetchKnowledgeBasesForCleanup(ctx)
if err != nil {
return fmt.Errorf("error fetching knowledge base cleaning up:%w", err)
}
if len(kbForCleanup) == 0 {
m.baseLogger.Info().Msg("no knowledge bases to cleanup")
return nil
}
m.baseLogger.Info().Msgf("found %d knowledge bases to cleanup", len(kbForCleanup))
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(m.maxConcurrentProvisioner)
for _, kb := range kbForCleanup {
kbToCleanup := kb
g.Go(func() error {
if err := gCtx.Err(); err != nil {
m.baseLogger.Warn().Msgf("cleanup cancelled for knowledge base due to earlier error")
return err
}
cleanupParams := cleanupRemoteResourcesParams{
kbResourceId: kbToCleanup.KnowledgeBaseResourceID.String,
dataSourceId: kbToCleanup.DataSourceID.String,
inclusionPrefix: kbToCleanup.StoragePrefix,
}
err := m.cleanupRemoteResources(ctx, cleanupParams)
if err != nil {
m.baseLogger.Error().Err(err).Msg("failed to cleanup remote resources for knowledge bases")
}
_, err = m.store.DeleteAwsKnowledgeBase(ctx, kbToCleanup.ID)
if err != nil {
msg := "failed to update database status for cleanup"
m.baseLogger.Error().Err(err).Msg(msg)
return fmt.Errorf("%s: %w", msg, err)
}
return nil
})
}
if err := g.Wait(); err != nil {
m.baseLogger.Error().Err(err).Msg("cleanup cycle completed with one or more failures")
return fmt.Errorf("concurrent cleanup failed: %w", err)
}
return nil
}
- Similar to the index manager we cleanup remote resources, we mark Opensearch Index in Index Manager for cleanup and then delete the state in database for Knowledge base.
- We also setup cron job for the same
func (m *KbManager) CleanupKnowledgeBaseWorker(ctx context.Context) {
location, err := time.LoadLocation("America/Chicago")
if err != nil {
m.baseLogger.Error().Err(err).Msg("cannot load the timezone")
return
}
c := cron.New(cron.WithLocation(location))
schedule := "30 2 * * *"
_, err = c.AddFunc(schedule, func() {
m.baseLogger.Info().Str("schedule", schedule).Str("timezone", location.String()).Msg("triggered scheduled cleanup job")
jobCtx, cancel := context.WithTimeout(context.Background(), 2*time.Hour)
defer cancel()
if err := m.CleanupKnowledgeBases(jobCtx); err != nil {
m.baseLogger.Error().Err(err).Msg("scheduled cleanup job failed")
} else {
m.baseLogger.Info().Msg("scheduled cleanup job completed successfully")
}
})
if err != nil {
m.baseLogger.Error().Err(err).Msg("FATAL: could not schedule cleanup job")
return
}
m.baseLogger.Info().Str("schedule", schedule).Str("timezone", location.String()).Msg("cleanup job scheduled successfully")
c.Start()
<-ctx.Done()
m.baseLogger.Info().Msg("shutting down scheduled cleanup worker")
stopCtx := c.Stop()
<-stopCtx.Done()
m.baseLogger.Info().Msg("cleanup scheduler stopped gracefully")
}
Running Knowledge base manager
func (m *KbManager) Run(ctx context.Context) error {
m.baseLogger.Info().Msg("starting knowledge base manager workers")
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
err := m.reconcilationWorker(gCtx)
return err
})
g.Go(func() error {
m.CleanupKnowledgeBaseWorker(gCtx)
return gCtx.Err()
})
m.baseLogger.Info().Msg("all knowledge base manager workers are running")
err := g.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
m.baseLogger.Error().Err(err).Msg("knowledge base manager stopped due to an err")
return err
}
m.baseLogger.Info().Msg("knowledge base manager stopped gracefully")
return nil
}
- As background goroutine we start reconciliation worker and cleanup worker.
Prime Pooling
- Prime pooling is something very integral part of this orchestration drama because for the first time when application is running it won't have anything no pre-provision remote indexes or knowledge bases, so we wanted to initially pre-provision minimum pool requirement for those resources and then start the application.
- We define method
PrimePool
for that. - We are having local context whose expiry is 60 minutes which is enough for this.
- And we make sure all the indexes and knowledge bases are ready and not just warming up.
func (m *KbManager) PrimePool(ctx context.Context) error {
m.baseLogger.Info().Msg("starting index pool priming...")
indexReadyCtx, cancelIndex := context.WithTimeout(ctx, 60*time.Minute)
defer cancelIndex()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
indexReady := false
for !indexReady {
select {
case <-indexReadyCtx.Done():
return fmt.Errorf("timeout waiting for index pool to be ready: %w", indexReadyCtx.Err())
case <-ticker.C:
hasEnoughIndexes, indexesWereProvisioned, err := m.indexManager.reconcileIndexPool(ctx)
if err != nil {
return fmt.Errorf("error reconciling index pool: %w", err)
}
if hasEnoughIndexes {
if indexesWereProvisioned {
m.baseLogger.Info().Msg("new indexes were provisioned, waiting before reconciling knowledge bases")
time.Sleep(30 * time.Second)
}
m.baseLogger.Info().Msg("index pool has sufficient available indexes")
indexReady = true
}
}
}
err := m.reconcileKnowledgeBasePool(ctx)
if err != nil {
return fmt.Errorf("error reconciling prime knowledge base pool: %w", err)
}
m.baseLogger.Info().Msg("knowledge base pool priming successful")
return nil
}
Symphony - How it all come together to play out.
- I have provided you
main.go
that's the entry point of application and this is how we handle things - We are creating context which gets canceled automatically when OS signals like pressing Ctrl + C (syscall.SIGINT), when systems sends terminating signal (syscall.SIGTERM).
- Then we are creating logging object based on zerolog
- we are loading our configuration from yaml file.
- creating instance of database and AWS for their corresponding operation
- Then we create group context for handling background goroutines
- Then we initially synchronously make sure all the resources are present before starting the application using
PrimePool
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
logConfig := logger.LoggerConfig{
LogLevel: zerolog.DebugLevel,
FileConfig: &logger.FileConfig{
Path: "../logs/chatbot_engine/chatbot_engine.log",
MaxSize: 10,
MaxBackups: 5,
MaxAge: 30,
Compress: true,
},
}
baseLogger := logger.NewLogger(logConfig)
config, err := utils.LoadConfig("./chatbot_engine.yaml")
if err != nil {
baseLogger.Fatal().Err(err).Msg("cannot load configuration")
}
switch config.Environment {
case globals.ProdEnv:
gin.SetMode(gin.ReleaseMode)
baseLogger.Info().Msg("Gin mode set to 'release'")
default:
gin.SetMode(gin.DebugMode)
baseLogger.Info().Msg("Gin mode set to 'debug'")
}
db, err := sql.Open("mysql", config.DBSource)
if err != nil {
baseLogger.Fatal().Err(err).Msg("error opening database connection")
}
defer db.Close()
pingCtx, cancelPing := context.WithTimeout(ctx, 5*time.Second)
defer cancelPing()
err = db.PingContext(pingCtx)
if err != nil {
baseLogger.Fatal().Err(err).Msg("failed to connect to database (ping failed)")
}
db.SetMaxOpenConns(config.DBMaxOpenConn)
db.SetMaxIdleConns(config.DBMaxIdleConn)
db.SetConnMaxLifetime(time.Duration(config.DBConnMaxLifetime))
baseLogger.Info().Msg("database connection pool initialized successfully")
store := database.NewSqlStore(db)
group, groupCtx := errgroup.WithContext(ctx)
var awsClient *aws_cloud.AWSCloudClients
if config.Environment != globals.DevEnv {
var err error
awsClient, err = aws_cloud.NewAWSCloudClients(
groupCtx,
aws_cloud.AwsCloudClientArg{
DevEnvironmentCfg: nil,
Region: config.AWSDefaultRegion,
OpensearchEndpoint: config.AWSOpensearchEndpoint,
BucketName: config.AWSBucket,
QueueName: config.AWSProdQueueName,
KeyID: config.AWSKeyID,
},
)
if err != nil {
baseLogger.Fatal().Err(err).Msg("error creating aws client")
}
} else {
var err error
awsClient, err = aws_cloud.NewAWSCloudClients(
groupCtx,
aws_cloud.AwsCloudClientArg{
DevEnvironmentCfg: &aws_cloud.DevEnvironmentConfig{
AccessKey: config.AWSAccessKey,
Secret: config.AWSSecretAccess,
},
LogEnable: config.AwsLogEnable,
Region: config.AWSDefaultRegion,
OpensearchEndpoint: config.AWSOpensearchEndpoint,
BucketName: config.AWSBucket,
QueueName: config.AWSDevQueueName,
KeyID: config.AWSKeyID,
},
)
if err != nil {
baseLogger.Fatal().Err(err).Msg("error creating aws client")
}
}
indexManager := knowledge_base.NewIndexManager(awsClient, store, baseLogger)
kbManager := knowledge_base.NewKbManager(awsClient, store, baseLogger, &config, indexManager)
if err := kbManager.PrimePool(groupCtx); err != nil {
baseLogger.Fatal().Err(err).Msg("error prime pooling knowledge base and indexes")
}
group.Go(func() error {
err := indexManager.Run(groupCtx)
if err != nil {
baseLogger.Error().Err(err).Msg("index manager exited with error")
}
return err
})
group.Go(func() error {
err := kbManager.Run(groupCtx)
if err != nil {
baseLogger.Error().Err(err).Msg("knowledge base manager exited with error")
}
return err
})
err = group.Wait()
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) {
baseLogger.Error().Err(err).Msg("application exited due to error")
} else {
baseLogger.Info().Msg("application exiting gracefully")
}
baseLogger.Info().Msg("Bye :)")
}
This is how I worked around pre-provision, I hope this helps.
Thanks :)
Twitter: X
Top comments (0)