DEV Community

Jimmy
Jimmy

Posted on

Building My Smart 2nd Brain, Part 3: API and UI Explained

In parts 1 and 2, we walked through the core LangChain graph behind our “Smart 2nd Brain” application. We covered document ingestion and retrieval, and showed how to implement a checkpointer, a vector database, and Human-In-The-Loop (HITL) controls to make the system more robust and trustworthy.

As a personal knowledge management tool, it’s not complete without an elegant, intuitive user interface. And to support easier testing, automation, and batch operations, it’s good practice to expose the graph behind a well-defined API layer. In this article, we’ll cover both: building a clean UI and designing a simple, reliable API to shield and orchestrate access to the graph.

FastAPI comes into actions

For the API implementation, I was thinking should I use Flask instead of FastAPI. At the end FastAPI’s preferred because it handles concurrent I/O (LLM calls, vector DB queries, streaming) efficiently, integrates cleanly with Pydantic for strict schemas around our LangChain graph inputs/output.

Inside the project, I have got a dedicated /api tier, which the start-up script is as follows:

if __name__ == "__main__":
    try:
        import uvicorn
        # Start the development server with configuration from settings
        uvicorn.run(
            "api.main:app",                    # Application import string
            host=settings.host,                # Host binding (default: 0.0.0.0)
            port=settings.port,                # Port number (default: 8000)
            reload=settings.debug,             # Auto-reload on code changes in debug mode
            log_level=settings.log_level.lower(),  # Logging level from settings
        )
    except ImportError:
        # Provide helpful error message if uvicorn is not available
        print("uvicorn not available, run with: uvicorn api.main:app --reload")
Enter fullscreen mode Exit fullscreen mode

We use Uvicorn which is a modern ASGI server for Python. When the server is started the FastAPI executes the lifespan method, it is an async context manager that runs once when the app starts and once when it stops. In the start-up process, FastAPI lazy-imports heavy modules: creates AzureOpenAIEmbeddings, AzureChatOpenAI, and Chroma if settings are available. Also it builds the MasterGraphBuilder and compiles the graph.

from fastapi import FastAPI
from contextlib import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware

from api.core.config import settings

# Initialize graph builder and compiled graph at app startup
@asynccontextmanager
async def lifespan(app: FastAPI):
    try:
        # Lazy import to avoid heavy imports at module load
        from agentic.workflows.master_graph_builder import MasterGraphBuilder
        from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
        from langchain_chroma import Chroma

        # Initialize models from settings
        if settings.openai_api_key and settings.azure_openai_endpoint_url:
            embedding_model = AzureOpenAIEmbeddings(
                azure_deployment="text-embedding-3-small",
                openai_api_version="2024-12-01-preview",
                azure_endpoint=settings.azure_openai_endpoint_url,
                openai_api_key=settings.openai_api_key,
            )
            llm = AzureChatOpenAI(
                azure_deployment="gpt-4o",
                openai_api_version="2024-12-01-preview",
                azure_endpoint=settings.azure_openai_endpoint_url,
                openai_api_key=settings.openai_api_key,
                temperature=0.1,
            )

            vectorstore = Chroma(
                collection_name="smart_second_brain",
                embedding_function=embedding_model,
                persist_directory="./chroma_db",
            )

            graph_builder = MasterGraphBuilder(
                llm=llm,
                embedding_model=embedding_model,
                vectorstore=vectorstore,
                chromadb_dir="./chroma_db",
            )
            compiled_graph = graph_builder.build()

            app.state.graph_builder = graph_builder
            app.state.compiled_graph = compiled_graph
        else:
            # Defer initialization to first request if credentials absent
            app.state.graph_builder = None
            app.state.compiled_graph = None
    except Exception:
        # Ensure attributes exist even if init fails
        app.state.graph_builder = None
        app.state.compiled_graph = None
    yield

# =============================================================================
# FASTAPI APPLICATION CONFIGURATION
# =============================================================================

# Create the main FastAPI application instance with metadata and configuration
app = FastAPI(
    title="Smart Second Brain API",
    description="An intelligent knowledge management system powered by LangGraph and AI",
    version="0.1.0",
    # Only show API docs in debug mode for security
    docs_url="/docs" if settings.debug else None,
    redoc_url="/redoc" if settings.debug else None,
    lifespan=lifespan,
)

