DEV Community

Sarma
Sarma

Posted on

Snapshots & Data-RESTORE: Vector-Database: Qdrant-Cluster

To state the obvious, it VERY IMPORTANT to very-frequently TEST the validity of SnapShots - manually. All Vector-databases are Not as robust as good ol' SQL-Databases, to fully trust the backups/snapshots.

The following manual steps walk you thru' (first) how to take/make snapshots and then to "restore/recover" from a snapshot.

(A) HOW-TO - creating snapshots

OPTION 1: in an AUTOMATED manner

We need a new Lambda. See sample code at bottom.

Option 2: Manually

If you want to MANUALLY trigger a snapshot of an entire-collection, just simply use the Qdrant's built-in RestApi:

POST /collections/${collection_name}/snapshots

Do this either via your choice of RestApi-client, or via Qdrant's Dashboard (via ALB like httpS://my-custom-alb-domain.mycompany.com/dashboard)

(B) HOW-TO:- viewing snapshots

  1. Use the Qdrant-Dashboard:
    1. Important: Use the API-Key/Token for security!!
    2. Important: Use httpS !
  2. Invoke POST collections/${collection_name}/points/scroll
    • Save post-response to a temporary local file, to help you track what you will do per this article.
  3. Confirm (using the post-response) that you have a NEW snapshot, that we shall refer to as snapshot # 1 (a.k.a. baseline ) of the collection.
    • FYI: You can easily recognize snapshots based on the TIMESTAMP that is part of the snapshot's name.
  4. If relying on automated-snapshots, wait for the next new snapshot to be created.

