DEV Community

DEVunderdog
DEVunderdog

Posted on

How I Cut AWS RAG System Response Time from 30 Seconds to 100ms

A deep dive into transforming a painful user experience into seamless interactions using resource pooling and state machines

The Pain Point That Changed Everything

Picture this: You've just built what you think is an elegant RAG (Retrieval Augmented Generation) system using AWS Bedrock, OpenSearch Serverless, and S3. The architecture looks clean on paper. Then you hit the "create knowledge base" button and... wait. And wait. Thirty seconds later, your users are still staring at a loading spinner.

This isn't just poor UX—it's a business killer in today's instant-gratification world.

The Hidden AWS Reality

Here's what the AWS documentation doesn't make crystal clear:

OpenSearch Serverless indexes take 15-20 seconds to become operationally searchable after creation.

This single constraint cascaded into a nightmare of complexity:

  1. Create OpenSearch index → 15-20 seconds of mandatory wait time
  2. Create AWS Knowledge Base → Remote API call with potential failures
  3. Create Data Source → Another remote API call
  4. Configure S3 inclusion prefixes → State management complexity
  5. Handle failures at any step → Rollback hell

Each step could fail independently, creating inconsistent states between our database and AWS resources. The traditional synchronous approach was doomed from the start.

The "Aha" Moment: Think Like a Restaurant

The breakthrough came from an unexpected analogy. High-end restaurants don't start cooking your meal when you order—they prep ingredients in advance. They maintain pools of ready-to-use components.

What if we pre-provisioned AWS resources and maintained them in pools?

The Solution: Resource Pooling with State Machines

I implemented a multi-layered pool system that transforms the user experience:

  • Before: Click → Wait 30+ seconds → Maybe get a knowledge base
  • After: Click → Instant assignment from pre-warmed pool → Seamless experience

Architecture Overview

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   User Layer    │    │  Business Logic │    │  Resource Pools │
│                 │    │                 │    │                 │
│ Instant Response│◄──►│ Assign from Pool│◄──►│ Pre-warmed AWS  │
│                 │    │                 │    │    Resources    │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                       │
                                               ┌───────▼────────┐
                                               │ Background     │
                                               │ Reconciliation │
                                               │ Workers        │
                                               └────────────────┘
Enter fullscreen mode Exit fullscreen mode

The Three-Layer Pool System

Layer 1: OpenSearch Index Pool

CREATE TABLE `opensearch_indexes` (
    `id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    `vector_index` VARCHAR(500) NOT NULL,
    `vector_field` VARCHAR(255) NOT NULL,
    `status` ENUM('PROVISIONING', 'AVAILABLE', 'ASSIGNED', 'DESTROYED', 'CLEANUP', 'FAILED') NOT NULL,
    `usable_at` TIMESTAMP NULL -- Critical for 15-20s warming period
);
Enter fullscreen mode Exit fullscreen mode

Layer 2: AWS Knowledge Base Pool

CREATE TABLE `aws_knowledge_bases` (
    `id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    `aws_kb_name` VARCHAR(100) NOT NULL,
    `index_id` BIGINT UNSIGNED NOT NULL,
    `knowledge_base_resource_id` VARCHAR(255) NULL,
    `status` ENUM('PROVISIONING', 'AVAILABLE', 'ASSIGNED', 'DESTROYED', 'CLEANUP', 'FAILED') NOT NULL,
    FOREIGN KEY (`index_id`) REFERENCES `opensearch_indexes`(`id`)
);
Enter fullscreen mode Exit fullscreen mode

Layer 3: User Assignment Layer

CREATE TABLE `user_knowledge_bases` (
    `id` BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    `name` VARCHAR(255) NOT NULL,
    `user_id` INT UNSIGNED NOT NULL,
    `aws_kb_id` BIGINT UNSIGNED NOT NULL,
    FOREIGN KEY (`aws_kb_id`) REFERENCES `aws_knowledge_bases`(`id`)
);
Enter fullscreen mode Exit fullscreen mode

The State Machine That Changes Everything

Each resource follows a deterministic state flow:

PROVISIONING → AVAILABLE → ASSIGNED → CLEANUP → DESTROYED
      ↓             ↑
    FAILED    (Reconciliation)
Enter fullscreen mode Exit fullscreen mode

This simple state machine eliminates the complexity of managing partial states and failed operations.

Each resource follows a clear state machine:

  • PROVISIONING: Resource is being created
  • AVAILABLE: Ready to be assigned to users
  • ASSIGNED: Currently in use by a user
  • CLEANUP: Marked for deletion
  • DESTROYED: Deleted (soft delete)
  • FAILED: Provisioning failed, needs cleanup

This approach provides several key benefits:

Index Manager

Index Manager is provision manager which handles the provisioning of Opensearch Indexes in the collections.

Property and Instantiation

Let's create a IndexManager which describes the provision manager and its property and let's try to have constructor which is utility to instantiate it.

The awsClient is a custom dependency which is being injected for handling AWS Operations. The store dependency which handles database operations, folks who would be familiar with sqlc would have an easy day to understand about this.

type IndexManager struct {
    awsClient                *aws_cloud.AWSCloudClients
    store                    database.Store
    baseLogger               *logger.Logger
    minSize                  int
    maxSize                  int
    maxConcurrentProvisioner int

    reconcileTrigger chan struct{}
}

