Table of Contents:
- Getting Started
- Data Ingestion
- Using gzipped archives
- Storing Embeddings with Qdrant
- Searching our Files
- Prompting and Embedding Models
- Creating an update queue
- Putting it all together
- Deploying
- Finishing up
Hey there! In today's article, we're gonna talk about how you can add superpowers to your Obsidian.md vaults by using Qdrant for RAG retrieval and Rust. By the end of this tutorial, you'll have a Rust application that can:
- Spin up a web server with a frontend at the home route
- Use an internal queue for self-updating of files on GitHub commits (ie when you update your Obsidian vault)
- Embed your Obsidian repo into Qdrant using OpenAI
- Use OpenAI for embedding and prompting
Note that this article is extremely long. We'll dive into many concepts that should essentially cover everything you need to make your own RAG application and more, but don't feel pressured to do it all at once! You can find the repo here if you need code guidance.
Getting Started
Pre-requisites
Before we get started, you'll need the following:
- An Obsidian.md vault stored on GitHub that you have access to
- An OpenAI key (or an appropriate HuggingFace language model)
- The Rust programming language
- Docker for spinning up a local Qdrant instance for testing. Alternatively, install Qdrant.
Setup
Before we get started, you'll need to initialise your project. The project repo linked above uses Shuttle for deployment. If you'd like to do it 'as intended' then you may find it easier to install cargo-shuttle
(via cargo install cargo-shuttle
) and use cargo shuttle init
to create your project.
If you would like to deploy elsewhere (or don't need to use Shuttle), you can use cargo init
. Bear in mind though that you'll need to create your own Qdrant instance yourself, as well as either using environment variables or the dotenvy
file for secrets.
For secrets, you will need the following in a .env
file if using dotenvy
:
OPENAI_KEY=
GITHUB_PERSONAL_ACCESS_TOKEN=
GITHUB_USERNAME=
GITHUB_REPO=
QDRANT_URL=
QDRANT_API_KEY=
You can also use the format below if using Shuttle. You'll need to put these in a file called Secrets.toml
in your project root.
OPENAI_KEY = ""
GITHUB_PERSONAL_ACCESS_TOKEN = ""
GITHUB_USERNAME = ""
GITHUB_REPO = ""
QDRANT_URL = ""
QDRANT_API_KEY = ""
Data Ingestion
Before we can start work on our main program, we'll look at data ingestion.
Introduction
Data ingestion is an extremely crucial part of RAG: the better your data is (ie the better it's formatted), the higher quality your LLM responses will tend to be. The general process for this depends on what kind of structure you're using for your files. A vault that has primarily bullet journaling diary entries, for example, will be much different from a long-form notes system. In this regard, this article is relatively unopinionated and will make a best effort to create a good general approach.
While you don't necessarily have to re-write your entire vault for good results, there are a couple pointers that may help:
- Keeping each "idea" or point/concept that you want to make to one paragraph. To illustrate this point further: in programming, it's said often that "one function should do one thing". It's the same idea here - with the intention of not accidentally leaking semantic information into a passage that is mostly about a different thing than the semantic information.
- If you want longer answers from your model, you should use longer passages of text in your documents.
My personal Obsidian.md vault combines the Zettelkasten system and the idea of a "Second Brain". The Zettelkasten system is based around creating processing information and creating a short list of your "takeaways" from it as a summarized bit. The "Second Brain" part is primarily based around information management. While this does lead to relatively shorter answers because of how short the data points are, if I wanted to I could simply fetch several similar results from Qdrant later on to be able to create larger responses if required.
This is what one of my average documents looks like which will be ingested into Qdrant.
Coding
Before we do anything, let's define a File
struct to hold the contents of our file (as well as chunked sentences) and a FileState
enum that we will use for a state-machine-like pattern when parsing our files:
#[derive(Clone)]
pub struct File {
pub path: String,
pub contents: String,
pub sentences: Vec<String>,
}
enum FileState {
None,
CodeBlock,
Sentence,
Comments,
}
Next, we'll add a method for parsing the file as well as a helper method for creating the File
struct itself:
impl File {
fn new(path: String, contents: String) -> Self {
Self {
path,
contents,
sentences: Vec::new(),
}
}
pub fn parse(&mut self) {
let mut contents = Vec::new();
let mut state = FileState::None;
let mut sentence = String::new();
for line in self.contents.lines() {
match state {
// empty file state
FileState::None => {
// here, we encounter the start of a codeblock
if line.starts_with("```
") {
state = FileState::CodeBlock;
sentence = String::new();
sentence.push_str(line);
sentence.push('\n');
// here, we encounter the start of a comment block
} else if line.starts_with("---") {
state = FileState::Comments;
// if there's no hash and content exists, it's a sentence as long as the filestate is None
} else if !line.starts_with('#') && !line.is_empty() {
state = FileState::Sentence;
sentence = String::new();
sentence.push_str(line);
sentence.push('\n');
}
}
FileState::CodeBlock => {
sentence.push_str(line);
// a second triple backtick signals the end of a codeblock
if line.starts_with("
```") {
contents.push(format!("Code block: {sentence}"));
sentence = String::new();
state = FileState::None;
}
}
FileState::Comments => {
// comments are irrelevant for the knowledge base
// we can ignore them here
if line.starts_with("---") {
state = FileState::None;
}
}
FileState::Sentence => {
// if end of passage reached, push it to sentences
if line.is_empty() {
state = FileState::None;
contents.push(format!("Passage: {sentence}"));
sentence = String::new();
// else add it to the current passage string
} else {
sentence.push_str(line);
sentence.push('\n');
}
}
}
}
self.sentences = contents;
}
}
Finally, to complete this part we'll add a function for fetching all of our files from any given Obsidian vault. To start with, we'll start with a function that takes a given directory, a file ending (as the string-slice type), and a prefix (as a PathBuf):
pub fn load_files_from_dir(dir: PathBuf, ending: &str, prefix: &PathBuf) -> Result<Vec<File>> {
let mut files = Vec::new();
for entry in fs::read_dir(dir)? {
let path = entry?.path();
println!("{}", path.display());
// rest of the parsing loop goes here
}
Ok(files)
}
Next, we'll add the actual parsing logic to check the following:
- If each file contains the correct file extension for files we want to parse (markdown, or
.md
) - File must not be in the
templates
directory - otherwise, skip it as template files typically have no relevant information (thetemplate
directory is a well-known convention in Obsidian)
If the above two requirements are satisfied, we can then get the file contents and parse it!
if path.is_file() {
if let Some(ext) = path.extension() {
if ext.to_str().unwrap() == ending {
let contents = fs::read_to_string(&path)?;
let path = Path::new(&path).strip_prefix(prefix)?.to_owned();
let path_as_str = format!("{}", path.display());
// if the path is in the "templates" directory, skip it - template files typically have no relevant information
if path_as_str.to_lowercase().starts_with("templates") {
println!(
"File was skipped because it's in the Templates directory: {}",
path.display()
);
continue;
}
// attempt to turn the path to a string slice
let key = path
.to_str()
.ok_or(anyhow!("Could not get string slice from path"))?;
// create a File struct
let mut file = File::new(key.to_string(), contents);
// parse the file contents and push the File to the array
file.parse();
files.push(file);
}
}
Using zipped archives
Of course, we don't want to just use files. We want to use directories that we've downloaded! We can download our repo from GitHub using the octocrab
library as a .tar.gz
file then get the directory path and do some processing work on it.
Before we define our methods, we'll define our struct:
use octocrab::{Octocrab, OctocrabBuilder};
#[derive(Clone)]
pub struct Octo {
crab: Octocrab,
user: String,
repo: String,
}
impl Octo {
pub fn new() -> Result<Self> {
let pat = env::var("GITHUB_PERSONAL_ACCESS_TOKEN")?;
let crab = OctocrabBuilder::new().personal_token(pat).build()?;
let user = env::var("GITHUB_USERNAME")?;
let repo = env::var("GITHUB_REPO")?;
Ok(Self { crab, user, repo })
}
Next, we'll want to define a few more methods to do the following:
- Getting the repo (according to the user/repo)
- Getting the latest commit from the repo
- Downloading the archive from GitHub and unpacking it
use anyhow::Result;
use flate2::read::GzDecoder;
use http_body_util::BodyExt;
use octocrab::{models::repos::RepoCommit, repos::RepoHandler};
use std::env;
use std::io::{Cursor, Read};
use std::path::PathBuf;
use tempfile::TempDir;
use tokio_tar::Archive;
impl Octo {
pub fn get_repo(&self) -> Result<RepoHandler<'_>> {
Ok(self.crab.repos(&self.user, &self.repo))
}
pub async fn get_latest_commit_from_repo(&self) -> Result<Option<RepoCommit>> {
let repo = self.get_repo()?;
let res = repo.list_commits().send().await?;
Ok(res.items.into_iter().next())
}
pub async fn download_repo(&self, dir: &TempDir) -> Result<PathBuf> {
let repo = self.get_repo()?;
let Some(commit) = self.get_latest_commit_from_repo().await? else {
return Err(anyhow::anyhow!("Could not find a commit from the repo :("));
};
let commit_sha = commit.sha;
// the folder name format typically follows user-repo-commit
let folder_name = format!("{}-{}-{}", self.user, self.repo, commit_sha);
let path = format!("{}/{}", dir.path().display(), folder_name);
let tarball = repo.download_tarball(commit_sha).await?;
// here we essentially wait for the download to finish and collect the bytes into an array
let tarball_bytes = tarball.into_body().collect().await?.to_bytes();
// here we use a gzip decoder to decode the gzip
// then use the tokio-tar library to read and unpack it
let mut gzip = GzDecoder::new(Cursor::new(tarball_bytes));
let mut decompressed_bytes = Vec::new();
gzip.read_to_end(&mut decompressed_bytes)?;
let mut ar = Archive::new(Cursor::new(decompressed_bytes));
ar.unpack(dir.path()).await?;
println!("{:?}", dir.path());
Ok(PathBuf::from(&path))
}
}
Storing Embeddings with Qdrant
The next important piece of the puzzle is embedding and Qdrant. Embedding allows us to turn a piece of information into its binary format using any number of LLMs (Large Language Models) by encoding the text. By turning words into numbers, we can compare semantic meaning of sentences or passages, as well as checking if a sentence is semantically the same as another sentence (for example, "What's the capital of England?" vs "capital of england").
Embeddings are typically used in semantic search - ie, the searching of semantically similar documents or materials to the query. In comparison to full-text search, embeddings are better for looking at semantic relationships between documents rather than checking whether a document contains a given phrase or word(s).
To start with, we'll implement a struct that holds the Qdrant client as well a u64
counter for when we upsert embeddings to Qdrant. We'll also use a couple of methods that allow instantiation from environment variables, as well as a pre-existing Qdrant client:
#[derive(Clone)]
pub struct VectorDB {
client: Arc<QdrantClient>,
id: u64,
}
impl VectorDB {
pub fn new() -> Result<Self> {
let qdrant_url = env::var("QDRANT_URL").unwrap_or_else(|_| {
println!("No QDRANT_URL env var found! Defaulting to localhost:6334...");
"http://localhost:6334".to_string()
});
let qdrant_api_key = env::var("QDRANT_API_KEY");
let cfg = QdrantClientConfig::from_url(&qdrant_url).with_api_key(qdrant_api_key);
let client = QdrantClient::new(Some(cfg))?;
Ok(Self {
client: Arc::new(client),
id: 0,
})
}
pub fn from_qdrant_client(client: QdrantClient) -> Self {
Self {
client: Arc::new(client),
id: 0,
}
}
}
Next, we'll create a function for initialising our collection. The most important details here are the size
and distance
if you don't need to scale and just want to get started. Note here that because we're using OpenAI as the embedding and prompting model, the size is strictly set as 1536.
We'll also include a function for resetting our collection, in case we need to nuke it or want to start over.
static COLLECTION: &str = "ballista";
impl VectorDB {
pub async fn create_collection(&self) -> Result<()> {
self.client
.create_collection(&CreateCollection {
collection_name: COLLECTION.to_string(),
vectors_config: Some(VectorsConfig {
config: Some(Config::Params(VectorParams {
size: 1536,
distance: Distance::Cosine.into(),
..Default::default()
})),
}),
..Default::default()
})
.await?;
Ok(())
}
pub async fn reset_collection(&self) -> Result<()> {
self.client.delete_collection(COLLECTION).await?;
self.create_collection().await?;
Ok(())
}
}
Of course, we'll also want to talk about adding and searching for embeddings within our collection! To do so is fairly simple, although it should be noted that after we upsert each embedding we make sure to increment the internal counter by 1 to avoid conflicts. If you try to insert an embedding into Qdrant with the same ID, it'll overwrite whatever was there previously.
We'll create our upsert_embedding
function first:
impl VectorDB {
pub async fn upsert_embedding(
&mut self,
embedding: Vec<f32>,
file: &File,
) -> Result<()> {
let payload: Payload = json!({
"id": file.path.clone(),
})
.try_into()
.unwrap();
println!("Embedded: {}", file.path);
let points = vec![PointStruct::new(self.id, vec, payload)];
self.client
.upsert_points(COLLECTION, None, points, None)
.await?;
self.id += 1;
Ok(())
}
}
Next we'll implement our similarity search function:
impl VectorDB {
pub async fn search(&self, embedding: Vec<f32>) -> Result<ScoredPoint> {
let payload_selector = WithPayloadSelector {
selector_options: Some(SelectorOptions::Enable(true)),
};
let search_points = SearchPoints {
collection_name: COLLECTION.to_string(),
vector: embedding,
limit: 1,
with_payload: Some(payload_selector),
..Default::default()
};
// conduct similarity search according to the request
// here we will only look for 1 resul
let search_result = self.client.search_points(&search_points).await?;
let result = search_result.result.into_iter().next();
// if there's a result, return it - otherwise return an error
match result {
Some(res) => Ok(res),
None => Err(anyhow::anyhow!("There were no results that matched :(")),
}
}
}
Searching our files
Once we've actually found our embeddings, we should search through our array of stored File
s to find the context we should insert into our prompt. To do this, we will define a trait called Finder
that has two methods - one for finding a file and one for finding the contents of a file. This will serve two purposes:
- The
find
function will get a file by key if it exists - The
get_contents
function will use aScoredPoint
(from Qdrant) to get the contents of a file.
pub trait Finder {
fn find(&self, key: &str) -> Option<String>;
fn get_contents(&self, result: &ScoredPoint) -> Option<String>;
}
Once we've declared our trait, we can then implement it for Vec<File>
by providing methods for the trait.
impl Finder for Vec<File> {
fn find(&self, key: &str) -> Option<String> {
for file in self {
if file.path == key {
return Some(file.contents.clone());
}
}
None
}
fn get_contents(&self, result: &ScoredPoint) -> Option<String> {
let text = result.payload.get("id")?;
let kind = text.kind.to_owned()?;
if let Kind::StringValue(value) = kind {
self.find(&value)
} else {
None
}
}
}
Prompting and Embedding Models
While there's a lot of different ways you can cut prompting and embedding, here we'll talk about using OpenAI briefly as it's the easiest and most convenient way to do prompting and embedding. To get started we'll define a unit struct:
#[derive(Clone)]
pub struct OpenAIBackend;
impl OpenAIBackend {
fn new() -> Result<Self> {
let openai_key = env::var("OPENAI_API_KEY").unwrap();
// this sets it globally so we can use it as required
openai::set_key(openai_key);
Ok(Self)
}
}
Next, we need to add a function for prompting an OpenAI model (that returns a chat stream), as well as one for creating embeddings. The reason why we return a chat stream is two fold: we want the user to not wait as long before seeing any result on the screen, and it also uses less memory on the server.
impl OpenAIBackend {
pub async fn chat_stream(&self, prompt: &str, contents: &str) -> Result<Conversation> {
let content = format!("{}\n Context: {}\n Be concise", prompt, contents);
let stream = ChatCompletionBuilder::default()
// although we use gpt-3.5-turbo here
// feel free to use gpt-4o!
.model("gpt-3.5-turbo")
.temperature(0.0)
.user("josh")
.messages(vec![ChatCompletionMessage {
role: openai::chat::ChatCompletionMessageRole::User,
content: Some(content),
name: Some("josh".to_string()),
function_call: None,
}])
.create_stream()
.await?;
Ok(stream)
}
// this gets sent to Qdrant later
pub async fn embed_file(&self, file: &File) -> Result<EmbeddingsResult> {
// gather sentences into references
let sentence_as_str: Vec<&str> = file.sentences.iter().map(|s| s.as_str()).collect();
println!("Embedding: {:?}", file.path);
let embeddings = Embeddings::create("text-embedding-ada-002", sentence_as_str, "josh")
.await
.inspect_err(|x| println!("Failed to embed: {x:?}"))?;
Ok(EmbeddingsResult::OpenAIEmbeddings(embeddings))
}
// the resulting embedding from this gets sent to Qdrant later
pub async fn embed_sentence(&self, prompt: &str) -> Result<Embedding> {
let embedding = Embedding::create("text-embedding-ada-002", prompt, "josh").await?;
Ok(embedding)
}
}
Application State
In order to store anything on the web server, we need to use shared mutable state - which is passed around in between handlers. Typically, application state either needs to be stored in an Arc
or must implement the Clone
trait so that it can be cloned.
To start with, we'll start with a struct that looks like this:
pub struct AppState {
pub files: Arc<RwLock<Vec<File>>>,
pub notify: Arc<Notify>,
pub db: VectorDB,
pub octo: Octo,
pub llm: OpenAIBackend,
}
impl AppState {
pub fn new(db: VectorDB, llm: T) -> Result<Self> {
Ok(Self {
files: Arc::new(RwLock::new(Vec::new())),
notify: Arc::new(Notify::new()),
db,
octo: Octo::new()?,
llm,
})
}
}
Don't be afraid of the type signatures! Arc<RwLock<T>>
is simply an access pattern to be able to make something thread-safe by wrapping it in a Mutex which makes it only accessible by one thread at a time. We then add an Arc
around it so it can be sent between threads.
For now we're basically done here, but we'll be using the Arc<Notify>
in just a little bit to make a rudimentary queue.
Setting up an update queue
Before we get started, we'll want to set up an update queue. The purpose of having an internal queue serves a few purposes:
- It stops requests from timing out if we include the whole of the update process in the HTTP request
- It lets the application to still run updates in the background while allowing it to still receive HTTP requests
- We can extend the queue functionality to whatever we want
For this application, we can create a rudimentary queue function that will live in the application state struct by using Arc<Notify>
. The Notify
struct is simply a way to send an update to a task or another thread without sending a full message with data in it.
pub struct AppState {
pub files: Arc<RwLock<Vec<File>>>,
pub notify: Arc<Notify>,
pub db: VectorDB,
pub octo: Octo,
pub llm: OpenAIBackend,
}
impl AppState {
pub fn new(db: VectorDB, llm: T) -> Result<Self> {
Ok(Self {
files: Arc::new(RwLock::new(Vec::new())),
notify: Arc::new(Notify::new()),
db,
octo: Octo::new()?,
llm,
})
}
}
Next, we'll set up the function for running the update queue. Note that here, the notified()
function will wait indefinitely until it receives a notification. If there's no notification, it will never progress in the loop, saving memory usage.
impl AppState {
pub async fn run_update_queue(&self) {
loop {
self.notify.notified().await;
let _ = self
.update()
.await
.inspect_err(|x| println!("Error while updating application state: {x}"))
.unwrap();
}
}
}
Finally, we will add the update
function! This updates the struct by running the whole embedding process:
impl AppState {
pub async fn update(&self) -> Result<()> {
let temp_dir = tempdir()?;
let path = self.octo.download_repo(&temp_dir).await?;
let mut files = load_files_from_dir(temp_dir.path().to_path_buf(), "md", &path)?;
let mut db = VectorDB::new()?;
db.reset_collection().await?;
embed_documentation(&mut files, &mut db, &self.llm).await?;
let mut lock = self.files.write().await;
*lock = files;
println!("All files have been embedded!");
Ok(())
}
}
During initialisation, we use Arc::clone()
on our application state and put it in a Tokio task to run the update queue. Using Arc::clone()
creates a reference-counted version of the variable, allowing any changes to be shared between the variable. Note that Arc
cloned values are generally not mutable by themselves. For the Vec<File>
, we use a Arc<RwLock<T>>
pattern, allowing us to change the inner value mutably while still allowing thread safety!
To extend this queue, you can use whatever triggers you want - setting up GitHub webhooks, scheduled tasks, and more. The essential part is using the state.notify.notify_one()
function to notify the Arc<Notify>
, which sends an event to the loop and kickstarts the update process.
Here's an example of using GitHub webhooks to notify the event loop on a new push to branch.
Putting it all together
Now onto the good part: putting everything together!
Before we create our prompt endpoint, we'll quickly create a type that converts the OpenAI Receiver<ChatCompletionDelta>
type to a Stream:
fn chat_completion_stream(
chat_completion: Receiver<ChatCompletionDelta>,
) -> impl Stream<Item = String> {
ReceiverStream::new(chat_completion)
.map(|completion| completion.choices)
.map(|choices| {
choices
.into_iter()
.map(|choice| choice.delta.content.unwrap_or("\n".to_string()))
.collect()
})
}
Of course, if there's something wrong with the request, we'll also want to stream a simple response that tells the user there was something wrong with the prompt:
fn error_stream() -> impl Stream<Item = String> {
futures::stream::once(async move { "Error with your prompt".to_string() })
}
We'll also want to create a function to combine everything we've done previously so that we don't need to keep referencing the whole workflow again:
async fn get_contents(
prompt: &str,
state: &Arc<AppState>,
) -> Result<Conversation> {
let embedding = state.llm.embed_sentence(prompt).await?;
let embedding = embedding.vec.iter().map(|&x| x as f32).collect();
let result = state.db.search(embedding).await?;
let contents = state
.files
.read()
.await
.get_contents(&result)
.ok_or(anyhow::anyhow!("There was a prompt error :("))?;
state.llm.chat_stream(prompt, contents.as_str()).await
}
After that, we can combine everything we've done into a single prompting endpoint!
pub async fn prompt(
State(app_state): State<Arc<AppState>>,
Json(Prompt { prompt }): Json<Prompt>,
) -> impl IntoResponse {
let chat_completion = crate::get_contents(&prompt, &app_state).await;
match chat_completion {
Ok(chat) => axum_streams::StreamBodyAs::text(chat_completion_stream(chat)),
Err(e) => {
println!("Something went wrong while prompting: {e}");
axum_streams::StreamBodyAs::text(error_stream())
}
}
}
Finally, we need to tie everything together by doing the following:
- Initialising all of the things we need
- Creating application state, wrapping it in an Arc and cloning it
- Move the cloned application state into a Tokio task and run the update queue
- Send a notification to the queue to trigger an update in the background
- Set up the router and return it
let vector_db = VectorDB::from_qdrant_client(qdrant);
println!("VectorDB created!");
vector_db.create_collection().await?;
let llm_backend = OpenAIBackend::new()?;
println!("OpenAI backend created!");
let state = AppState::new(vector_db, llm_backend)?;
let state = Arc::new(state);
Note that the clone below is mandatory. Without this, the compiler will tell us we have an error related to a moved variable. By using async move
, any used variables within the closure are moved in. We can get around this with interior mutability via the Arc
, which is also quite cheap to clone so we are not losing too much by doing so.
let cloned_state: Arc<AppState> = Arc::clone(&state);
tokio::spawn(async move {
cloned_state.run_update_queue().await;
});
state.notify.notify_one();
Now that we're at the end, we need to just initialise our Axum router then return it!
let rtr = Router::new()
.route("/prompt", post(prompt))
.with_state(state);
// if not using Shuttle, replace this with setting up a TcpListener and starting your Axum server
Ok(rtr.into())
To test our application, we can start our application with cargo shuttle run
then use curl:
curl http://localhost:8000/prompt -H 'Content-Type: application/json' -d '{"prompt":"Hello world!"}' > response.txt
This curl one-liner sends a POST request to our prompt endpoint, returns the response and stores it in a file called response.txt
in the directory where we execute the command.
Deploying
Interested in deploying? If you're using a Shuttle project (ie you initialised the project using cargo shuttle init
), you can use cargo shuttle deploy --ad
to deploy and watch the magic happen.
If not, you will need a Dockerfile to deploy. I would highly recommend using cargo-chef
to deploy - it's quite useful and has personally saved me a lot of time between Dockerfile deployments.
Finishing up
Thanks for reading! With the power of Qdrant, OpenAI and Rust, anything is possible.
Top comments (0)