DEV Community

Alain Airom
Alain Airom

Posted on

Populating a RAG with data from enterprise documents repositories for Generative AI

Ingesting data from OneDrive into Milvus in order to use with generative ai.

Image description
Image courtesy Milvus.io

Introduction and motivation

In a recent project we help our customer to populate a RAG vector database (Milvus in our case) with content ingestion from the company’s Sharepoint/OneDrive repositories, which would then be using a LLM fitting the best of the customer’s requirement on watsonx.ai platform, for instance a Granite model.

This article describes the code I prepare and test locally on my computer. This code will be adapted to be used on the customer’s platform, but as usual I like to do my research beforehand… (I’m a bit a geek I guess).

Local Milvus Preparation

In a previous article I wrote about the difficulties I encountered on setting-up a Milvus environment on my laptop, well I guess I finally found a very straightforward and simple way to do this. Hereafter are the steps.

In order to do this you need either Docker, or in my case I use Podman.

# fetch the startup script
curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.sh
Enter fullscreen mode Exit fullscreen mode

The next step and all you have to do is to run the script, although I have made an alias on my laptop which makes alias docker=”podman”, for the simplification in this case, I replace all ‘sudo docker’ commands to ‘sudo podman’ un the downloaded ‘standalone_embed.sh’ file which I renamed.

# the original call is 'bash standalone_embed.sh start'
bash standalone_podman_embed.sh start
Enter fullscreen mode Exit fullscreen mode

And that’s it. Your Milvus instance should be up and running. But we’re going to be sure of it.

Test Milvus database connection and prepare the main application