# =============================================================================
# MIDDLEWARE CONFIGURATION
# =============================================================================

# Add CORS (Cross-Origin Resource Sharing) middleware to allow frontend communication
# This enables the Streamlit frontend to make API calls from different origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.allowed_hosts,  # Configured origins from settings
    allow_credentials=True,                # Allow cookies and authentication headers
    allow_methods=["*"],                   # Allow all HTTP methods (GET, POST, PUT, DELETE)
    allow_headers=["*"],                   # Allow all request headers
)
Enter fullscreen mode Exit fullscreen mode

Here I include the basic setup for /test and /health endpoints as well as part of generic configuration. For the URL specific to the graph operations, they are grouped under the graph_api.py:

# =============================================================================
# API ENDPOINTS
# =============================================================================

@router.post("/ingest", response_model=WorkflowResponse)
async def ingest_document(
    request: IngestRequest,
    graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
    http_request: Request = None
):

    start_time = datetime.utcnow()

    try:
        # Generate unique thread ID for this ingestion workflow
        thread_id = f"ingest_{int(start_time.timestamp())}"

        # Create knowledge state for the ingestion workflow
        state = KnowledgeState(
            query_type="ingest",                           # Workflow type identifier
            raw_document=request.document,                 # Original document content
            source=request.source or "api_ingest",         # Document source or default
            categories=request.categories or [],           # Document categories
            metadata=request.metadata or {}                # Additional metadata
        )

        # Ensure compiled graph is available
        compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
        if compiled_graph is None:
            compiled_graph = graph_builder.build()
            http_request.app.state.compiled_graph = compiled_graph

        # Execute the ingestion workflow with thread ID for tracking
        logger.info(f"🔄 Starting document ingestion workflow: {thread_id}")
        result = compiled_graph.invoke(
            state,
            config={"configurable": {"thread_id": thread_id}}
        )

        # Calculate execution time for performance monitoring
        execution_time = (datetime.utcnow() - start_time).total_seconds()

        logger.info(f"✅ Ingestion completed in {execution_time:.2f}s")
        return WorkflowResponse(
            success=True,
            thread_id=thread_id,
            result=result,
            execution_time=execution_time,
            timestamp=datetime.utcnow()
        )

    except Exception as e:
        logger.error(f"❌ Ingestion failed: {e}")
        raise HTTPException(
            status_code=500,
            detail=f"Document ingestion failed: {str(e)}"
        )


@router.post("/query", response_model=WorkflowResponse)
async def query_knowledge_base(
    request: QueryRequest,
    graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
    http_request: Request = None
):

    start_time = datetime.utcnow()

    try:
        # Use provided thread ID or generate new one for conversation continuity
        thread_id = request.thread_id or f"query_{int(start_time.timestamp())}"

        # Get conversation history from custom memory manager
        conversation_history = conversation_memory.get_conversation_history(thread_id)

        # Create knowledge state for the query workflow
        state = KnowledgeState(
            query_type="query",                            # Workflow type identifier
            user_input=request.query,                      # User's question
            categories=[],                                 # No categories for queries
            messages=conversation_history,                 # Use custom conversation memory
            knowledge_type=request.knowledge_type,
            require_human_review=request.require_human_review
        )

        # Ensure compiled graph is available
        compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
        if compiled_graph is None:
            compiled_graph = graph_builder.build()
            http_request.app.state.compiled_graph = compiled_graph

        # Execute the query workflow with thread ID for context
        logger.info(f"🔍 Starting query workflow: {thread_id}")
        result = compiled_graph.invoke(
            state,
            config={"configurable": {"thread_id": thread_id}}
        )

        # Save conversation to custom memory manager
        conversation_memory.add_message(thread_id, "user", request.query)
        if result.get("final_answer"):
            conversation_memory.add_message(thread_id, "assistant", result["final_answer"])

        # Calculate execution time for performance monitoring
        execution_time = (datetime.utcnow() - start_time).total_seconds()

        logger.info(f"✅ Query completed in {execution_time:.2f}s")
        return WorkflowResponse(
            success=True,
            thread_id=thread_id,
            result=result,
            execution_time=execution_time,
            timestamp=datetime.utcnow()
        )

    except Exception as e:
        logger.error(f"❌ Query failed: {e}")
        raise HTTPException(
            status_code=500,
            detail=f"Query execution failed: {str(e)}"
        )


