DEV Community

Alain Airom
Alain Airom

Posted on

Stop Managing Encryption Keys: Vault-as-a-Service in 5 Minutes

Bob was here to help me making an encryption service demonstration in less than 60 minutes!


Data security often feels like a trade-off between robust protection and developer velocity. Implementing a secure “Encryption-as-a-Service” model usually requires significant infrastructure overhead. However, by combining FastAPI for a high-performance REST interface and HashiCorp Vault for centralized key management, it is possible to deploy a production-ready encryption service in minutes.


The Architecture: Secure by Design

The core philosophy of this service is that encryption keys should never leave their secure provider. This project leverages the HashiCorp Vault Transit Secrets Engine, which handles cryptographic functions in-flight.

Key Components:

  • FastAPI Application: Acts as the orchestrator, exposing RESTful endpoints for encryption, decryption, and record management. Press enter or click to view image in full size

  • Vault Transit Engine: Performs the actual cryptographic operations. The application sends data to Vault, and Vault returns ciphertext; the raw encryption keys remain protected within Vault’s memory.

  • SQLite Database: Stores only the ciphertext and metadata (such as key version and context), ensuring that a database breach does not lead to data exposure.

Core Features


vault-encryption-service/
├── app/                    # Application source code
│   ├── main.py            # FastAPI application
│   ├── auth.py            # JWT/OAuth2 authentication
│   ├── vault_client.py    # Vault integration with TLS
│   ├── database.py        # SQLite database operations
│   └── models.py          # Data models
├── certs/                 # TLS certificates (auto-generated)
├── k8s/                   # Kubernetes manifests
├── scripts/               # Automation scripts
│   ├── start.sh          # Start application locally
│   ├── stop.sh           # Stop application
│   ├── generate-certs.sh # TLS certificate generation
│   └── push-to-github.sh # GitHub automation
├── docs/                  # Documentation
│   ├── ARCHITECTURE.md   # Architecture diagrams
│   ├── API_REFERENCE.md  # Complete API documentation
│   └── DEPLOYMENT_GUIDE.md # Deployment instructions
├── tests/                 # Test files
├── examples/              # Example usage scripts
├── Dockerfile            # Container image definition
├── requirements.txt      # Python dependencies
├── .env.example          # Environment configuration template
└── README.md            # This file
Enter fullscreen mode Exit fullscreen mode

This implementation is designed to handle the full lifecycle of sensitive data:

  • Encryption-as-a-Service: High-level API for data protection.
  • Key Rotation: Supports seamless rotation to new key versions without breaking existing ciphertext.
  • Context-Based Derivation: Adds a layer of security by requiring a specific context (like a user ID) to decrypt data.
  • Horizontal Scalability: Fully containerized and Kubernetes-ready with Horizontal Pod Autoscaler (HPA) support.

What is Technically Implemented


The code implementation establishes a production-ready Encryption-as-a-Service by integrating a FastAPI REST interface with HashiCorp Vault’s Transit Secrets Engine. The core logic is distributed across four specialized modules: main.py handles the API orchestration and lifecycle management, including endpoints for encryption, decryption, and key rotation; vault_client.py manages the direct interaction with Vault, ensuring the Transit engine is initialized and cryptographic operations are performed securely without ever exposing raw encryption keys to the application; database.py utilizes aiosqlite to provide an asynchronous persistence layer for storing encrypted ciphertext, metadata, and key versions; and models.py defines the Pydantic data structures for robust request validation and automated API documentation. This architecture ensures that sensitive data is protected both in transit and at rest, while maintaining high performance through asynchronous operations and automated key lifecycle management.

  • Encryption/Decryption Service: REST API for encrypting and decrypting sensitive data
  • JWT/OAuth2 Authentication: Secure API access with token-based authentication
  • TLS/HTTPS Support: Encrypted communication for all transit data
  • SQLite Database: Local storage for encrypted data records
  • Vault Transit Engine: Leverages Vault’s transit secrets engine for encryption operations
  • Kubernetes Ready: Complete K8s manifests for production deployment


# main.py
"""
FastAPI application for Vault Encryption-as-a-Service
Provides REST API endpoints for encrypting and decrypting data using HashiCorp Vault
"""
from fastapi import FastAPI, HTTPException, status, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordRequestForm
from contextlib import asynccontextmanager
import logging
from datetime import datetime, timedelta
from typing import List