func NewIndexManager(
    awsClient *aws_cloud.AWSCloudClients,
    store database.Store,
    baseLogger *logger.Logger,
) *IndexManager {
    return &IndexManager{
        awsClient:                awsClient,
        store:                    store,
        baseLogger:               baseLogger,
        minSize:                  globals.MinIndexPoolSize,
        maxSize:                  globals.MaxIndexPoolSize,
        maxConcurrentProvisioner: globals.MaxConcurrentProvisioner,

        reconcileTrigger: make(chan struct{}, 1),
    }
}
Enter fullscreen mode Exit fullscreen mode

Provision a new index

It is a method of IndexManager and it basically creates a new index in Opensearch.

func (m *IndexManager) provisionNewIndex(ctx context.Context) error {
    indexName := utils.GenerateUniqueIndexName(true)
    vectorField := globals.VectorFieldName

    result, err := m.store.CreateOpensearchIndex(ctx, database.CreateOpensearchIndexParams{
        VectorIndex: indexName,
        VectorField: vectorField,
    })
    if err != nil {
        return fmt.Errorf("error initiating opensearch indexes in database: %w", err)
    }

    lastId, err := result.LastInsertId()
    if err != nil {
        return fmt.Errorf("error fetching last inserted id in opensearch indexs: %w", err)
    }

    err = m.awsClient.CreateIndexAndWaitForReady(ctx, indexName, vectorField)
    if err != nil {
        updateErr := m.store.UpdateOpensearchIndexStatus(ctx, database.UpdateOpensearchIndexStatusParams{
            Status: database.OpensearchIndexesStatusFAILED,
            ID:     uint64(lastId),
        })
        if updateErr != nil {
            return fmt.Errorf("error updating opensearch index status to failed due to err (%w): %w", err, updateErr)
        }
        return fmt.Errorf("error creating opensearch index remotely: %w", err)
    }

    err = m.store.FinalizeOpensearchIndex(ctx, uint64(lastId))
    if err != nil {
        return fmt.Errorf("error finalizing opensearch indexes: %w", err)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • GenerateUniqueIndexName is utility function that actually generates unique name for indexes.
  • CreateOpensearchIndex is database operation function, which basically creates an Opensearch Index in the database with PROVISIONING status.
  • CreateIndexAndWaitForReady is AWS operation function which actually creates an index in Opensearch and make sure it has been created.
    • if the remote operation got failed, then we actually marked the state of which was created in initially in database to FAILED using function UpdateOpensearchIndexStatus
  • FinalizeOpensearchIndex function just marks the state of initially created index to AVAILABLE such that its available to be assigned and also fill up usable_at column that justify warming of Opensearch Indexes.

Reconciling Index Pool

func (m *IndexManager) reconcileIndexPool(ctx context.Context) (bool, bool, error) {
    readyCount, err := m.store.CountReadyAvailableIndexes(ctx)
    if err != nil {
        return false, false, fmt.Errorf("error counting indexes by status: %w", err)
    }

    warmingCount, err := m.store.CountWarmingAvailableIndexes(ctx)
    if err != nil {
        return false, false, fmt.Errorf("error counting warming indexes: %w", err)
    }

    provisioningCount, err := m.store.CountProvisioningIndexes(ctx)
    if err != nil {
        return false, false, fmt.Errorf("failed to count provisioning indexes: %w", err)
    }

    if readyCount >= int64(m.minSize) {
        m.baseLogger.Info().Msgf("Index pool is healthy: Ready=%d", readyCount)
        return true, false, nil
    }

    totalPotentialPool := readyCount + warmingCount + provisioningCount

    if totalPotentialPool >= int64(m.minSize) {
        return false, false, nil
    }

    needed := m.minSize - int(totalPotentialPool)

    m.baseLogger.Info().Msgf(
        "index pool state: Available=%d, Warming=%d, Provisioning=%d. Need to create %d",
        readyCount,
        warmingCount,
        provisioningCount,
        needed,
    )

    g, gCtx := errgroup.WithContext(ctx)

    g.SetLimit(m.maxConcurrentProvisioner)

    for i := 0; i < needed; i++ {
        g.Go(func() error {
            m.baseLogger.Info().Msg("dispatching index provisioner task")

            err := m.provisionNewIndex(gCtx)
            if err != nil {
                m.baseLogger.Error().Err(err).Msg("failed to provision new index")
                return err
            }
            m.baseLogger.Info().Msg("successfully provisioned index")
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return false, false, fmt.Errorf("reconcilation failed during provision of indexs: %w", err)
    }

    m.baseLogger.Info().Msg("index reconcilation cycle finished")

    finalAvailableCount, err := m.store.CountIndexesByStatus(ctx, database.OpensearchIndexesStatusAVAILABLE)
    if err != nil {
        return false, false, fmt.Errorf("error countintt available indexes after provisioning: %w", err)
    }

    return finalAvailableCount >= int64(m.minSize), true, nil
}
Enter fullscreen mode Exit fullscreen mode
  • using database query we count indexes in 'AVAILABLE' status and aren't warming up. What I meant by warming up is that it usually takes 15 to 20 seconds for searchable, hence we consider usable_at column too for that, so for that query would be something like this (basically defined as method CountReadyAvailableIndex)

        -- name: CountReadyAvailableIndexes :one
        select count(*) from opensearch_indexes
        where status = 'AVAILABLE'
            and usable_at is not null
            and usable_at < now() - interval 1 minute;
    
  • using database query we also count warming available indexes using following query.

        -- name: CountWarmingAvailableIndexes :one
        select count(*) from opensearch_indexes
        where status = 'AVAILABLE'
            and usable_at is not null
            and usable_at >= now() - interval 1 minute;
    
  • using database query we also count provisioning indexes using following query.

        -- name: CountProvisioningIndexes :one
        select count(*) from opensearch_indexes
        where status = 'PROVISIONING' and created_at > now() - interval 10 minute;
    
  • We are calculating total pool of indexes by adding all the counts, we are kind of conscious about over-provisioning hence we took this step.

  • If calculated pool is greater than minimum size than we return.

  • We then calculated the needed number of indexes

  • To simply handle the concurrency in golang we are using errgroup because,

    • if any goroutine in the group returns an error, the entire group is considered failed, and we can handle the error appropriately.
    • if one goroutines fails, context get cancelled, signalling other goroutines to stop their work early.
    • Wait() method block until all goroutines complete.
  • We create a grouping contexted using errgroup.WithContext(ctx) additionally we also limit the concurrency using SetLimit method.

  • We run loop until the needed mark is met, and we start goroutine for each count and each goroutine executed provisionNewIndex function.

  • And we wait for all goroutine to complete using Wait() method.

  • And then we calculate the final AVAILABLE count for indexes and we make sure its above the minimum pool size.

Trigger Reconcilation

  • We would be reconciling and maintaining the pool of indexes periodically, but there would be instances where we need to trigger sometimes, hence you notice in NewIndexManager we created a buffered channel of size 1 and its for better communication and triggering, and the size describes that its not for subsequent triggering but just we balance out with manual triggering and periodic reconciliation.
  • triggerReconcilation does that.
func (m *IndexManager) triggerReconcilation() {
    select {
    case m.reconcileTrigger <- struct{}{}:
        m.baseLogger.Info().Msg("index reconcilation successfully trigger")
    default:
        m.baseLogger.Info().Msg("index reconcilation trigger ignored, reconcilation is already running")
    }
}
Enter fullscreen mode Exit fullscreen mode

Reconcilation Worker

func (m *IndexManager) reconcilationWorker(ctx context.Context) error {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()

    if _, _, err := m.reconcileIndexPool(ctx); err != nil {
        m.baseLogger.Error().Err(err).Msg("initial reconcilation failed")
        return err
    }

    for {
        select {
        case <-ctx.Done():
            m.baseLogger.Info().Msg("index reconcilation worker shutting down")
            return ctx.Err()

        case <-ticker.C:
            m.baseLogger.Info().Msg("periodic trigger of reconcilation of indexes")
            if _, _, err := m.reconcileIndexPool(ctx); err != nil {
                m.baseLogger.Error().Err(err).Msg("periodic index reconcilation failed")
                return err
            }

        case <-m.reconcileTrigger:
            m.baseLogger.Info().Msg("event-driven index trigger: starting reconcilation")

            select {
            case <-m.reconcileTrigger:
            default:
            }

            if _, _, err := m.reconcileIndexPool(ctx); err != nil {
                m.baseLogger.Error().Err(err).Msg("event-driven index reconcilation failed")
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
  • We have worker which periodically hit the function reconcileIndexPool so basically by listening on ticker channel.
  • Similarly for manual triggering we are listening on reconcileTrigger channel which we initialize earlier, listen on that.

Claiming Indexes

  • We have query which basically runs to claim indexes based on database state
-- name: ClaimAvailableIndex :one
select
    id, vector_index, vector_field
from opensearch_indexes
    where status = 'AVAILABLE'
    and usable_at is not null
    and usable_at < now() - interval 1 minute
order by rand()
limit 1
for update skip locked;
Enter fullscreen mode Exit fullscreen mode
  • the reason we have kept for update skip locked is because we are choosing randomly from record we fetched based on conditions, if its locked we moved on to next record to select.
  • And once we fetch that record, we mark it as assigned within the transaction using following query.
-- name: MarkIndexAsAssigned :exec
update opensearch_indexes
set
    status = 'ASSIGNED', assigned_at = now()
where id = ?;
Enter fullscreen mode Exit fullscreen mode
  • So this is whole transaction defined as TxClaimAvailableIndex
func (store *SqlStore) TxClaimAvailableIndex(
    ctx context.Context,
) (*ClaimAvailableIndexRow, error) {
    var response ClaimAvailableIndexRow
    err := store.execTx(ctx, func(q *Queries) error {
        index, err := q.ClaimAvailableIndex(ctx)
        if err != nil {
            return fmt.Errorf("failed to claim available index: %w", err)
        }

        err = q.MarkIndexAsAssigned(ctx, index.ID)
        if err != nil {
            return fmt.Errorf("failed to mark index as assigned: %w", err)
        }
        response = index

        return nil

    })

    if err != nil {
        return nil, err
    }

    return &response, nil

}
Enter fullscreen mode Exit fullscreen mode
  • After claiming index from database we trigger manually reconciliation of indexes.
func (m *IndexManager) ClaimAvailableIndex(ctx context.Context) (*database.ClaimAvailableIndexRow, error) {
    index, err := m.store.TxClaimAvailableIndex(ctx)
    if err != nil {
        return nil, err
    }

    go m.triggerReconcilation()

    return index, nil
}
Enter fullscreen mode Exit fullscreen mode

Cleaning up indexes

func (m *IndexManager) cleanupRemoteResources(ctx context.Context, indexName string) error {
    err := m.awsClient.DeleteIndex(ctx, indexName)
    if err != nil {
        return fmt.Errorf("error deleting opensearch index remotely: %w", err)
    }

    return nil
}

func (m *IndexManager) CleanupIndexes(ctx context.Context) error {
    indexesForCleanup, err := m.store.FetchIndexesForCleanup(ctx)
    if err != nil {
        return fmt.Errorf("error fetching indexes for clean up: %w", err)
    }

    if len(indexesForCleanup) == 0 {
        m.baseLogger.Info().Msg("no indexes to cleanup")
        return nil
    }

    m.baseLogger.Info().Msgf("found %d indexes to cleanup", len(indexesForCleanup))

    g, gCtx := errgroup.WithContext(ctx)
    g.SetLimit(m.maxConcurrentProvisioner)

    for _, idx := range indexesForCleanup {
        indexToCleanup := idx

        g.Go(func() error {
            if err := gCtx.Err(); err != nil {
                m.baseLogger.Warn().Msgf("cleanup cancelled for index %s due to earlier error", indexToCleanup.VectorField)
                return err
            }

            err := m.cleanupRemoteResources(gCtx, indexToCleanup.VectorIndex)
            if err != nil {
                m.baseLogger.Error().
                    Err(err).
                    Str("index", indexToCleanup.VectorIndex).
                    Msg("failed to cleanup remote resources for index")
            }

            _, err = m.store.DeleteOpensearchIndex(gCtx, indexToCleanup.ID)
            if err != nil {
                m.baseLogger.Error().
                    Err(err).
                    Uint64("id", indexToCleanup.ID).
                    Msg("failed to mark index as destroyed in database")
                return fmt.Errorf("failed to update database status: %w", err)
            }

            m.baseLogger.Info().
                Str("index", indexToCleanup.VectorIndex).
                Msg("successfully cleaned up index")

            return nil
        })
    }

    if err := g.Wait(); err != nil {
        m.baseLogger.Error().Err(err).Msg("cleanup cycle completed with one or more failures")
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • We are basically handling remote index cleanup in Opensearch using cleanupRemoteResources method.
  • In CleanupIndexes we first fetch the indexes for cleanup and query is like this:
-- name: FetchIndexesForCleanup :many
select
    oi.*
from opensearch_indexes oi
left join
    aws_knowledge_bases akb on oi.id = akb.index_id
where
    oi.status = 'FAILED'
    or (oi.status = 'PROVISIONING' and oi.created_at < now() - interval 10 minute)
    or (
        oi.status = 'CLEANUP'
        and (akb.id is null or akb.status = 'DESTROYED')
    );
Enter fullscreen mode Exit fullscreen mode
- We are doing joins because we wanted to make sure that indexes which are set for `CLEANUP` do have destroyed knowledge base other wise we would have cascading inconsistency, of course it would not be enforced by database, but we need to take care of that as we are working with soft-deletes which is actually a logical layer of deletion.
Enter fullscreen mode Exit fullscreen mode
  • After fetching we trying to concurrently trying to cleanup remotely and once cleaned up remotely we try to soft-delete in database using this query
-- name: DeleteOpensearchIndex :execresult
update opensearch_indexes
set
    status = 'DESTROYED',
    deleted_at = current_timestamp
where id = ?;
Enter fullscreen mode Exit fullscreen mode

CRON based Cleanup Worker

  • According to our requirements we are never going to run this application multiple instances, hence a single instance would run and we would try to initially do vertical scaling, because we are in kind of beta mode so not expecting much traffic hence we are setting CRON job within application to schedule Cleanup, but it would be drawback and not suitable if we are ran multiple instances of this application because then coordination would be complex and that would be another day problem to solve for.
func (m *IndexManager) CleanupIndexWorker(ctx context.Context) {
    location, err := time.LoadLocation("America/Chicago")
    if err != nil {
        m.baseLogger.Error().Err(err).Msg("cannot load the timezone")
        return
    }

    c := cron.New(cron.WithLocation(location))

    schedule := "30 3 * * *"
    _, err = c.AddFunc(schedule, func() {
        m.baseLogger.Info().
            Str("schedule", schedule).
            Str("timezone", location.String()).
            Msg("triggered scheduled index cleanup job")

        jobCtx, cancel := context.WithTimeout(context.Background(), 2*time.Hour)
        defer cancel()

        if err := m.CleanupIndexes(jobCtx); err != nil {
            m.baseLogger.Error().Err(err).Msg("scheduled index cleanup job failed")
        } else {
            m.baseLogger.Info().Msg("scheduled index cleanup job completed successfully")
        }
    })

    if err != nil {
        m.baseLogger.Error().Err(err).Msg("fatal: could not schedule cleanup job")
        return
    }

    m.baseLogger.Info().
        Str("schedule", schedule).
        Str("timezone", location.String()).
        Msg("index cleanup job scheduled successfully")

    c.Start()

    <-ctx.Done()

    m.baseLogger.Info().Msg("shutting down scheduled index cleanup worker")

    stopCtx := c.Stop()
    <-stopCtx.Done()

    m.baseLogger.Info().Msg("index cleanup scheduler stopped gracefully")
}
Enter fullscreen mode Exit fullscreen mode

Run Index Manager

  • We define method to run Index Manager wrapped in method Run
func (m *IndexManager) Run(ctx context.Context) error {
    m.baseLogger.Info().Msg("starting index manager workers")

    g, gCtx := errgroup.WithContext(ctx)

    g.Go(func() error {
        err := m.reconcilationWorker(ctx)
        return err
    })

    g.Go(func() error {
        m.CleanupIndexWorker(gCtx)
        return gCtx.Err()
    })

    m.baseLogger.Info().Msg("all index manager workers are running")
    err := g.Wait()
    if err != nil && !errors.Is(err, context.Canceled) {
        m.baseLogger.Error().Err(err).Msg("index manager stopped due to an error")
        return err
    }

    m.baseLogger.Info().Msg("index manager stopped gracefully")
    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • We start in background as goroutine the reconcilationWorker and CleanupIndexWorker.

Knowledge Base Manager

  • Let's define Knowledge base Manager
type KbManager struct {
    awsClient                *aws_cloud.AWSCloudClients
    store                    database.Store
    config                   *utils.Config
    baseLogger               *logger.Logger
    indexManager             *IndexManager
    minSize                  int
    maxSize                  int
    maxConcurrentProvisioner int

    reconcileTrigger chan struct{}
}
Enter fullscreen mode Exit fullscreen mode
  • Initiating Knowledge base manager
func NewKbManager(
    awsClient *aws_cloud.AWSCloudClients,
    store database.Store,
    baseLogger *logger.Logger,
    config *utils.Config,
    indexManager *IndexManager,
) *KbManager {
    return &KbManager{
        awsClient:                awsClient,
        store:                    store,
        baseLogger:               baseLogger,
        config:                   config,
        minSize:                  globals.MinKbPoolSize,
        maxSize:                  globals.MaxKbPoolSize,
        maxConcurrentProvisioner: globals.MaxConcurrentProvisioner,
        indexManager:             indexManager,

        reconcileTrigger: make(chan struct{}, 1),
    }
}
Enter fullscreen mode Exit fullscreen mode

Provision New Knowledge Base

  • The reason we are injecting index manager into knowledge base manager because we need to claim indexes for knowledge bases
func (m *KbManager) provisionNewKnowledgeBase(ctx context.Context) error {
    m.baseLogger.Info().Msg("attempting to claim an available index for new knowledge base")
    claimedIndex, err := m.indexManager.ClaimAvailableIndex(ctx)
    if err != nil {
        if database.IsRecordNotFound(err) {
            m.baseLogger.Warn().Msg("no available index to claim")
            go m.indexManager.triggerReconcilation()
            return fmt.Errorf("no available index in the pool")
        }
        return fmt.Errorf("failed to claim available index: %w", err)
    }

    m.baseLogger.Info().Msg("successfully claimed indexed")

    dataSourceName := utils.GenerateUniqueIndexName(false)
    storagePrefix := utils.GenerateS3Prefix()
    kbName := utils.GenerateKnowledgeBaseName()

    inclusionPrefix := fmt.Sprintf("%s/%s/", m.config.AWSParentFolder, storagePrefix)

    result, err := m.store.CreateAwsKnowledgeBase(ctx, database.CreateAwsKnowledgeBaseParams{
        AwsKbName:      kbName,
        IndexID:        claimedIndex.ID,
        DataSourceName: dataSourceName,
        StoragePrefix:  inclusionPrefix,
    })
    if err != nil {
        return fmt.Errorf("error initiating knowledge base in database: %w", err)
    }

    lastId, err := result.LastInsertId()
    if err != nil {
        return fmt.Errorf("error fetching last inserted id in the knowledge base: %w", err)
    }

    kbResult, err := m.awsClient.CreateKnowledgeBase(ctx, aws_cloud.CreateKnowledgeBaseParams{
        Name:              &kbName,
        RoleARN:           &m.config.AWSIamRoleARN,
        S3Location:        &m.config.AWSDefaultS3Uri,
        EmbeddingModelARN: &m.config.AWSEmbeddingModelARN,
        CollectionARN:     &m.config.AWSCollectionARN,
        VectorIndexName:   &claimedIndex.VectorIndex,
        VectorField:       &claimedIndex.VectorField,
    })
    if err != nil {
        indexErr := m.store.MarkOpensearchIndexForCleanup(ctx, claimedIndex.ID)
        if indexErr != nil {
            return fmt.Errorf("error marking index for cleanup while rollbacking failure in knowledge base creation (%w): %w", err, indexErr)
        }

        _, rollbackErr := m.store.DeleteAwsKnowledgeBase(ctx, uint64(lastId))
        if rollbackErr != nil {
            return fmt.Errorf("error rollbacking knowledge base due to error(%w): %w", err, rollbackErr)
        }

        return fmt.Errorf("failed to create knowledge base: %w", err)
    }

    dataSourceResult, err := m.awsClient.CreateDataSource(
        ctx,
        dataSourceName,
        kbResult.KnowledgeBase.KnowledgeBaseId,
        &m.config.AWSBucketARN,
        &m.config.AWSBucketOwnerID,
        &inclusionPrefix,
    )
    if err != nil {
        _, kbErr := m.awsClient.DeleteKnowledgeBase(ctx, *kbResult.KnowledgeBase.KnowledgeBaseId)
        if kbErr != nil {
            return fmt.Errorf("error deleting knowledge base (due to - %w): %w", err, kbErr)
        }

        indexErr := m.store.MarkOpensearchIndexForCleanup(ctx, claimedIndex.ID)
        if indexErr != nil {
            return fmt.Errorf("error marking index for cleanup while rollbacking failure in knowledge base creation (%w): %w", err, indexErr)
        }

        _, rollbackErr := m.store.DeleteAwsKnowledgeBase(ctx, uint64(lastId))
        if rollbackErr != nil {
            return fmt.Errorf("error rollbacking knowledge base due to error(%w): %w", err, rollbackErr)
        }
        return fmt.Errorf("failed to create data source: %w", err)
    }

    err = m.store.FinalizedAwsKnowledgeBase(ctx, database.FinalizedAwsKnowledgeBaseParams{
        DataSourceID: sql.NullString{
            Valid:  true,
            String: *dataSourceResult.DataSource.DataSourceId,
        },
        KnowledgeBaseResourceID: sql.NullString{
            Valid:  true,
            String: *kbResult.KnowledgeBase.KnowledgeBaseId,
        },
        ID: uint64(lastId),
    })
    if err != nil {
        dataSourceErr := m.awsClient.DeleteDataSource(ctx, kbResult.KnowledgeBase.KnowledgeBaseId, dataSourceResult.DataSource.DataSourceId)
        if dataSourceErr != nil {
            return fmt.Errorf("error deleting data source (due to: %w): %w", err, dataSourceErr)
        }

        _, knowledgeBaseErr := m.awsClient.DeleteKnowledgeBase(ctx, *kbResult.KnowledgeBase.KnowledgeBaseId)
        if knowledgeBaseErr != nil {
            return fmt.Errorf("error deleting knowledge base (due to: %w): %w", err, knowledgeBaseErr)
        }

        indexErr := m.store.MarkOpensearchIndexForCleanup(ctx, claimedIndex.ID)
        if indexErr != nil {
            return fmt.Errorf("error marking index for cleanup while rollbacking failure in knowledge base creation (%w): %w", err, indexErr)
        }

        return fmt.Errorf("error finalizing knowledge base: %w", err)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • Initially we are claiming indexes from index manager
  • Then we create AWS Knowledge base in our database using following query,
insert into aws_knowledge_bases (
    aws_kb_name,
    index_id,
    data_source_name,
    storage_prefix,
    status
) values (
    ?, ?, ?, ?, 'PROVISIONING'
);
Enter fullscreen mode Exit fullscreen mode
  • The we create knowledge base actually in AWS, and if something fails we handle rollback by marking index for cleanup and delete the initiated state in database.
  • Then after creating knowledge base we need to create data source and then we finalize the knowledge base creation.
-- name: FinalizedAwsKnowledgeBase :exec
update aws_knowledge_bases
set
    data_source_id = ?,
    knowledge_base_resource_id = ?,
    status = 'AVAILABLE'
where id = ?;
Enter fullscreen mode Exit fullscreen mode
  • again we handle rollback if finalization failed by deleting remote data sources, knowledge bases and marking opensearch indexes for cleanup.

Reconciling Knowledge Base Pool

  • Similar to what we do in index manager for reconciliation of pool we do the same for knowledge base
  • The only difference is we don't have warming instances for this.
func (m *KbManager) reconcileKnowledgeBasePool(ctx context.Context) error {
    availableCount, err := m.store.CountKnowledgeBaseByStatus(ctx, database.AwsKnowledgeBasesStatusAVAILABLE)
    if err != nil {
        return fmt.Errorf("failed to count available knowledge bases: %w", err)
    }

    provisioningCount, err := m.store.CountProvisioningKnowledgeBase(ctx)
    if err != nil {
        return fmt.Errorf("failed to count provisioning knowledge base: %w", err)
    }

    count := availableCount + provisioningCount

    if count >= int64(m.minSize) {
        return nil
    }

    needed := m.minSize - int(count)

    m.baseLogger.Info().Msgf(
        "knowledge base pool state: Available=%d, Provisioning=%d. Need to create %d",
        availableCount,
        provisioningCount,
        needed,
    )

    g, gCtx := errgroup.WithContext(ctx)

    g.SetLimit(m.maxConcurrentProvisioner)

    for i := 0; i < needed; i++ {
        g.Go(func() error {
            m.baseLogger.Info().Msg("dispatching knowledge base provisioner task")

            err := m.provisionNewKnowledgeBase(gCtx)
            if err != nil {
                m.baseLogger.Error().Err(err).Msg("failed to provision knowledge base")
                return err
            }
            m.baseLogger.Info().Msg("successfully provisioned a knowledge base")
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return fmt.Errorf("reconcilation failed during provisioning knowledge base: %w", err)
    }

    m.baseLogger.Info().Msg("knowledge base reconcilation cycle finished")

    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • Then whenever whatever components where knowledge base manager will be utilize they would be able to get available knowledge base using following method.
func (m *KbManager) AssignedAvailableKnowledgeBase(
    ctx context.Context,
    args database.TxAssignKnowledgeBaseParams,
) (*int64, error) {
    knowledgeBaseId, err := m.store.TxAssignKnowledgeBase(ctx, args)
    if err != nil {
        return nil, err
    }

    go m.triggerReconcilation()

    return knowledgeBaseId, nil
}
Enter fullscreen mode Exit fullscreen mode
  • And we are triggering for reconciliation similar to index manager
func (m *KbManager) triggerReconcilation() {
    select {
    case m.reconcileTrigger <- struct{}{}:
        m.baseLogger.Info().Msg("knowledge base reconcilation successfully triggered")
    default:
        m.baseLogger.Info().Msg("knowledge base reconcilation trigger ignored, a reconcilation is already running")
    }
}
Enter fullscreen mode Exit fullscreen mode

Cleaning up Knowledge Bases

func (m *KbManager) cleanupRemoteResources(ctx context.Context, args cleanupRemoteResourcesParams) error {
    err := m.awsClient.DeleteDataSource(ctx, &args.kbResourceId, &args.dataSourceId)
    if err != nil {
        return fmt.Errorf("error deleting data source remotely: %w", err)
    }

    _, err = m.awsClient.DeleteKnowledgeBase(ctx, args.kbResourceId)
    if err != nil {
        return fmt.Errorf("error deleting knowledge base remotely: %w", err)
    }

    err = m.awsClient.DeleteMultipleObjects(ctx, args.inclusionPrefix)
    if err != nil {
        return fmt.Errorf("error deleting knowledge base documents remotely: %w", err)
    }

    return nil
}

func (m *KbManager) CleanupKnowledgeBases(
    ctx context.Context,
) error {
    kbForCleanup, err := m.store.FetchKnowledgeBasesForCleanup(ctx)
    if err != nil {
        return fmt.Errorf("error fetching knowledge base cleaning up:%w", err)
    }

    if len(kbForCleanup) == 0 {
        m.baseLogger.Info().Msg("no knowledge bases to cleanup")
        return nil
    }

    m.baseLogger.Info().Msgf("found %d knowledge bases to cleanup", len(kbForCleanup))

    g, gCtx := errgroup.WithContext(ctx)

    g.SetLimit(m.maxConcurrentProvisioner)

    for _, kb := range kbForCleanup {

        kbToCleanup := kb

        g.Go(func() error {
            if err := gCtx.Err(); err != nil {
                m.baseLogger.Warn().Msgf("cleanup cancelled for knowledge base due to earlier error")
                return err
            }

            cleanupParams := cleanupRemoteResourcesParams{
                kbResourceId:    kbToCleanup.KnowledgeBaseResourceID.String,
                dataSourceId:    kbToCleanup.DataSourceID.String,
                inclusionPrefix: kbToCleanup.StoragePrefix,
            }
            err := m.cleanupRemoteResources(ctx, cleanupParams)
            if err != nil {
                m.baseLogger.Error().Err(err).Msg("failed to cleanup remote resources for knowledge bases")
            }

            _, err = m.store.DeleteAwsKnowledgeBase(ctx, kbToCleanup.ID)
            if err != nil {
                msg := "failed to update database status for cleanup"
                m.baseLogger.Error().Err(err).Msg(msg)
                return fmt.Errorf("%s: %w", msg, err)
            }

            return nil

        })
    }

    if err := g.Wait(); err != nil {
        m.baseLogger.Error().Err(err).Msg("cleanup cycle completed with one or more failures")
        return fmt.Errorf("concurrent cleanup failed: %w", err)
    }

    return nil
}

Enter fullscreen mode Exit fullscreen mode
  • Similar to the index manager we cleanup remote resources, we mark Opensearch Index in Index Manager for cleanup and then delete the state in database for Knowledge base.
  • We also setup cron job for the same
func (m *KbManager) CleanupKnowledgeBaseWorker(ctx context.Context) {
    location, err := time.LoadLocation("America/Chicago")
    if err != nil {
        m.baseLogger.Error().Err(err).Msg("cannot load the timezone")
        return
    }

    c := cron.New(cron.WithLocation(location))

    schedule := "30 2 * * *"
    _, err = c.AddFunc(schedule, func() {
        m.baseLogger.Info().Str("schedule", schedule).Str("timezone", location.String()).Msg("triggered scheduled cleanup job")

        jobCtx, cancel := context.WithTimeout(context.Background(), 2*time.Hour)
        defer cancel()

        if err := m.CleanupKnowledgeBases(jobCtx); err != nil {
            m.baseLogger.Error().Err(err).Msg("scheduled cleanup job failed")
        } else {
            m.baseLogger.Info().Msg("scheduled cleanup job completed successfully")
        }
    })

    if err != nil {
        m.baseLogger.Error().Err(err).Msg("FATAL: could not schedule cleanup job")
        return
    }

    m.baseLogger.Info().Str("schedule", schedule).Str("timezone", location.String()).Msg("cleanup job scheduled successfully")
    c.Start()

    <-ctx.Done()

    m.baseLogger.Info().Msg("shutting down scheduled cleanup worker")

    stopCtx := c.Stop()
    <-stopCtx.Done()
    m.baseLogger.Info().Msg("cleanup scheduler stopped gracefully")

}
Enter fullscreen mode Exit fullscreen mode

Running Knowledge base manager

func (m *KbManager) Run(ctx context.Context) error {
    m.baseLogger.Info().Msg("starting knowledge base manager workers")

    g, gCtx := errgroup.WithContext(ctx)

    g.Go(func() error {
        err := m.reconcilationWorker(gCtx)
        return err
    })

    g.Go(func() error {
        m.CleanupKnowledgeBaseWorker(gCtx)
        return gCtx.Err()
    })

    m.baseLogger.Info().Msg("all knowledge base manager workers are running")
    err := g.Wait()
    if err != nil && !errors.Is(err, context.Canceled) {
        m.baseLogger.Error().Err(err).Msg("knowledge base manager stopped due to an err")
        return err
    }

    m.baseLogger.Info().Msg("knowledge base manager stopped gracefully")
    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • As background goroutine we start reconciliation worker and cleanup worker.

Prime Pooling

  • Prime pooling is something very integral part of this orchestration drama because for the first time when application is running it won't have anything no pre-provision remote indexes or knowledge bases, so we wanted to initially pre-provision minimum pool requirement for those resources and then start the application.
  • We define method PrimePool for that.
  • We are having local context whose expiry is 60 minutes which is enough for this.
  • And we make sure all the indexes and knowledge bases are ready and not just warming up.
func (m *KbManager) PrimePool(ctx context.Context) error {
    m.baseLogger.Info().Msg("starting index pool priming...")

    indexReadyCtx, cancelIndex := context.WithTimeout(ctx, 60*time.Minute)
    defer cancelIndex()

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    indexReady := false
    for !indexReady {
        select {
        case <-indexReadyCtx.Done():
            return fmt.Errorf("timeout waiting for index pool to be ready: %w", indexReadyCtx.Err())
        case <-ticker.C:
            hasEnoughIndexes, indexesWereProvisioned, err := m.indexManager.reconcileIndexPool(ctx)
            if err != nil {
                return fmt.Errorf("error reconciling index pool: %w", err)
            }
            if hasEnoughIndexes {
                if indexesWereProvisioned {
                    m.baseLogger.Info().Msg("new indexes were provisioned, waiting before reconciling knowledge bases")
                    time.Sleep(30 * time.Second)
                }
                m.baseLogger.Info().Msg("index pool has sufficient available indexes")
                indexReady = true
            }
        }
    }
    err := m.reconcileKnowledgeBasePool(ctx)
    if err != nil {
        return fmt.Errorf("error reconciling prime knowledge base pool: %w", err)
    }
    m.baseLogger.Info().Msg("knowledge base pool priming successful")
    return nil
}

Enter fullscreen mode Exit fullscreen mode

Symphony - How it all come together to play out.

  • I have provided you main.go that's the entry point of application and this is how we handle things
  • We are creating context which gets canceled automatically when OS signals like pressing Ctrl + C (syscall.SIGINT), when systems sends terminating signal (syscall.SIGTERM).
  • Then we are creating logging object based on zerolog
  • we are loading our configuration from yaml file.
  • creating instance of database and AWS for their corresponding operation
  • Then we create group context for handling background goroutines
  • Then we initially synchronously make sure all the resources are present before starting the application using PrimePool
func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    logConfig := logger.LoggerConfig{
        LogLevel: zerolog.DebugLevel,
        FileConfig: &logger.FileConfig{
            Path:       "../logs/chatbot_engine/chatbot_engine.log",
            MaxSize:    10,
            MaxBackups: 5,
            MaxAge:     30,
            Compress:   true,
        },
    }

    baseLogger := logger.NewLogger(logConfig)

    config, err := utils.LoadConfig("./chatbot_engine.yaml")
    if err != nil {
        baseLogger.Fatal().Err(err).Msg("cannot load configuration")
    }

    switch config.Environment {
    case globals.ProdEnv:
        gin.SetMode(gin.ReleaseMode)
        baseLogger.Info().Msg("Gin mode set to 'release'")
    default:
        gin.SetMode(gin.DebugMode)
        baseLogger.Info().Msg("Gin mode set to 'debug'")
    }

    db, err := sql.Open("mysql", config.DBSource)
    if err != nil {
        baseLogger.Fatal().Err(err).Msg("error opening database connection")
    }
    defer db.Close()

    pingCtx, cancelPing := context.WithTimeout(ctx, 5*time.Second)
    defer cancelPing()
    err = db.PingContext(pingCtx)
    if err != nil {
        baseLogger.Fatal().Err(err).Msg("failed to connect to database (ping failed)")
    }

    db.SetMaxOpenConns(config.DBMaxOpenConn)
    db.SetMaxIdleConns(config.DBMaxIdleConn)
    db.SetConnMaxLifetime(time.Duration(config.DBConnMaxLifetime))

    baseLogger.Info().Msg("database connection pool initialized successfully")

    store := database.NewSqlStore(db)

    group, groupCtx := errgroup.WithContext(ctx)

    var awsClient *aws_cloud.AWSCloudClients
    if config.Environment != globals.DevEnv {
        var err error
        awsClient, err = aws_cloud.NewAWSCloudClients(
            groupCtx,
            aws_cloud.AwsCloudClientArg{
                DevEnvironmentCfg:  nil,
                Region:             config.AWSDefaultRegion,
                OpensearchEndpoint: config.AWSOpensearchEndpoint,
                BucketName:         config.AWSBucket,
                QueueName:          config.AWSProdQueueName,
                KeyID:              config.AWSKeyID,
            },
        )
        if err != nil {
            baseLogger.Fatal().Err(err).Msg("error creating aws client")
        }
    } else {
        var err error
        awsClient, err = aws_cloud.NewAWSCloudClients(
            groupCtx,
            aws_cloud.AwsCloudClientArg{
                DevEnvironmentCfg: &aws_cloud.DevEnvironmentConfig{
                    AccessKey: config.AWSAccessKey,
                    Secret:    config.AWSSecretAccess,
                },
                LogEnable:          config.AwsLogEnable,
                Region:             config.AWSDefaultRegion,
                OpensearchEndpoint: config.AWSOpensearchEndpoint,
                BucketName:         config.AWSBucket,
                QueueName:          config.AWSDevQueueName,
                KeyID:              config.AWSKeyID,
            },
        )
        if err != nil {
            baseLogger.Fatal().Err(err).Msg("error creating aws client")
        }
    }

    indexManager := knowledge_base.NewIndexManager(awsClient, store, baseLogger)
    kbManager := knowledge_base.NewKbManager(awsClient, store, baseLogger, &config, indexManager)

    if err := kbManager.PrimePool(groupCtx); err != nil {
        baseLogger.Fatal().Err(err).Msg("error prime pooling knowledge base and indexes")
    }

    group.Go(func() error {
        err := indexManager.Run(groupCtx)
        if err != nil {
            baseLogger.Error().Err(err).Msg("index manager exited with error")
        }

        return err
    })

    group.Go(func() error {
        err := kbManager.Run(groupCtx)
        if err != nil {
            baseLogger.Error().Err(err).Msg("knowledge base manager exited with error")
        }

        return err
    })

    err = group.Wait()
    if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) {
        baseLogger.Error().Err(err).Msg("application exited due to error")
    } else {
        baseLogger.Info().Msg("application exiting gracefully")
    }

        baseLogger.Info().Msg("Bye :)")
}
Enter fullscreen mode Exit fullscreen mode

This is how I worked around pre-provision, I hope this helps.

Thanks :)

Twitter: X

Top comments (0)