In parts 1 and 2, we walked through the core LangChain graph behind our “Smart 2nd Brain” application. We covered document ingestion and retrieval, and showed how to implement a checkpointer, a vector database, and Human-In-The-Loop (HITL) controls to make the system more robust and trustworthy.
As a personal knowledge management tool, it’s not complete without an elegant, intuitive user interface. And to support easier testing, automation, and batch operations, it’s good practice to expose the graph behind a well-defined API layer. In this article, we’ll cover both: building a clean UI and designing a simple, reliable API to shield and orchestrate access to the graph.
FastAPI comes into actions
For the API implementation, I was thinking should I use Flask instead of FastAPI. At the end FastAPI’s preferred because it handles concurrent I/O (LLM calls, vector DB queries, streaming) efficiently, integrates cleanly with Pydantic for strict schemas around our LangChain graph inputs/output.
Inside the project, I have got a dedicated /api tier, which the start-up script is as follows:
if __name__ == "__main__":
try:
import uvicorn
# Start the development server with configuration from settings
uvicorn.run(
"api.main:app", # Application import string
host=settings.host, # Host binding (default: 0.0.0.0)
port=settings.port, # Port number (default: 8000)
reload=settings.debug, # Auto-reload on code changes in debug mode
log_level=settings.log_level.lower(), # Logging level from settings
)
except ImportError:
# Provide helpful error message if uvicorn is not available
print("uvicorn not available, run with: uvicorn api.main:app --reload")
We use Uvicorn which is a modern ASGI server for Python. When the server is started the FastAPI executes the lifespan method, it is an async context manager that runs once when the app starts and once when it stops. In the start-up process, FastAPI lazy-imports heavy modules: creates AzureOpenAIEmbeddings, AzureChatOpenAI, and Chroma if settings are available. Also it builds the MasterGraphBuilder and compiles the graph.
from fastapi import FastAPI
from contextlib import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
from api.core.config import settings
# Initialize graph builder and compiled graph at app startup
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
# Lazy import to avoid heavy imports at module load
from agentic.workflows.master_graph_builder import MasterGraphBuilder
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
from langchain_chroma import Chroma
# Initialize models from settings
if settings.openai_api_key and settings.azure_openai_endpoint_url:
embedding_model = AzureOpenAIEmbeddings(
azure_deployment="text-embedding-3-small",
openai_api_version="2024-12-01-preview",
azure_endpoint=settings.azure_openai_endpoint_url,
openai_api_key=settings.openai_api_key,
)
llm = AzureChatOpenAI(
azure_deployment="gpt-4o",
openai_api_version="2024-12-01-preview",
azure_endpoint=settings.azure_openai_endpoint_url,
openai_api_key=settings.openai_api_key,
temperature=0.1,
)
vectorstore = Chroma(
collection_name="smart_second_brain",
embedding_function=embedding_model,
persist_directory="./chroma_db",
)
graph_builder = MasterGraphBuilder(
llm=llm,
embedding_model=embedding_model,
vectorstore=vectorstore,
chromadb_dir="./chroma_db",
)
compiled_graph = graph_builder.build()
app.state.graph_builder = graph_builder
app.state.compiled_graph = compiled_graph
else:
# Defer initialization to first request if credentials absent
app.state.graph_builder = None
app.state.compiled_graph = None
except Exception:
# Ensure attributes exist even if init fails
app.state.graph_builder = None
app.state.compiled_graph = None
yield
# =============================================================================
# FASTAPI APPLICATION CONFIGURATION
# =============================================================================
# Create the main FastAPI application instance with metadata and configuration
app = FastAPI(
title="Smart Second Brain API",
description="An intelligent knowledge management system powered by LangGraph and AI",
version="0.1.0",
# Only show API docs in debug mode for security
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
lifespan=lifespan,
)
# =============================================================================
# MIDDLEWARE CONFIGURATION
# =============================================================================
# Add CORS (Cross-Origin Resource Sharing) middleware to allow frontend communication
# This enables the Streamlit frontend to make API calls from different origins
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_hosts, # Configured origins from settings
allow_credentials=True, # Allow cookies and authentication headers
allow_methods=["*"], # Allow all HTTP methods (GET, POST, PUT, DELETE)
allow_headers=["*"], # Allow all request headers
)
Here I include the basic setup for /test and /health endpoints as well as part of generic configuration. For the URL specific to the graph operations, they are grouped under the graph_api.py:
# =============================================================================
# API ENDPOINTS
# =============================================================================
@router.post("/ingest", response_model=WorkflowResponse)
async def ingest_document(
request: IngestRequest,
graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
http_request: Request = None
):
start_time = datetime.utcnow()
try:
# Generate unique thread ID for this ingestion workflow
thread_id = f"ingest_{int(start_time.timestamp())}"
# Create knowledge state for the ingestion workflow
state = KnowledgeState(
query_type="ingest", # Workflow type identifier
raw_document=request.document, # Original document content
source=request.source or "api_ingest", # Document source or default
categories=request.categories or [], # Document categories
metadata=request.metadata or {} # Additional metadata
)
# Ensure compiled graph is available
compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
if compiled_graph is None:
compiled_graph = graph_builder.build()
http_request.app.state.compiled_graph = compiled_graph
# Execute the ingestion workflow with thread ID for tracking
logger.info(f"🔄 Starting document ingestion workflow: {thread_id}")
result = compiled_graph.invoke(
state,
config={"configurable": {"thread_id": thread_id}}
)
# Calculate execution time for performance monitoring
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"✅ Ingestion completed in {execution_time:.2f}s")
return WorkflowResponse(
success=True,
thread_id=thread_id,
result=result,
execution_time=execution_time,
timestamp=datetime.utcnow()
)
except Exception as e:
logger.error(f"❌ Ingestion failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Document ingestion failed: {str(e)}"
)
@router.post("/query", response_model=WorkflowResponse)
async def query_knowledge_base(
request: QueryRequest,
graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
http_request: Request = None
):
start_time = datetime.utcnow()
try:
# Use provided thread ID or generate new one for conversation continuity
thread_id = request.thread_id or f"query_{int(start_time.timestamp())}"
# Get conversation history from custom memory manager
conversation_history = conversation_memory.get_conversation_history(thread_id)
# Create knowledge state for the query workflow
state = KnowledgeState(
query_type="query", # Workflow type identifier
user_input=request.query, # User's question
categories=[], # No categories for queries
messages=conversation_history, # Use custom conversation memory
knowledge_type=request.knowledge_type,
require_human_review=request.require_human_review
)
# Ensure compiled graph is available
compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
if compiled_graph is None:
compiled_graph = graph_builder.build()
http_request.app.state.compiled_graph = compiled_graph
# Execute the query workflow with thread ID for context
logger.info(f"🔍 Starting query workflow: {thread_id}")
result = compiled_graph.invoke(
state,
config={"configurable": {"thread_id": thread_id}}
)
# Save conversation to custom memory manager
conversation_memory.add_message(thread_id, "user", request.query)
if result.get("final_answer"):
conversation_memory.add_message(thread_id, "assistant", result["final_answer"])
# Calculate execution time for performance monitoring
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"✅ Query completed in {execution_time:.2f}s")
return WorkflowResponse(
success=True,
thread_id=thread_id,
result=result,
execution_time=execution_time,
timestamp=datetime.utcnow()
)
except Exception as e:
logger.error(f"❌ Query failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Query execution failed: {str(e)}"
)
@router.post("/ingest-pdfs", response_model=MultiIngestResponse)
async def ingest_multiple_pdfs(
files: List[UploadFile] = File(...),
source: str = Form(...),
categories: Optional[str] = Form(None),
author: Optional[str] = Form(None),
metadata: Optional[str] = Form(None),
graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
http_request: Request = None
):
start_time = datetime.utcnow()
# Validate input
if not files:
raise HTTPException(
status_code=400,
detail="No files provided. Please upload at least one PDF file."
)
# Parse categories and metadata
category_list = []
if categories:
category_list = [cat.strip() for cat in categories.split(",") if cat.strip()]
metadata_dict = {}
if metadata:
try:
metadata_dict = json.loads(metadata)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON metadata provided: {metadata}")
metadata_dict = {}
# Add common metadata
metadata_dict.update({
"author": author or "Unknown",
"source": source,
"categories": category_list,
"ingestion_date": datetime.utcnow().isoformat(),
"batch_ingest": True
})
results = []
processed_count = 0
failed_count = 0
# Ensure compiled graph is available
compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
if compiled_graph is None:
compiled_graph = graph_builder.build()
http_request.app.state.compiled_graph = compiled_graph
logger.info(f"🔄 Starting batch PDF ingestion: {len(files)} files")
# Process each PDF file
for file in files:
file_start_time = datetime.utcnow()
try:
# Validate file type
if not file.filename or not file.filename.lower().endswith('.pdf'):
results.append(PDFIngestResult(
filename=file.filename or "unknown",
success=False,
error="Invalid file type. Only PDF files are supported.",
processing_time=0.0
))
failed_count += 1
continue
# Extract text from PDF
pdf_content = await extract_pdf_text(file)
# Create knowledge state for ingestion
state = KnowledgeState(
query_type="ingest",
raw_document=pdf_content,
source=f"{source}_{file.filename}",
categories=category_list,
metadata={**metadata_dict, "original_filename": file.filename}
)
# Generate unique thread ID for this PDF
thread_id = f"pdf_ingest_{int(file_start_time.timestamp())}_{file.filename}"
# Execute the ingestion workflow
logger.info(f"📄 Processing PDF: {file.filename}")
result = compiled_graph.invoke(
state,
config={"configurable": {"thread_id": thread_id}}
)
# Calculate processing time for this file
file_processing_time = (datetime.utcnow() - file_start_time).total_seconds()
# Count chunks created
chunks_created = len(result.get("chunks", []))
results.append(PDFIngestResult(
filename=file.filename,
success=True,
chunks_created=chunks_created,
thread_id=thread_id,
processing_time=file_processing_time
))
processed_count += 1
logger.info(f"✅ PDF processed successfully: {file.filename} ({chunks_created} chunks)")
except Exception as e:
file_processing_time = (datetime.utcnow() - file_start_time).total_seconds()
error_msg = str(e)
logger.error(f"❌ Failed to process PDF {file.filename}: {error_msg}")
results.append(PDFIngestResult(
filename=file.filename or "unknown",
success=False,
error=error_msg,
processing_time=file_processing_time
))
failed_count += 1
# Calculate total execution time
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"🏁 Batch PDF ingestion completed: {processed_count} successful, {failed_count} failed in {execution_time:.2f}s")
return MultiIngestResponse(
success=processed_count > 0,
total_files=len(files),
processed_files=processed_count,
failed_files=failed_count,
results=results,
execution_time=execution_time,
timestamp=datetime.utcnow()
)
@router.post("/feedback", response_model=FeedbackResponse)
async def submit_feedback(
request: FeedbackRequest,
graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
http_request: Request = None
):
start_time = datetime.utcnow()
try:
# Validate feedback type
valid_feedback_types = ["approved", "rejected", "edited"]
if request.feedback not in valid_feedback_types:
raise HTTPException(
status_code=400,
detail=f"Invalid feedback type. Must be one of: {valid_feedback_types}"
)
# Validate edits are provided when feedback is "edited"
if request.feedback == "edited" and not request.edits:
raise HTTPException(
status_code=400,
detail="Edits must be provided when feedback type is 'edited'"
)
# Validate knowledge type if provided
if request.knowledge_type:
valid_knowledge_types = ["conversational", "reusable", "verified"]
if request.knowledge_type not in valid_knowledge_types:
raise HTTPException(
status_code=400,
detail=f"Invalid knowledge type. Must be one of: {valid_knowledge_types}"
)
logger.info(f"📝 Processing feedback for thread {request.thread_id}: {request.feedback}")
# Get the compiled graph
compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
if compiled_graph is None:
compiled_graph = graph_builder.build()
http_request.app.state.compiled_graph = compiled_graph
# Create a new state with the feedback
feedback_state = KnowledgeState(
query_type="query", # We're processing feedback on a query
human_feedback=request.feedback,
edits=request.edits,
metadata={
"feedback_comment": request.comment,
"feedback_timestamp": start_time.isoformat(),
"feedback_source": "api"
}
)
# Process feedback through the workflow using LangGraph checkpointing
try:
# Get the current state from LangGraph checkpointing
current_state = compiled_graph.get_state({"configurable": {"thread_id": request.thread_id}})
if not current_state or not current_state.values:
raise HTTPException(
status_code=404,
detail=f"No conversation found for thread_id: {request.thread_id}"
)
# Create a new state with the existing conversation data plus feedback
existing_state = current_state.values
logger.info(f"🔍 Creating feedback state with knowledge_type: {request.knowledge_type}")
feedback_state = KnowledgeState(
query_type=existing_state.get("query_type", "query"),
user_input=existing_state.get("user_input"),
messages=existing_state.get("messages", []),
generated_answer=existing_state.get("generated_answer"),
retrieved_docs=existing_state.get("retrieved_docs"),
retrieved_chunks=existing_state.get("retrieved_chunks"),
metadata=existing_state.get("metadata", {}),
human_feedback=request.feedback,
edits=request.edits,
knowledge_type=request.knowledge_type,
status=existing_state.get("status"),
logs=existing_state.get("logs", [])
)
logger.info(f"🔍 Feedback state created with knowledge_type: {feedback_state.knowledge_type}")
# Add feedback metadata
if request.comment:
feedback_state.metadata = feedback_state.metadata or {}
feedback_state.metadata["feedback_comment"] = request.comment
feedback_state.metadata["feedback_timestamp"] = start_time.isoformat()
feedback_state.metadata["feedback_source"] = "api"
# If feedback is "edited", update the edited_answer with the edits
if request.feedback == "edited" and request.edits:
feedback_state.edited_answer = request.edits
logger.info(f"🔍 Updated edited_answer with edits: {request.edits[:100]}...")
# Resume the graph from the interrupt by invoking with the updated state
logger.info(f"🔄 Resuming workflow after human feedback")
result = compiled_graph.invoke(
feedback_state,
config={"configurable": {"thread_id": request.thread_id}}
)
# Determine action taken based on feedback and knowledge type
# Derive action from result
if result.get("human_feedback") == "approved":
if result.get("knowledge_type") in ("reusable", "verified"):
action_taken = f"Answer approved and stored as {result.get('knowledge_type')} knowledge in vector database"
else:
action_taken = "Answer approved and stored in conversation history"
elif result.get("human_feedback") == "rejected":
action_taken = "Answer rejected, not stored in knowledge base"
elif result.get("human_feedback") == "edited":
if result.get("knowledge_type") in ("reusable", "verified"):
action_taken = f"Answer edited and stored as {result.get('knowledge_type')} knowledge in vector database"
else:
action_taken = "Answer edited and stored in conversation history"
else:
action_taken = "Feedback processed"
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(f"✅ Feedback processed successfully in {execution_time:.2f}s")
return FeedbackResponse(
success=True,
thread_id=request.thread_id,
feedback=result.get("human_feedback", request.feedback),
action_taken=action_taken,
timestamp=datetime.utcnow(),
message="Feedback processed successfully"
)
except Exception as e:
logger.error(f"❌ Error processing feedback: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to process feedback: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Unexpected error in feedback submission: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.get("/feedback/{thread_id}", response_model=FeedbackStatusResponse)
async def get_feedback_status(
thread_id: str,
graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
http_request: Request = None
):
try:
logger.info(f"🔍 Checking feedback status for thread: {thread_id}")
# Get the compiled graph
compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
if compiled_graph is None:
compiled_graph = graph_builder.build()
http_request.app.state.compiled_graph = compiled_graph
# Try to get the current state from LangGraph checkpointing
try:
# Get the current state from the compiled graph's checkpointing system
current_state = compiled_graph.get_state({"configurable": {"thread_id": thread_id}})
if not current_state or not current_state.values:
raise HTTPException(
status_code=404,
detail=f"No conversation found for thread_id: {thread_id}"
)
# Extract the KnowledgeState from the checkpoint
state_values = current_state.values
# Check if there's a pending answer awaiting feedback
has_pending_feedback = (
state_values.get("generated_answer") is not None and
state_values.get("human_feedback") is None
)
# Get current answer if pending
current_answer = state_values.get("generated_answer") if has_pending_feedback else None
# Build feedback history from conversation metadata
feedback_history = []
metadata = state_values.get("metadata", {})
if metadata and "feedback_history" in metadata:
feedback_history = metadata["feedback_history"]
# Add current feedback if it exists
if state_values.get("human_feedback"):
current_feedback = {
"feedback": state_values.get("human_feedback"),
"timestamp": datetime.utcnow().isoformat(),
"comment": metadata.get("feedback_comment") if metadata else None
}
feedback_history.append(current_feedback)
# Determine current status
if has_pending_feedback:
status = "awaiting_feedback"
elif state_values.get("human_feedback") == "approved":
status = "approved"
elif state_values.get("human_feedback") == "rejected":
status = "rejected"
elif state_values.get("human_feedback") == "edited":
status = "edited"
else:
status = "unknown"
except Exception as e:
logger.warning(f"Could not retrieve state from LangGraph checkpointing: {e}")
# Fallback: check if there's any conversation history in Redis
conversation_history = conversation_memory.get_conversation_history(thread_id)
if not conversation_history:
raise HTTPException(
status_code=404,
detail=f"No conversation found for thread_id: {thread_id}"
)
# If we have conversation history but no checkpoint state, assume no pending feedback
has_pending_feedback = False
current_answer = None
feedback_history = []
status = "conversation_exists"
logger.info(f"✅ Feedback status retrieved: {status}")
return FeedbackStatusResponse(
thread_id=thread_id,
has_pending_feedback=has_pending_feedback,
current_answer=current_answer,
feedback_history=feedback_history,
status=status
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error retrieving feedback status: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve feedback status: {str(e)}"
)
The /ingest and /query endpoints execute document ingestion and query workflows, respectively. The /feedback endpoints (POST and GET by thread ID) handle human feedback and allow the system to resume after interruptions using that feedback. Moreover there are two ingestion endpoints — /ingest and /ingest-pdfs, to make bulk PDF ingestion straightforward.
Finally, each API endpoint will access the get_graph_builder() method for creating the graph object, which I have showed in the previous article:
def get_graph_builder(request: Request) -> MasterGraphBuilder:
# Use app.state; lazily initialize if missing
graph_builder = getattr(request.app.state, "graph_builder", None)
compiled_graph = getattr(request.app.state, "compiled_graph", None)
if graph_builder is None:
# Extract configuration from environment settings
api_key = settings.openai_api_key
azure_endpoint = settings.azure_openai_endpoint_url
# Validate required configuration
if not api_key:
raise HTTPException(
status_code=500,
detail="OpenAI API key not configured"
)
try:
# =============================================================================
# MODEL INITIALIZATION
# =============================================================================
# Import Azure OpenAI models for embeddings and language generation
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
# Initialize Azure embedding model for document vectorization
embedding_model = AzureOpenAIEmbeddings(
azure_deployment="text-embedding-3-small", # Your embedding deployment name
openai_api_version="2024-12-01-preview", # Azure OpenAI API version
azure_endpoint=azure_endpoint, # Azure service endpoint
openai_api_key=api_key # API key for authentication
)
# Initialize Azure language model for text generation and reasoning
llm = AzureChatOpenAI(
azure_deployment="gpt-4o", # Your LLM deployment name
openai_api_version="2024-12-01-preview", # Azure OpenAI API version
azure_endpoint=azure_endpoint, # Azure service endpoint
openai_api_key=api_key, # API key for authentication
temperature=0.1 # Low temperature for consistent outputs
)
# =============================================================================
# VECTOR STORE INITIALIZATION
# =============================================================================
# Initialize ChromaDB vector store for document storage and retrieval
from langchain_chroma import Chroma
vectorstore = Chroma(
collection_name="smart_second_brain", # Collection name for documents
embedding_function=embedding_model, # Function to create embeddings
persist_directory="./chroma_db" # Local storage directory
)
# =============================================================================
# GRAPH BUILDER INITIALIZATION
# =============================================================================
# Create and configure the main workflow orchestrator
graph_builder = MasterGraphBuilder(
llm=llm, # Language model for reasoning
embedding_model=embedding_model, # Embedding model for vectorization
vectorstore=vectorstore, # Vector database for storage
chromadb_dir="./chroma_db" # ChromaDB storage directory
)
# Compile the workflow for execution
compiled_graph = graph_builder.build()
request.app.state.graph_builder = graph_builder
request.app.state.compiled_graph = compiled_graph
logger.info("✅ Graph builder initialized with all models")
except Exception as e:
logger.error(f"❌ Failed to initialize graph builder: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to initialize graph: {str(e)}"
)
return graph_builder
The API now is ready, next is the UI.
Let's rock with Streamlit
I initially considered using Vue for the frontend, but as a backend developer, Vue or React felt a bit overwhelming. In the end, I still prefer Streamlit.
Since we now have a dedicated API, the Streamlit frontend will interact with the graph exclusively through that API.
I won’t dive into the Streamlit code here—feel free to check it out in the repository.
As mentioned earlier, we support both batch PDF ingestion and generic text ingestion, each handled in its own tab.
Now let’s see a real chat in action with continuous knowledge updates.
As someone who grew up on a steady diet of Japanese animation, Gundam is one of my all-time favorites—so I’ll chat about it with my 2nd Brain!
And since there is no information provided about Gundam before in the vector DB, 2nd brain replies "I don't know". (Notice that a new conversation ID is created) I can provide further information about what Gundam is:
I copied the quick description about Gundam from web and confirm, 2nd brain will save it back to the vector DB. When I ask again the same question 2nd brain now can answer:
And if I start a new conversation with a new thread id, since the information has been stored, it can be answered right-away:
But in honest speaking, building a user-friendly chat UI which coherent and consistent flow of information, that facilitates continuous user-provided information update, is indeed a challenging task.
Ready to level up - what's our play?
The 2nd Brain is shaping up beautifully—so how do we make it even smarter and more powerful? Let’s sprinkle in a bit of automation to dial up the magic.
Stay tuned for the final chapter!
(The project source code will be provided when the entire series is concluded.)
Top comments (0)