This guide translates abstract concepts of agentic RAG into a concrete, end to end implementation.
The problem
I recently encountered the challenge of having to interpret a user provided string with a "change request" to match it against one or more API fields that must be modified to fulfill the request. This is a classic example of "semantic classification" in AI engineering.
The most obvious approach is a simple one-shot inference call: ask the LLM to select the correct field(s) from a provided list. This is easy to implement as you simply enumerate the fields in the prompt with enough detail for the LLM to make a decision. However, this approach does not scale as the list of fields can grow to hundreds or even thousands of entries. This floods the context window, increases latency (processing more tokens takes longer), and ultimately sacrifices accuracy due to the "lost in the middle" phenomenon.
Asking the LLM to filter through large blocks of input text is known as the "finding a needle in a haystack" challenge. The main problem that arises when searching for a piece of information buried in a massive context window is that models tend to forget details located in the middle, while remembering details at the beginning and end best. This is known as the "lost in the middle" phenomenon. One approach to mitigate this is to use a search engine to do the filtering first (finding the potential needles), allowing the LLM to work directly with the subset of most relevant data. This is what is known as the RAG solution.
RAG is not always the best solution. It depends on the nature of the data: when processing large paragraphs of text, RAG can be limited in its ability to reason over distant parts of the data because chunks are retrieved in isolation rather than as a coherent whole. For my use case, however, API fields are discrete pieces of information that are already clumped by domain (API schema and endpoint), making RAG a strong fit.
RAG alone, however, is often not enough. Retrieval typically returns entries that score highly by similarity, which can include results that look relevant but don't best match the user's intent. Reranking is a crucial technique to re-evaluate retrieved candidates in light of the user's request and select the most contextually appropriate entries.
Anatomy of a RAG
Let's go through the steps required to build and consume a RAG system to find the field(s) that match a user's change request.
- Prepare the schemas: Collect the schemas and fields that are supported. The field details are important: field name, path, description, type, additional context and the source schema.
- Generate embeddings for each field: Iterate over all fields and generate a vector embedding (numerical representation) for each field's string representation (a concatenation of name, description, context, etc).
- Store the fields in a vector DB: Insert each field's vector embedding in a database that supports vector similarity search, like DuckDB 😎.
- Wire up your app to access the DB: Setup your app so that it can connect to the vector DB instance.
- Generate an embedding for the user request: Using the same "embedder" from step 2, generate a vector representation of the user query. This vector is used to query the DB.
- Setup a RAG retriever: The retriever is a function that executes the "similarity search" against the DB relying on components from step #4 and #5. It returns a list of candidate "documents" (fields) that are mathematically similar to the query. LangChain supports this 😎.
- Setup the RAG chain: The chain puts together the retriever from step #6 along with the instructions for the LLM on how to choose the appropriate fields. This is where the final "reasoning" happens. LangChain is great at this 😎.
Steps 1 through 3 are "build" concerns (often called the Ingestion Pipeline). Steps 4 through 7 are "runtime" concerns performed by the app to process live user requests.
The implementation
Let's put it all together with DuckDB and Langchain Go. For the API fields I'm going to use the Github API. For the LLM and embedder I'm going to use GCP's Vertex models.
1. Prepare the schemas:
Collect each API field into a list of SchemaField instances that we can iterate over later on to generate the embeddings.
The SchemaField type:
// SchemaField represents the data we want to embed
type SchemaField struct {
// The path to the field in dot notation
Path string // E.g.: pull_request.required_approving_review_count
// The "humanized" context
Context string // E.g.: required pull request reviews
// The field name
FieldName string // E.g.: required approving review count
// The intent of the field
Description string // E.g.: Number of reviewers required to approve pull requests (0-6, where 0 means not required)
// The type to disambiguate from fields with similar names
Type string // E.g.: integer
// The schema source of the field
Schema string // E.g.: "classic_branch_protection" or "ruleset_branch_protection"
// The aliases of the field for cases where the field is referred to by a different name
Aliases []string // E.g.: ["approvals", "approving reviews", "review approvals", "review approvals count"]
}
A list of "repo" fields:
I'm only providing a few fields to keep it readable. In the same way there's a method
getRepoFields()to capture all the "repo" fields, there would be other methods to collect "branch protection" fields, "rulesets" fields, etc.
func getRepoFields() []SchemaField {
return []SchemaField{
{
Path: "allow_auto_merge",
Context: "repository",
FieldName: "allow auto merge",
Description: "Either true to allow auto-merge on pull requests, or false to disallow auto-merge. Default: false",
Type: "boolean",
Schema: "repo_fields",
},
{
Path: "allow_forking",
Context: "repository",
FieldName: "allow forking",
Description: "Either true to allow private forks, or false to prevent private forks. Default: false",
Type: "boolean",
Schema: "repo_fields",
},
{
Path: "allow_merge_commit",
Context: "repository",
FieldName: "allow merge commit",
Description: "Either true to allow merging pull requests with a merge commit, or false to prevent merging pull requests with merge commits. Default: true",
Type: "boolean",
Schema: "repo_fields",
},
{
Path: "allow_rebase_merge",
Context: "repository",
FieldName: "allow rebase merge",
Description: "Either true to allow rebase-merging pull requests, or false to prevent rebase-merging. Default: true",
Type: "boolean",
Schema: "repo_fields",
},
{
Path: "allow_squash_merge",
Context: "repository",
FieldName: "allow squash merge",
Description: "Either true to allow squash-merging pull requests, or false to prevent squash-merging. Default: true",
Type: "boolean",
Schema: "repo_fields",
},
// More fields go here
// ...
}
}
2. Generate embeddings for each field:
To generate the embedding we need a string representation of each field. We can do so by adding a toEmbed method to SchemaField type like this:
func (s SchemaField) toEmbed() string {
aliases := ""
if len(s.Aliases) > 0 {
aliases = fmt.Sprintf(" (also known as: %s)", strings.Join(s.Aliases, ", "))
}
// Natural language format:
// "Field <Name> <Aliases> is a <Type> used to <Description>."
return fmt.Sprintf(
"Field '%s'%s is a %s used to %s [Context: %s]",
s.FieldName, aliases, humanizeType(s.Type), s.Description, s.Context,
)
}
func humanizeType(t string) string {
switch t {
case "boolean":
return "Toggle switch (True/False)"
case "integer":
return "Integer count (e.g. 1, 2, 5)"
}
return t
}
A few best-practice considerations for the embed string:
- Frontload high signal information: Place the most semantically rich fields at the beginning of the string as they'll match closerly the terminology used by the user.
- Use natural language serialization: Many embedding models are optimized for natural language sentences. Framing the data as a coherent statement can yield better results than a robotic list of key/value pairs.
- Encode aliases: Users may refer to some fields with alternative names. Explicitly encode those to increase the chances of matching.
- Iterate!: I found variations of the format to yield subtle differences in the similarity search. You should rely on integration tests to control variance and account for the subtleties of your data as you try out new formats.
3. Store the fields in a vector DB:
In order to perform similarity search against the fields, let's create a DuckDB instance and table with vector embedding support:
import (
"database/sql"
"fmt"
// Import the DuckDB driver
_ "github.com/duckdb/duckdb-go/v2"
)
const (
FieldEmbeddingsTableDDL = `
-- Enable the VSS extension for vector similarity search
INSTALL vss;
LOAD vss;
-- Set the experimental flag to allow persisting the instance to disk.
SET hnsw_enable_experimental_persistence = true;
CREATE TABLE schema_fields (
path TEXT, -- E.g.: "required_pull_request_reviews.count"
context TEXT, -- E.g.: "required pull request reviews"
field_name TEXT, -- E.g.: "count"
description TEXT, -- E.g.: "Number of reviewers..."
field_type TEXT, -- E.g.: "integer" (renamed from 'type' to avoid keyword conflict)
schema TEXT, -- E.g.: "ruleset_branch_protection"
-- The Vector Column
-- Size 768 is specific to Google's "text-embedding-004" model
embedding FLOAT[768]
);
-- Since we rely on matching meaning (which is mathematically "direction" in vector space),
-- we must use Cosine Similarity.
CREATE INDEX idx_schema_fields_embedding
ON schema_fields
USING HNSW (embedding)
WITH (metric = 'cosine');
`
FieldEmbeddingDML = `
INSERT INTO schema_fields (path, context, field_name, description, field_type, schema, embedding)
VALUES (?,?,?,?,?,?,?);
`
)
And now the actual logic to write the DB instance to a file and insert the embeddings:
type FieldEmbeddingsDB struct {
db *sql.DB
}
func NewFieldEmbeddingsDB() (*FieldEmbeddingsDB, error) {
// create in-file DuckDB instance
db, err := sql.Open("duckdb", "dist/embeddings/schema-fields.duckdb")
if err != nil {
return nil, fmt.Errorf("failed to open in-file duckdb: %w", err)
}
// create embeddings table schema
if _, err := db.Exec(FieldEmbeddingsTableDDL); err != nil {
db.Close()
return nil, fmt.Errorf("failed to create field embeddings table schema: %w", err)
}
return &FieldEmbeddingsDB{db: db}, nil
}
func (f *FieldEmbeddingsDB) StoreFieldEmbeddings(embeddings []float32, field SchemaField) error {
_, err := f.db.Exec(FieldEmbeddingDML,
field.Path,
field.Context,
field.FieldName,
field.Description,
field.Type,
field.Schema,
embeddings,
)
if err != nil {
return fmt.Errorf("failed to insert field embedding in DuckDB: %w", err)
}
return nil
}
func (f *FieldEmbeddingsDB) Close() error {
return f.db.Close()
}
Lastly, iterate over the fields, generate the embedding and use the FieldEmbeddingsDB client to insert them:
// 1. Initialize the Embedder
embedder, err := vertex.NewGoogleEmbedder(ctx)
if err != nil {
log.Fatalf("Failed to create embedder: %v", err)
}
// 2. Define the schema fields to embed
schemaFields := getRepoFields()
// 3. Prepare the text to be embedded
var textsToEmbed []string
for _, field := range schemaFields {
textsToEmbed = append(textsToEmbed, field.toEmbed())
}
// 4. Generate Embeddings (Batch Call)
vectors, err := embedder.EmbedDocuments(ctx, textsToEmbed)
if err != nil {
log.Fatalf("Failed to embed fields: %v", err)
}
// 5. Generate the DuckDB embeddings instance
db, err := NewFieldEmbeddingsDB()
if err != nil {
log.Fatalf("Failed to create DuckDB embeddings instance: %v", err)
}
defer db.Close()
// 6. Store the vectors in a DuckDB instance
for i, field := range schemaFields {
fieldVector := vectors[i]
err := db.StoreFieldEmbeddings(fieldVector, field)
if err != nil {
log.Fatalf("Failed to store field embeddings: %v", err)
}
}
This concludes the "ingestion phase". Let's now move on to the runtime processing phase. Note that I didn't include function NewGoogleEmbedder. That will be included on the next steps as we'll also need it during runtime query processing.
4. Wire up your app to access the DB:
Let's now setup the app with a client to access and query the "fields" DuckDB instance written by step #3.
import (
"context"
"database/sql"
"fmt"
// Import the DuckDB driver
_ "github.com/duckdb/duckdb-go/v2"
)
// FieldsDB is a readonly DuckDB instance to find fields that match a vector embedding
type FieldsDB struct {
db *sql.DB
}
func NewFieldsDB(fieldsDBDirectoryPath string) (*FieldsDB, error) {
// ?access_mode=READ_ONLY is crucial for concurrency safety
dsn := fmt.Sprintf("%s/schema-fields.duckdb?access_mode=READ_ONLY", fieldsDBDirectoryPath)
db, err := sql.Open("duckdb", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open in-file duckdb with fields embeddings: %w", err)
}
return &FieldsDB{db: db}, nil
}
func (f *FieldsDB) Close() error {
return f.db.Close()
}
type MatchedField struct {
FieldName string `json:"fieldName"`
Context string `json:"context"`
Path string `json:"path"`
Description string `json:"description"`
FieldType string `json:"fieldType"`
Schema string `json:"schema"`
Score float64 `json:"score"`
}
// FindFields finds the fields most similar to the given embedding
func (f *FieldsDB) FindFields(
ctx context.Context,
embedding []float32,
) ([]MatchedField, error) {
query := `
LOAD vss;
SELECT field_name,
context,
path,
description,
field_type,
schema,
array_cosine_similarity(embedding, ?::FLOAT[768]) AS similarity_score
FROM schema_fields
ORDER BY similarity_score DESC
LIMIT 10
`
rows, err := f.db.QueryContext(
ctx,
query,
embedding,
)
if err != nil {
return nil, fmt.Errorf("failed to query fields: %w", err)
}
defer rows.Close()
fields := []MatchedField{}
for rows.Next() {
var fieldName string
var context string
var path string
var description string
var fieldType string
var schema string
var similarityScore float64
err := rows.Scan(
&fieldName,
&context,
&path,
&description,
&fieldType,
&schema,
&similarityScore,
)
if err != nil {
return nil, fmt.Errorf("failed to scan field: %w", err)
}
fields = append(fields, MatchedField{
FieldName: fieldName,
Context: context,
Path: path,
Description: description,
FieldType: fieldType,
Schema: schema,
Score: similarityScore,
})
}
return fields, nil
}
5. Generate an embedding for the user request & 6. Setup a RAG retriever
I'm bundling steps 5 and 6 since the RAG retriever in my app takes care of generating the embedding for the user request.
⚠️ Note ⚠️: You should not pass the raw user provided string directly to the embedder. You should normalize it before generating the embedding by removing action verbs, stop words and any irrelevant characters to ensure high quality matching.
Let's now take a look at the embedder that must be used during both the ingestion and runtime processing phases:
import (
"context"
"fmt"
"github.com/tmc/langchaingo/embeddings"
"github.com/tmc/langchaingo/llms/googleai"
"github.com/tmc/langchaingo/llms/googleai/vertex"
)
// GetGoogleEmbedder initializes the Google AI client specifically for embeddings
func NewGoogleEmbedder(ctx context.Context) (embeddings.Embedder, error) {
llm, err := vertex.New(ctx,
googleai.WithDefaultEmbeddingModel("text-embedding-004"),
googleai.WithCloudProject("your-gcp-project"),
googleai.WithCloudLocation("us-central1"),
)
if err != nil {
return nil, fmt.Errorf("failed to create googleai client: %w", err)
}
// Create the Embedder wrapper
// This interface unifies calls across OpenAI, Google, Ollama, etc.
embedder, err := embeddings.NewEmbedder(llm)
if err != nil {
return nil, fmt.Errorf("failed to create embedder wrapper: %w", err)
}
return embedder, nil
}
And the Langchain retriever:
import (
"context"
"fmt"
"log/slog"
"github.com/tmc/langchaingo/embeddings"
"github.com/tmc/langchaingo/schema"
)
type FieldRetriever struct {
fieldsDB *database.FieldsDB
embedder embeddings.Embedder
}
func NewFieldRetriever(
fieldsDB *database.FieldsDB,
embedder embeddings.Embedder,
) *FieldRetriever {
return &FieldRetriever{
fieldsDB: fieldsDB,
embedder: embedder,
}
}
// MatchFields matches a change description to one or more fields using
// similarity search against the schema embeddings database.
// The change description should be a normalized change description.
func (t *FieldRetriever) matchFields(ctx context.Context, changeDescription string) ([]database.MatchedField, error) {
embedding, err := t.embedder.EmbedQuery(ctx, changeDescription)
if err != nil {
return nil, fmt.Errorf("failed to embed change description: %w", err)
}
fields, err := t.fieldsDB.FindFields(ctx, embedding)
if err != nil {
return nil, fmt.Errorf("failed to match fields in schema embeddings database: %w", err)
}
return fields, nil
}
// GetRelevantDocuments implements schema.Retriever.
func (r *FieldRetriever) GetRelevantDocuments(
ctx context.Context,
query string,
) ([]schema.Document, error) {
matchedFields, err := r.matchFields(ctx, query)
if err != nil {
slog.Error("failed to match fields via the RAG retriever", "error", err)
return nil, fmt.Errorf("failed to match fields via the RAG retriever: %w", err)
}
docs := make([]schema.Document, len(matchedFields))
for i, field := range matchedFields {
content := fmt.Sprintf(
"Field: %s\nContext: %s\nPath: %s\nDescription: %s\nFieldType: %s\nSchema: %s\nScore: %f",
field.FieldName,
field.Context,
field.Path,
field.Description,
field.FieldType,
field.Schema,
field.Score,
)
docs[i] = schema.Document{
PageContent: content,
Metadata: map[string]any{
"field_name": field.FieldName,
"path": field.Path,
"type": field.FieldType,
"score": field.Score, // similarity score from the vector DB
},
}
}
slog.Debug("matched fields via the RAG retriever", "count", len(docs))
return docs, nil
}
7. Setup the RAG chain
Lastly, setup the RAG chain with the retriever and instructions to select the fields:
⚠️ Note ⚠️: Notice how the agent method MatchFields accepts both a changeDescription with the raw user query and a normalizedChangeDescription with the normalized string. changeDescription provides the full context to perform the final reasoning.
type FieldMatcherAgent struct {
llm llms.Model
fieldRetriever *FieldRetriever
}
// MatchFields matches a change description to the fields that must be modified to fulfill the change..
func (a *FieldMatcherAgent) MatchFields(
ctx context.Context,
changeDescription string,
normalizedChangeDescription string,
) (MatchedFieldsResponse, error) {
// Prepare the instructions for the RAG chain
outputParser, err := outputparser.NewDefined(MatchedFieldsResponse{})
if err != nil {
slog.Error("failed to create output parser for the field matcher RAG chain", "error", err)
return MatchedFieldsResponse{}, fmt.Errorf("failed to create output parser for the field matcher RAG chain: %w", err)
}
systemPrompt := prompts.NewSystemMessagePromptTemplate(`You are an expert in Github API configuration.
You will receive a "Change Description" and a list of "Candidate Fields".
Your task:
1. Analyze the Change Description.
2. Review the Candidate Fields provided in the Context.
3. Select ALL fields that must be modified to fulfill the change.
4. Copy the exact metadata (FieldName, Path, Score, etc.) from the candidate text into the response.
If no fields are a good match for the change description, return an empty list.`, []string{})
// Interpolate the change description directly as it can't be passed via the RAG chain
userPromptTemplate := fmt.Sprintf(`Match the change description to the fields that must be modified to fulfill the change.
Change Description: %s
Candidate Fields (Context):
{{.context}}
Format instructions:
`+outputParser.GetFormatInstructions(), changeDescription)
userPrompt := prompts.NewHumanMessagePromptTemplate(userPromptTemplate, []string{"question", "context"})
agentPrompt := prompts.NewChatPromptTemplate([]prompts.MessageFormatter{
systemPrompt,
userPrompt,
})
// Build a map rerank documents chain for question answering (finding the best field match)
llmChain := chains.NewLLMChain(a.llm, agentPrompt)
// Build StuffDocuments Chain
// This chain concatenates all retrieved docs into the "{{.context}}" variable
stuffChain := chains.NewStuffDocuments(llmChain)
// Build the RAG chain
rag := chains.NewRetrievalQA(
stuffChain,
a.fieldRetriever,
)
rag.InputKey = "query" // Matches {{.query}} in prompt
ragResult, err := chains.Call(ctx, rag, map[string]any{
// Will be passed to the retriever (GetRelevantDocuments) as the "question" input
"query": normalizedChangeDescription,
})
if err != nil {
slog.Error("failed to run RAG chain to find the matching fields", "error", err, "change_description", changeDescription)
return MatchedFieldsResponse{}, fmt.Errorf("failed to run RAG chain: %w", err)
}
// The result is now a single JSON string containing all matches
textResult, ok := ragResult["text"].(string)
if !ok {
return MatchedFieldsResponse{}, fmt.Errorf("unexpected output type from RAG chain")
}
matches, err := outputParser.Parse(textResult)
if err != nil {
slog.Error("failed to parse matched fields", "error", err)
return MatchedFieldsResponse{}, fmt.Errorf("failed to parse matched fields: %w", err)
}
return matches, nil
}
That's all!
Top comments (0)