from app.models import (
    EncryptRequest, EncryptResponse,
    DecryptRequest, DecryptResponse,
    EncryptedRecord, HealthResponse
)
from app.vault_client import VaultClient
from app.database import Database
from app.auth import (
    Token, User, authenticate_user, create_access_token,
    get_current_active_user, fake_users_db, ACCESS_TOKEN_EXPIRE_MINUTES
)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Global instances
from typing import Optional as OptionalType
vault_client: OptionalType[VaultClient] = None
database: OptionalType[Database] = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan manager"""
    global vault_client, database

    logger.info("Starting Vault Encryption Service...")

    # Initialize Vault client
    vault_client = VaultClient()
    await vault_client.initialize_transit_engine()
    logger.info("Vault client initialized")

    # Initialize database
    database = Database()
    await database.initialize()
    logger.info("Database initialized")

    yield

    logger.info("Shutting down Vault Encryption Service...")


# Create FastAPI application
app = FastAPI(
    title="Vault Encryption Service",
    description="Encryption-as-a-Service using HashiCorp Vault Transit Engine",
    version="1.0.0",
    lifespan=lifespan
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.post("/token", response_model=Token, tags=["Authentication"])
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """
    OAuth2 compatible token login, get an access token for future requests

    - **username**: Your username
    - **password**: Your password
    """
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/", tags=["Root"])
async def root():
    """Root endpoint"""
    return {
        "service": "Vault Encryption Service",
        "version": "1.0.0",
        "status": "running",
        "authentication": "enabled"
    }


@app.get("/users/me", response_model=User, tags=["Authentication"])
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """Get current user information"""
    return current_user


@app.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
    """Health check endpoint"""
    if vault_client is None or database is None:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="Service not initialized"
        )
    vault_healthy = vault_client.health_check()
    db_healthy = await database.health_check()

    return HealthResponse(
        status="healthy" if (vault_healthy and db_healthy) else "unhealthy",
        vault_connected=vault_healthy,
        database_connected=db_healthy,
        timestamp=datetime.utcnow().isoformat()
    )


@app.post("/encrypt", response_model=EncryptResponse, tags=["Encryption"])
async def encrypt_data(
    request: EncryptRequest,
    current_user: User = Depends(get_current_active_user)
):
    """
    Encrypt plaintext data using Vault transit engine (requires authentication)

    - **plaintext**: The data to encrypt
    - **context**: Optional context for key derivation
    """
    try:
        if vault_client is None or database is None:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail="Service not initialized"
            )
        # Encrypt data using Vault
        ciphertext, key_version = vault_client.encrypt(
            plaintext=request.plaintext,
            context=request.context
        )

        # Store encrypted record in database
        record_id = await database.insert_record(
            ciphertext=ciphertext,
            key_version=key_version,
            context=request.context
        )

        logger.info(f"Encrypted data stored with record ID: {record_id}")

        return EncryptResponse(
            ciphertext=ciphertext,
            record_id=record_id,
            key_version=key_version
        )

    except Exception as e:
        logger.error(f"Encryption failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Encryption failed: {str(e)}"
        )


@app.post("/decrypt", response_model=DecryptResponse, tags=["Encryption"])
async def decrypt_data(
    request: DecryptRequest,
    current_user: User = Depends(get_current_active_user)
):
    """
    Decrypt ciphertext using Vault transit engine (requires authentication)

    - **ciphertext**: The encrypted data to decrypt
    - **context**: Optional context used during encryption
    """
    try:
        if vault_client is None:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail="Service not initialized"
            )
        # Decrypt data using Vault
        plaintext = vault_client.decrypt(
            ciphertext=request.ciphertext,
            context=request.context
        )

        logger.info("Data decrypted successfully")

        return DecryptResponse(plaintext=plaintext)

    except Exception as e:
        logger.error(f"Decryption failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Decryption failed: {str(e)}"
        )


@app.get("/records", response_model=List[EncryptedRecord], tags=["Records"])
async def get_records(
    limit: int = 100,
    current_user: User = Depends(get_current_active_user)
):
    """
    Get all encrypted records from database (requires authentication)

    - **limit**: Maximum number of records to return (default: 100)
    """
    try:
        if database is None:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail="Service not initialized"
            )
        records = await database.get_all_records(limit=limit)
        return records

    except Exception as e:
        logger.error(f"Failed to retrieve records: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve records: {str(e)}"
        )


@app.get("/records/{record_id}", response_model=EncryptedRecord, tags=["Records"])
async def get_record(
    record_id: int,
    current_user: User = Depends(get_current_active_user)
):
    """
    Get a specific encrypted record by ID (requires authentication)

    - **record_id**: The database record ID
    """
    try:
        if database is None:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail="Service not initialized"
            )
        record = await database.get_record(record_id)

        if not record:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"Record {record_id} not found"
            )

        return record

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Failed to retrieve record: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve record: {str(e)}"
        )


@app.delete("/records/{record_id}", tags=["Records"])
async def delete_record(
    record_id: int,
    current_user: User = Depends(get_current_active_user)
):
    """
    Delete a specific encrypted record (requires authentication)

    - **record_id**: The database record ID
    """
    try:
        if database is None:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail="Service not initialized"
            )
        deleted = await database.delete_record(record_id)

        if not deleted:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"Record {record_id} not found"
            )

        return {"message": f"Record {record_id} deleted successfully"}

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Failed to delete record: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to delete record: {str(e)}"
        )


@app.post("/rotate-key", tags=["Key Management"])
async def rotate_encryption_key(current_user: User = Depends(get_current_active_user)):
    """
    Rotate the encryption key to a new version (requires authentication)

    Note: Existing encrypted data can still be decrypted with old key versions
    """
    try:
        if vault_client is None:
            raise HTTPException(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                detail="Service not initialized"
            )
        vault_client.rotate_key()
        return {"message": "Encryption key rotated successfully"}

    except Exception as e:
        logger.error(f"Key rotation failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Key rotation failed: {str(e)}"
        )


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

# vault_client.py
"""
HashiCorp Vault client for encryption/decryption operations
Uses Vault's Transit Secrets Engine for encryption-as-a-service
"""
import hvac
import base64
import os
from typing import Optional, Tuple
import logging

logger = logging.getLogger(__name__)


class VaultClient:
    """Client for interacting with HashiCorp Vault Transit Engine"""

    def __init__(
        self,
        vault_addr: Optional[str] = None,
        vault_token: Optional[str] = None,
        transit_mount: str = "transit",
        key_name: str = "encryption-key",
        verify_tls: Optional[bool] = None,
        ca_cert_path: Optional[str] = None
    ):
        """
        Initialize Vault client

        Args:
            vault_addr: Vault server address (default: from VAULT_ADDR env)
            vault_token: Vault authentication token (default: from VAULT_TOKEN env)
            transit_mount: Transit engine mount point
            key_name: Name of the encryption key
            verify_tls: Whether to verify TLS certificates (default: from VAULT_VERIFY_TLS env)
            ca_cert_path: Path to CA certificate for TLS verification (default: from VAULT_CA_CERT env)
        """
        self.vault_addr = vault_addr or os.getenv("VAULT_ADDR", "http://localhost:8200")
        self.vault_token = vault_token or os.getenv("VAULT_TOKEN", "root")
        self.transit_mount = transit_mount
        self.key_name = key_name

        # TLS configuration
        if verify_tls is None:
            verify_tls_env = os.getenv("VAULT_VERIFY_TLS", "true").lower()
            verify_tls = verify_tls_env in ("true", "1", "yes")

        ca_cert = ca_cert_path or os.getenv("VAULT_CA_CERT")

        # Initialize Vault client with TLS support
        verify_param: Optional[str] = None

        # Configure TLS verification
        if self.vault_addr.startswith("https://"):
            if ca_cert and os.path.exists(ca_cert):
                verify_param = ca_cert
                logger.info(f"Using CA certificate: {ca_cert}")
            elif not verify_tls:
                verify_param = None  # Will use default verification
                logger.warning("TLS verification disabled - not recommended for production")
            else:
                logger.info("TLS verification enabled with system certificates")

        self.client = hvac.Client(
            url=self.vault_addr,
            token=self.vault_token,
            verify=verify_param if verify_param else True
        )

    async def initialize_transit_engine(self):
        """Initialize the transit secrets engine and create encryption key"""
        try:
            # Enable transit secrets engine if not already enabled
            if f"{self.transit_mount}/" not in self.client.sys.list_mounted_secrets_engines()["data"]:
                logger.info(f"Enabling transit secrets engine at {self.transit_mount}")
                self.client.sys.enable_secrets_engine(
                    backend_type="transit",
                    path=self.transit_mount
                )

            # Create encryption key if it doesn't exist
            try:
                self.client.secrets.transit.read_key(
                    name=self.key_name,
                    mount_point=self.transit_mount
                )
                logger.info(f"Encryption key '{self.key_name}' already exists")
            except Exception as read_error:
                # Key doesn't exist, create it
                if "invalid path" in str(read_error).lower():
                    logger.info(f"Creating encryption key '{self.key_name}'")
                    self.client.secrets.transit.create_key(
                        name=self.key_name,
                        mount_point=self.transit_mount
                    )

        except Exception as e:
            logger.error(f"Failed to initialize transit engine: {e}")
            raise

    def encrypt(self, plaintext: str, context: Optional[str] = None) -> Tuple[str, int]:
        """
        Encrypt plaintext using Vault transit engine

        Args:
            plaintext: Data to encrypt
            context: Optional context for key derivation

        Returns:
            Tuple of (ciphertext, key_version)
        """
        try:
            # Encode plaintext to base64 as required by Vault
            plaintext_b64 = base64.b64encode(plaintext.encode()).decode()

            # Prepare encryption parameters
            encrypt_params = {
                "plaintext": plaintext_b64,
                "mount_point": self.transit_mount,
                "name": self.key_name
            }

            # Add context if provided
            if context:
                context_b64 = base64.b64encode(context.encode()).decode()
                encrypt_params["context"] = context_b64

            # Encrypt data
            response = self.client.secrets.transit.encrypt_data(**encrypt_params)

            ciphertext = response["data"]["ciphertext"]
            key_version = response["data"]["key_version"]

            logger.info(f"Successfully encrypted data with key version {key_version}")
            return ciphertext, key_version

        except Exception as e:
            logger.error(f"Encryption failed: {e}")
            raise

    def decrypt(self, ciphertext: str, context: Optional[str] = None) -> str:
        """
        Decrypt ciphertext using Vault transit engine

        Args:
            ciphertext: Encrypted data
            context: Optional context used during encryption

        Returns:
            Decrypted plaintext
        """
        try:
            # Prepare decryption parameters
            decrypt_params = {
                "ciphertext": ciphertext,
                "mount_point": self.transit_mount,
                "name": self.key_name
            }

            # Add context if provided
            if context:
                context_b64 = base64.b64encode(context.encode()).decode()
                decrypt_params["context"] = context_b64

            # Decrypt data
            response = self.client.secrets.transit.decrypt_data(**decrypt_params)

            # Decode from base64
            plaintext_b64 = response["data"]["plaintext"]
            plaintext = base64.b64decode(plaintext_b64).decode()

            logger.info("Successfully decrypted data")
            return plaintext

        except Exception as e:
            logger.error(f"Decryption failed: {e}")
            raise

    def rotate_key(self):
        """Rotate the encryption key to a new version"""
        try:
            self.client.secrets.transit.rotate_key(
                name=self.key_name,
                mount_point=self.transit_mount
            )
            logger.info(f"Successfully rotated key '{self.key_name}'")
        except Exception as e:
            logger.error(f"Key rotation failed: {e}")
            raise

    def health_check(self) -> bool:
        """Check if Vault is accessible and authenticated"""
        try:
            return self.client.sys.is_initialized() and self.client.is_authenticated()
        except Exception as e:
            logger.error(f"Vault health check failed: {e}")
            return False

# Made with Bob
Enter fullscreen mode Exit fullscreen mode
  • Usage example;
# example_usage.py
"""
Example usage of the Vault Encryption Service API with Authentication

