To state the obvious, it VERY IMPORTANT to very-frequently TEST the validity of SnapShots - manually. All Vector-databases are Not as robust as good ol' SQL-Databases, to fully trust the backups/snapshots.
The following manual steps walk you thru' (first) how to take/make snapshots and then to "restore/recover" from a snapshot.
(A) HOW-TO - creating snapshots
OPTION 1: in an AUTOMATED manner
We need a new Lambda. See sample code at bottom.
Option 2: Manually
If you want to MANUALLY trigger a snapshot of an entire-collection, just simply use the Qdrant's built-in RestApi:
POST /collections/${collection_name}/snapshots
Do this either via your choice of RestApi-client, or via Qdrant's Dashboard (via ALB like httpS://my-custom-alb-domain.mycompany.com/dashboard)
(B) HOW-TO:- viewing snapshots
- Use the Qdrant-Dashboard:
- Important: Use the API-Key/Token for security!!
- Important: Use
httpS!
- Invoke
POST collections/${collection_name}/points/scroll- Save post-response to a temporary local file, to help you track what you will do per this article.
- Confirm (using the post-response) that you have a NEW snapshot, that we shall refer to as
snapshot # 1(a.k.a.baseline) of the collection.- FYI: You can easily recognize snapshots based on the TIMESTAMP that is part of the snapshot's name.
- If relying on automated-snapshots, wait for the next new snapshot to be created.
(C) HOW-TO:- prep for VERIFYING the snapshots
- Make a change to your vector database.
- If you are using Qdrant's
test_collection, then insert a new Point:PointStruct(id=6, vector=[0.30, 0.05, 0.10, 0.40], payload={"city": "Bengaluru"})(Don't ask me about the numbers)
- If you are using Qdrant's
- Trigger a new snapshot (manually or via scheduled-automation).
-
POST collections/${collection_name}/points/scroll(repeat)- --COMPARE-- this response with the contents of a temporary local file.
- Confirm you have a NEW snapshot (called
snapshot # 2) of the collection (AFTER the above update).
(D) HOW-TO:- Restore/Recover
Snapshots stored locally inside Qdrant-cluster.
FYI: Qdrant stores ALL snapshots that you create/trigger, under /qdrant/snapshots, but YOU should rely on Qdrant's API (../scroll) to list/access/download/upload snapshots.
Invoke the following POST api (with json-body) to do a restore using baseline a.k.a. snapshot # 1 :-
PUT /collections/${collection_name}/snapshots/recover`
{
"location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
- Do a query to verify that data change (that you did in sub-section
(C)) .. is MISSING ! - Restore from an LATEST snapshot # 2.
- Do a query to verify that data change (that you did in sub-section
(C)) .. is BACK !
From s3-bucket
Warning: The Fargate-Task's(Container) role will need S3-access. Out of scope of this article.
- Restore from an old snapshot # 1/baseline using this example:-
PUT /collections/${collection_name}/snapshots/recover`
{
"location": "https://??????????.s3.us-east-2.amazonaws.com/${collection_name}-1046358132872257-2025-12-10-20-31-22.snapshot"`
}
- Do a query to verify that data change (that you did in sub-section
(C)) .. is MISSING ! - Restore from an LATEST snapshot # 2.
- Do a query to verify that data change (that you did in sub-section
(C)) .. is BACK !
(E) HOW-TO:- Lambda to create AUTOMATED-Snapshots
This Lambda will trigger snapshots periodically, and will also COPY those snapshots to an S3-bucket. FYI: I instead prefer mounting an EFS-FileSystem onto /qdrant/snapshots/ of the Container (runnning as a Fargate-Task), and having EFS replicated to a 2nd region.
import os
import json
import boto3
import traceback
from datetime import datetime, timezone
import requests
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.metrics import MetricUnit
logger = Logger()
tracer = Tracer()
metrics = Metrics()
s3_client = boto3.client('s3')
secrets_client = boto3.client('secretsmanager')
### -------------------------------------------
def get_api_key(secrets_manager_arn: str) -> str:
"""Retrieve API key from AWS Secrets Manager"""
if not secrets_manager_arn or secrets_manager_arn.strip() == "":
raise ValueError("Secrets Manager ARN not provided")
logger.info(f"Retrieving API key from Secrets Manager ARN: {secrets_manager_arn}")
response = secrets_client.get_secret_value(SecretId=secrets_manager_arn)
api_key = response['SecretString'].strip()
if not api_key:
raise ValueError("API key not found in secret")
logger.info("API key retrieved successfully ✅")
return api_key
### -------------------------------------------
def get_qdrant_url(qdrant_fqdn: str) -> str:
"""Construct Qdrant URL using Service Discovery FQDN"""
if not qdrant_fqdn:
raise ValueError("QDRANT_FQDN environment variable not set")
qdrant_url = f"http://{qdrant_fqdn}:6333"
logger.info(f"Qdrant URL: {qdrant_url}")
return qdrant_url
### -------------------------------------------
def determine_snapshot_frequency() -> str:
"""
Determine which snapshot frequency to use based on current time
Returns: '15min', 'hourly', 'daily', or 'monthly'
"""
now = datetime.now(timezone.utc)
# Monthly: 1st of month at 8 AM UTC
if now.day == 1 and now.hour == 8 and now.minute < 15:
return 'monthly'
# Daily: Every day at 8 AM UTC
if now.hour == 8 and now.minute < 15:
return 'daily'
# Hourly: Top of every hour
if now.minute < 15:
return 'hourly'
# Default: 15-minute intervals
return '15min'
### -------------------------------------------
def get_collections(qdrant_url: str, api_key: str) -> list:
"""Get list of all collections"""
collections_url = f"{qdrant_url}/collections"
headers = {"api-key": api_key}
logger.info(f"Getting collections from {collections_url}")
response = requests.get(collections_url, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to get collections: {response.status_code} - {response.text}")
collections_data = response.json()
collections = [col["name"] for col in collections_data.get("result", {}).get("collections", [])]
logger.info(f"Found collections: {collections}")
return collections
### -------------------------------------------
def create_collection_snapshot(qdrant_url: str, collection_name: str, api_key: str) -> dict:
"""
Create a snapshot for a specific collection
Returns snapshot metadata
"""
snapshot_url = f"{qdrant_url}/collections/{collection_name}/snapshots"
headers = {"api-key": api_key}
logger.info(f"Creating snapshot for collection '{collection_name}' at {snapshot_url}")
response = requests.post(snapshot_url, headers=headers)
if response.status_code not in [200, 201]:
raise Exception(f"Failed to create snapshot for collection '{collection_name}': {response.status_code} - {response.text}")
snapshot_data = response.json()
logger.info(f"Snapshot created for collection '{collection_name}': {json.dumps(snapshot_data)}")
return snapshot_data
### -------------------------------------------
def download_collection_snapshot(qdrant_url: str, collection_name: str, snapshot_name: str, api_key: str) -> bytes:
"""Download collection snapshot from Qdrant"""
download_url = f"{qdrant_url}/collections/{collection_name}/snapshots/{snapshot_name}"
headers = {"api-key": api_key}
logger.info(f"Downloading snapshot from {download_url}")
response = requests.get(download_url, headers=headers, stream=True)
if response.status_code != 200:
raise Exception(f"Failed to download snapshot: {response.status_code}")
return response.content
### -------------------------------------------
def upload_to_s3(bucket_name: str, s3_key: str, data: bytes) -> None:
"""Upload snapshot to S3"""
logger.info(f"Uploading to s3://{bucket_name}/{s3_key}")
s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=data)
logger.info(f"Upload complete ✅")
### -------------------- main -----------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
"""
Lambda function to create and manage Qdrant snapshots
Implements RPO of 15 minutes with S3 lifecycle-based retention
"""
logger.info('^' * 160)
try:
# Get environment variables
secrets_manager_arn = os.getenv('AK_ARN')
if not secrets_manager_arn:
raise ValueError("AK_ARN environment-variable NOT set")
qdrant_fqdn = os.getenv('QDRANT_FQDN')
if not qdrant_fqdn:
raise ValueError("QDRANT_FQDN environment-variable NOT set")
tier = os.getenv('TIER')
if not tier:
raise ValueError("TIER environment-variable NOT set")
cpu_arch = os.getenv('CPU_ARCH')
if not cpu_arch:
raise ValueError("CPU_ARCH environment-variable NOT set")
# ecs_cluster_name = os.getenv('ECS_CLUSTER_NAME')
# fargate_container_name = os.getenv('FARGATE_CONTAINER_NAME')
# Validate required environment variables
# if not all([secrets_manager_arn, qdrant_fqdn, tier, cpu_arch, ecs_cluster_name, fargate_container_name]):
if not all([secrets_manager_arn, qdrant_fqdn, tier, cpu_arch]):
missing = [k for k, v in {
'AK_ARN': secrets_manager_arn,
'QDRANT_FQDN': qdrant_fqdn,
'TIER': tier,
'CPU_ARCH': cpu_arch,
# 'ECS_CLUSTER_NAME': ecs_cluster_name,
# 'FARGATE_CONTAINER_NAME': fargate_container_name
}.items() if not v]
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
# Get API key and Qdrant URL
api_key = get_api_key(secrets_manager_arn)
qdrant_url = get_qdrant_url(qdrant_fqdn)
# Determine snapshot frequency
frequency = determine_snapshot_frequency()
logger.info(f"Snapshot frequency: {frequency}")
# Get collections
collections = get_collections(qdrant_url, api_key)
if not collections:
logger.info("No collections found - skipping snapshot creation")
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'message': 'No collections found - no snapshots created',
'frequency': frequency
})
}
# Get S3 bucket name
bucket_name = os.getenv('SNAPSHOT_S3_BUCKET_NAME')
if not bucket_name:
raise ValueError("SNAPSHOT_S3_BUCKET_NAME environment-variable NOT set")
# Create snapshots for each collection
snapshots_created = []
total_size = 0
for collection_name in collections:
try:
# Create collection snapshot
snapshot_data = create_collection_snapshot(qdrant_url, collection_name, api_key)
snapshot_name = snapshot_data.get('result', {}).get('name')
if not snapshot_name:
logger.error(f"Snapshot name not found for collection '{collection_name}'")
continue
# Download snapshot
snapshot_bytes = download_collection_snapshot(qdrant_url, collection_name, snapshot_name, api_key)
logger.info(f"Downloaded snapshot for '{collection_name}': {len(snapshot_bytes)} bytes")
# Upload to S3
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')
s3_key = f"{tier}-{cpu_arch}/{frequency}/{collection_name}/{snapshot_name}"
# s3_key = f"{tier}-{cpu_arch}/{frequency}/{collection_name}/{timestamp}-{snapshot_name}"
upload_to_s3(bucket_name, s3_key, snapshot_bytes)
snapshots_created.append({
'collection': collection_name,
'snapshot_name': snapshot_name,
's3_location': f"s3://{bucket_name}/{s3_key}",
'size_bytes': len(snapshot_bytes)
})
total_size += len(snapshot_bytes)
except Exception as e:
logger.error(f"Failed to create snapshot for collection '{collection_name}': {str(e)}")
continue
# Emit metrics
metrics.add_metric(name="SnapshotsCreated", unit=MetricUnit.Count, value=len(snapshots_created))
metrics.add_metric(name="TotalSnapshotSize", unit=MetricUnit.Bytes, value=total_size)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'message': f'Collection snapshots created successfully',
'frequency': frequency,
'snapshots_created': len(snapshots_created),
'collections': collections,
'snapshots': snapshots_created,
'note': 'Retention managed by S3 lifecycle policies'
})
}
except Exception as e:
traceback.print_exc()
logger.exception("❌ Error creating snapshot")
metrics.add_metric(name="SnapshotError", unit=MetricUnit.Count, value=1)
return {
'statusCode': 500,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({'error': f'Error creating snapshot: {str(e)}'})
}
APPENDIX
- Get-Started article # 1
- A article # 2 has FULL DETAILs on the Critical-Design & Key-requirements that influenced/constrained/forced the final implementation.
- A article # 3 re: Snapshots.
- A separate GitLab-repo contains the full CDK-Construct.
- Assumption: You'll OK to CUSTOM-build the Qdrant Container-IMAGE (using a custom Dockerfile) using Qdrant's github. article # 4 for a sensible/defensible
Dockerfile.
/ End
Top comments (0)