DEV Community

Jimmy
Jimmy

Posted on

Building My Smart 2nd Brain: Part 1 - Agentic AI with RAG

Overview

In my previous articles, I developed two LangGraph applications utilizing distinct personas for generating blog articles and images, with web search capabilities as tools. To complete this learning journey with LangGraph, I will now build a new application incorporating the following features:

  • Text embedding using a vector database for Retrieval-Augmented Generation (RAG).
  • Long-term memory using Sqlite to store message history.
  • Human-in-the-Loop (HITL) functionality for user verification of generated results.

This new project, my “Smart 2nd Brain”, is a personal knowledge management system designed to make my life more organized and spark creativity. RAG, long-term memory and HITL aren’t groundbreaking, but combining them felt like building a custom toolbox for my brain. The vector DB made retrieving my scattered notes a breeze, Sqlite gave the system a memory that rivals my own, and HITL kept me in control, letting me tweak outputs to feel just right.

This “Smart 2nd Brain” was a rewarding step in my LangGraph journey, blending AI smarts with personal flair. It’s more than a tool — it’s like an extension of my mind, helping me organize thoughts, rediscover ideas, and fuel curiosity.

The Knowledge State Object

Let's tale a look of the master state object:

from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field

class KnowledgeState(BaseModel):
    messages: List[Dict[str, str]] = Field(
        default_factory=list,
        description="Conversation-style messages exchanged between user, AI, and system."
    )
    user_input: Optional[str] = Field(
        None, description="Raw input from user (query or new document)."
    )
    query_type: Optional[str] = Field(
        None, description="Either 'ingest' for new document or 'query' for retrieval."
    )  
    raw_document: Optional[str] = Field(
        None, description="Full raw text of the ingested document."
    )
    chunks: Optional[List[str]] = Field(
        None, description="Text chunks after splitting for embedding."
    )
    embeddings: Optional[List[List[float]]] = Field(
        None, description="Vector embeddings for the chunks."
    )
    source: Optional[str] = Field(
        None, description="Source of the document (e.g., filename, URL, transcript ID)."
    )
    categories: Optional[List[str]] = Field(
        None, description="One or more categories/tags for the document."
    )
    metadata: Optional[Dict[str, Any]] = Field(
        default_factory=dict,
        description="Additional metadata (author, date, page, etc.)."
    )
    retrieved_docs: Optional[List[Dict[str, Any]]] = Field(
        None, description="Relevant documents retrieved from vector DB."
    )
    retrieved_chunks: Optional[List[str]] = Field(
        None, description="Raw chunks retrieved from the vector DB."
    )
    generated_answer: Optional[str] = Field(
        None, description="AI-generated draft answer or summary."
    )
    final_answer: Optional[str] = Field(
        None, description="Final, human-approved answer or summary."
    )
    human_feedback: Optional[str] = Field(
        None, description="Feedback on AI output: 'approved' | 'rejected' | 'edited'."
    )
    edits: Optional[str] = Field(
        None, description="Manual corrections provided by the human."
    )
    edited_answer: Optional[str] = Field(
        None, description="The edited version of the answer provided by human feedback."
    )
    knowledge_type: Optional[str] = Field(
        None, description="Type of knowledge: 'conversational' | 'reusable' | 'verified'."
    )
    conversation_history: Optional[List[Dict[str, str]]] = Field(
        None, description="Running log of user/AI/system messages for context."
    )

    user_preferences: Optional[Dict[str, Any]] = Field(
        None, description="User customization, e.g. summary style, tone, etc."
    )
    status: Optional[str] = Field(
        None, description="Pipeline status: 'pending', 'processing', 'done', 'error'."
    )
    logs: Optional[List[str]] = Field(
        default_factory=list,
        description="Debug logs collected during pipeline execution."
    )
Enter fullscreen mode Exit fullscreen mode

The KnowledgeState class includes all the details that are being passed in the course of the graph execution:

  • messages: Conversation-style messages for chat-like interactions
  • user_input: Raw user input (query or document content)
  • query_type: Workflow type identifier ('ingest' or 'query')
  • raw_document: Full document text for ingestion workflows
  • chunks: Text chunks after document splitting
  • embeddings: Vector representations of text chunks
  • source: Document source identifier
  • categories: Document classification tags
  • metadata: Additional structured metadata
  • retrieved_docs: Retrieved documents from vector database
  • retrieved_chunks: Raw text chunks from retrieval
  • generated_answer: AI-generated response draft
  • final_answer: Human-approved final response
  • human_feedback: Human feedback on AI outputs
  • edits: Manual corrections and edits
  • conversation_history: Complete conversation context
  • user_preferences: User customization settings
  • status: Current workflow status
  • logs: Debug and execution logs

