DEV Community

Cover image for Build a AI Voice Agent Using RAG Pipeline and VideoSDK
Chaitrali Kakde
Chaitrali Kakde

Posted on

Build a AI Voice Agent Using RAG Pipeline and VideoSDK

Language models are powerful, but their responses are limited to the information within their context window. Once that limit is reached, they often start guessing. Retrieval-Augmented Generation (RAG) helps overcome this by allowing the model to fetch relevant information from an external knowledge base before generating a response.
In this post, we’ll build an example RAG-powered voice agent using VideoSDK, ChromaDB, and OpenAI. This demo shows how you can combine real-time audio input, intelligent data retrieval, and natural voice responses to create a more reliable and context-aware conversational agent.

RAG Architecture Explained

The architecture below shows how VideoSDK brings together real-time voice communication and Retrieval-Augmented Generation (RAG) to create a smarter, context-aware AI assistant.
Everything starts inside the VideoSDK Room, where the user speaks. The User Voice Input is captured and passed into the Voice Processing pipeline.

  • Speech to Text (STT) : The user’s audio is first converted into text using a speech recognition model like Deepgram.
  • Embedding Model : The transcribed text is transformed into a numerical vector representation (embedding).
  • Vector Database : These embeddings are used to search a database knowledge base for semantically relevant documents. This is where retrieval happens , the AI fetches real, factual context instead of guessing.
  • LLM (Large Language Model): The retrieved context is passed to the LLM, which generates a grounded, accurate response.
  • Text to Speech (TTS) : Finally, the generated text response is converted back into natural voice like ElevenLabs TTS, and streamed back to the user as the Agent Voice Output.Rag-Architecture

token

Prerequisites

Install dependencies

pip install "videosdk-agents[deepgram,openai,elevenlabs,silero,turn_detector]"
pip install chromadb openai numpy
Enter fullscreen mode Exit fullscreen mode

Set API Keys in .env

DEEPGRAM_API_KEY = "Your Deepgram API Key"
OPENAI_API_KEY = "Your OpenAI API Key"
ELEVENLABS_API_KEY = "Your ElevenLabs API Key"
VIDEOSDK_AUTH_TOKEN = "VideoSDK Auth token"
Enter fullscreen mode Exit fullscreen mode

API Keys - Get API keys Deepgram ↗OpenAI ↗ElevenLabs ↗ & VideoSDK Dashboard ↗ follow to guide to generate videosdk token

Implementation

Step 1: Custom Voice Agent with RAG

Create a main.py file and add a custom agent class that extends Agent and adds retrieval capabilities:

class VoiceAgent(Agent):
    def __init__(self):
        super().__init__(
            instructions="""You are a helpful voice assistant that answers questions
            based on provided context. Use the retrieved documents to ground your answers.
            If no relevant context is found, say so. Be concise and conversational."""
        )

        # Initialize OpenAI client for embeddings
        self.openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

        # Define your knowledge base
        self.documents = [
            "What is VideoSDK? VideoSDK is a comprehensive video calling and live streaming platform...",
            "How do I authenticate with VideoSDK? Use JWT tokens generated with your API key...",
            # Add more documents
        ]

        # Set up ChromaDB
        self.chroma_client = chromadb.Client()  # In-memory
        # For persistence: chromadb.PersistentClient(path="./chroma_db")
        self.collection = self.chroma_client.create_collection(
            name="videosdk_faq_collection"
        )

        # Generate embeddings and populate database
        self._initialize_knowledge_base()

    def _initialize_knowledge_base(self):
        """Generate embeddings and store documents."""
        embeddings = [self._get_embedding_sync(doc) for doc in self.documents]
        self.collection.add(
            documents=self.documents,
            embeddings=embeddings,
            ids=[f"doc_{i}" for i in range(len(self.documents))]
        )
Enter fullscreen mode Exit fullscreen mode

Step 2: Embedding Generation

Implement both synchronous (for initialization) and asynchronous (for runtime) embedding methods:

def _get_embedding_sync(self, text: str) -> list[float]:
        """Synchronous embedding for initialization."""
        import openai
        client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        response = client.embeddings.create(
            input=text,
            model="text-embedding-ada-002"
        )
        return response.data[0].embedding

    async def get_embedding(self, text: str) -> list[float]:
        """Async embedding for runtime queries."""
        response = await self.openai_client.embeddings.create(
            input=text,
            model="text-embedding-ada-002"
        )
        return response.data[0].embedding