This script demonstrates how to use the encryption service
for encrypting and decrypting sensitive data with JWT authentication.
"""
import requests
import json
from typing import Optional

# Configuration
BASE_URL = "http://localhost:8000"
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "secret"


class EncryptionClient:
    """Client for interacting with the Vault Encryption Service"""

    def __init__(self, base_url: str = BASE_URL, username: str = DEFAULT_USERNAME, password: str = DEFAULT_PASSWORD):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = None

    def authenticate(self) -> str:
        """Authenticate and get access token"""
        response = requests.post(
            f"{self.base_url}/token",
            data={"username": self.username, "password": self.password}
        )
        response.raise_for_status()
        self.token = response.json()["access_token"]
        return self.token

    def _get_headers(self) -> dict:
        """Get headers with authentication token"""
        if not self.token:
            self.authenticate()
        return {
            "Authorization": f"Bearer {self.token}",
            "Content-Type": "application/json"
        }

    def health_check(self) -> dict:
        """Check service health (no authentication required)"""
        response = requests.get(f"{self.base_url}/health")
        response.raise_for_status()
        return response.json()

    def get_current_user(self) -> dict:
        """Get current authenticated user information"""
        response = requests.get(
            f"{self.base_url}/users/me",
            headers=self._get_headers()
        )
        response.raise_for_status()
        return response.json()

    def encrypt(self, plaintext: str, context: Optional[str] = None) -> dict:
        """Encrypt plaintext data"""
        payload = {"plaintext": plaintext}
        if context:
            payload["context"] = context

        response = requests.post(
            f"{self.base_url}/encrypt",
            headers=self._get_headers(),
            json=payload
        )
        response.raise_for_status()
        return response.json()

    def decrypt(self, ciphertext: str, context: Optional[str] = None) -> str:
        """Decrypt ciphertext"""
        payload = {"ciphertext": ciphertext}
        if context:
            payload["context"] = context

        response = requests.post(
            f"{self.base_url}/decrypt",
            headers=self._get_headers(),
            json=payload
        )
        response.raise_for_status()
        return response.json()["plaintext"]

    def list_records(self, limit: int = 100) -> list:
        """List all encrypted records"""
        response = requests.get(
            f"{self.base_url}/records",
            headers=self._get_headers(),
            params={"limit": limit}
        )
        response.raise_for_status()
        return response.json()

    def get_record(self, record_id: int) -> dict:
        """Get a specific record"""
        response = requests.get(
            f"{self.base_url}/records/{record_id}",
            headers=self._get_headers()
        )
        response.raise_for_status()
        return response.json()

    def delete_record(self, record_id: int) -> dict:
        """Delete a record"""
        response = requests.delete(
            f"{self.base_url}/records/{record_id}",
            headers=self._get_headers()
        )
        response.raise_for_status()
        return response.json()

    def rotate_key(self) -> dict:
        """Rotate the encryption key"""
        response = requests.post(
            f"{self.base_url}/rotate-key",
            headers=self._get_headers()
        )
        response.raise_for_status()
        return response.json()


def example_authentication():
    """Example: Authentication"""
    print("\n=== Example 1: Authentication ===")

    client = EncryptionClient()

    # Authenticate
    print("Authenticating...")
    token = client.authenticate()
    print(f"✓ Access token obtained: {token[:20]}...")

    # Get current user info
    user = client.get_current_user()
    print(f"✓ Logged in as: {user['username']} ({user['full_name']})")
    print(f"  Email: {user['email']}")


def example_basic_encryption():
    """Example: Basic encryption and decryption"""
    print("\n=== Example 2: Basic Encryption ===")

    client = EncryptionClient()

    # Encrypt data
    plaintext = "This is my secret message"
    print(f"Plaintext: {plaintext}")

    encrypted = client.encrypt(plaintext)
    print(f"Ciphertext: {encrypted['ciphertext']}")
    print(f"Record ID: {encrypted['record_id']}")
    print(f"Key Version: {encrypted['key_version']}")

    # Decrypt data
    decrypted = client.decrypt(encrypted['ciphertext'])
    print(f"Decrypted: {decrypted}")

    assert plaintext == decrypted, "Decryption failed!"
    print("✓ Encryption/Decryption successful!")


def example_context_encryption():
    """Example: Encryption with context"""
    print("\n=== Example 3: Context-based Encryption ===")

    client = EncryptionClient()

    # Encrypt with context
    plaintext = "User's credit card number: 1234-5678-9012-3456"
    context = "user-12345"

    print(f"Plaintext: {plaintext}")
    print(f"Context: {context}")

    encrypted = client.encrypt(plaintext, context)
    print(f"Ciphertext: {encrypted['ciphertext']}")

    # Decrypt with correct context
    decrypted = client.decrypt(encrypted['ciphertext'], context)
    print(f"Decrypted (correct context): {decrypted}")

    # Try to decrypt with wrong context (will fail)
    try:
        wrong_decrypted = client.decrypt(encrypted['ciphertext'], "wrong-context")
        print(f"Decrypted (wrong context): {wrong_decrypted}")
    except requests.exceptions.HTTPError as e:
        print(f"✓ Decryption with wrong context failed as expected: {e}")


def example_multiple_encryptions():
    """Example: Encrypting multiple values"""
    print("\n=== Example 4: Multiple Encryptions ===")

    client = EncryptionClient()

    sensitive_data = [
        {"user": "alice", "data": "alice@example.com"},
        {"user": "bob", "data": "bob@example.com"},
        {"user": "charlie", "data": "charlie@example.com"},
    ]

    encrypted_records = []

    for item in sensitive_data:
        encrypted = client.encrypt(
            plaintext=item["data"],
            context=item["user"]
        )
        encrypted_records.append({
            "user": item["user"],
            "record_id": encrypted["record_id"],
            "ciphertext": encrypted["ciphertext"]
        })
        print(f"Encrypted {item['user']}'s data: Record ID {encrypted['record_id']}")

    # Decrypt all
    print("\nDecrypting all records:")
    for record in encrypted_records:
        decrypted = client.decrypt(record["ciphertext"], record["user"])
        print(f"  {record['user']}: {decrypted}")


def example_list_records():
    """Example: Listing all records"""
    print("\n=== Example 5: List Records ===")

    client = EncryptionClient()

    records = client.list_records(limit=10)
    print(f"Total records: {len(records)}")

    for record in records[:5]:  # Show first 5
        print(f"\nRecord ID: {record['id']}")
        print(f"  Ciphertext: {record['ciphertext'][:50]}...")
        print(f"  Context: {record['context']}")
        print(f"  Key Version: {record['key_version']}")
        print(f"  Created: {record['created_at']}")


def example_key_rotation():
    """Example: Key rotation"""
    print("\n=== Example 6: Key Rotation ===")

    client = EncryptionClient()

    # Encrypt with current key
    plaintext = "Data encrypted with old key"
    encrypted_old = client.encrypt(plaintext)
    print(f"Encrypted with key version: {encrypted_old['key_version']}")

    # Rotate key
    print("Rotating encryption key...")
    result = client.rotate_key()
    print(f"{result['message']}")

    # Encrypt with new key
    encrypted_new = client.encrypt(plaintext)
    print(f"Encrypted with key version: {encrypted_new['key_version']}")

    # Both can still be decrypted
    decrypted_old = client.decrypt(encrypted_old['ciphertext'])
    decrypted_new = client.decrypt(encrypted_new['ciphertext'])

    print(f"Old ciphertext decrypted: {decrypted_old}")
    print(f"New ciphertext decrypted: {decrypted_new}")
    print("✓ Both old and new encryptions can be decrypted!")


def example_health_check():
    """Example: Health check"""
    print("\n=== Example 7: Health Check ===")

    client = EncryptionClient()

    health = client.health_check()
    print(f"Service Status: {health['status']}")
    print(f"Vault Connected: {health['vault_connected']}")
    print(f"Database Connected: {health['database_connected']}")
    print(f"Timestamp: {health['timestamp']}")


def example_error_handling():
    """Example: Error handling"""
    print("\n=== Example 8: Error Handling ===")

    client = EncryptionClient()

    # Try to decrypt invalid ciphertext
    try:
        client.decrypt("invalid-ciphertext")
    except requests.exceptions.HTTPError as e:
        print(f"✓ Handled invalid ciphertext error: {e.response.status_code}")

    # Try to get non-existent record
    try:
        client.get_record(999999)
    except requests.exceptions.HTTPError as e:
        print(f"✓ Handled non-existent record error: {e.response.status_code}")


def main():
    """Run all examples"""
    print("=" * 60)
    print("Vault Encryption Service - Usage Examples with Authentication")
    print("=" * 60)

    try:
        # Check if service is running
        client = EncryptionClient()
        health = client.health_check()

        if health['status'] != 'healthy':
            print("⚠ Warning: Service is not fully healthy")
            print(f"Vault: {health['vault_connected']}")
            print(f"Database: {health['database_connected']}")
            return

        print("✓ Service is healthy and ready")
        print(f"✓ Authentication enabled: {health.get('authentication', 'N/A')}")

        # Run examples
        example_authentication()
        example_basic_encryption()
        example_context_encryption()
        example_multiple_encryptions()
        example_list_records()
        example_key_rotation()
        example_health_check()
        example_error_handling()

        print("\n" + "=" * 60)
        print("All examples completed successfully!")
        print("=" * 60)

    except requests.exceptions.ConnectionError:
        print("\n❌ Error: Cannot connect to the service")
        print("Make sure the service is running at", BASE_URL)
        print("Run: ./scripts/start.sh")
    except Exception as e:
        print(f"\n❌ Error: {e}")


if __name__ == "__main__":
    main()

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

Conclusion

By decoupling encryption logic from the application and delegating it to HashiCorp Vault, teams can achieve a high level of security with minimal code complexity.


Thanks for reading and special thanks to “Bob” who helped me to build this demo in less than one hour! 💯

Links

Top comments (0)