Ingesting data from OneDrive into Milvus in order to use with generative ai.
Introduction and motivation
In a recent project we help our customer to populate a RAG vector database (Milvus in our case) with content ingestion from the company’s Sharepoint/OneDrive repositories, which would then be using a LLM fitting the best of the customer’s requirement on watsonx.ai platform, for instance a Granite model.
This article describes the code I prepare and test locally on my computer. This code will be adapted to be used on the customer’s platform, but as usual I like to do my research beforehand… (I’m a bit a geek I guess).
Local Milvus Preparation
In a previous article I wrote about the difficulties I encountered on setting-up a Milvus environment on my laptop, well I guess I finally found a very straightforward and simple way to do this. Hereafter are the steps.
In order to do this you need either Docker, or in my case I use Podman.
# fetch the startup script
curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.sh
The next step and all you have to do is to run the script, although I have made an alias on my laptop which makes alias docker=”podman”, for the simplification in this case, I replace all ‘sudo docker’ commands to ‘sudo podman’ un the downloaded ‘standalone_embed.sh’ file which I renamed.
# the original call is 'bash standalone_embed.sh start'
bash standalone_podman_embed.sh start
And that’s it. Your Milvus instance should be up and running. But we’re going to be sure of it.
Test Milvus database connection and prepare the main application
As I write the application in Python, I prepare a virtual environment (a best practice I recommend to everyone which I learnt the hard way! 🫣
python3.11 -m venv sharepoint_milvus
source sharepoint_milvus/bin/activate
The next step is to install the very first package to work with Milvus.
pip install --upgrade pymilvus
And then testing the connection to the local vector db.
from pymilvus import connections
try:
connections.connect(host="127.0.0.1", port="19530")
print("Milvus connection successful!")
connections.disconnect("default")
except Exception as e:
print(f"Milvus connection failed: {e}")
The connection is OK, the second test I made using sample code from the Milvus site in order to make a vector database and testing it is the following.
from pymilvus import MilvusClient
import numpy as np
client = MilvusClient("./milvus_demo.db")
client.create_collection(
collection_name="demo_collection",
dimension=384 # The vectors we will use in this demo has 384 dimensions
)
docs = [
"Artificial intelligence was founded as an academic discipline in 1956.",
"Alan Turing was the first person to conduct substantial research in AI.",
"Born in Maida Vale, London, Turing was raised in southern England.",
]
vectors = [[ np.random.uniform(-1, 1) for _ in range(384) ] for _ in range(len(docs)) ]
data = [ {"id": i, "vector": vectors[i], "text": docs[i], "subject": "history"} for i in range(len(vectors)) ]
res = client.insert(
collection_name="demo_collection",
data=data
)
res = client.search(
collection_name="demo_collection",
data=[vectors[0]],
filter="subject == 'history'",
limit=2,
output_fields=["text", "subject"],
)
print(res)
res = client.query(
collection_name="demo_collection",
filter="subject == 'history'",
output_fields=["text", "subject"],
)
print(res)
res = client.delete(
collection_name="demo_collection",
filter="subject == 'history'",
)
print(res)
Once this works, we can move forward.
The main application (and challenges)
Writing the main application and testing it took me quite a time. Also there is a big disclaimer here. At IBM, we use OneDrive as one of our enterprise repositories, though we can access the drive with the internal SSO login system, however the strict IBM security policy blocks connection through an application like the one I wrote. So I tested the code against one on my own email addresses from ‘Outlook’ and my own drive to make sure that it could work (I also received some emails from MicroSoft regarding suspicious activities on my account 🚨.
Having said that, let’s go back to my application. First and as usual I created a “.env” file with my connection/configuration informations.
export SHAREPOINT_USERNAME="xxx@yyy.com"
export SHAREPOINT_PASSWORD="PASSWORD"
export SHAREPOINT_SITE_URL="THE_MAIN_URL"
export SHAREPOINT_DOCUMENT_LIBRARY="THE_MAIN_DOCUMENT_FOLDER"
export SHAREPOINT_SUBFOLDER_NAME="THE_TARGET_SUBFOLDER"
' specific to my environment
export TOKENIZERS_PARALLELISM="false"
I “source” this file in order to use it.
source .env
Then comes the code. First we take a look at dependencies required by the application.
import os
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
import tiktoken
import hashlib
import time
After that, some debug stuff in order to be sure that the application has the right connection/credential information.
# Configuration
SHAREPOINT_USERNAME = os.environ.get("SHAREPOINT_USERNAME")
SHAREPOINT_PASSWORD = os.environ.get("SHAREPOINT_PASSWORD")
SHAREPOINT_SITE_URL = os.environ.get("SHAREPOINT_SITE_URL")
SHAREPOINT_DOCUMENT_LIBRARY = os.environ.get("SHAREPOINT_DOCUMENT_LIBRARY")
SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip() # Trim spaces
print(f"SHAREPOINT_USERNAME: {os.environ.get('SHAREPOINT_USERNAME')}")
print(f"SHAREPOINT_PASSWORD: {os.environ.get('SHAREPOINT_PASSWORD')}")
print(f"SHAREPOINT_SITE_URL: {os.environ.get('SHAREPOINT_SITE_URL')}")
print(f"SHAREPOINT_DOCUMENT_LIBRARY: {os.environ.get('SHAREPOINT_DOCUMENT_LIBRARY')}")
print(f"SHAREPOINT_SUBFOLDER_NAME: {os.environ.get('SHAREPOINT_SUBFOLDER_NAME')}")
print(f"Complete URL: "+ SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME)
C_URL = SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME
My Milvus is running locally, so the setup is hard-coded (OK, it’s ugly, I admit).
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
MILVUS_COLLECTION_NAME = "document_chunks"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 100
EMBEDDING_DIMENSION = 1536
ENCODING_NAME = "cl100k_base"
And with here is the full code.
import os
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
import tiktoken
import hashlib
import time
# Configuration
SHAREPOINT_USERNAME = os.environ.get("SHAREPOINT_USERNAME")
SHAREPOINT_PASSWORD = os.environ.get("SHAREPOINT_PASSWORD")
SHAREPOINT_SITE_URL = os.environ.get("SHAREPOINT_SITE_URL")
SHAREPOINT_DOCUMENT_LIBRARY = os.environ.get("SHAREPOINT_DOCUMENT_LIBRARY")
SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip() # Trim spaces
print(f"SHAREPOINT_USERNAME: {os.environ.get('SHAREPOINT_USERNAME')}")
print(f"SHAREPOINT_PASSWORD: {os.environ.get('SHAREPOINT_PASSWORD')}")
print(f"SHAREPOINT_SITE_URL: {os.environ.get('SHAREPOINT_SITE_URL')}")
print(f"SHAREPOINT_DOCUMENT_LIBRARY: {os.environ.get('SHAREPOINT_DOCUMENT_LIBRARY')}")
print(f"SHAREPOINT_SUBFOLDER_NAME: {os.environ.get('SHAREPOINT_SUBFOLDER_NAME')}")
print(f"Complete URL: "+ SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME)
C_URL = SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
MILVUS_COLLECTION_NAME = "document_chunks"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 100
EMBEDDING_DIMENSION = 1536
ENCODING_NAME = "cl100k_base"
def authenticate_sharepoint(site_url, username, password):
"""Authenticates with SharePoint."""
ctx_auth = AuthenticationContext(site_url)
if ctx_auth.acquire_token_for_user(username, password):
ctx = ClientContext(site_url, ctx_auth)
print("SharePoint authentication successful.")
return ctx
else:
raise Exception(f"Failed to authenticate: {ctx_auth.get_last_error()}")
def download_file(ctx, server_relative_url):
"""Downloads a file from SharePoint."""
file = File.open_binary(ctx, server_relative_url)
ctx.execute_query()
print(f"Downloaded file: {server_relative_url}")
return file.content
def get_sharepoint_files(ctx, library_name, subfolder_path=None):
"""
Fetches files from a SharePoint doc library or a specific subfolder.
"""
try:
list_obj = ctx.web.lists.get_by_title(library_name)
ctx.load(list_obj.root_folder)
ctx.execute_query()
if subfolder_path:
full_folder_url = f"{list_obj.root_folder.serverRelativeUrl}/{subfolder_path}"
print(f"full_folder_url: {full_folder_url}")
try:
folder = ctx.web.get_folder_by_server_relative_url(full_folder_url)
ctx.load(folder)
ctx.execute_query()
print(f"Folder object: {folder}")
print(f"Folder.serverRelativeUrl: {folder.serverRelativeUrl}")
print(f"Folder.exists: {folder.exists}")
except Exception as folder_error:
print(f"Error during get_folder_by_server_relative_url(): {folder_error}")
return []
ctx.load(folder)
ctx.execute_query()
try:
items = folder.files.get_all().execute_query()
except Exception as query_error:
print(f"Error during folder.files.get_all().execute_query(): {query_error}")
return []
else:
items = list_obj.items.get_all().execute_query()
files = []
if items:
if isinstance(items, list):
for i, item in enumerate(items):
try:
print(f"Item {i}: {item}")
if hasattr(item, 'properties'):
print(f"Item {i} properties: {item.properties}")
else:
print(f"Item {i} has no properties.")
if hasattr(item, 'serverRelativeUrl'):
print(f"Item {i} serverRelativeUrl: {item.serverRelativeUrl}")
else:
print(f"Item {i} has no serverRelativeUrl.")
if hasattr(item, 'properties') and 'ServerRelativeUrl' in item.properties:
files.append(item.properties["ServerRelativeUrl"])
else:
print(f"Item {i} does not have properties or ServerRelativeUrl: {item}")
except Exception as item_error:
print(f"Error processing item {i}: {item_error}")
else:
print("Items is not a list.")
else:
print("Items list is empty.")
print(f"Found {len(files)} files in '{library_name}'. Subfolder: {subfolder_path}")
print(f"Files retrieved: {files}")
return files
except Exception as e:
print(f"Error in get_sharepoint_files: {e}")
return []
def chunk_text(text, chunk_size, chunk_overlap, encoding_name):
"""Splits text into chunks with overlap."""
encoder = tiktoken.get_encoding(encoding_name)
tokens = encoder.encode(text)
chunks = []
for i in range(0, len(tokens), chunk_size - chunk_overlap):
chunk = tokens[i : i + chunk_size]
chunks.append(encoder.decode(chunk))
print(f"Text chunked into {len(chunks)} chunks.")
return chunks
def create_milvus_collection(collection_name, dimension):
"""Creates a Milvus collection."""
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="chunk_id", dtype=DataType.INT64),
FieldSchema(name="chunk_text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dimension),
]
schema = CollectionSchema(fields, "Document Chunks")
collection = Collection(collection_name, schema)
index_params = {
"metric_type": "IP",
"index_type": "HNSW",
"params": {"M": 8, "efConstruction": 64},
}
collection.create_index(field_name="embedding", index_params=index_params)
print(f"Milvus collection '{collection_name}' created.")
return collection
def insert_chunks_to_milvus(collection, document_id, chunks, embeddings):
"""Inserts into Milvus."""
print(f"Chunks length: {len(chunks)}")
print(f"Embeddings length: {len(embeddings)}")
data = [
[document_id] * len(chunks),
list(range(len(chunks))),
chunks,
embeddings,
]
collection.insert(data)
print(f"Inserted {len(chunks)} chunks into Milvus.")
def generate_embeddings(chunks):
"""Generates embeddings for chunks """
embeddings = [[0.1] * EMBEDDING_DIMENSION for _ in chunks]
print(f"Generated embeddings for {len(chunks)} chunks.")
print(f"Embeddings: {embeddings}")
return embeddings
def process_sharepoint_documents():
"""Main process"""
try:
start_time = time.time()
ctx = authenticate_sharepoint(SHAREPOINT_SITE_URL, SHAREPOINT_USERNAME, SHAREPOINT_PASSWORD)
SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip()
print(f"SUBFOLDER_NAME (process_sharepoint_documents): {SUBFOLDER_NAME}")
print(f"SHAREPOINT_DOCUMENT_LIBRARY (process_sharepoint_documents): {SHAREPOINT_DOCUMENT_LIBRARY}")
if SUBFOLDER_NAME:
files = get_sharepoint_files(ctx, SHAREPOINT_DOCUMENT_LIBRARY, SUBFOLDER_NAME)
else:
files = get_sharepoint_files(ctx, SHAREPOINT_DOCUMENT_LIBRARY)
if not files:
print("No files found in the specified SharePoint location.")
return
collection = create_milvus_collection(MILVUS_COLLECTION_NAME, EMBEDDING_DIMENSION)
for file_ref in files:
file_start_time = time.time()
try:
print(f"Processing file: {file_ref}")
file_content = download_file(ctx, file_ref).decode("utf-8", errors="ignore")
document_id = hashlib.md5(file_ref.encode()).hexdigest()
chunks = chunk_text(file_content, CHUNK_SIZE, CHUNK_OVERLAP, ENCODING_NAME)
embeddings = generate_embeddings(chunks)
if not embeddings:
print(f"No embeddings were generated for file: {file_ref}")
continue
insert_chunks_to_milvus(collection, document_id, chunks, embeddings)
print(f"File processed in {time.time() - file_start_time:.2f} seconds.")
except Exception as file_error:
print(f"Error processing file {file_ref}: {file_error}")
collection.flush()
print("SharePoint documents processed and indexed.")
print(f"Total processing time: {time.time() - start_time:.2f} seconds.")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
process_sharepoint_documents()
OK, that’s the code. These chunkings would be used using the best fit LLM on “watsonx.ai” platform for the use case of the customer.
Conclusion
As mentioned above, this is a test application in order to prepare myself for the project we would realize with our client. As always, I want to get my hands dirty and be sure of what we propose as solution to our customers and business partners, and above and beyond all, I love this 😅.
Thanks for reading.
Links
- watsonx.ai: https://www.ibm.com/products/watsonx-ai
- Granite LLM family on Hugging Face: https://huggingface.co/ibm-granite
Top comments (0)