Enter fullscreen mode Exit fullscreen mode

Step 3: Retrieval Method

Add semantic search capability:

async def retrieve(self, query: str, k: int = 2) -> list[str]:
        """Retrieve top-k most relevant documents from vector database."""
        # Generate query embedding
        query_embedding = await self.get_embedding(query)

        # Query ChromaDB
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=k
        )

        # Return matching documents
        return results["documents"][0] if results["documents"] else []
Enter fullscreen mode Exit fullscreen mode

Step 4: Agent Lifecycle Hooks

Define agent behavior on entry and exit:

async def on_enter(self) -> None:
        """Called when agent session starts."""
        await self.session.say("Hello! I'm your VideoSDK assistant. How can I help you today?")

    async def on_exit(self) -> None:
        """Called when agent session ends."""
        await self.session.say("Thank you for using VideoSDK. Goodbye!")
Enter fullscreen mode Exit fullscreen mode

Step 5: Custom Conversation Flow

Override the conversation flow to inject retrieved context:

class RAGConversationFlow(ConversationFlow):
    async def run(self, transcript: str) -> AsyncIterator[str]:
        """
        Process user input with RAG pipeline.
        Args:
            transcript: User's speech transcribed to text
        Yields:
            Generated response chunks
        """
        # Step 1: Retrieve relevant documents
        context_docs = await self.agent.retrieve(transcript)

        # Step 2: Format context
        if context_docs:
            context_str = "\n\n".join([f"Document {i+1}: {doc}"
                                      for i, doc in enumerate(context_docs)])
        else:
            context_str = "No relevant context found."

        # Step 3: Inject context into conversation
        self.agent.chat_context.add_message(
            role="system",
            content=f"Retrieved Context:\n{context_str}\n\nUse this context to answer the user's question."
        )

        # Step 4: Generate response with LLM
        async for response_chunk in self.process_with_llm():
            yield response_chunk
Enter fullscreen mode Exit fullscreen mode

Step 6: Session and Pipeline Setup

Configure the agent session and start the job:

async def entrypoint(ctx: JobContext):
    agent = VoiceAgent()

    conversation_flow = RAGConversationFlow(
        agent=agent,
    )

    session = AgentSession(
        agent=agent,
        pipeline=CascadingPipeline(
            stt=DeepgramSTT(),
            llm=OpenAILLM(),
            tts=ElevenLabsTTS(),
            vad=SileroVAD(),
            turn_detector=TurnDetector()
        ),
        conversation_flow=conversation_flow,
    )

    # Register cleanup
    ctx.add_shutdown_callback(lambda: session.close())

    # Start agent
    try:
        await ctx.connect()
        print("Waiting for participant...")
        await ctx.room.wait_for_participant()
        print("Participant joined - starting session")
        await session.start()
        await asyncio.Event().wait()
    except KeyboardInterrupt:
        print("\nShutting down gracefully...")
    finally:
        await session.close()
        await ctx.shutdown()

def make_context() -> JobContext:
    room_options = RoomOptions(name="RAG Voice Assistant", playground=True)
    return JobContext(room_options=room_options)

if __name__ == "__main__":
    job = WorkerJob(entrypoint=entrypoint, jobctx=make_context)
    job.start()
Enter fullscreen mode Exit fullscreen mode

Step 7: Run the Python Script

python main.py
Enter fullscreen mode Exit fullscreen mode

You can also use console for running the script

python main.py console
Enter fullscreen mode Exit fullscreen mode

Now that the full RAG pipeline is in place, the agent can seamlessly handle every stage from capturing voice input to fetching relevant context and generating fact-based spoken responses. It’s a fully functional, end-to-end intelligent voice system powered by VideoSDK.

Best Practices

  • Document Quality: Use clear, well-structured documents with specific information
  • Chunk Size: Keep chunks between 300-800 words for optimal retrieval
  • Retrieval Count: Start with k=2-3, adjust based on response quality and latency
  • Context Window: Ensure retrieved context fits within LLM token limits
  • Persistent Storage: Use PersistentClient in production to save embeddings
  • Error Handling: Always handle retrieval failures gracefully
  • Testing: Test with diverse queries to ensure good coverage

Resources and Next Steps

👉 Share your thoughts, roadblocks, or success stories in the comments or join our Discord community ↗. We’re excited to learn from your journey and help you build even better AI-powered communication tools!

Top comments (0)