Overview
In my previous articles, I developed two LangGraph applications utilizing distinct personas for generating blog articles and images, with web search capabilities as tools. To complete this learning journey with LangGraph, I will now build a new application incorporating the following features:
- Text embedding using a vector database for Retrieval-Augmented Generation (RAG).
- Long-term memory using Sqlite to store message history.
- Human-in-the-Loop (HITL) functionality for user verification of generated results.
This new project, my “Smart 2nd Brain”, is a personal knowledge management system designed to make my life more organized and spark creativity. RAG, long-term memory and HITL aren’t groundbreaking, but combining them felt like building a custom toolbox for my brain. The vector DB made retrieving my scattered notes a breeze, Sqlite gave the system a memory that rivals my own, and HITL kept me in control, letting me tweak outputs to feel just right.
This “Smart 2nd Brain” was a rewarding step in my LangGraph journey, blending AI smarts with personal flair. It’s more than a tool — it’s like an extension of my mind, helping me organize thoughts, rediscover ideas, and fuel curiosity.
The Knowledge State Object
Let's tale a look of the master state object:
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class KnowledgeState(BaseModel):
messages: List[Dict[str, str]] = Field(
default_factory=list,
description="Conversation-style messages exchanged between user, AI, and system."
)
user_input: Optional[str] = Field(
None, description="Raw input from user (query or new document)."
)
query_type: Optional[str] = Field(
None, description="Either 'ingest' for new document or 'query' for retrieval."
)
raw_document: Optional[str] = Field(
None, description="Full raw text of the ingested document."
)
chunks: Optional[List[str]] = Field(
None, description="Text chunks after splitting for embedding."
)
embeddings: Optional[List[List[float]]] = Field(
None, description="Vector embeddings for the chunks."
)
source: Optional[str] = Field(
None, description="Source of the document (e.g., filename, URL, transcript ID)."
)
categories: Optional[List[str]] = Field(
None, description="One or more categories/tags for the document."
)
metadata: Optional[Dict[str, Any]] = Field(
default_factory=dict,
description="Additional metadata (author, date, page, etc.)."
)
retrieved_docs: Optional[List[Dict[str, Any]]] = Field(
None, description="Relevant documents retrieved from vector DB."
)
retrieved_chunks: Optional[List[str]] = Field(
None, description="Raw chunks retrieved from the vector DB."
)
generated_answer: Optional[str] = Field(
None, description="AI-generated draft answer or summary."
)
final_answer: Optional[str] = Field(
None, description="Final, human-approved answer or summary."
)
human_feedback: Optional[str] = Field(
None, description="Feedback on AI output: 'approved' | 'rejected' | 'edited'."
)
edits: Optional[str] = Field(
None, description="Manual corrections provided by the human."
)
edited_answer: Optional[str] = Field(
None, description="The edited version of the answer provided by human feedback."
)
knowledge_type: Optional[str] = Field(
None, description="Type of knowledge: 'conversational' | 'reusable' | 'verified'."
)
conversation_history: Optional[List[Dict[str, str]]] = Field(
None, description="Running log of user/AI/system messages for context."
)
user_preferences: Optional[Dict[str, Any]] = Field(
None, description="User customization, e.g. summary style, tone, etc."
)
status: Optional[str] = Field(
None, description="Pipeline status: 'pending', 'processing', 'done', 'error'."
)
logs: Optional[List[str]] = Field(
default_factory=list,
description="Debug logs collected during pipeline execution."
)
The KnowledgeState class includes all the details that are being passed in the course of the graph execution:
- messages: Conversation-style messages for chat-like interactions
- user_input: Raw user input (query or document content)
- query_type: Workflow type identifier ('ingest' or 'query')
- raw_document: Full document text for ingestion workflows
- chunks: Text chunks after document splitting
- embeddings: Vector representations of text chunks
- source: Document source identifier
- categories: Document classification tags
- metadata: Additional structured metadata
- retrieved_docs: Retrieved documents from vector database
- retrieved_chunks: Raw text chunks from retrieval
- generated_answer: AI-generated response draft
- final_answer: Human-approved final response
- human_feedback: Human feedback on AI outputs
- edits: Manual corrections and edits
- conversation_history: Complete conversation context
- user_preferences: User customization settings
- status: Current workflow status
- logs: Debug and execution logs
Please note that in this KnowledgeState object some properties are cumulative (e.g. conversation_history) while some are reflecting just the latest values (e.g. generated_answer, final_answer); I will go into the details in later sections. Anyway, this state object facilitates the capability of the Smart 2nd Brain system, enables all required information can be passed throughout the graph.
The Tales of Two Paths
The application graph supports two different paths: "ingest" and "query".
The Smart 2nd Brain supports PDF document ingestion into a vector database for text embedding, this is the so-called "ingest" path. After the documents are stored into the vector database they are available for enriching the LLM to answer questions from users. We call this path as the "query" path.
In the MasterGraphBuilder class, this graph is created by the build method:
def build(self) -> CompiledStateGraph:
"""
Build and compile the complete LangGraph workflow.
This method constructs the workflow graph by defining nodes,
setting up conditional routing, and establishing the flow
between different processing stages.
Returns:
CompiledStateGraph: Compiled and checkpointed workflow graph
Graph Structure:
Entry Point: router
Conditional Routing: Based on query_type
Ingestion Branch: router -> chunk -> embed -> store -> END
Query Branch: router -> retriever -> answer -> review -> validated_store -> END
Features:
- Conditional edges for workflow routing
- Checkpointing for state persistence
- Error handling and logging throughout
- Human-in-the-loop integration
"""
# Create the base state graph
graph = StateGraph(KnowledgeState)
# =============================================================================
# NODE DEFINITIONS
# =============================================================================
# Add all workflow nodes
graph.add_node("router", self.input_router) # Entry point and routing
graph.add_node("chunk", self.chunk_doc_node) # Document chunking
graph.add_node("embed", self.embed_node) # Embedding generation
graph.add_node("store", self.store_node) # Document storage
graph.add_node("retriever", self.retriever_node) # Document retrieval
graph.add_node("answer", self.answer_gen_node) # Answer generation
graph.add_node("review", self.human_review_node) # Human review
graph.add_node("validated_store", self.validated_store_node) # Validated storage
# Set the entry point
graph.set_entry_point("router")
# =============================================================================
# CONDITIONAL ROUTING
# =============================================================================
# Define routing logic based on query_type
def route_condition(state):
if hasattr(state, 'query_type'):
if state.query_type == "ingest":
return "ingest"
elif state.query_type == "query":
return "query"
else:
return "__end__"
return "query"
# Add conditional edges from router
graph.add_conditional_edges("router", route_condition, {
"ingest": "chunk", # Route to document ingestion
"query": "retriever", # Route to knowledge query
"__end__": END, # End workflow for invalid types
})
# =============================================================================
# INGESTION BRANCH
# =============================================================================
# Document processing pipeline
graph.add_edge("chunk", "embed") # Chunk -> Embed
graph.add_edge("embed", "store") # Embed -> Store
graph.add_edge("store", END) # Store -> End
# =============================================================================
# QUERY BRANCH
# =============================================================================
# Knowledge query pipeline
graph.add_edge("retriever", "answer") # Retrieve -> Generate
graph.add_edge("answer", "review") # Generate -> Review
graph.add_edge("review", "validated_store") # Review -> Store
graph.add_edge("validated_store", END) # Store -> End
# Compile the graph with checkpointing
return graph.compile(checkpointer=self.checkpointer)
Feeding the Brain Chunk by Chunk
Let's check the ingest branch first. If the graph is invoked for ingest, the first encountered is the chunk node.
def chunk_doc_node(self, state: KnowledgeState):
if state.raw_document:
# Use RecursiveCharacterTextSplitter for intelligent text segmentation
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # Maximum characters per chunk
chunk_overlap=50 # Overlap between consecutive chunks for context
)
state.chunks = splitter.split_text(state.raw_document)
state.logs = (state.logs or []) + [
f"Chunked document into {len(state.chunks)} chunks"
]
else:
state.logs = (state.logs or []) + ["No raw_document found for chunking"]
return state
To ingest lengthy text, it must first be divided into smaller chunks for efficient processing. The chunk_doc_node function takes the raw_document and splits it into manageable segments for downstream tasks.
In LangGraph, the RecursiveCharacterTextSplitter facilitates this by breaking text into semantically coherent chunks, ideal for NLP applications like embeddings generation or RAG. It employs a hierarchical approach, using customizable separators (e.g., paragraphs, sentences, words) to prioritize natural boundaries and preserve context.
Here, we set chunk_size=500 and chunk_overlap=50, striking a balance between coherence and minimal redundancy. Once the state returns the chunks, the next step is generating embeddings.
def embed_node(self, state: KnowledgeState):
# Use embedding model if available, otherwise fallback to placeholder
if state.chunks:
if self.embedding_model:
try:
logger.info(f"🔤 Generating embeddings for {len(state.chunks)} chunks")
state.embeddings = self.embedding_model.embed_documents(state.chunks)
logger.info(f"✅ Generated {len(state.embeddings)} embeddings")
except Exception as e:
logger.error(f"❌ Embedding generation failed: {e}")
# Fallback to placeholder embeddings for continued processing
state.embeddings = [[0.1, 0.2]] * len(state.chunks)
state.logs = (state.logs or []) + [f"Embedding error: {str(e)}"]
else:
logger.warning("⚠️ No embedding model provided, using placeholder embeddings")
state.embeddings = [[0.1, 0.2]] * len(state.chunks)
return state
This node transforms text chunks into high-dimensional vector embeddings that encode semantic meaning, enabling effective similarity searches. The embedding model handles this conversion, and LangChain’s AzureOpenAIEmbeddings class seamlessly integrates the model for robust embedding generation.
# Initialize Azure embedding model for document vectorization
embedding_model = AzureOpenAIEmbeddings(
azure_deployment="text-embedding-3-small", # Your embedding deployment name
openai_api_version="2024-12-01-preview", # Azure OpenAI API version
azure_endpoint=azure_endpoint, # Azure service endpoint
openai_api_key=api_key # API key for authentication
)
At the conclusion of the pipeline, the store_node function saves the embedded chunks into a vector database, utilizing Chroma, a widely adopted solution for vector storage.
The metadata, which includes multiple categories, is designed to enhance the querying of these embedded chunks during the query path, enabling more precise and context-aware retrieval.
def store_node(self, state: KnowledgeState):
if state.embeddings and state.chunks:
try:
# Initialize vectorstore if not provided
if not self.vectorstore:
self.vectorstore = Chroma(
collection_name="knowledge_base",
embedding_function=self.embedding_model,
persist_directory=self.chromadb_dir
)
# Prepare metadata for each chunk
metadatas = [
{
"source": state.source or "unknown",
"categories": ", ".join(state.categories) if state.categories else "general",
"chunk_id": i
}
for i in range(len(state.chunks))
]
# Store chunks with metadata in ChromaDB
self.vectorstore.add_texts(
texts=state.chunks,
metadatas=metadatas
)
# Data is automatically persisted when using persist_directory
state.status = "stored"
state.logs = (state.logs or []) + [
f"Stored {len(state.chunks)} chunks in ChromaDB with categories {state.categories or ['general']}"
]
except Exception as e:
state.status = "error"
state.logs = (state.logs or []) + [f"Storing failed: {e}"]
else:
state.logs = (state.logs or []) + ["No embeddings/chunks to store"]
return state
More Exciting Features Await
Rather than overwhelming you with every detail now, I’ll explore the query path, long-term memory, and Human-in-the-Loop capabilities in upcoming articles. I’ll also showcase how to integrate a FastAPI suite with a sleek user interface to seamlessly connect all components. Stay tuned for what’s next!
(Source code will be provided when concluding this series)
Top comments (0)