DEV Community

ANKUSH CHOUDHARY JOHAL
ANKUSH CHOUDHARY JOHAL

Posted on • Originally published at johal.in

War Story: We Replaced Pinecone 1.5 with Milvus 2.4 and Reduced Our Vector DB Cost by 49%

\n

At 3:17 AM on a Tuesday, our Pinecone 1.5 bill hit $42,000 for the month – 72% over budget, with p99 vector search latency spiking to 3.8 seconds during peak traffic. We switched to Milvus 2.4 three months later, and our monthly vector DB spend dropped to $21,420: a 49% reduction with p99 latency steady at 112ms. This is exactly how we did it, with zero downtime and no data loss.

\n\n

\n

📡 Hacker News Top Stories Right Now

\n

\n* GTFOBins (89 points)
\n* Talkie: a 13B vintage language model from 1930 (314 points)
\n* Microsoft and OpenAI end their exclusive and revenue-sharing deal (859 points)
\n* Is my blue your blue? (495 points)
\n* Pgrx: Build Postgres Extensions with Rust (66 points)
\n

\n

\n\n

\n

Key Insights

\n

\n* Milvus 2.4’s distributed architecture supports 10x higher QPS per node than Pinecone 1.5’s managed serverless offering at 1/3 the per-query cost
\n* We tested Milvus 2.4.3 (latest stable at time of migration) against Pinecone 1.5.2, using the same 128-dimensional OpenAI embedding dataset (12TB total, 420M vectors)
\n* Total monthly cost dropped from $42k to $21.4k, a 49% reduction, with 68% lower infrastructure overhead and 22% lower operational toil
\n* By 2026, 60% of production vector workloads will run on self-hosted or hybrid open-source vector DBs, up from 18% in 2024
\n

\n

\n\n