As I write the application in Python, I prepare a virtual environment (a best practice I recommend to everyone which I learnt the hard way! 🫣

python3.11 -m venv sharepoint_milvus
source sharepoint_milvus/bin/activate
Enter fullscreen mode Exit fullscreen mode

The next step is to install the very first package to work with Milvus.

pip install --upgrade pymilvus
Enter fullscreen mode Exit fullscreen mode

And then testing the connection to the local vector db.

from pymilvus import connections

try:
    connections.connect(host="127.0.0.1", port="19530")
    print("Milvus connection successful!")
    connections.disconnect("default")
except Exception as e:
    print(f"Milvus connection failed: {e}")
Enter fullscreen mode Exit fullscreen mode

The connection is OK, the second test I made using sample code from the Milvus site in order to make a vector database and testing it is the following.

from pymilvus import MilvusClient
import numpy as np

client = MilvusClient("./milvus_demo.db")
client.create_collection(
    collection_name="demo_collection",
    dimension=384  # The vectors we will use in this demo has 384 dimensions
)

docs = [
    "Artificial intelligence was founded as an academic discipline in 1956.",
    "Alan Turing was the first person to conduct substantial research in AI.",
    "Born in Maida Vale, London, Turing was raised in southern England.",
]

vectors = [[ np.random.uniform(-1, 1) for _ in range(384) ] for _ in range(len(docs)) ]
data = [ {"id": i, "vector": vectors[i], "text": docs[i], "subject": "history"} for i in range(len(vectors)) ]
res = client.insert(
    collection_name="demo_collection",
    data=data
)

res = client.search(
    collection_name="demo_collection",
    data=[vectors[0]],
    filter="subject == 'history'",
    limit=2,
    output_fields=["text", "subject"],
)
print(res)

res = client.query(
    collection_name="demo_collection",
    filter="subject == 'history'",
    output_fields=["text", "subject"],
)
print(res)

res = client.delete(
    collection_name="demo_collection",
    filter="subject == 'history'",
)
print(res)
Enter fullscreen mode Exit fullscreen mode

Once this works, we can move forward.

The main application (and challenges)

Writing the main application and testing it took me quite a time. Also there is a big disclaimer here. At IBM, we use OneDrive as one of our enterprise repositories, though we can access the drive with the internal SSO login system, however the strict IBM security policy blocks connection through an application like the one I wrote. So I tested the code against one on my own email addresses from ‘Outlook’ and my own drive to make sure that it could work (I also received some emails from MicroSoft regarding suspicious activities on my account 🚨.

Having said that, let’s go back to my application. First and as usual I created a “.env” file with my connection/configuration informations.

export SHAREPOINT_USERNAME="xxx@yyy.com"
export SHAREPOINT_PASSWORD="PASSWORD"
export SHAREPOINT_SITE_URL="THE_MAIN_URL"
export SHAREPOINT_DOCUMENT_LIBRARY="THE_MAIN_DOCUMENT_FOLDER"
export SHAREPOINT_SUBFOLDER_NAME="THE_TARGET_SUBFOLDER"

' specific to my environment
export TOKENIZERS_PARALLELISM="false" 
Enter fullscreen mode Exit fullscreen mode

I “source” this file in order to use it.

source .env
Enter fullscreen mode Exit fullscreen mode

Then comes the code. First we take a look at dependencies required by the application.

import os
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
import tiktoken
import hashlib
import time
Enter fullscreen mode Exit fullscreen mode

After that, some debug stuff in order to be sure that the application has the right connection/credential information.

# Configuration
SHAREPOINT_USERNAME = os.environ.get("SHAREPOINT_USERNAME")
SHAREPOINT_PASSWORD = os.environ.get("SHAREPOINT_PASSWORD")
SHAREPOINT_SITE_URL = os.environ.get("SHAREPOINT_SITE_URL")
SHAREPOINT_DOCUMENT_LIBRARY = os.environ.get("SHAREPOINT_DOCUMENT_LIBRARY")
SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip()  # Trim spaces

print(f"SHAREPOINT_USERNAME: {os.environ.get('SHAREPOINT_USERNAME')}")
print(f"SHAREPOINT_PASSWORD: {os.environ.get('SHAREPOINT_PASSWORD')}")
print(f"SHAREPOINT_SITE_URL: {os.environ.get('SHAREPOINT_SITE_URL')}")
print(f"SHAREPOINT_DOCUMENT_LIBRARY: {os.environ.get('SHAREPOINT_DOCUMENT_LIBRARY')}")
print(f"SHAREPOINT_SUBFOLDER_NAME: {os.environ.get('SHAREPOINT_SUBFOLDER_NAME')}")

print(f"Complete URL: "+ SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME)
C_URL = SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME
Enter fullscreen mode Exit fullscreen mode

My Milvus is running locally, so the setup is hard-coded (OK, it’s ugly, I admit).

MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
MILVUS_COLLECTION_NAME = "document_chunks"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 100
EMBEDDING_DIMENSION = 1536
ENCODING_NAME = "cl100k_base"
Enter fullscreen mode Exit fullscreen mode

And with here is the full code.

import os
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
import tiktoken
import hashlib
import time

# Configuration
SHAREPOINT_USERNAME = os.environ.get("SHAREPOINT_USERNAME")
SHAREPOINT_PASSWORD = os.environ.get("SHAREPOINT_PASSWORD")
SHAREPOINT_SITE_URL = os.environ.get("SHAREPOINT_SITE_URL")
SHAREPOINT_DOCUMENT_LIBRARY = os.environ.get("SHAREPOINT_DOCUMENT_LIBRARY")
SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip()  # Trim spaces

print(f"SHAREPOINT_USERNAME: {os.environ.get('SHAREPOINT_USERNAME')}")
print(f"SHAREPOINT_PASSWORD: {os.environ.get('SHAREPOINT_PASSWORD')}")
print(f"SHAREPOINT_SITE_URL: {os.environ.get('SHAREPOINT_SITE_URL')}")
print(f"SHAREPOINT_DOCUMENT_LIBRARY: {os.environ.get('SHAREPOINT_DOCUMENT_LIBRARY')}")
print(f"SHAREPOINT_SUBFOLDER_NAME: {os.environ.get('SHAREPOINT_SUBFOLDER_NAME')}")

print(f"Complete URL: "+ SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME)
C_URL = SHAREPOINT_SITE_URL+ "/" + SHAREPOINT_DOCUMENT_LIBRARY + "/" + SUBFOLDER_NAME

MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
MILVUS_COLLECTION_NAME = "document_chunks"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 100
EMBEDDING_DIMENSION = 1536
ENCODING_NAME = "cl100k_base"

def authenticate_sharepoint(site_url, username, password):
    """Authenticates with SharePoint."""
    ctx_auth = AuthenticationContext(site_url)
    if ctx_auth.acquire_token_for_user(username, password):
        ctx = ClientContext(site_url, ctx_auth)
        print("SharePoint authentication successful.")
        return ctx
    else:
        raise Exception(f"Failed to authenticate: {ctx_auth.get_last_error()}")

def download_file(ctx, server_relative_url):
    """Downloads a file from SharePoint."""
    file = File.open_binary(ctx, server_relative_url)
    ctx.execute_query()
    print(f"Downloaded file: {server_relative_url}")
    return file.content

def get_sharepoint_files(ctx, library_name, subfolder_path=None):
    """
    Fetches files from a SharePoint doc library or a specific subfolder.
    """
    try:
        list_obj = ctx.web.lists.get_by_title(library_name)
        ctx.load(list_obj.root_folder)
        ctx.execute_query()

        if subfolder_path:
            full_folder_url = f"{list_obj.root_folder.serverRelativeUrl}/{subfolder_path}"
            print(f"full_folder_url: {full_folder_url}")
            try:
                folder = ctx.web.get_folder_by_server_relative_url(full_folder_url)
                ctx.load(folder)
                ctx.execute_query()
                print(f"Folder object: {folder}")
                print(f"Folder.serverRelativeUrl: {folder.serverRelativeUrl}")
                print(f"Folder.exists: {folder.exists}")
            except Exception as folder_error:
                print(f"Error during get_folder_by_server_relative_url(): {folder_error}")
                return []

            ctx.load(folder)
            ctx.execute_query()
            try:
                items = folder.files.get_all().execute_query()
            except Exception as query_error:
                print(f"Error during folder.files.get_all().execute_query(): {query_error}")
                return []

        else:
            items = list_obj.items.get_all().execute_query()

        files = []
        if items:
            if isinstance(items, list):
                for i, item in enumerate(items):
                    try:
                        print(f"Item {i}: {item}")
                        if hasattr(item, 'properties'):
                            print(f"Item {i} properties: {item.properties}")
                        else:
                            print(f"Item {i} has no properties.")
                        if hasattr(item, 'serverRelativeUrl'):
                            print(f"Item {i} serverRelativeUrl: {item.serverRelativeUrl}")
                        else:
                            print(f"Item {i} has no serverRelativeUrl.")

                        if hasattr(item, 'properties') and 'ServerRelativeUrl' in item.properties:
                            files.append(item.properties["ServerRelativeUrl"])
                        else:
                            print(f"Item {i} does not have properties or ServerRelativeUrl: {item}")
                    except Exception as item_error:
                        print(f"Error processing item {i}: {item_error}")
            else:
                print("Items is not a list.")
        else:
            print("Items list is empty.")

        print(f"Found {len(files)} files in '{library_name}'. Subfolder: {subfolder_path}")
        print(f"Files retrieved: {files}")
        return files

    except Exception as e:
        print(f"Error in get_sharepoint_files: {e}")
        return []

def chunk_text(text, chunk_size, chunk_overlap, encoding_name):
    """Splits text into chunks with overlap."""
    encoder = tiktoken.get_encoding(encoding_name)
    tokens = encoder.encode(text)
    chunks = []
    for i in range(0, len(tokens), chunk_size - chunk_overlap):
        chunk = tokens[i : i + chunk_size]
        chunks.append(encoder.decode(chunk))
    print(f"Text chunked into {len(chunks)} chunks.")
    return chunks

def create_milvus_collection(collection_name, dimension):
    """Creates a Milvus collection."""
    connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)

    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=256),
        FieldSchema(name="chunk_id", dtype=DataType.INT64),
        FieldSchema(name="chunk_text", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dimension),
    ]
    schema = CollectionSchema(fields, "Document Chunks")
    collection = Collection(collection_name, schema)
    index_params = {
        "metric_type": "IP",
        "index_type": "HNSW",
        "params": {"M": 8, "efConstruction": 64},
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    print(f"Milvus collection '{collection_name}' created.")
    return collection

def insert_chunks_to_milvus(collection, document_id, chunks, embeddings):
    """Inserts into Milvus."""
    print(f"Chunks length: {len(chunks)}")
    print(f"Embeddings length: {len(embeddings)}")
    data = [
        [document_id] * len(chunks),
        list(range(len(chunks))),
        chunks,
        embeddings,
    ]
    collection.insert(data)
    print(f"Inserted {len(chunks)} chunks into Milvus.")

def generate_embeddings(chunks):
    """Generates embeddings for chunks """
    embeddings = [[0.1] * EMBEDDING_DIMENSION for _ in chunks]
    print(f"Generated embeddings for {len(chunks)} chunks.")
    print(f"Embeddings: {embeddings}")
    return embeddings

def process_sharepoint_documents():
    """Main process"""
    try:
        start_time = time.time()
        ctx = authenticate_sharepoint(SHAREPOINT_SITE_URL, SHAREPOINT_USERNAME, SHAREPOINT_PASSWORD)
        SUBFOLDER_NAME = os.environ.get("SHAREPOINT_SUBFOLDER_NAME").strip()
        print(f"SUBFOLDER_NAME (process_sharepoint_documents): {SUBFOLDER_NAME}")  
        print(f"SHAREPOINT_DOCUMENT_LIBRARY (process_sharepoint_documents): {SHAREPOINT_DOCUMENT_LIBRARY}")
        if SUBFOLDER_NAME:
            files = get_sharepoint_files(ctx, SHAREPOINT_DOCUMENT_LIBRARY, SUBFOLDER_NAME)
        else:
            files = get_sharepoint_files(ctx, SHAREPOINT_DOCUMENT_LIBRARY)

        if not files:
            print("No files found in the specified SharePoint location.")
            return

        collection = create_milvus_collection(MILVUS_COLLECTION_NAME, EMBEDDING_DIMENSION)

        for file_ref in files:
            file_start_time = time.time()
            try:
                print(f"Processing file: {file_ref}")
                file_content = download_file(ctx, file_ref).decode("utf-8", errors="ignore")
                document_id = hashlib.md5(file_ref.encode()).hexdigest()

                chunks = chunk_text(file_content, CHUNK_SIZE, CHUNK_OVERLAP, ENCODING_NAME)
                embeddings = generate_embeddings(chunks)

                if not embeddings:
                    print(f"No embeddings were generated for file: {file_ref}")
                    continue

                insert_chunks_to_milvus(collection, document_id, chunks, embeddings)
                print(f"File processed in {time.time() - file_start_time:.2f} seconds.")

            except Exception as file_error:
                print(f"Error processing file {file_ref}: {file_error}")
        collection.flush()
        print("SharePoint documents processed and indexed.")
        print(f"Total processing time: {time.time() - start_time:.2f} seconds.")

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    process_sharepoint_documents()
Enter fullscreen mode Exit fullscreen mode

OK, that’s the code. These chunkings would be used using the best fit LLM on “watsonx.ai” platform for the use case of the customer.

Conclusion

As mentioned above, this is a test application in order to prepare myself for the project we would realize with our client. As always, I want to get my hands dirty and be sure of what we propose as solution to our customers and business partners, and above and beyond all, I love this 😅.

Thanks for reading.

Links

AWS Q Developer image

Your AI Code Assistant

Automate your code reviews. Catch bugs before your coworkers. Fix security issues in your code. Built to handle large projects, Amazon Q Developer works alongside you from idea to production code.

Get started free in your IDE

Top comments (0)

AWS GenAI LIVE image

How is generative AI increasing efficiency?

Join AWS GenAI LIVE! to find out how gen AI is reshaping productivity, streamlining processes, and driving innovation.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay