Note: This post was written by my teammate joel on the bitcrowd blog
A RAG for Elixir
Recently, we told you about our wonderful RAG for Elixir that you can use to chat with your codebase.
It has one single flaw. We built our RAG for Elixir using Python.
Don't get me wrong, Python is alright and there are reasons why as of today it's the most popular programming language on GitHub.
But there are also many reasons why Elixir is our programming language of choice.
So, wouldn't it be great if we could build our RAG for Elixir in Elixir?
The components
Let's start by revisiting the components we chose to build our original system.
It was based on LangChain.
We contributed a language parser for Elixir to the project to be able to chunk our code into tidy pieces.
To capture the semantic meaning of the code chunks, we used the renowned Jina embeddings model in version 2.
We persisted the resulting embeddings in a locally running chroma
vector store.
To have an LLM to talk to, we employed llama3:8b
locally with ollama
.
Alright, let's have a look what's available in Elixir:
- [x] Something to chunk Elixir code
- [x] Jina embeddings v2
- [x] Llama3
- [x] Chroma
As we can see, all of the components are available in Elixir, although we will run chroma
using docker as we've done before.
There is one small change: we will replace Meta's Llama3 with Microsoft's Phi-3.5 mini instruct.
While we can run Llama3 in Elixir, you must sign up on Hugging Face, request access from Meta, wait for approval, and get an authentication token to use it.
We don't want this barrier for anyone who wants to follow this blog post (and run the resulting script that's waiting at the end π).
With Phi-3.5 you won't need any of that. The only requirement is a machine with enough RAM.
With all that said, it's time to build a local Retrieval Augmented Generation system in Elixir.
Building a RAG system in Elixir
Our RAG system consists of three parts:
- An ingestion pipeline to read the codebase, chunk the code, and store it in Chroma
- A retrieval pipeline to accept an input query, find relevant pieces of code and retrieve them from Chroma
- A generation pipeline to feed the relevant information and the query into an LLM and generate a helpful response for the user
Servings
We will run two models using Bumblebee
. An embedding model that we will configure as RagTime.EmbeddingServing
and an LLM that we will configure as RagTime.LLMServing
.
We create a module that contains two builder functions to build the servings.
You can add servings to your supervision tree as described in the Nx
documentation.
defmodule RagTime.Serving do
def build_embedding_serving() do
repo = {:hf, "jinaai/jina-embeddings-v2-base-code"}
{:ok, model_info} =
Bumblebee.load_model(repo,
spec_overrides: [architecture: :base],
params_filename: "model.safetensors"
)
{:ok, tokenizer} = Bumblebee.load_tokenizer(repo)
Bumblebee.Text.TextEmbedding.text_embedding(model_info, tokenizer,
compile: [batch_size: 64, sequence_length: 512],
defn_options: [compiler: EXLA],
output_attribute: :hidden_state,
output_pool: :mean_pooling
)
end
def build_llm_serving() do
repo = {:hf, "microsoft/phi-3.5-mini-instruct"}
{:ok, model_info} = Bumblebee.load_model(repo)
{:ok, tokenizer} = Bumblebee.load_tokenizer(repo)
{:ok, generation_config} = Bumblebee.load_generation_config(repo)
generation_config = Bumblebee.configure(generation_config, max_new_tokens: 512)
Bumblebee.Text.generation(model_info, tokenizer, generation_config,
compile: [batch_size: 1, sequence_length: 6000],
defn_options: [compiler: EXLA],
stream: false
)
end
end
Ingestion
In our ingestion pipeline, we want to accept a path to an Elixir codebase.
We find all Elixir files in the codebase using Path.wildcard/1
but ignore every file that's inside _build
or deps
.
Then, we build a map that contains the path of the file at the source
key and the content of the file at the content
key.
We chunk the code into pieces, calculate embeddings which capture the semantics of the chunk of code, and store the chunk and the embedding in Chroma.
defmodule RagTime.Ingestion do
def ingest(collection, input_path) do
files =
Path.wildcard(input_path <> "/**/*.{ex, exs}")
|> Enum.filter(fn path ->
not String.contains?(path, ["/_build/", "/deps/"])
end)
files_content = for file <- files, do: File.read!(file)
documents =
Enum.zip_with(files, files_content, fn file, content ->
%{content: content, source: file}
end)
chunks = chunk_with_metadata(documents, :elixir)
embeddings = generate_embeddings(chunks)
store_embeddings_and_chunks(collection, embeddings, chunks)
end
def chunk_with_metadata(documents, format) do
chunks = Enum.map(documents, &TextChunker.split(&1.content, format: format))
sources = Enum.map(documents, & &1.source)
Enum.zip(sources, chunks)
|> Enum.flat_map(fn {source, source_chunks} ->
for chunk <- source_chunks do
%{
source: source,
start_byte: chunk.start_byte,
end_byte: chunk.end_byte,
text: chunk.text
}
end
end)
end
def generate_embeddings(chunks) do
chunk_text_list = Enum.map(chunks, & &1.text)
Nx.Serving.batched_run(RagTime.EmbeddingServing, chunk_text_list)
|> Enum.map(fn %{embedding: embedding} -> Nx.to_list(embedding) end)
end
def store_embeddings_and_chunks(collection, embeddings, chunks) do
documents = Enum.map(chunks, & &1.text)
ids = Enum.map(chunks, &chunk_to_id(&1))
Chroma.Collection.add(collection, %{documents: documents, ids: ids, embeddings: embeddings})
end
defp chunk_to_id(%{source: path, start_byte: start_byte, end_byte: end_byte}) do
file_content = File.read!(path)
start_line =
file_content
|> String.byte_slice(0, start_byte)
|> String.split("\n")
|> Enum.count()
end_line =
file_content
|> String.byte_slice(0, end_byte)
|> String.split("\n")
|> Enum.count()
"#{path}:#{start_line}-#{end_line}"
end
end
Retrieval
Alright, now that we've got the code chunks and corresponding embeddings in our Chroma collection, we want to retrieve relevant code chunks for a given query.
So, we'll build our retrieval pipeline next.
For that, we create a new module with a single retrieve/2
function that takes a Chroma collection and a query and returns relevant chunks and their sources.
We compute the embeddings that correspond to the query using the same embedding serving we employed for ingesting.
The embedding is an Nx.Tensor
, so we convert it to a list and query Chroma with the list.
We limit the results to the 10 most relevant code chunks and return the code chunks and the sources.
defmodule RagTime.Retrieval do
def retrieve(collection, query) do
%{embedding: query_embedding} = Nx.Serving.batched_run(RagTime.EmbeddingServing, query)
query_embedding = Nx.to_list(query_embedding)
{:ok, results} =
Chroma.Collection.query(collection,
results: 10,
query_embeddings: [query_embedding]
)
[code_chunks] = results["documents"]
[sources] = results["ids"]
{code_chunks, sources}
end
end
Generation
We already arrived at the last piece of our system.
It's time to generate a helpful response.
Again, we'll define a new module.
This time it has a single generate_response/3
function that takes the query
, context_documents
, and context_sources
.
We concat the code chunks (our context documents) and inject them into a prompt together with our query.
We pass the resulting prompt to an LLM and return query, context, context source, and the response.
defmodule RagTime.Generation do
def generate_response(query, context_documents, context_sources) do
context =
Enum.map(context_documents, fn code_chunk ->
"""
[...]
#{code_chunk}
[...]
"""
end)
|> Enum.join("\n\n")
prompt =
"""
<|system|>
You are a helpful assistant.</s>
<|user|>
Context information is below.
---------------------
#{context}
---------------------
Given the context information and no prior knowledge, answer the query.
Query: #{query}
Answer: </s>
<|assistant|>
"""
%{results: [result]} = Nx.Serving.batched_run(RagTime.LLMServing, prompt)
%{
query: query,
context: context,
context_sources: context_sources,
response: result.text
}
end
end
To have a public interface for our RAG system, we define a RagTime
module with ingest/2
and query/2
functions.
defmodule RagTime do
def ingest(collection, path), do: RagTime.Ingestion.ingest(collection, path)
def query(collection, query) do
{context, sources} = RagTime.Retrieval.retrieve(collection, query)
RagTime.Generation.generate_response(query, context, sources)
end
end
And that's it, a fully local RAG system in Elixir.
Running the RAG system
For demo purposes, we built a simple UI using phoenix_playground
, you can download the resulting file from here.
Remember that we need a running Chroma instance.
You can start it with docker run -p 8000:8000 chromadb/chroma
.
Then you can run elixir rag_time.exs
to start the RAG system.
In our previous blog post, we used the system to get help for a Phoenix PR.
We will ask our Elixir RAG the same question to see if it works.
We clone the Phoenix repository and checkout the commit right before the PR got merged.
Then, we ingest the codebase into our RAG system using the UI.
Afterwards, we ask the exact same question as last time:
We: Please help me understand the following briefing:
Currently, if you set
remote_ip
during a test when building aconn
, calling a dispatch call, e.gget
, will run theconn
throughrecycle
which will lose this information. Instead, we wantthe remote_ip
maintained, similar tohost
. Can you see why remote_ip is not preserved in test cases where recycle is used?
Chatbot: The
remote_ip
is not preserved during the recycle process in test cases because the recycling mechanism is designed to emulate the behavior of browsers, which typically recycle connections by using cookies. When a connection is recycled, the response cookies, including thehost
header, are saved and sent in subsequent requests. However, theremote_ip
is not automatically included in these cookies. The recycling process, as implemented in the provided context, primarily focuses on preserving and transferring response headers like "accept", "accept-language", and "authorization". It does not explicitly handle theremote_ip
header. This is why when you setremote_ip
during a test and then dispatch a request (e.g., usingget
), therecycle
function will process the connection and potentially discard theremote_ip
information. To maintain theremote_ip
during tests, you would need to implement a custom solution that explicitly preserves and transfers this header during the recycling process. This could involve modifying the recycling function or creating a wrapper function that handles theremote_ip
separately. Alternatively, you could avoid using the recycle function altogether and rely on Phoenix's built-in connection recycling, which might preserve theremote_ip
by default. However, this approach might not be suitable for all test scenarios, especially if you need to test specific behaviors related to theremote_ip
header. In summary, the reasonremote_ip
is not preserved during recycling is that the recycling mechanism is not designed to handle this header, and it does not automatically include it in the response cookies. To address this issue, you would need to implement a custom solution or adjust your testing approach accordingly.
As you can see, we get a different response, but remember that we're using a different LLM.
We also get slightly different sources, which might be due to different chunking.
You can play around with the RAG system and swap out the embeddings model or LLM.
If your machine does not have enough memory to run the RAG system, try using smaller models, such as thenlper/gte-small for embeddings and HuggingFaceTB/SmolLM2-135M-Instruct as LLM.
Remember to adapt the prompt accordingly in that case.
In any case, start a discussion with us on GitHub or reach out via bluesky or slack if you need help or just want to let us know what you think.
Tip:
If you haven't done so already, check the rest of our ongoing series on RAG!
Top comments (0)