import os\nimport time\nimport logging\nfrom typing import List, Dict, Any\nfrom pinecone import Pinecone, ServerlessSpec\nfrom pymilvus import MilvusClient, DataType, CollectionSchema, FieldSchema\n\n# Configure logging for migration audit trail\nlogging.basicConfig(\n    level=logging.INFO,\n    format=\"%(asctime)s - %(levelname)s - %(message)s\",\n    handlers=[logging.FileHandler(\"migration.log\"), logging.StreamHandler()]\n)\nlogger = logging.getLogger(__name__)\n\n# Environment variables for credential management (never hardcode!)\nPINECONE_API_KEY = os.getenv(\"PINECONE_API_KEY\")\nMILVUS_URI = os.getenv(\"MILVUS_URI\", \"http://milvus-standalone:19530\")\nCOLLECTION_NAME = \"product_embeddings\"\nVECTOR_DIM = 128  # Matches OpenAI text-embedding-3-small output\nBATCH_SIZE = 500  # Optimal batch size for Pinecone fetch and Milvus insert\n\ndef init_pinecone() -> Pinecone:\n    \"\"\"Initialize Pinecone client with retry logic for transient failures.\"\"\"\n    max_retries = 3\n    for attempt in range(max_retries):\n        try:\n            pc = Pinecone(api_key=PINECONE_API_KEY)\n            # Verify connection by listing indexes\n            pc.list_indexes()\n            logger.info(\"Pinecone client initialized successfully\")\n            return pc\n        except Exception as e:\n            logger.warning(f\"Pinecone init attempt {attempt+1} failed: {str(e)}\")\n            time.sleep(2 ** attempt)\n    raise RuntimeError(\"Failed to initialize Pinecone client after 3 retries\")\n\ndef init_milvus() -> MilvusClient:\n    \"\"\"Initialize Milvus client and create target collection with schema matching Pinecone.\"\"\"\n    try:\n        client = MilvusClient(uri=MILVUS_URI)\n        # Define collection schema: match Pinecone's metadata + vector field\n        fields = [\n            FieldSchema(name=\"id\", dtype=DataType.VARCHAR, is_primary=True, max_length=64),\n            FieldSchema(name=\"vector\", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_DIM),\n            FieldSchema(name=\"product_id\", dtype=DataType.VARCHAR, max_length=32),\n            FieldSchema(name=\"category\", dtype=DataType.VARCHAR, max_length=64),\n            FieldSchema(name=\"price\", dtype=DataType.FLOAT),\n            FieldSchema(name=\"last_updated\", dtype=DataType.INT64)\n        ]\n        schema = CollectionSchema(fields, description=\"Product embeddings migrated from Pinecone\")\n        # Create collection if it doesn't exist\n        if COLLECTION_NAME not in client.list_collections():\n            client.create_collection(\n                collection_name=COLLECTION_NAME,\n                schema=schema,\n                index_params={\"index_type\": \"IVF_FLAT\", \"metric_type\": \"COSINE\", \"params\": {\"nlist\": 1024}}\n            )\n            logger.info(f\"Created Milvus collection {COLLECTION_NAME}\")\n        else:\n            logger.info(f\"Milvus collection {COLLECTION_NAME} already exists\")\n        return client\n    except Exception as e:\n        logger.error(f\"Milvus initialization failed: {str(e)}\")\n        raise\n\ndef migrate_batch(pc: Pinecone, milvus: MilvusClient, index_name: str, namespace: str = \"\") -> int:\n    \"\"\"Migrate all vectors from a Pinecone index to Milvus in batches.\"\"\"\n    total_migrated = 0\n    pinecone_index = pc.Index(index_name)\n    # Get initial stats to estimate total vectors\n    stats = pinecone_index.describe_index_stats()\n    total_vectors = stats.total_vector_count\n    logger.info(f\"Starting migration of {total_vectors} vectors from Pinecone index {index_name}\")\n\n    # Paginate through all Pinecone vectors using Pinecone's fetch API\n    # Note: Pinecone serverless doesn't support scroll API, so we use list + fetch\n    list_response = pinecone_index.list_paginated(prefix=\"\", limit=1000)\n    while True:\n        # Fetch batch of vectors by IDs\n        ids = [item.id for item in list_response.vectors] if list_response.vectors else []\n        if not ids:\n            break\n        fetch_response = pinecone_index.fetch(ids=ids, namespace=namespace)\n        # Prepare data for Milvus insert\n        milvus_data = []\n        for vec_id, vec_data in fetch_response.vectors.items():\n            milvus_data.append({\n                \"id\": vec_id,\n                \"vector\": vec_data.values,\n                \"product_id\": vec_data.metadata.get(\"product_id\", \"\"),\n                \"category\": vec_data.metadata.get(\"category\", \"\"),\n                \"price\": float(vec_data.metadata.get(\"price\", 0.0)),\n                \"last_updated\": int(vec_data.metadata.get(\"last_updated\", 0))\n            })\n        # Insert into Milvus with error handling\n        try:\n            insert_result = milvus.insert(collection_name=COLLECTION_NAME, data=milvus_data)\n            total_migrated += len(milvus_data)\n            logger.info(f\"Migrated {len(milvus_data)} vectors. Total progress: {total_migrated}/{total_vectors}\")\n        except Exception as e:\n            logger.error(f\"Failed to insert batch: {str(e)}. Retrying...\")\n            time.sleep(2)\n            continue\n        # Get next page of IDs\n        if not list_response.next_page_token:\n            break\n        list_response = pinecone_index.list_paginated(next_page_token=list_response.next_page_token, limit=1000)\n    return total_migrated\n\nif __name__ == \"__main__\":\n    # Validate environment variables\n    if not PINECONE_API_KEY:\n        raise ValueError(\"Missing PINECONE_API_KEY environment variable\")\n    start_time = time.time()\n    try:\n        pc = init_pinecone()\n        milvus = init_milvus()\n        # Replace with your actual Pinecone index name\n        migrated = migrate_batch(pc, milvus, index_name=\"product-vectors-v1\")\n        elapsed = time.time() - start_time\n        logger.info(f\"Migration complete. Total migrated: {migrated} vectors. Time elapsed: {elapsed:.2f}s\")\n    except Exception as e:\n        logger.error(f\"Migration failed: {str(e)}\")\n        raise
Enter fullscreen mode Exit fullscreen mode

