DEV Community

Cover image for Migrating Vector Data from Milvus to TiDB
TiDB Community
TiDB Community

Posted on

Migrating Vector Data from Milvus to TiDB

This article was written by caiyfc, a dedicated TiDB Cloud Serverless user and TiDB Community Moderator.

Introduction

Recently, I’ve been exploring the use of vector databases to build Retrieval-Augmented Generation (RAG) applications, successfully implementing a setup with Milvus, Llama 3, Ollama, and LangChain. After obtaining a TiDB Cloud Serverless credit through an event, I decided to migrate the vector data from Milvus to TiDB Cloud Serverless.

Upon researching, I discovered that existing migration tools currently do not support transferring data from Milvus to TiDB. However, this doesn’t mean migration is impossible. While existing tools do not facilitate this process, a manual migration is feasible. This article outlines my approach to achieving this.

For more information about obtaining free TiDB Cloud Serverless credit, visit the event page.

Migration Plan

To perform data migration, the first step is to determine the migration plan. The simplest migration consists of two steps: exporting data from the source database and importing it into the target database, thus completing the data migration.

However, this case is different. The RAG application utilizes LangChain, and based on research, the structure created by LangChain in Milvus differs from that in TiDB.

In Milvus, the collection name is: LangChainCollection, with the structure being:

Image description
However, in TiDB, the table name is langchain_vector, and the structure is as follows:

Image description
The documentation for LangChain also provides details:

Image description
Therefore, the data migration will require two additional steps: data preparation and table structure adjustment. Since these are heterogeneous databases, the exported data format chosen is the more universal CSV.

The overall plan is as follows:
Image description

Exporting Data from Milvus

import csv
from pymilvus import connections, Collection

# Connect to Milvus
connections.connect("default", host="10.3.xx.xx", port="19530")

# Get the Collection
collection = Collection("LangChainCollection")

# Paginate through all the data
limit = 1000
offset = 0
all_results = []

while True:
    # Pass expr parameter, using a simple condition to query all data
    results = collection.query(expr="", output_fields=["pk", "source", "page", "text", "vector"], limit=limit, offset=offset)
    if not results:
        break
    all_results.extend(results)
    offset += limit

