This article was written by caiyfc, a dedicated TiDB Cloud Serverless user and TiDB Community Moderator.
Introduction
Recently, I’ve been exploring the use of vector databases to build Retrieval-Augmented Generation (RAG) applications, successfully implementing a setup with Milvus, Llama 3, Ollama, and LangChain. After obtaining a TiDB Cloud Serverless credit through an event, I decided to migrate the vector data from Milvus to TiDB Cloud Serverless.
Upon researching, I discovered that existing migration tools currently do not support transferring data from Milvus to TiDB. However, this doesn’t mean migration is impossible. While existing tools do not facilitate this process, a manual migration is feasible. This article outlines my approach to achieving this.
For more information about obtaining free TiDB Cloud Serverless credit, visit the event page.
Migration Plan
To perform data migration, the first step is to determine the migration plan. The simplest migration consists of two steps: exporting data from the source database and importing it into the target database, thus completing the data migration.
However, this case is different. The RAG application utilizes LangChain, and based on research, the structure created by LangChain in Milvus differs from that in TiDB.
In Milvus, the collection name is: LangChainCollection, with the structure being:
However, in TiDB, the table name is langchain_vector, and the structure is as follows:
The documentation for LangChain also provides details:
Therefore, the data migration will require two additional steps: data preparation and table structure adjustment. Since these are heterogeneous databases, the exported data format chosen is the more universal CSV.
The overall plan is as follows:
Exporting Data from Milvus
import csv
from pymilvus import connections, Collection
# Connect to Milvus
connections.connect("default", host="10.3.xx.xx", port="19530")
# Get the Collection
collection = Collection("LangChainCollection")
# Paginate through all the data
limit = 1000
offset = 0
all_results = []
while True:
# Pass expr parameter, using a simple condition to query all data
results = collection.query(expr="", output_fields=["pk", "source", "page", "text", "vector"], limit=limit, offset=offset)
if not results:
break
all_results.extend(results)
offset += limit
# Open the CSV file, prepare to write data
with open("milvus_data.csv", "w", newline="", encoding='utf-8') as csvfile:
# Define the CSV column names
fieldnames = ["pk", "source", "page", "text", "vector"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write the header
writer.writeheader()
# Write each record
for result in all_results:
# Parse JSON data and extract fields
vector_str = ','.join(map(str, result.get("vector", []))) # Convert the vector array to a string
writer.writerow({
"pk": result.get("pk"), # Get the primary key
"source": result.get("source"), # Get the source file
"page": result.get("page"), # Get the page number
"text": result.get("text"), # Get the text
"vector": vector_str # Write the vector data
})
print(f"Total records written to CSV: {len(all_results)}")
The format of the exported CSV file data is as follows:
Data Preparation and Table Structure Adjustment
I converted a small amount of test data into vectors and used LangChain to load it into TiDB Cloud Serverless. This process facilitated obtaining the data structure and format within TiDB Cloud Serverless.
The table structure is as follows:
CREATE TABLE `langchain_vector` (
`id` varchar(36) NOT NULL,
`embedding` vector(512) NOT NULL COMMENT 'hnsw(distance=cosine)',
`document` text DEFAULT NULL,
`meta` json DEFAULT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
The exported data format in CSV is as follows (partial content omitted):
"id","embedding","document","meta","create_time","update_time"
"081574ec-b3e4-481b-b0c7-9a789080d160","[-0.4020949,-0.6850993,******,0.16776393,-0.049104385]","forecasting and disaster mitigation. The organization is committed to advancing scienti\0c\nknowledge and improving public safety and well-being through its work.\nFor further information, please contact:","{\"page\": 5, \"source\": \"./WMO report highlights growing shortfalls and stress in global water resources.pdf\"}","2024-10-22 02:41:49","2024-10-22 02:41:49"
Given the CSV file exported from Milvus, the relationship is clear: "embedding" corresponds to "vector," "document" corresponds to "text," and "meta" corresponds to "page" and "source." This logic clarifies the mapping. A data preparation script based on this relationship is as follows:
import pandas as pd
import json
from uuid import uuid4
from datetime import datetime
# Read the CSV file
input_csv = 'milvus_data.csv' # Replace with your CSV file name
df = pd.read_csv(input_csv)
# Create a new DataFrame
output_data = []
for _, row in df.iterrows():
# Extract required fields
id_value = str(uuid4()) # Generate a unique ID
embedding = f"[{','.join(row['vector'].split(','))}]" # Convert vector to embedding format
document = row['text']
# Generate meta information
meta_dict = {"page": row['page'], "source": row['source']}
meta = json.dumps(meta_dict, ensure_ascii=False) # First generate standard JSON
# meta = meta.replace('"', '\\"') # Escape double quotes if needed
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
update_time = create_time # Same time for update
# Add to output data
output_data.append({
"id": id_value,
"embedding": embedding,
"document": document,
"meta": meta,
"create_time": create_time,
"update_time": update_time
})
# Convert to DataFrame
output_df = pd.DataFrame(output_data)
# Save as CSV file
output_csv = 'output.csv' # Output file name
output_df.to_csv(output_csv, index=False, quoting=1) # quoting=1 ensures strings are quoted
print(f"Conversion completed, saved as {output_csv}")
Once the data preparation is complete, the data can be imported into TiDB Cloud Serverless.
Importing Data to TiDB Cloud Serverless
TiDB Cloud Serverless provides three import methods:
In this case, we will use the "Upload a local file" method.
CSV files smaller than 50 MiB can be uploaded using the first option for uploading local files. If the file exceeds 50 MiB, a script can be used to split the file into smaller chunks before uploading:
After uploading the file, select the previously created database and table, and click define table
:
Adjust the mappings as needed, then click start import
:
For more import methods, you can refer to the documentation: Migration and Import Overview.
Validation of Results
After successfully importing the data, the next step is to validate it. I modified the code for the RAG application to read vector data from both Milvus and TiDB. Using the same question, I queried the large model to return answers and checked whether the answers were similar.
from langchain_community.llms import Ollama
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain import hub
from langchain.chains import RetrievalQA
from langchain.vectorstores.milvus import Milvus
from langchain_community.embeddings.jina import JinaEmbeddings
from langchain_community.vectorstores import TiDBVectorStore
import os
llm = Ollama(
model="llama3",
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]
),
stop=["<|eot_id|>"],
)
embeddings = JinaEmbeddings(jina_api_key="xxxx", model_name="jina-embeddings-v2-small-en")
vector_store_milvus = Milvus(
embedding_function=embeddings,
connection_args={"uri": "http://10.3.xx.xx:19530"},
)
TIDB_CONN_STR="mysql+pymysql://xxxx.root:password@host:4000/test?ssl_ca=/Downloads/isrgrootx1.pem&ssl_verify_cert=true&ssl_verify_identity=true"
vector_store_tidb = TiDBVectorStore(
connection_string=TIDB_CONN_STR,
embedding_function=embeddings,
table_name="langchain_vector",
)
os.environ["LANGCHAIN_API_KEY"] = "xxxx"
query = input("\nQuery: ")
prompt = hub.pull("rlm/rag-prompt")
qa_chain = RetrievalQA.from_chain_type(
llm, retriever=vector_store_milvus.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
print("milvus")
result = qa_chain({"query": query})
print("\n--------------------------------------")
print("tidb")
qa_chain = RetrievalQA.from_chain_type(
llm, retriever=vector_store_tidb.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
result = qa_chain({"query": query})
The connection string for TiDB can be obtained directly from TiDB Cloud Serverless:
After posing the question to the RAG application and reviewing the responses, I found that the answers from Milvus and TiDB were essentially consistent, indicating that the vector migration was successful. It is also advisable to compare the number of data entries in Milvus and TiDB tables; if they match, the migration should be considered successful.
Summary
Data migration between different databases is fundamentally about converting the data into a universal format that all databases can recognize, including vector data. This migration from Milvus to TiDB Cloud Serverless differs from traditional relational database migrations. Although the RAG application utilizes LangChain, the table structures and data formats created by LangChain vary across different databases. Therefore, additional organization of the data and table structures is required to successfully migrate to the target database. Fortunately, TiDB Cloud Serverless offers various convenient data import methods, making the migration process relatively straightforward.
Top comments (0)