DEV Community

Cover image for Building Tamper-Evident Audit Trails for Trading Systems: A Deep Dive into VCP v1.1

Building Tamper-Evident Audit Trails for Trading Systems: A Deep Dive into VCP v1.1

TL;DR: The EU AI Act is coming for algorithmic trading. Traditional logs can be tampered with. VCP v1.1 uses Merkle trees + external anchoring to create verifiable audit trails. Here's how to implement it.


🎯 The Problem

You're building a trading bot. Your compliance team says you need audit logs. Easy, right?

# "Audit logging" circa 2024
def execute_trade(order):
    result = broker.submit(order)
    logger.info(f"Trade executed: {order} -> {result}")
    db.insert("audit_log", {
        "timestamp": datetime.now(),
        "order": order,
        "result": result
    })
    return result
Enter fullscreen mode Exit fullscreen mode

Ship it. βœ…

Except... what happens when a regulator asks: "How do I know you didn't delete the embarrassing trades?"

Your answer: "Trust me bro."

That's not going to fly under the EU AI Act.


πŸ”₯ Why This Matters Now

In November 2025, three things happened:

  1. EBA Factsheet (Nov 21): Confirmed AI credit scoring is high-risk. Algo trading classification pending EC guidance (February 2026).

  2. EU Parliament Resolution (Nov 25): Called for "tamper-resistant logs with audit integration" in financial AI systems.

  3. CEN-CENELEC Standards (Oct 23): Logging capabilities standard expected Q2 2026. Specifies cryptographic integrity requirements.

The regulatory direction is clear: logs need to be verifiable, not just stored.


πŸ—οΈ Enter VCP v1.1: The Architecture

VeritasChain Protocol (VCP) is an open specification for tamper-evident audit trails. Version 1.1 introduces a three-layer security architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  LAYER 3: EXTERNAL VERIFIABILITY                        β”‚
β”‚  β€’ Digital Signature (Ed25519)        ─── REQUIRED      β”‚
β”‚  β€’ External Anchor (blockchain/TSA)   ─── REQUIRED      β”‚
β”‚  β†’ Third parties can verify without trusting you        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  LAYER 2: COLLECTION INTEGRITY                          β”‚
β”‚  β€’ Merkle Tree (RFC 6962)             ─── REQUIRED      β”‚
β”‚  β€’ Merkle Root                        ─── REQUIRED      β”‚
β”‚  β†’ Proves batch completeness (no omissions)             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  LAYER 1: EVENT INTEGRITY                               β”‚
β”‚  β€’ EventHash (SHA-256)                ─── REQUIRED      β”‚
β”‚  β€’ PrevHash (hash chain)              ─── OPTIONAL      β”‚
β”‚  β†’ Individual event integrity                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Enter fullscreen mode Exit fullscreen mode

Let's implement each layer.


πŸ’» Layer 1: Event Integrity

Every event gets a cryptographic hash. Change one bit, the hash changes completely.

EventHash Calculation

import hashlib
import json
from typing import Any

def canonicalize_json(obj: Any) -> str:
    """
    RFC 8785 JSON Canonicalization Scheme (simplified)

    - Sort keys alphabetically
    - No whitespace
    - Unicode normalization
    """
    return json.dumps(obj, sort_keys=True, separators=(',', ':'), ensure_ascii=False)


def calculate_event_hash(header: dict, payload: dict, algo: str = "sha256") -> str:
    """
    Calculate EventHash for a VCP event

    Args:
        header: Event header (EventID, Timestamp, EventType, etc.)
        payload: Event payload (trade data, risk parameters, etc.)
        algo: Hash algorithm (sha256, sha3_256, blake3)

    Returns:
        Hex-encoded hash string
    """
    canonical_header = canonicalize_json(header)
    canonical_payload = canonicalize_json(payload)

    # Concatenate canonicalized components
    hash_input = (canonical_header + canonical_payload).encode('utf-8')

    if algo == "sha256":
        return hashlib.sha256(hash_input).hexdigest()
    elif algo == "sha3_256":
        return hashlib.sha3_256(hash_input).hexdigest()
    else:
        raise ValueError(f"Unsupported algorithm: {algo}")


# Example usage
header = {
    "EventID": "01958c5a-b2d0-7f12-8a4f-1234567890ab",  # UUID v7
    "TimestampISO": "2026-01-17T10:30:00.123456Z",
    "TimestampInt": 1768738200123456,
    "EventType": "ORDER_SUBMIT"
}

payload = {
    "Symbol": "EURUSD",
    "Side": "BUY",
    "Volume": 1.0,
    "Price": 1.0850,
    "AlgorithmID": "trend_follower_v2"
}

event_hash = calculate_event_hash(header, payload)
print(f"EventHash: {event_hash}")
# EventHash: 3a7f2c1b9e8d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a
Enter fullscreen mode Exit fullscreen mode

Why Canonicalization Matters

Without canonicalization, the same data can produce different hashes:

# These are logically identical but hash differently
obj1 = {"b": 2, "a": 1}
obj2 = {"a": 1, "b": 2}

json.dumps(obj1)  # '{"b": 2, "a": 1}'
json.dumps(obj2)  # '{"a": 1, "b": 2}'

# Different strings β†’ different hashes β†’ verification fails
Enter fullscreen mode Exit fullscreen mode

RFC 8785 solves this with deterministic serialization rules.

Optional: Hash Chains (PrevHash)

VCP v1.1 makes hash chains optional. But if you want real-time tamper detection between anchor points, you can still use them:

class HashChainLogger:
    def __init__(self):
        self.prev_hash = "0" * 64  # Genesis

    def log_event(self, header: dict, payload: dict) -> dict:
        # Include prev_hash in header
        header["PrevHash"] = self.prev_hash

        # Calculate hash including the chain
        event_hash = calculate_event_hash(header, payload)

        # Update chain
        self.prev_hash = event_hash

        return {
            "Header": header,
            "Payload": payload,
            "Security": {
                "EventHash": event_hash,
                "PrevHash": header["PrevHash"]
            }
        }
Enter fullscreen mode Exit fullscreen mode

When to use hash chains:

  • βœ… HFT systems (real-time gap detection)
  • βœ… Regulatory submission (auditors expect it)
  • ❌ Backtesting (events generated out of order)
  • ❌ MT4/MT5 DLLs (unnecessary complexity)

🌳 Layer 2: Collection Integrity (Merkle Trees)

Here's where VCP gets interesting. Individual hashes prove events weren't modified. But how do you prove events weren't deleted?

Merkle trees.

RFC 6962 Merkle Tree Construction

VCP mandates RFC 6962 compliance to prevent second preimage attacks. The key is domain separation:

from typing import List, Tuple
import hashlib


def merkle_leaf_hash(data: bytes) -> bytes:
    """Hash a leaf node with 0x00 prefix (domain separation)"""
    return hashlib.sha256(b'\x00' + data).digest()


def merkle_internal_hash(left: bytes, right: bytes) -> bytes:
    """Hash an internal node with 0x01 prefix (domain separation)"""
    return hashlib.sha256(b'\x01' + left + right).digest()


def build_merkle_tree(event_hashes: List[str]) -> Tuple[str, List[List[bytes]]]:
    """
    Build RFC 6962 compliant Merkle tree

    Args:
        event_hashes: List of hex-encoded event hashes

    Returns:
        (merkle_root, tree_levels) where tree_levels[0] is leaves
    """
    if not event_hashes:
        raise ValueError("Cannot build tree from empty list")

    # Convert to bytes and hash as leaves
    leaves = [bytes.fromhex(h) for h in event_hashes]
    current_level = [merkle_leaf_hash(leaf) for leaf in leaves]

    tree = [current_level]

    while len(current_level) > 1:
        next_level = []

        for i in range(0, len(current_level), 2):
            left = current_level[i]
            # If odd number of nodes, duplicate the last one
            right = current_level[i + 1] if i + 1 < len(current_level) else current_level[i]
            next_level.append(merkle_internal_hash(left, right))

        tree.append(next_level)
        current_level = next_level

    merkle_root = current_level[0].hex()
    return merkle_root, tree


# Example: 4 events
event_hashes = [
    "a1b2c3d4e5f6789012345678901234567890123456789012345678901234567890",
    "b2c3d4e5f67890123456789012345678901234567890123456789012345678901a",
    "c3d4e5f678901234567890123456789012345678901234567890123456789012ab",
    "d4e5f6789012345678901234567890123456789012345678901234567890123abc"
]

merkle_root, tree = build_merkle_tree(event_hashes)
print(f"Merkle Root: {merkle_root}")
print(f"Tree depth: {len(tree)}")
Enter fullscreen mode Exit fullscreen mode

Visualizing the Tree

                    [Root]
                   /      \
              [H01]        [H23]
              /    \      /    \
          [H0]   [H1]  [H2]   [H3]
           |      |     |      |
        Event0 Event1 Event2 Event3
Enter fullscreen mode Exit fullscreen mode

Key property: Change any event β†’ the root changes. Remove any event β†’ the root changes. Add any event β†’ the root changes.

Generating Merkle Proofs

To prove a specific event is included in the root:

def generate_merkle_proof(tree: List[List[bytes]], leaf_index: int) -> List[dict]:
    """
    Generate audit path for a specific event

    Returns list of sibling hashes needed to reconstruct root
    """
    proof = []
    index = leaf_index

    for level in tree[:-1]:  # Exclude root level
        sibling_index = index ^ 1  # XOR to get sibling

        if sibling_index < len(level):
            proof.append({
                "hash": level[sibling_index].hex(),
                "position": "left" if sibling_index < index else "right"
            })

        index //= 2

    return proof


def verify_merkle_proof(
    event_hash: str,
    proof: List[dict],
    merkle_root: str
) -> bool:
    """Verify that an event is included in a Merkle root"""
    current = merkle_leaf_hash(bytes.fromhex(event_hash))

    for step in proof:
        sibling = bytes.fromhex(step["hash"])

        if step["position"] == "left":
            current = merkle_internal_hash(sibling, current)
        else:
            current = merkle_internal_hash(current, sibling)

    return current.hex() == merkle_root


# Generate proof for Event1
proof = generate_merkle_proof(tree, leaf_index=1)
print(f"Proof for Event1: {proof}")

# Verify
is_valid = verify_merkle_proof(event_hashes[1], proof, merkle_root)
print(f"Verification: {'βœ… PASS' if is_valid else '❌ FAIL'}")
Enter fullscreen mode Exit fullscreen mode

βš“ Layer 3: External Verifiability

This is the critical change in VCP v1.1: External anchoring is now REQUIRED for all tiers.

Why? Because without external anchoring, an operator can:

  1. Generate events with valid hashes
  2. Build a valid Merkle tree
  3. Realize some events are problematic
  4. Regenerate the entire tree without those events
  5. Present the sanitized version

No one would know because no one saw the original root.

External anchoring creates an irrevocable commitment.

Anchoring Options by Tier

Tier Frequency Targets Cost
Platinum 10 min Ethereum, Bitcoin, RFC 3161 TSA $$
Gold 1 hour RFC 3161 TSA, attested database $
Silver 24 hours OpenTimestamps, FreeTSA Free

Silver Tier: OpenTimestamps Implementation

OpenTimestamps is free and Bitcoin-backed. Perfect for getting started:

# pip install opentimestamps-client

import subprocess
import tempfile
import os


def anchor_with_opentimestamps(merkle_root: str) -> dict:
    """
    Anchor a Merkle root using OpenTimestamps

    Returns anchor proof that can be independently verified
    """
    # Write root to temp file
    with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.txt') as f:
        f.write(bytes.fromhex(merkle_root))
        temp_path = f.name

    try:
        # Create timestamp (this calls the OTS servers)
        subprocess.run(
            ['ots', 'stamp', temp_path],
            check=True,
            capture_output=True
        )

        # Read the proof file
        proof_path = temp_path + '.ots'
        with open(proof_path, 'rb') as f:
            ots_proof = f.read()

        return {
            "type": "OPENTIMESTAMPS",
            "merkle_root": merkle_root,
            "proof": ots_proof.hex(),
            "status": "PENDING"  # Confirmed after Bitcoin block
        }

    finally:
        os.unlink(temp_path)
        if os.path.exists(temp_path + '.ots'):
            os.unlink(temp_path + '.ots')


def verify_opentimestamps(anchor: dict) -> bool:
    """Verify an OpenTimestamps anchor"""
    with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.txt') as f:
        f.write(bytes.fromhex(anchor["merkle_root"]))
        data_path = f.name

    with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.ots') as f:
        f.write(bytes.fromhex(anchor["proof"]))
        proof_path = f.name

    try:
        result = subprocess.run(
            ['ots', 'verify', proof_path],
            capture_output=True,
            text=True
        )
        return "Success!" in result.stdout or result.returncode == 0

    finally:
        os.unlink(data_path)
        os.unlink(proof_path)
Enter fullscreen mode Exit fullscreen mode

Gold/Platinum Tier: RFC 3161 TSA

For production systems, use a qualified Time Stamp Authority:

import requests
import hashlib
from asn1crypto import tsp, cms
from datetime import datetime


def anchor_with_tsa(merkle_root: str, tsa_url: str = "https://freetsa.org/tsr") -> dict:
    """
    Anchor using RFC 3161 Time Stamp Authority

    Args:
        merkle_root: Hex-encoded Merkle root
        tsa_url: TSA server URL

    Returns:
        Anchor record with TSA token
    """
    # Create timestamp request
    digest = bytes.fromhex(merkle_root)

    ts_request = tsp.TimeStampReq({
        'version': 1,
        'message_imprint': {
            'hash_algorithm': {'algorithm': 'sha256'},
            'hashed_message': digest
        },
        'cert_req': True
    })

    # Send to TSA
    response = requests.post(
        tsa_url,
        data=ts_request.dump(),
        headers={'Content-Type': 'application/timestamp-query'}
    )

    if response.status_code != 200:
        raise Exception(f"TSA request failed: {response.status_code}")

    # Parse response
    ts_response = tsp.TimeStampResp.load(response.content)

    if ts_response['status']['status'].native != 'granted':
        raise Exception(f"TSA rejected request: {ts_response['status']}")

    token = ts_response['time_stamp_token']

    return {
        "type": "RFC3161_TSA",
        "merkle_root": merkle_root,
        "tsa_url": tsa_url,
        "token": response.content.hex(),
        "timestamp": datetime.utcnow().isoformat() + "Z"
    }
Enter fullscreen mode Exit fullscreen mode

πŸ”§ Putting It All Together: Complete Implementation

Here's a production-ready Silver tier implementation:

"""
VCP v1.1 Silver Tier Implementation
"""

import uuid
import time
import hashlib
import json
from datetime import datetime
from typing import List, Optional
from dataclasses import dataclass, field
from nacl.signing import SigningKey
import base64


@dataclass
class VCPEvent:
    header: dict
    payload: dict
    security: dict
    policy_identification: dict


@dataclass
class AnchorRecord:
    id: str
    merkle_root: str
    signature: str
    anchor_target: dict
    event_count: int
    first_event_id: str
    last_event_id: str
    timestamp: int


class VCPSilverTier:
    """
    VCP v1.1 Silver Tier Implementation

    Requirements:
    - EventHash: REQUIRED
    - Merkle Tree: REQUIRED
    - Digital Signature: REQUIRED
    - External Anchor: REQUIRED (24 hours)
    - PrevHash: OPTIONAL (not implemented here)
    """

    def __init__(self, private_key_hex: str, policy_id: str, issuer: str):
        """
        Initialize VCP logger

        Args:
            private_key_hex: Ed25519 private key (32 bytes hex)
            policy_id: Unique policy identifier
            issuer: Organization name
        """
        self.signing_key = SigningKey(bytes.fromhex(private_key_hex))
        self.verify_key = self.signing_key.verify_key
        self.policy_id = policy_id
        self.issuer = issuer

        self.pending_events: List[VCPEvent] = []
        self.anchored_batches: List[AnchorRecord] = []
        self.last_anchor_time: Optional[float] = None

    def _generate_uuid7(self) -> str:
        """Generate UUID v7 (time-ordered)"""
        # Simplified UUID v7 generation
        timestamp_ms = int(time.time() * 1000)
        random_bits = int.from_bytes(uuid.uuid4().bytes[6:], 'big')

        uuid_int = (timestamp_ms << 80) | (0b0111 << 76) | (random_bits & ((1 << 76) - 1))
        return str(uuid.UUID(int=uuid_int))

    def _canonicalize(self, obj: dict) -> str:
        """RFC 8785 canonicalization"""
        return json.dumps(obj, sort_keys=True, separators=(',', ':'), ensure_ascii=False)

    def _calculate_hash(self, *parts: str) -> str:
        """Calculate SHA-256 hash of concatenated parts"""
        combined = ''.join(parts).encode('utf-8')
        return hashlib.sha256(combined).hexdigest()

    def _sign(self, message: str) -> str:
        """Sign message with Ed25519, return base64"""
        signed = self.signing_key.sign(message.encode('utf-8'))
        return base64.b64encode(signed.signature).decode('ascii')

    def log_event(
        self,
        event_type: str,
        payload: dict,
        timestamp: Optional[datetime] = None
    ) -> VCPEvent:
        """
        Log a VCP event

        Args:
            event_type: Event type code (e.g., "ORDER_SUBMIT", "TRADE_EXECUTE")
            payload: Event payload data
            timestamp: Optional timestamp (defaults to now)

        Returns:
            VCPEvent object
        """
        ts = timestamp or datetime.utcnow()
        ts_int = int(ts.timestamp() * 1_000_000)  # Microseconds

        header = {
            "EventID": self._generate_uuid7(),
            "TimestampISO": ts.strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z",
            "TimestampInt": ts_int,
            "EventType": event_type,
            "Version": "1.1"
        }

        # Calculate EventHash
        event_hash = self._calculate_hash(
            self._canonicalize(header),
            self._canonicalize(payload)
        )

        # Sign the hash
        signature = self._sign(event_hash)

        security = {
            "Version": "1.1",
            "EventHash": event_hash,
            "HashAlgo": "SHA256",
            "Signature": signature,
            "SignAlgo": "ED25519",
            "PublicKey": self.verify_key.encode().hex()
        }

        policy_identification = {
            "PolicyID": self.policy_id,
            "ConformanceTier": "SILVER",
            "RegistrationPolicy": {
                "Issuer": self.issuer,
                "IssuanceTimestamp": ts_int
            },
            "VerificationDepth": {
                "MerkleProofRequired": True,
                "ExternalAnchorRequired": True
            }
        }

        event = VCPEvent(
            header=header,
            payload=payload,
            security=security,
            policy_identification=policy_identification
        )

        self.pending_events.append(event)
        return event

    def anchor_batch(self) -> Optional[AnchorRecord]:
        """
        Anchor pending events to external service

        REQUIRED: Must be called at least daily for Silver tier

        Returns:
            AnchorRecord or None if no pending events
        """
        if not self.pending_events:
            return None

        # Collect event hashes
        event_hashes = [e.security["EventHash"] for e in self.pending_events]

        # Build Merkle tree
        merkle_root, tree = self._build_merkle_tree(event_hashes)

        # Sign Merkle root
        root_signature = self._sign(merkle_root)

        # External anchor (simplified - in production, call actual service)
        anchor_id = self._generate_uuid7()
        anchor_target = {
            "Type": "PUBLIC_SERVICE",
            "Identifier": "opentimestamps.org",
            "Proof": f"ots_pending_{anchor_id}"  # Placeholder
        }

        # Create anchor record
        anchor = AnchorRecord(
            id=anchor_id,
            merkle_root=merkle_root,
            signature=root_signature,
            anchor_target=anchor_target,
            event_count=len(self.pending_events),
            first_event_id=self.pending_events[0].header["EventID"],
            last_event_id=self.pending_events[-1].header["EventID"],
            timestamp=int(time.time() * 1_000_000)
        )

        # Update events with Merkle info
        for i, event in enumerate(self.pending_events):
            event.security["MerkleRoot"] = merkle_root
            event.security["MerkleIndex"] = i
            event.security["AnchorReference"] = anchor_id

            # Generate and store audit path
            event.security["AuditPath"] = self._generate_audit_path(tree, i)

        self.anchored_batches.append(anchor)
        self.pending_events = []
        self.last_anchor_time = time.time()

        return anchor

    def _build_merkle_tree(self, event_hashes: List[str]):
        """Build RFC 6962 Merkle tree"""
        leaves = [bytes.fromhex(h) for h in event_hashes]
        current = [hashlib.sha256(b'\x00' + leaf).digest() for leaf in leaves]

        tree = [current]

        while len(current) > 1:
            next_level = []
            for i in range(0, len(current), 2):
                left = current[i]
                right = current[i + 1] if i + 1 < len(current) else current[i]
                next_level.append(hashlib.sha256(b'\x01' + left + right).digest())
            tree.append(next_level)
            current = next_level

        return current[0].hex(), tree

    def _generate_audit_path(self, tree: List[List[bytes]], index: int) -> List[dict]:
        """Generate Merkle proof for event at index"""
        proof = []

        for level in tree[:-1]:
            sibling_idx = index ^ 1
            if sibling_idx < len(level):
                proof.append({
                    "hash": level[sibling_idx].hex(),
                    "position": "left" if sibling_idx < index else "right"
                })
            index //= 2

        return proof

    def verify_event(self, event: VCPEvent) -> dict:
        """
        Verify a VCP event's integrity

        Returns verification result with details
        """
        results = {
            "event_id": event.header["EventID"],
            "checks": {}
        }

        # Check 1: EventHash
        calculated_hash = self._calculate_hash(
            self._canonicalize(event.header),
            self._canonicalize(event.payload)
        )
        results["checks"]["event_hash"] = calculated_hash == event.security["EventHash"]

        # Check 2: Signature
        try:
            from nacl.signing import VerifyKey
            vk = VerifyKey(bytes.fromhex(event.security["PublicKey"]))
            sig = base64.b64decode(event.security["Signature"])
            vk.verify(event.security["EventHash"].encode(), sig)
            results["checks"]["signature"] = True
        except Exception:
            results["checks"]["signature"] = False

        # Check 3: Merkle proof (if anchored)
        if "MerkleRoot" in event.security and "AuditPath" in event.security:
            results["checks"]["merkle_proof"] = self._verify_merkle_proof(
                event.security["EventHash"],
                event.security["AuditPath"],
                event.security["MerkleRoot"]
            )

        results["valid"] = all(results["checks"].values())
        return results

    def _verify_merkle_proof(self, event_hash: str, proof: List[dict], root: str) -> bool:
        """Verify Merkle inclusion proof"""
        current = hashlib.sha256(b'\x00' + bytes.fromhex(event_hash)).digest()

        for step in proof:
            sibling = bytes.fromhex(step["hash"])
            if step["position"] == "left":
                current = hashlib.sha256(b'\x01' + sibling + current).digest()
            else:
                current = hashlib.sha256(b'\x01' + current + sibling).digest()

        return current.hex() == root

    def export_event(self, event: VCPEvent) -> dict:
        """Export event as JSON-serializable dict"""
        return {
            "Header": event.header,
            "Payload": event.payload,
            "Security": event.security,
            "PolicyIdentification": event.policy_identification
        }


# ========================================
# Usage Example
# ========================================

if __name__ == "__main__":
    # Generate a key (in production, load from secure storage)
    from nacl.signing import SigningKey
    key = SigningKey.generate()
    private_key_hex = key.encode().hex()

    # Initialize VCP logger
    vcp = VCPSilverTier(
        private_key_hex=private_key_hex,
        policy_id="DEMO-SILVER-2026-001",
        issuer="Demo Trading Co."
    )

    # Log some trading events
    events = []

    # Order submission
    events.append(vcp.log_event("ORDER_SUBMIT", {
        "Symbol": "EURUSD",
        "Side": "BUY",
        "Volume": 1.0,
        "Price": 1.0850,
        "OrderType": "LIMIT",
        "AlgorithmID": "trend_follower_v2"
    }))

    # Trade execution
    events.append(vcp.log_event("TRADE_EXECUTE", {
        "OrderID": events[0].header["EventID"],
        "Symbol": "EURUSD",
        "Side": "BUY",
        "Volume": 1.0,
        "ExecutionPrice": 1.0851,
        "Slippage": 0.0001
    }))

    # Risk check
    events.append(vcp.log_event("RISK_CHECK", {
        "AccountEquity": 10000.00,
        "OpenPositions": 1,
        "MarginUsed": 100.00,
        "DrawdownPercent": 0.5,
        "RiskStatus": "NORMAL"
    }))

    # Anchor the batch
    anchor = vcp.anchor_batch()

    print("=" * 60)
    print("VCP v1.1 Silver Tier Demo")
    print("=" * 60)
    print(f"\nπŸ“¦ Logged {len(events)} events")
    print(f"🌳 Merkle Root: {anchor.merkle_root[:32]}...")
    print(f"✍️  Signature: {anchor.signature[:32]}...")
    print(f"βš“ Anchor ID: {anchor.id}")

    # Verify each event
    print("\nπŸ” Verification Results:")
    for event in events:
        result = vcp.verify_event(event)
        status = "βœ…" if result["valid"] else "❌"
        print(f"  {status} {event.header['EventType']}: {result['checks']}")

    # Export as JSON
    print("\nπŸ“„ Sample Event JSON:")
    print(json.dumps(vcp.export_event(events[0]), indent=2))
Enter fullscreen mode Exit fullscreen mode

πŸ§ͺ Testing Your Implementation

VCP v1.1 defines critical conformance tests:

import pytest

class TestVCPConformance:
    """VCP v1.1 Conformance Test Suite"""

    def test_event_hash_calculation(self, vcp_logger):
        """SCH-001: Event structure validation"""
        event = vcp_logger.log_event("TEST", {"data": "value"})
        assert "EventHash" in event.security
        assert len(event.security["EventHash"]) == 64  # SHA-256 hex

    def test_uuid_v7_format(self, vcp_logger):
        """UID-001: UUID v7 format validation"""
        event = vcp_logger.log_event("TEST", {})
        event_id = event.header["EventID"]

        # UUID v7 has version nibble = 7
        version = int(event_id[14], 16)
        assert version == 7

    def test_merkle_tree_construction(self, vcp_logger):
        """MKL-001: Merkle tree construction"""
        for _ in range(5):
            vcp_logger.log_event("TEST", {"i": _})

        anchor = vcp_logger.anchor_batch()
        assert anchor.merkle_root is not None
        assert len(anchor.merkle_root) == 64

    def test_merkle_proof_verification(self, vcp_logger):
        """MKL-002: Merkle proof verification"""
        events = [vcp_logger.log_event("TEST", {"i": i}) for i in range(4)]
        anchor = vcp_logger.anchor_batch()

        for event in events:
            result = vcp_logger.verify_event(event)
            assert result["checks"]["merkle_proof"] is True

    def test_external_anchor_present(self, vcp_logger):
        """ANC-001: External anchor presence"""
        vcp_logger.log_event("TEST", {})
        anchor = vcp_logger.anchor_batch()

        assert anchor.anchor_target is not None
        assert "Type" in anchor.anchor_target
        assert "Identifier" in anchor.anchor_target

    def test_signature_verification(self, vcp_logger):
        """SIG-001: Signature algorithm compliance"""
        event = vcp_logger.log_event("TEST", {})
        result = vcp_logger.verify_event(event)

        assert result["checks"]["signature"] is True
        assert event.security["SignAlgo"] == "ED25519"
Enter fullscreen mode Exit fullscreen mode

Run with: pytest test_vcp.py -v


πŸ“Š VCP Compliance Tiers Comparison

Feature Silver Gold Platinum
Target Retail, MT4/MT5 Prop firms HFT, Exchanges
Clock Sync Best-effort NTP (<1ms) PTPv2 (<1ΞΌs)
Anchor Frequency 24 hours 1 hour 10 minutes
Timestamp Precision MILLISECOND MICROSECOND NANOSECOND
Signature Ed25519 (delegated) Ed25519 (client) Ed25519 (HSM)
Serialization JSON JSON SBE
Throughput ~1K events/sec ~100K events/sec >1M events/sec

πŸš€ Quick Start Checklist

Ready to implement VCP v1.1? Here's your checklist:

  • [ ] Generate Ed25519 keypair (use nacl or cryptography library)
  • [ ] Define your PolicyID (unique identifier for your implementation)
  • [ ] Implement EventHash calculation (RFC 8785 canonicalization + SHA-256)
  • [ ] Implement Merkle tree (RFC 6962 with domain separation)
  • [ ] Set up external anchoring (OpenTimestamps for free, TSA for production)
  • [ ] Add daily anchor cron job (or more frequent for Gold/Platinum)
  • [ ] Run conformance tests (MKL-001, MKL-002, ANC-001, SIG-001)
  • [ ] Document your Policy (issuer, tier, verification depth)

πŸ“š Resources


🎯 Summary

Traditional audit logs are trust-based. VCP v1.1 makes them verification-based.

The three-layer architecture provides:

  1. Event Integrity (Layer 1): Individual events can't be modified
  2. Collection Integrity (Layer 2): Events can't be deleted
  3. External Verifiability (Layer 3): Anyone can verify without trusting you

With EU AI Act coming into force and CEN-CENELEC standards specifying "tamper-resistant logs," this isn't optional anymore. Start with Silver tier, scale to Platinum as needed.

The code is open. The spec is free. The deadline is 2026.


Have questions? Drop a comment below or open an issue on the VCP GitHub repo.


Tags: #cryptography #trading #python #fintech #compliance #merkletree #blockchain #security #opensource #euaiact

Top comments (0)