@router.post("/ingest-pdfs", response_model=MultiIngestResponse)
async def ingest_multiple_pdfs(
    files: List[UploadFile] = File(...),
    source: str = Form(...),
    categories: Optional[str] = Form(None),
    author: Optional[str] = Form(None),
    metadata: Optional[str] = Form(None),
    graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
    http_request: Request = None
):

    start_time = datetime.utcnow()

    # Validate input
    if not files:
        raise HTTPException(
            status_code=400,
            detail="No files provided. Please upload at least one PDF file."
        )

    # Parse categories and metadata
    category_list = []
    if categories:
        category_list = [cat.strip() for cat in categories.split(",") if cat.strip()]

    metadata_dict = {}
    if metadata:
        try:
            metadata_dict = json.loads(metadata)
        except json.JSONDecodeError:
            logger.warning(f"Invalid JSON metadata provided: {metadata}")
            metadata_dict = {}

    # Add common metadata
    metadata_dict.update({
        "author": author or "Unknown",
        "source": source,
        "categories": category_list,
        "ingestion_date": datetime.utcnow().isoformat(),
        "batch_ingest": True
    })

    results = []
    processed_count = 0
    failed_count = 0

    # Ensure compiled graph is available
    compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
    if compiled_graph is None:
        compiled_graph = graph_builder.build()
        http_request.app.state.compiled_graph = compiled_graph

    logger.info(f"🔄 Starting batch PDF ingestion: {len(files)} files")

    # Process each PDF file
    for file in files:
        file_start_time = datetime.utcnow()

        try:
            # Validate file type
            if not file.filename or not file.filename.lower().endswith('.pdf'):
                results.append(PDFIngestResult(
                    filename=file.filename or "unknown",
                    success=False,
                    error="Invalid file type. Only PDF files are supported.",
                    processing_time=0.0
                ))
                failed_count += 1
                continue

            # Extract text from PDF
            pdf_content = await extract_pdf_text(file)

            # Create knowledge state for ingestion
            state = KnowledgeState(
                query_type="ingest",
                raw_document=pdf_content,
                source=f"{source}_{file.filename}",
                categories=category_list,
                metadata={**metadata_dict, "original_filename": file.filename}
            )

            # Generate unique thread ID for this PDF
            thread_id = f"pdf_ingest_{int(file_start_time.timestamp())}_{file.filename}"

            # Execute the ingestion workflow
            logger.info(f"📄 Processing PDF: {file.filename}")
            result = compiled_graph.invoke(
                state,
                config={"configurable": {"thread_id": thread_id}}
            )

            # Calculate processing time for this file
            file_processing_time = (datetime.utcnow() - file_start_time).total_seconds()

            # Count chunks created
            chunks_created = len(result.get("chunks", []))

            results.append(PDFIngestResult(
                filename=file.filename,
                success=True,
                chunks_created=chunks_created,
                thread_id=thread_id,
                processing_time=file_processing_time
            ))
            processed_count += 1

            logger.info(f"✅ PDF processed successfully: {file.filename} ({chunks_created} chunks)")

        except Exception as e:
            file_processing_time = (datetime.utcnow() - file_start_time).total_seconds()
            error_msg = str(e)

            logger.error(f"❌ Failed to process PDF {file.filename}: {error_msg}")

            results.append(PDFIngestResult(
                filename=file.filename or "unknown",
                success=False,
                error=error_msg,
                processing_time=file_processing_time
            ))
            failed_count += 1

    # Calculate total execution time
    execution_time = (datetime.utcnow() - start_time).total_seconds()

    logger.info(f"🏁 Batch PDF ingestion completed: {processed_count} successful, {failed_count} failed in {execution_time:.2f}s")

    return MultiIngestResponse(
        success=processed_count > 0,
        total_files=len(files),
        processed_files=processed_count,
        failed_files=failed_count,
        results=results,
        execution_time=execution_time,
        timestamp=datetime.utcnow()
    )