(C) HOW-TO:- prep for VERIFYING the snapshots

  1. Make a change to your vector database.
    • If you are using Qdrant's test_collection, then insert a new Point: PointStruct(id=6, vector=[0.30, 0.05, 0.10, 0.40], payload={"city": "Bengaluru"}) (Don't ask me about the numbers)
  2. Trigger a new snapshot (manually or via scheduled-automation).
  3. POST collections/${collection_name}/points/scroll (repeat)
    • --COMPARE-- this response with the contents of a temporary local file.
  4. Confirm you have a NEW snapshot (called snapshot # 2) of the collection (AFTER the above update).

(D) HOW-TO:- Restore/Recover

Snapshots stored locally inside Qdrant-cluster.

FYI: Qdrant stores ALL snapshots that you create/trigger, under /qdrant/snapshots, but YOU should rely on Qdrant's API (../scroll) to list/access/download/upload snapshots.

Invoke the following POST api (with json-body) to do a restore using baseline a.k.a. snapshot # 1 :-

PUT /collections/${collection_name}/snapshots/recover`
{
  "location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
Enter fullscreen mode Exit fullscreen mode
  1. Do a query to verify that data change (that you did in sub-section (C)) .. is MISSING !
  2. Restore from an LATEST snapshot # 2.
  3. Do a query to verify that data change (that you did in sub-section (C)) .. is BACK !

From s3-bucket

Warning: The Fargate-Task's(Container) role will need S3-access. Out of scope of this article.

  1. Restore from an old snapshot # 1/baseline using this example:-
PUT /collections/${collection_name}/snapshots/recover`
{
    "location": "https://??????????.s3.us-east-2.amazonaws.com/${collection_name}-1046358132872257-2025-12-10-20-31-22.snapshot"`
}
Enter fullscreen mode Exit fullscreen mode
  1. Do a query to verify that data change (that you did in sub-section (C)) .. is MISSING !
  2. Restore from an LATEST snapshot # 2.
  3. Do a query to verify that data change (that you did in sub-section (C)) .. is BACK !

(E) HOW-TO:- Lambda to create AUTOMATED-Snapshots

This Lambda will trigger snapshots periodically, and will also COPY those snapshots to an S3-bucket. FYI: I instead prefer mounting an EFS-FileSystem onto /qdrant/snapshots/ of the Container (runnning as a Fargate-Task), and having EFS replicated to a 2nd region.

import os
import json
import boto3
import traceback
from datetime import datetime, timezone

import requests

from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.metrics import MetricUnit

logger = Logger()
tracer = Tracer()
metrics = Metrics()

s3_client = boto3.client('s3')
secrets_client = boto3.client('secretsmanager')

### -------------------------------------------
def get_api_key(secrets_manager_arn: str) -> str:
    """Retrieve API key from AWS Secrets Manager"""
    if not secrets_manager_arn or secrets_manager_arn.strip() == "":
        raise ValueError("Secrets Manager ARN not provided")

    logger.info(f"Retrieving API key from Secrets Manager ARN: {secrets_manager_arn}")
    response = secrets_client.get_secret_value(SecretId=secrets_manager_arn)
    api_key = response['SecretString'].strip()

    if not api_key:
        raise ValueError("API key not found in secret")

    logger.info("API key retrieved successfully ✅")
    return api_key

### -------------------------------------------
def get_qdrant_url(qdrant_fqdn: str) -> str:
    """Construct Qdrant URL using Service Discovery FQDN"""
    if not qdrant_fqdn:
        raise ValueError("QDRANT_FQDN environment variable not set")

    qdrant_url = f"http://{qdrant_fqdn}:6333"
    logger.info(f"Qdrant URL: {qdrant_url}")
    return qdrant_url

### -------------------------------------------
def determine_snapshot_frequency() -> str:
    """
    Determine which snapshot frequency to use based on current time
    Returns: '15min', 'hourly', 'daily', or 'monthly'
    """
    now = datetime.now(timezone.utc)

    # Monthly: 1st of month at 8 AM UTC
    if now.day == 1 and now.hour == 8 and now.minute < 15:
        return 'monthly'

    # Daily: Every day at 8 AM UTC
    if now.hour == 8 and now.minute < 15:
        return 'daily'

    # Hourly: Top of every hour
    if now.minute < 15:
        return 'hourly'

    # Default: 15-minute intervals
    return '15min'

### -------------------------------------------
def get_collections(qdrant_url: str, api_key: str) -> list:
    """Get list of all collections"""
    collections_url = f"{qdrant_url}/collections"
    headers = {"api-key": api_key}

    logger.info(f"Getting collections from {collections_url}")
    response = requests.get(collections_url, headers=headers)

    if response.status_code != 200:
        raise Exception(f"Failed to get collections: {response.status_code} - {response.text}")

    collections_data = response.json()
    collections = [col["name"] for col in collections_data.get("result", {}).get("collections", [])]
    logger.info(f"Found collections: {collections}")
    return collections

### -------------------------------------------
def create_collection_snapshot(qdrant_url: str, collection_name: str, api_key: str) -> dict:
    """
    Create a snapshot for a specific collection
    Returns snapshot metadata
    """
    snapshot_url = f"{qdrant_url}/collections/{collection_name}/snapshots"
    headers = {"api-key": api_key}

    logger.info(f"Creating snapshot for collection '{collection_name}' at {snapshot_url}")
    response = requests.post(snapshot_url, headers=headers)

    if response.status_code not in [200, 201]:
        raise Exception(f"Failed to create snapshot for collection '{collection_name}': {response.status_code} - {response.text}")

    snapshot_data = response.json()
    logger.info(f"Snapshot created for collection '{collection_name}': {json.dumps(snapshot_data)}")

    return snapshot_data

### -------------------------------------------
def download_collection_snapshot(qdrant_url: str, collection_name: str, snapshot_name: str, api_key: str) -> bytes:
    """Download collection snapshot from Qdrant"""
    download_url = f"{qdrant_url}/collections/{collection_name}/snapshots/{snapshot_name}"
    headers = {"api-key": api_key}

    logger.info(f"Downloading snapshot from {download_url}")
    response = requests.get(download_url, headers=headers, stream=True)

    if response.status_code != 200:
        raise Exception(f"Failed to download snapshot: {response.status_code}")

    return response.content

### -------------------------------------------
def upload_to_s3(bucket_name: str, s3_key: str, data: bytes) -> None:
    """Upload snapshot to S3"""
    logger.info(f"Uploading to s3://{bucket_name}/{s3_key}")
    s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=data)
    logger.info(f"Upload complete ✅")

### -------------------- main -----------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
    """
    Lambda function to create and manage Qdrant snapshots
    Implements RPO of 15 minutes with S3 lifecycle-based retention
    """

    logger.info('^' * 160)

    try:
        # Get environment variables
        secrets_manager_arn = os.getenv('AK_ARN')
        if not secrets_manager_arn:
            raise ValueError("AK_ARN environment-variable NOT set")
        qdrant_fqdn = os.getenv('QDRANT_FQDN')
        if not qdrant_fqdn:
            raise ValueError("QDRANT_FQDN environment-variable NOT set")
        tier = os.getenv('TIER')
        if not tier:
            raise ValueError("TIER environment-variable NOT set")
        cpu_arch = os.getenv('CPU_ARCH')
        if not cpu_arch:
            raise ValueError("CPU_ARCH environment-variable NOT set")
        # ecs_cluster_name = os.getenv('ECS_CLUSTER_NAME')
        # fargate_container_name = os.getenv('FARGATE_CONTAINER_NAME')

        # Validate required environment variables
        # if not all([secrets_manager_arn, qdrant_fqdn, tier, cpu_arch, ecs_cluster_name, fargate_container_name]):
        if not all([secrets_manager_arn, qdrant_fqdn, tier, cpu_arch]):
            missing = [k for k, v in {
                'AK_ARN': secrets_manager_arn,
                'QDRANT_FQDN': qdrant_fqdn,
                'TIER': tier,
                'CPU_ARCH': cpu_arch,
                # 'ECS_CLUSTER_NAME': ecs_cluster_name,
                # 'FARGATE_CONTAINER_NAME': fargate_container_name
            }.items() if not v]
            raise ValueError(f"Missing required environment variables: {', '.join(missing)}")

        # Get API key and Qdrant URL
        api_key = get_api_key(secrets_manager_arn)
        qdrant_url = get_qdrant_url(qdrant_fqdn)

        # Determine snapshot frequency
        frequency = determine_snapshot_frequency()
        logger.info(f"Snapshot frequency: {frequency}")

        # Get collections
        collections = get_collections(qdrant_url, api_key)

        if not collections:
            logger.info("No collections found - skipping snapshot creation")
            return {
                'statusCode': 200,
                'headers': {'Content-Type': 'application/json'},
                'body': json.dumps({
                    'message': 'No collections found - no snapshots created',
                    'frequency': frequency
                })
            }

        # Get S3 bucket name
        bucket_name = os.getenv('SNAPSHOT_S3_BUCKET_NAME')
        if not bucket_name:
            raise ValueError("SNAPSHOT_S3_BUCKET_NAME environment-variable NOT set")

        # Create snapshots for each collection
        snapshots_created = []
        total_size = 0

        for collection_name in collections:
            try:
                # Create collection snapshot
                snapshot_data = create_collection_snapshot(qdrant_url, collection_name, api_key)
                snapshot_name = snapshot_data.get('result', {}).get('name')

                if not snapshot_name:
                    logger.error(f"Snapshot name not found for collection '{collection_name}'")
                    continue

                # Download snapshot
                snapshot_bytes = download_collection_snapshot(qdrant_url, collection_name, snapshot_name, api_key)
                logger.info(f"Downloaded snapshot for '{collection_name}': {len(snapshot_bytes)} bytes")

                # Upload to S3
                timestamp = datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')
                s3_key = f"{tier}-{cpu_arch}/{frequency}/{collection_name}/{snapshot_name}"
                # s3_key = f"{tier}-{cpu_arch}/{frequency}/{collection_name}/{timestamp}-{snapshot_name}"
                upload_to_s3(bucket_name, s3_key, snapshot_bytes)

                snapshots_created.append({
                    'collection': collection_name,
                    'snapshot_name': snapshot_name,
                    's3_location': f"s3://{bucket_name}/{s3_key}",
                    'size_bytes': len(snapshot_bytes)
                })
                total_size += len(snapshot_bytes)

            except Exception as e:
                logger.error(f"Failed to create snapshot for collection '{collection_name}': {str(e)}")
                continue

        # Emit metrics
        metrics.add_metric(name="SnapshotsCreated", unit=MetricUnit.Count, value=len(snapshots_created))
        metrics.add_metric(name="TotalSnapshotSize", unit=MetricUnit.Bytes, value=total_size)

        return {
            'statusCode': 200,
            'headers': {'Content-Type': 'application/json'},
            'body': json.dumps({
                'message': f'Collection snapshots created successfully',
                'frequency': frequency,
                'snapshots_created': len(snapshots_created),
                'collections': collections,
                'snapshots': snapshots_created,
                'note': 'Retention managed by S3 lifecycle policies'
            })
        }

    except Exception as e:
        traceback.print_exc()
        logger.exception("❌ Error creating snapshot")
        metrics.add_metric(name="SnapshotError", unit=MetricUnit.Count, value=1)

        return {
            'statusCode': 500,
            'headers': {'Content-Type': 'application/json'},
            'body': json.dumps({'error': f'Error creating snapshot: {str(e)}'})
        }
Enter fullscreen mode Exit fullscreen mode

APPENDIX

  1. Get-Started article # 1
  2. A article # 2 has FULL DETAILs on the Critical-Design & Key-requirements that influenced/constrained/forced the final implementation.
  3. A article # 3 re: Snapshots.
  4. A separate GitLab-repo contains the full CDK-Construct.
  5. Assumption: You'll OK to CUSTOM-build the Qdrant Container-IMAGE (using a custom Dockerfile) using Qdrant's github. article # 4 for a sensible/defensible Dockerfile.

/ End

Top comments (0)