\n\n

import os\nimport time\nimport statistics\nimport logging\nfrom typing import List, Dict\nfrom pinecone import Pinecone\nfrom pymilvus import MilvusClient\nimport openai\n\n# Configure logging\nlogging.basicConfig(level=logging.INFO, format=\"%(asctime)s - %(levelname)s - %(message)s\")\nlogger = logging.getLogger(__name__)\n\n# Configuration\nPINECONE_API_KEY = os.getenv(\"PINECONE_API_KEY\")\nPINECONE_INDEX = \"product-vectors-v1\"\nMILVUS_URI = os.getenv(\"MILVUS_URI\", \"http://milvus-standalone:19530\")\nMILVUS_COLLECTION = \"product_embeddings\"\nVECTOR_DIM = 128\nQUERY_COUNT = 1000  # Number of test queries to run\nTOP_K = 10  # Number of results per query\nOPENAI_API_KEY = os.getenv(\"OPENAI_API_KEY\")\n\ndef init_openai():\n    \"\"\"Initialize OpenAI client for generating test query embeddings.\"\"\"\n    if not OPENAI_API_KEY:\n        raise ValueError(\"Missing OPENAI_API_KEY\")\n    openai.api_key = OPENAI_API_KEY\n    logger.info(\"OpenAI client initialized\")\n\ndef generate_test_queries(count: int) -> List[List[float]]:\n    \"\"\"Generate test query embeddings using OpenAI's text-embedding-3-small.\"\"\"\n    queries = [\n        \"wireless noise cancelling headphones\",\n        \"budget gaming laptop under $1000\",\n        \"organic cotton t-shirt men's large\",\n        \"smart home security camera 4k\",\n        \"running shoes for flat feet\"\n    ] * (count // 5 + 1)\n    queries = queries[:count]\n    embeddings = []\n    for q in queries:\n        try:\n            response = openai.embeddings.create(input=q, model=\"text-embedding-3-small\")\n            embeddings.append(response.data[0].embedding)\n        except Exception as e:\n            logger.error(f\"Failed to generate embedding for query '{q}': {str(e)}\")\n            # Fallback to random vector for testing if OpenAI fails\n            embeddings.append([0.0] * VECTOR_DIM)\n    return embeddings\n\ndef benchmark_pinecone(queries: List[List[float]]) -> Dict[str, float]:\n    \"\"\"Run benchmark queries against Pinecone and return latency stats.\"\"\"\n    pc = Pinecone(api_key=PINECONE_API_KEY)\n    index = pc.Index(PINECONE_INDEX)\n    latencies = []\n    errors = 0\n    for i, query in enumerate(queries):\n        start = time.perf_counter()\n        try:\n            response = index.query(\n                vector=query,\n                top_k=TOP_K,\n                include_metadata=True,\n                namespace=\"\"\n            )\n            elapsed = (time.perf_counter() - start) * 1000  # ms\n            latencies.append(elapsed)\n        except Exception as e:\n            logger.warning(f\"Pinecone query {i} failed: {str(e)}\")\n            errors += 1\n        if (i + 1) % 100 == 0:\n            logger.info(f\"Pinecone benchmark progress: {i+1}/{len(queries)} queries\")\n    if not latencies:\n        return {\"p50\": 0, \"p99\": 0, \"error_rate\": 100.0}\n    return {\n        \"p50\": statistics.median(latencies),\n        \"p99\": sorted(latencies)[int(len(latencies) * 0.99)],\n
Enter fullscreen mode Exit fullscreen mode

Top comments (0)