As the adoption of Generative AI (GenAI) surges across industries, organizations are increasingly leveraging Retrieval-Augmented Generation (RAG) techniques to bolster their AI models with real-time, context-rich data. Managing the complex flow of information in such applications poses significant challenges, particularly when dealing with continuously generated data at scale. KubeMQ, a robust message broker, emerges as a solution to streamline the routing of multiple RAG processes, ensuring efficient data handling in GenAI applications.
To further enhance the efficiency and scalability of RAG workflows, integrating a high-performance database like FalkorDB is essential. FalkorDB provides a reliable and scalable storage solution for the dynamic knowledge bases that RAG systems depend on, ensuring rapid data retrieval and seamless integration with messaging systems like KubeMQ.
Understanding RAG in GenAI Workflows
RAG is a paradigm that enhances generative AI models by integrating a retrieval mechanism, allowing models to access external knowledge bases during inference. This approach significantly improves the accuracy, relevance, and timeliness of generated responses by grounding them in the most recent and pertinent information available.
In typical GenAI workflows employing RAG, the process involves multiple steps:
Query processing: Interpreting the user's input to understand intent and context
Retrieval: Fetching relevant documents or data from a dynamic knowledge base, such as FalkorDB, which ensures quick and efficient access to the most recent and pertinent information.
Generation: Producing a response using both the input and the retrieved data
Response delivery: Providing the final, enriched output back to the user
Scaling these steps, especially in environments where data is continuously generated and updated, necessitates an efficient and reliable mechanism for data flow between the various components of the RAG pipeline.
The Critical Role of KubeMQ in RAG Processing
Handling Continuous Data Streams at Scale
In scenarios such as IoT networks, social media platforms, or real-time analytics systems, new data is incessantly produced, and AI models must adapt swiftly to incorporate this information. Traditional request-response architectures can become bottlenecks under high-throughput conditions, leading to latency issues and degraded performance.
KubeMQ manages high-throughput messaging scenarios by providing a scalable and robust infrastructure for efficient data routing between services. By integrating KubeMQ into the RAG pipeline, each new data point is published to a message queue or stream, ensuring that retrieval components have immediate access to the latest information without overwhelming the system. This real-time data handling capability is crucial for maintaining the relevance and accuracy of GenAI outputs.
Serving as the Optimal Router
KubeMQ offers a variety of messaging patterns — including queues, streams, publish-subscribe (pub/sub), and Remote Procedure Calls (RPC) — making it a versatile and powerful router within a RAG pipeline. Its low latency and high-performance characteristics ensure prompt message delivery, which is essential for real-time GenAI applications where delays can significantly impact user experience and system efficacy.
Moreover, KubeMQ's ability to handle complex routing logic allows for sophisticated data distribution strategies. This ensures that different components of the AI system receive precisely the data they need, when they need it, without unnecessary duplication or delays.
Integrating FalkorDB for Enhanced Data Management
While KubeMQ efficiently routes messages between services, FalkorDB complements this by providing a scalable and high-performance graph database solution for storing and retrieving the vast amounts of data required by RAG processes. This integration ensures that as new data flows through KubeMQ, it is seamlessly stored in FalkorDB, making it readily available for retrieval operations without introducing latency or bottlenecks.
Enhancing Scalability and Reliability
As GenAI applications grow in both user base and data volume, scalability becomes a paramount concern. KubeMQ is scalable, supporting horizontal scaling to accommodate increased load seamlessly. It ensures that as the number of RAG processes increases or as data generation accelerates, the messaging infrastructure remains robust and responsive.
Additionally, KubeMQ provides message persistence and fault tolerance. In the event of system failures or network disruptions, KubeMQ ensures that messages are not lost and that the system can recover gracefully. This reliability is critical in maintaining the integrity of AI applications that users depend on for timely and accurate information.
Eliminating the Need for Dedicated Routing Services
Implementing custom routing services for data handling in RAG pipelines can be resource-intensive and complex. It often requires significant development effort to build, maintain, and scale these services, diverting focus from core AI application development.
By adopting KubeMQ, organizations eliminate the need to create bespoke routing solutions. KubeMQ provides out-of-the-box functionality that addresses the routing needs of RAG processes, including complex routing patterns, message filtering, and priority handling. This not only reduces development and maintenance overhead but also accelerates time-to-market for GenAI solutions.
Unified Access via REST and SDK
KubeMQ offers multiple interfaces for interacting with its message broker capabilities:
REST API: Enables language-agnostic integration, allowing services written in any programming language to send and receive messages over HTTP
SDKs: Provides client libraries for various programming languages (such as Python, Java, Go, and .NET), facilitating more efficient communication patterns and better performance through native integrations
This flexibility allows developers to choose the most appropriate method for their specific use case, simplifying the architecture and accelerating development cycles. A single touchpoint for data routing streamlines communication between different components of the RAG pipeline, enhancing overall system coherence.
Implementing KubeMQ in a RAG Pipeline: A Detailed Example
The code example showcases how to build a movie information retrieval system by integrating KubeMQ into a RAG pipeline. It sets up a server that ingests movie URLs from Rotten Tomatoes to build a knowledge graph using GPT-4. Users can interact with this system through a chat client, sending movie-related queries and receiving AI-generated responses. This use case demonstrates how to handle continuous data ingestion and real-time query processing in a practical application, utilizing KubeMQ for efficient message handling and inter-service communication within the context of movies.
Architecture Overview
Data ingestion service: Captures and publishes new data to KubeMQ streams as it becomes available
Retrieval service: Subscribe to the KubeMQ stream to receive updates and refresh the knowledge base
Generation service: Listens for query requests, interacts with the AI model, and generates responses
Response service: Sends the generated responses back to users through appropriate channels
Setting Up KubeMQ
Ensure that KubeMQ is operational, which can be achieved by deploying it using Docker:
docker run -d --rm \
-p 8080:8080 \
-p 50000:50000 \
-p 9090:9090 \
-e KUBEMQ_TOKEN="your token"
This command starts KubeMQ with the necessary ports exposed for REST and gRPC communications.
RAG Server Side
This code (GitHub repo) implements a RAG server that processes chat queries and manages knowledge sources using KubeMQ for message handling.
# server.py
import json
import threading
from typing import List
from dotenv import load_dotenv
load_dotenv()
import time
from kubemq.common import CancellationToken
from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription
from kubemq.queues import Client as QueuesClient
from graphrag_sdk.models.openai import OpenAiGenerativeModel
from graphrag_sdk.model_config import KnowledgeGraphModelConfig
from graphrag_sdk import KnowledgeGraph, Ontology
from graphrag_sdk.source import URL
class RAGServer:
def __init__(self):
self.cq_client = CQClient(address="localhost:50000")
self.queues_client = QueuesClient(address="localhost:50000")
model = OpenAiGenerativeModel(model_name="gpt-4o")
with open("ontology.json", "r") as f:
ontology = json.load(f)
ontology = Ontology.from_json(ontology)
self.kg = KnowledgeGraph(
name="movies",
model_config=KnowledgeGraphModelConfig.with_model(model),
ontology=ontology)
self.chat = self.kg.chat_session()
self.shutdown_event = threading.Event()
self.threads: List[threading.Thread] = []
def handle_chat(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
print(f"Received chat message: {message}")
result= self.chat.send_message(message)
answer = result.get("response","No answer")
print(f"Chat response: {answer}")
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=answer.encode('utf-8')
)
self.cq_client.send_response_message(response)
except Exception as e:
print(f"Error processing chat message: {str(e)}")
self.cq_client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def pull_from_queue(self):
while not self.shutdown_event.is_set():
try:
result = self.queues_client.pull("rag-sources-queue", 10, 1)
if result.is_error:
print(f"Error pulling message from queue: {result.error}")
continue
sources = []
for message in result.messages:
source = message.body.decode('utf-8')
print(f"Received source: {source}, adding to knowledge graph")
sources.append(URL(message.body.decode('utf-8')))
if sources:
self.kg.process_sources(sources)
except Exception as e:
if not self.shutdown_event.is_set(): # Only log if not shutting down
print(f"Error processing sources: {str(e)}")
def subscribe_to_chat_queries(self):
def on_error(err: str):
if not self.shutdown_event.is_set(): # Only log if not shutting down
print(f"Error: {err}")
cancellation_token = CancellationToken()
try:
self.cq_client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="rag-chat-query",
on_receive_query_callback=self.handle_chat,
on_error_callback=on_error,
),
cancel=cancellation_token
)
# Wait for shutdown signal
while not self.shutdown_event.is_set():
time.sleep(0.1)
# Cancel subscription when shutdown is requested
cancellation_token.cancel()
except Exception as e:
if not self.shutdown_event.is_set():
print(f"Error in subscription thread: {str(e)}")
def run(self):
chat_thread = threading.Thread(target=self.subscribe_to_chat_queries)
queue_thread = threading.Thread(target=self.pull_from_queue)
self.threads.extend([chat_thread, queue_thread])
for thread in self.threads:
thread.daemon = True # Make threads daemon so they exit when main thread exits
thread.start()
print("RAG server started")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down gracefully...")
self.shutdown()
self.cq_client.close()
self.queues_client.close()
def shutdown(self):
print("Initiating shutdown sequence...")
self.shutdown_event.set() # Signal all threads to stop
for thread in self.threads:
thread.join(timeout=5.0) # Wait up to 5 seconds for each thread
if thread.is_alive():
print(f"Warning: Thread {thread.name} did not shutdown cleanly")
print("Shutdown complete")
if __name__ == "__main__":
rag_server = RAGServer()
rag_server.run()
The server runs two main threads: one that subscribes to chat queries through a channel called "rag-chat-query" and processes them using a knowledge graph with GPT-4, and another that continuously pulls from a queue called "rag-sources-queue" to add new sources to the knowledge graph. The knowledge graph is initialized with a custom ontology loaded from a JSON file and uses OpenAI's GPT-4 model for processing. The server implements graceful shutdown handling and error management, ensuring that all threads are properly terminated when the server is stopped.
Sending Source Data to Ingest Into RAG Knowledge Graph
# sources_client.py
from kubemq.queues import *
class SourceClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_source(self, message: str) :
send_result = self.client.send_queues_message(
QueueMessage(
channel="rag-sources-queue",
body=message.encode("utf-8"),
)
)
if send_result.is_error:
print(f"message send error, error:{send_result.error}")
if __name__ == "__main__":
client = SourceClient()
urls = ["https://www.rottentomatoes.com/m/side_by_side_2012",
"https://www.rottentomatoes.com/m/matrix",
"https://www.rottentomatoes.com/m/matrix_revolutions",
"https://www.rottentomatoes.com/m/matrix_reloaded",
"https://www.rottentomatoes.com/m/speed_1994",
"https://www.rottentomatoes.com/m/john_wick_chapter_4"]
for url in urls:
client.send_source(url)
print("done")
This code implements a simple client that sends movie URLs to the RAG server through KubeMQ's queue system. Specifically, it creates a SourceClient
class that connects to KubeMQ and sends messages to the "rag-sources-queue" channel, which is the same queue that the RAG server monitors. When run as a main program, it sends a list of Rotten Tomatoes movie URLs (including Matrix movies, John Wick, and Speed) to be processed and added to the knowledge graph by the RAG server.
Send and Receive Questions and Answers
#chat_client.py
from kubemq.cq import *
class ChatClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_message(self, message: str) -> str:
response = self.client.send_query_request(QueryMessage(
channel="rag-chat-query",
body=message.encode('utf-8'),
timeout_in_seconds=30
))
return response.body.decode('utf-8')
if __name__ == "__main__":
client = ChatClient()
print("Sending first question: Who is the director of the movie The Matrix?")
response = client.send_message("Who is the director of the movie The Matrix?")
print(f"Response: {response}")
print("Sending second question: How this director connected to Keanu Reeves?")
response = client.send_message("How this director connected to Keanu Reeves?")
print(f"Response: {response}")
This code implements a chat client that communicates with the RAG server through KubeMQ's query system. The ChatClient
class sends messages to the "rag-chat-query" channel and waits for responses, with a 30-second timeout for each query. When run as a main program, it demonstrates the client's functionality by sending two related questions about The Matrix's director and their connection to Keanu Reeves, printing each response as it receives them.
Code Repository
All code examples can be found in my fork of the original GitHub repository.
Conclusion
Integrating KubeMQ into RAG pipelines for GenAI applications provides a scalable, reliable, and efficient mechanism for handling continuous data streams and complex inter-process communications. By serving as a unified router with versatile messaging patterns, KubeMQ simplifies the overall architecture, reduces the need for custom routing solutions, and accelerates development cycles.
Furthermore, incorporating FalkorDB enhances data management by offering a high-performance knowledge base that seamlessly integrates with KubeMQ. This combination ensures optimized data retrieval and storage, supporting the dynamic requirements of RAG processes.
The ability to handle high-throughput scenarios, combined with features like persistence and fault tolerance, ensures that GenAI applications remain responsive and reliable, even under heavy loads or in the face of system disruptions.
By leveraging KubeMQ and FalkorDB, organizations can focus on enhancing their AI models and delivering valuable insights and services, confident that their data routing infrastructure is robust and capable of meeting the demands of modern AI workflows.
Top comments (0)