@router.post("/feedback", response_model=FeedbackResponse)
async def submit_feedback(
    request: FeedbackRequest,
    graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
    http_request: Request = None
):

    start_time = datetime.utcnow()

    try:
        # Validate feedback type
        valid_feedback_types = ["approved", "rejected", "edited"]
        if request.feedback not in valid_feedback_types:
            raise HTTPException(
                status_code=400,
                detail=f"Invalid feedback type. Must be one of: {valid_feedback_types}"
            )

        # Validate edits are provided when feedback is "edited"
        if request.feedback == "edited" and not request.edits:
            raise HTTPException(
                status_code=400,
                detail="Edits must be provided when feedback type is 'edited'"
            )

        # Validate knowledge type if provided
        if request.knowledge_type:
            valid_knowledge_types = ["conversational", "reusable", "verified"]
            if request.knowledge_type not in valid_knowledge_types:
                raise HTTPException(
                    status_code=400,
                    detail=f"Invalid knowledge type. Must be one of: {valid_knowledge_types}"
                )

        logger.info(f"📝 Processing feedback for thread {request.thread_id}: {request.feedback}")

        # Get the compiled graph
        compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
        if compiled_graph is None:
            compiled_graph = graph_builder.build()
            http_request.app.state.compiled_graph = compiled_graph

        # Create a new state with the feedback
        feedback_state = KnowledgeState(
            query_type="query",  # We're processing feedback on a query
            human_feedback=request.feedback,
            edits=request.edits,
            metadata={
                "feedback_comment": request.comment,
                "feedback_timestamp": start_time.isoformat(),
                "feedback_source": "api"
            }
        )

        # Process feedback through the workflow using LangGraph checkpointing
        try:
            # Get the current state from LangGraph checkpointing
            current_state = compiled_graph.get_state({"configurable": {"thread_id": request.thread_id}})

            if not current_state or not current_state.values:
                raise HTTPException(
                    status_code=404,
                    detail=f"No conversation found for thread_id: {request.thread_id}"
                )

            # Create a new state with the existing conversation data plus feedback
            existing_state = current_state.values
            logger.info(f"🔍 Creating feedback state with knowledge_type: {request.knowledge_type}")
            feedback_state = KnowledgeState(
                query_type=existing_state.get("query_type", "query"),
                user_input=existing_state.get("user_input"),
                messages=existing_state.get("messages", []),
                generated_answer=existing_state.get("generated_answer"),
                retrieved_docs=existing_state.get("retrieved_docs"),
                retrieved_chunks=existing_state.get("retrieved_chunks"),
                metadata=existing_state.get("metadata", {}),
                human_feedback=request.feedback,
                edits=request.edits,
                knowledge_type=request.knowledge_type,
                status=existing_state.get("status"),
                logs=existing_state.get("logs", [])
            )
            logger.info(f"🔍 Feedback state created with knowledge_type: {feedback_state.knowledge_type}")

            # Add feedback metadata
            if request.comment:
                feedback_state.metadata = feedback_state.metadata or {}
                feedback_state.metadata["feedback_comment"] = request.comment
                feedback_state.metadata["feedback_timestamp"] = start_time.isoformat()
                feedback_state.metadata["feedback_source"] = "api"

            # If feedback is "edited", update the edited_answer with the edits
            if request.feedback == "edited" and request.edits:
                feedback_state.edited_answer = request.edits
                logger.info(f"🔍 Updated edited_answer with edits: {request.edits[:100]}...")

            # Resume the graph from the interrupt by invoking with the updated state
            logger.info(f"🔄 Resuming workflow after human feedback")
            result = compiled_graph.invoke(
                feedback_state,
                config={"configurable": {"thread_id": request.thread_id}}
            )

            # Determine action taken based on feedback and knowledge type
            # Derive action from result
            if result.get("human_feedback") == "approved":
                if result.get("knowledge_type") in ("reusable", "verified"):
                    action_taken = f"Answer approved and stored as {result.get('knowledge_type')} knowledge in vector database"
                else:
                    action_taken = "Answer approved and stored in conversation history"
            elif result.get("human_feedback") == "rejected":
                action_taken = "Answer rejected, not stored in knowledge base"
            elif result.get("human_feedback") == "edited":
                if result.get("knowledge_type") in ("reusable", "verified"):
                    action_taken = f"Answer edited and stored as {result.get('knowledge_type')} knowledge in vector database"
                else:
                    action_taken = "Answer edited and stored in conversation history"
            else:
                action_taken = "Feedback processed"

            execution_time = (datetime.utcnow() - start_time).total_seconds()

            logger.info(f"✅ Feedback processed successfully in {execution_time:.2f}s")

            return FeedbackResponse(
                success=True,
                thread_id=request.thread_id,
                feedback=result.get("human_feedback", request.feedback),
                action_taken=action_taken,
                timestamp=datetime.utcnow(),
                message="Feedback processed successfully"
            )

        except Exception as e:
            logger.error(f"❌ Error processing feedback: {e}")
            raise HTTPException(
                status_code=500,
                detail=f"Failed to process feedback: {str(e)}"
            )

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"❌ Unexpected error in feedback submission: {e}")
        raise HTTPException(
            status_code=500,
            detail=f"Internal server error: {str(e)}"
        )