# Open the CSV file, prepare to write data
with open("milvus_data.csv", "w", newline="", encoding='utf-8') as csvfile:
    # Define the CSV column names
    fieldnames = ["pk", "source", "page", "text", "vector"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    # Write the header
    writer.writeheader()

    # Write each record
    for result in all_results:
        # Parse JSON data and extract fields
        vector_str = ','.join(map(str, result.get("vector", [])))  # Convert the vector array to a string
        writer.writerow({
            "pk": result.get("pk"),          # Get the primary key
            "source": result.get("source"),  # Get the source file
            "page": result.get("page"),      # Get the page number
            "text": result.get("text"),      # Get the text
            "vector": vector_str             # Write the vector data
        })

print(f"Total records written to CSV: {len(all_results)}")
Enter fullscreen mode Exit fullscreen mode

The format of the exported CSV file data is as follows:

Image description

Data Preparation and Table Structure Adjustment

I converted a small amount of test data into vectors and used LangChain to load it into TiDB Cloud Serverless. This process facilitated obtaining the data structure and format within TiDB Cloud Serverless.

The table structure is as follows:

CREATE TABLE `langchain_vector` (
`id` varchar(36) NOT NULL,
`embedding` vector(512) NOT NULL COMMENT 'hnsw(distance=cosine)',
`document` text DEFAULT NULL,
`meta` json DEFAULT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
Enter fullscreen mode Exit fullscreen mode

The exported data format in CSV is as follows (partial content omitted):

"id","embedding","document","meta","create_time","update_time"
"081574ec-b3e4-481b-b0c7-9a789080d160","[-0.4020949,-0.6850993,******,0.16776393,-0.049104385]","forecasting and disaster mitigation. The organization is committed to advancing scienti\0c\nknowledge and improving public safety and well-being through its work.\nFor further information, please contact:","{\"page\": 5, \"source\": \"./WMO report highlights growing shortfalls and stress in global water resources.pdf\"}","2024-10-22 02:41:49","2024-10-22 02:41:49"
Enter fullscreen mode Exit fullscreen mode

Given the CSV file exported from Milvus, the relationship is clear: "embedding" corresponds to "vector," "document" corresponds to "text," and "meta" corresponds to "page" and "source." This logic clarifies the mapping. A data preparation script based on this relationship is as follows:

import pandas as pd
import json
from uuid import uuid4
from datetime import datetime

# Read the CSV file
input_csv = 'milvus_data.csv'  # Replace with your CSV file name
df = pd.read_csv(input_csv)

# Create a new DataFrame
output_data = []

for _, row in df.iterrows():
    # Extract required fields
    id_value = str(uuid4())  # Generate a unique ID
    embedding = f"[{','.join(row['vector'].split(','))}]"  # Convert vector to embedding format
    document = row['text']

    # Generate meta information
    meta_dict = {"page": row['page'], "source": row['source']}
    meta = json.dumps(meta_dict, ensure_ascii=False)  # First generate standard JSON
    # meta = meta.replace('"', '\\"')  # Escape double quotes if needed

    create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    update_time = create_time  # Same time for update

    # Add to output data
    output_data.append({
        "id": id_value,
        "embedding": embedding,
        "document": document,
        "meta": meta,
        "create_time": create_time,
        "update_time": update_time
    })

# Convert to DataFrame
output_df = pd.DataFrame(output_data)

# Save as CSV file
output_csv = 'output.csv'  # Output file name
output_df.to_csv(output_csv, index=False, quoting=1)  # quoting=1 ensures strings are quoted

print(f"Conversion completed, saved as {output_csv}")
Enter fullscreen mode Exit fullscreen mode

Once the data preparation is complete, the data can be imported into TiDB Cloud Serverless.

Importing Data to TiDB Cloud Serverless

TiDB Cloud Serverless provides three import methods:

Image description

In this case, we will use the "Upload a local file" method.
CSV files smaller than 50 MiB can be uploaded using the first option for uploading local files. If the file exceeds 50 MiB, a script can be used to split the file into smaller chunks before uploading:

Image description
After uploading the file, select the previously created database and table, and click define table:

Image description
Adjust the mappings as needed, then click start import:

Image description

For more import methods, you can refer to the documentation: Migration and Import Overview.

Validation of Results

After successfully importing the data, the next step is to validate it. I modified the code for the RAG application to read vector data from both Milvus and TiDB. Using the same question, I queried the large model to return answers and checked whether the answers were similar.

from langchain_community.llms import Ollama
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain import hub
from langchain.chains import RetrievalQA
from langchain.vectorstores.milvus import Milvus
from langchain_community.embeddings.jina import JinaEmbeddings
from langchain_community.vectorstores import TiDBVectorStore
import os

llm = Ollama(
model="llama3",
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]
),
stop=["<|eot_id|>"],
)


embeddings = JinaEmbeddings(jina_api_key="xxxx", model_name="jina-embeddings-v2-small-en")

vector_store_milvus = Milvus(
    embedding_function=embeddings,
    connection_args={"uri": "http://10.3.xx.xx:19530"},
)


TIDB_CONN_STR="mysql+pymysql://xxxx.root:password@host:4000/test?ssl_ca=/Downloads/isrgrootx1.pem&ssl_verify_cert=true&ssl_verify_identity=true"
vector_store_tidb = TiDBVectorStore(
    connection_string=TIDB_CONN_STR,
    embedding_function=embeddings,
    table_name="langchain_vector",
)


os.environ["LANGCHAIN_API_KEY"] = "xxxx"
query = input("\nQuery: ")
prompt = hub.pull("rlm/rag-prompt")   

qa_chain = RetrievalQA.from_chain_type(
    llm, retriever=vector_store_milvus.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
print("milvus")
result = qa_chain({"query": query})

print("\n--------------------------------------")
print("tidb")
qa_chain = RetrievalQA.from_chain_type(
    llm, retriever=vector_store_tidb.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
result = qa_chain({"query": query})
Enter fullscreen mode Exit fullscreen mode

The connection string for TiDB can be obtained directly from TiDB Cloud Serverless:

Image description
After posing the question to the RAG application and reviewing the responses, I found that the answers from Milvus and TiDB were essentially consistent, indicating that the vector migration was successful. It is also advisable to compare the number of data entries in Milvus and TiDB tables; if they match, the migration should be considered successful.

Image description

Summary

Data migration between different databases is fundamentally about converting the data into a universal format that all databases can recognize, including vector data. This migration from Milvus to TiDB Cloud Serverless differs from traditional relational database migrations. Although the RAG application utilizes LangChain, the table structures and data formats created by LangChain vary across different databases. Therefore, additional organization of the data and table structures is required to successfully migrate to the target database. Fortunately, TiDB Cloud Serverless offers various convenient data import methods, making the migration process relatively straightforward.

Top comments (0)