Please note that in this KnowledgeState object some properties are cumulative (e.g. conversation_history) while some are reflecting just the latest values (e.g. generated_answer, final_answer); I will go into the details in later sections. Anyway, this state object facilitates the capability of the Smart 2nd Brain system, enables all required information can be passed throughout the graph.

The Tales of Two Paths

The application graph supports two different paths: "ingest" and "query".

The Smart 2nd Brain supports PDF document ingestion into a vector database for text embedding, this is the so-called "ingest" path. After the documents are stored into the vector database they are available for enriching the LLM to answer questions from users. We call this path as the "query" path.

In the MasterGraphBuilder class, this graph is created by the build method:

def build(self) -> CompiledStateGraph:
        """
        Build and compile the complete LangGraph workflow.

        This method constructs the workflow graph by defining nodes,
        setting up conditional routing, and establishing the flow
        between different processing stages.

        Returns:
            CompiledStateGraph: Compiled and checkpointed workflow graph

        Graph Structure:
            Entry Point: router
            Conditional Routing: Based on query_type
            Ingestion Branch: router -> chunk -> embed -> store -> END
            Query Branch: router -> retriever -> answer -> review -> validated_store -> END

        Features:
            - Conditional edges for workflow routing
            - Checkpointing for state persistence
            - Error handling and logging throughout
            - Human-in-the-loop integration
        """
        # Create the base state graph
        graph = StateGraph(KnowledgeState)

        # =============================================================================
        # NODE DEFINITIONS
        # =============================================================================

        # Add all workflow nodes
        graph.add_node("router", self.input_router)           # Entry point and routing
        graph.add_node("chunk", self.chunk_doc_node)          # Document chunking
        graph.add_node("embed", self.embed_node)              # Embedding generation
        graph.add_node("store", self.store_node)              # Document storage
        graph.add_node("retriever", self.retriever_node)      # Document retrieval
        graph.add_node("answer", self.answer_gen_node)        # Answer generation
        graph.add_node("review", self.human_review_node)      # Human review
        graph.add_node("validated_store", self.validated_store_node)  # Validated storage

        # Set the entry point
        graph.set_entry_point("router")

        # =============================================================================
        # CONDITIONAL ROUTING
        # =============================================================================

        # Define routing logic based on query_type
        def route_condition(state):
            if hasattr(state, 'query_type'):
                if state.query_type == "ingest":
                    return "ingest"
                elif state.query_type == "query":
                    return "query"
                else:
                    return "__end__"
            return "query"

        # Add conditional edges from router
        graph.add_conditional_edges("router", route_condition, {
            "ingest": "chunk",      # Route to document ingestion
            "query": "retriever",   # Route to knowledge query
            "__end__": END,         # End workflow for invalid types
        })

        # =============================================================================
        # INGESTION BRANCH
        # =============================================================================

        # Document processing pipeline
        graph.add_edge("chunk", "embed")      # Chunk -> Embed
        graph.add_edge("embed", "store")     # Embed -> Store
        graph.add_edge("store", END)         # Store -> End

        # =============================================================================
        # QUERY BRANCH
        # =============================================================================

        # Knowledge query pipeline
        graph.add_edge("retriever", "answer")           # Retrieve -> Generate
        graph.add_edge("answer", "review")             # Generate -> Review
        graph.add_edge("review", "validated_store")    # Review -> Store
        graph.add_edge("validated_store", END)         # Store -> End

        # Compile the graph with checkpointing
        return graph.compile(checkpointer=self.checkpointer)
Enter fullscreen mode Exit fullscreen mode

Feeding the Brain Chunk by Chunk

Let's check the ingest branch first. If the graph is invoked for ingest, the first encountered is the chunk node.

def chunk_doc_node(self, state: KnowledgeState):
        if state.raw_document:
            # Use RecursiveCharacterTextSplitter for intelligent text segmentation
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=500,     # Maximum characters per chunk
                chunk_overlap=50    # Overlap between consecutive chunks for context
            )
            state.chunks = splitter.split_text(state.raw_document)
            state.logs = (state.logs or []) + [
                f"Chunked document into {len(state.chunks)} chunks"
            ]
        else:
            state.logs = (state.logs or []) + ["No raw_document found for chunking"]

        return state

Enter fullscreen mode Exit fullscreen mode

To ingest lengthy text, it must first be divided into smaller chunks for efficient processing. The chunk_doc_node function takes the raw_document and splits it into manageable segments for downstream tasks.

In LangGraph, the RecursiveCharacterTextSplitter facilitates this by breaking text into semantically coherent chunks, ideal for NLP applications like embeddings generation or RAG. It employs a hierarchical approach, using customizable separators (e.g., paragraphs, sentences, words) to prioritize natural boundaries and preserve context.

Here, we set chunk_size=500 and chunk_overlap=50, striking a balance between coherence and minimal redundancy. Once the state returns the chunks, the next step is generating embeddings.

def embed_node(self, state: KnowledgeState):
        # Use embedding model if available, otherwise fallback to placeholder
        if state.chunks:
            if self.embedding_model:
                try:
                    logger.info(f"🔤 Generating embeddings for {len(state.chunks)} chunks")
                    state.embeddings = self.embedding_model.embed_documents(state.chunks)
                    logger.info(f"✅ Generated {len(state.embeddings)} embeddings")
                except Exception as e:
                    logger.error(f"❌ Embedding generation failed: {e}")
                    # Fallback to placeholder embeddings for continued processing
                    state.embeddings = [[0.1, 0.2]] * len(state.chunks)
                    state.logs = (state.logs or []) + [f"Embedding error: {str(e)}"]
            else:
                logger.warning("⚠️ No embedding model provided, using placeholder embeddings")
                state.embeddings = [[0.1, 0.2]] * len(state.chunks)
        return state

Enter fullscreen mode Exit fullscreen mode

This node transforms text chunks into high-dimensional vector embeddings that encode semantic meaning, enabling effective similarity searches. The embedding model handles this conversion, and LangChain’s AzureOpenAIEmbeddings class seamlessly integrates the model for robust embedding generation.

# Initialize Azure embedding model for document vectorization
embedding_model = AzureOpenAIEmbeddings(
                azure_deployment="text-embedding-3-small",  # Your embedding deployment name
                openai_api_version="2024-12-01-preview",    # Azure OpenAI API version
                azure_endpoint=azure_endpoint,              # Azure service endpoint
                openai_api_key=api_key                      # API key for authentication
            )
Enter fullscreen mode Exit fullscreen mode

At the conclusion of the pipeline, the store_node function saves the embedded chunks into a vector database, utilizing Chroma, a widely adopted solution for vector storage.

The metadata, which includes multiple categories, is designed to enhance the querying of these embedded chunks during the query path, enabling more precise and context-aware retrieval.

def store_node(self, state: KnowledgeState):

        if state.embeddings and state.chunks:
            try:
                # Initialize vectorstore if not provided
                if not self.vectorstore:
                    self.vectorstore = Chroma(
                        collection_name="knowledge_base",
                        embedding_function=self.embedding_model,
                        persist_directory=self.chromadb_dir
                    )

                # Prepare metadata for each chunk             
                metadatas = [
                    {
                        "source": state.source or "unknown",
                        "categories": ", ".join(state.categories) if state.categories else "general",
                        "chunk_id": i
                    }
                    for i in range(len(state.chunks))
                ]

                # Store chunks with metadata in ChromaDB
                self.vectorstore.add_texts(
                    texts=state.chunks,
                    metadatas=metadatas
                )

                # Data is automatically persisted when using persist_directory

                state.status = "stored"
                state.logs = (state.logs or []) + [
                    f"Stored {len(state.chunks)} chunks in ChromaDB with categories {state.categories or ['general']}"
                ]
            except Exception as e:
                state.status = "error"
                state.logs = (state.logs or []) + [f"Storing failed: {e}"]
        else:
            state.logs = (state.logs or []) + ["No embeddings/chunks to store"]

        return state

Enter fullscreen mode Exit fullscreen mode

More Exciting Features Await

Rather than overwhelming you with every detail now, I’ll explore the query path, long-term memory, and Human-in-the-Loop capabilities in upcoming articles. I’ll also showcase how to integrate a FastAPI suite with a sleek user interface to seamlessly connect all components. Stay tuned for what’s next!

(Source code will be provided when concluding this series)

Top comments (0)