@router.get("/feedback/{thread_id}", response_model=FeedbackStatusResponse)
async def get_feedback_status(
    thread_id: str,
    graph_builder: MasterGraphBuilder = Depends(get_graph_builder),
    http_request: Request = None
):

    try:
        logger.info(f"🔍 Checking feedback status for thread: {thread_id}")

        # Get the compiled graph
        compiled_graph = getattr(http_request.app.state, "compiled_graph", None)
        if compiled_graph is None:
            compiled_graph = graph_builder.build()
            http_request.app.state.compiled_graph = compiled_graph

        # Try to get the current state from LangGraph checkpointing
        try:
            # Get the current state from the compiled graph's checkpointing system
            current_state = compiled_graph.get_state({"configurable": {"thread_id": thread_id}})

            if not current_state or not current_state.values:
                raise HTTPException(
                    status_code=404,
                    detail=f"No conversation found for thread_id: {thread_id}"
                )

            # Extract the KnowledgeState from the checkpoint
            state_values = current_state.values

            # Check if there's a pending answer awaiting feedback
            has_pending_feedback = (
                state_values.get("generated_answer") is not None and 
                state_values.get("human_feedback") is None
            )

            # Get current answer if pending
            current_answer = state_values.get("generated_answer") if has_pending_feedback else None

            # Build feedback history from conversation metadata
            feedback_history = []
            metadata = state_values.get("metadata", {})
            if metadata and "feedback_history" in metadata:
                feedback_history = metadata["feedback_history"]

            # Add current feedback if it exists
            if state_values.get("human_feedback"):
                current_feedback = {
                    "feedback": state_values.get("human_feedback"),
                    "timestamp": datetime.utcnow().isoformat(),
                    "comment": metadata.get("feedback_comment") if metadata else None
                }
                feedback_history.append(current_feedback)

            # Determine current status
            if has_pending_feedback:
                status = "awaiting_feedback"
            elif state_values.get("human_feedback") == "approved":
                status = "approved"
            elif state_values.get("human_feedback") == "rejected":
                status = "rejected"
            elif state_values.get("human_feedback") == "edited":
                status = "edited"
            else:
                status = "unknown"

        except Exception as e:
            logger.warning(f"Could not retrieve state from LangGraph checkpointing: {e}")
            # Fallback: check if there's any conversation history in Redis
            conversation_history = conversation_memory.get_conversation_history(thread_id)
            if not conversation_history:
                raise HTTPException(
                    status_code=404,
                    detail=f"No conversation found for thread_id: {thread_id}"
                )

            # If we have conversation history but no checkpoint state, assume no pending feedback
            has_pending_feedback = False
            current_answer = None
            feedback_history = []
            status = "conversation_exists"

        logger.info(f"✅ Feedback status retrieved: {status}")

        return FeedbackStatusResponse(
            thread_id=thread_id,
            has_pending_feedback=has_pending_feedback,
            current_answer=current_answer,
            feedback_history=feedback_history,
            status=status
        )

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"❌ Error retrieving feedback status: {e}")
        raise HTTPException(
            status_code=500,
            detail=f"Failed to retrieve feedback status: {str(e)}"
        )
Enter fullscreen mode Exit fullscreen mode

The /ingest and /query endpoints execute document ingestion and query workflows, respectively. The /feedback endpoints (POST and GET by thread ID) handle human feedback and allow the system to resume after interruptions using that feedback. Moreover there are two ingestion endpoints — /ingest and /ingest-pdfs, to make bulk PDF ingestion straightforward.

Finally, each API endpoint will access the get_graph_builder() method for creating the graph object, which I have showed in the previous article:

def get_graph_builder(request: Request) -> MasterGraphBuilder:

    # Use app.state; lazily initialize if missing
    graph_builder = getattr(request.app.state, "graph_builder", None)
    compiled_graph = getattr(request.app.state, "compiled_graph", None)

    if graph_builder is None:
        # Extract configuration from environment settings
        api_key = settings.openai_api_key
        azure_endpoint = settings.azure_openai_endpoint_url

        # Validate required configuration
        if not api_key:
            raise HTTPException(
                status_code=500,
                detail="OpenAI API key not configured"
            )

        try:
            # =============================================================================
            # MODEL INITIALIZATION
            # =============================================================================

            # Import Azure OpenAI models for embeddings and language generation
            from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI

            # Initialize Azure embedding model for document vectorization
            embedding_model = AzureOpenAIEmbeddings(
                azure_deployment="text-embedding-3-small",  # Your embedding deployment name
                openai_api_version="2024-12-01-preview",    # Azure OpenAI API version
                azure_endpoint=azure_endpoint,              # Azure service endpoint
                openai_api_key=api_key                      # API key for authentication
            )

            # Initialize Azure language model for text generation and reasoning
            llm = AzureChatOpenAI(
                azure_deployment="gpt-4o",                  # Your LLM deployment name
                openai_api_version="2024-12-01-preview",    # Azure OpenAI API version
                azure_endpoint=azure_endpoint,              # Azure service endpoint
                openai_api_key=api_key,                     # API key for authentication
                temperature=0.1                             # Low temperature for consistent outputs
            )

            # =============================================================================
            # VECTOR STORE INITIALIZATION
            # =============================================================================

            # Initialize ChromaDB vector store for document storage and retrieval
            from langchain_chroma import Chroma
            vectorstore = Chroma(
                collection_name="smart_second_brain",        # Collection name for documents
                embedding_function=embedding_model,          # Function to create embeddings
                persist_directory="./chroma_db"              # Local storage directory
            )

            # =============================================================================
            # GRAPH BUILDER INITIALIZATION
            # =============================================================================

            # Create and configure the main workflow orchestrator
            graph_builder = MasterGraphBuilder(
                llm=llm,                                    # Language model for reasoning
                embedding_model=embedding_model,             # Embedding model for vectorization
                vectorstore=vectorstore,                    # Vector database for storage
                chromadb_dir="./chroma_db"                  # ChromaDB storage directory
            )

            # Compile the workflow for execution
            compiled_graph = graph_builder.build()
            request.app.state.graph_builder = graph_builder
            request.app.state.compiled_graph = compiled_graph
            logger.info("✅ Graph builder initialized with all models")

        except Exception as e:
            logger.error(f"❌ Failed to initialize graph builder: {e}")
            raise HTTPException(
                status_code=500,
                detail=f"Failed to initialize graph: {str(e)}"
            )

    return graph_builder
Enter fullscreen mode Exit fullscreen mode

The API now is ready, next is the UI.

Let's rock with Streamlit

I initially considered using Vue for the frontend, but as a backend developer, Vue or React felt a bit overwhelming. In the end, I still prefer Streamlit.

Since we now have a dedicated API, the Streamlit frontend will interact with the graph exclusively through that API.

I won’t dive into the Streamlit code here—feel free to check it out in the repository.

PDFs Ingestion Tab

Texts Ingestion Tab

Chat Program

As mentioned earlier, we support both batch PDF ingestion and generic text ingestion, each handled in its own tab.

Now let’s see a real chat in action with continuous knowledge updates.

As someone who grew up on a steady diet of Japanese animation, Gundam is one of my all-time favorites—so I’ll chat about it with my 2nd Brain!

And since there is no information provided about Gundam before in the vector DB, 2nd brain replies "I don't know". (Notice that a new conversation ID is created) I can provide further information about what Gundam is:

I copied the quick description about Gundam from web and confirm, 2nd brain will save it back to the vector DB. When I ask again the same question 2nd brain now can answer:

And if I start a new conversation with a new thread id, since the information has been stored, it can be answered right-away:

But in honest speaking, building a user-friendly chat UI which coherent and consistent flow of information, that facilitates continuous user-provided information update, is indeed a challenging task.

Ready to level up - what's our play?

The 2nd Brain is shaping up beautifully—so how do we make it even smarter and more powerful? Let’s sprinkle in a bit of automation to dial up the magic.

Stay tuned for the final chapter!

(The project source code will be provided when the entire series is concluded.)

Top comments (0)