DEV Community

Cover image for Building AI's Flight Recorder: Cryptographic Audit Trails for Algorithmic Trading Under the EU AI Act

Building AI's Flight Recorder: Cryptographic Audit Trails for Algorithmic Trading Under the EU AI Act

When an aircraft crashes, investigators recover the flight data recorder—the "black box"—to reconstruct exactly what happened. Every input, every output, every system state is preserved with cryptographic precision. The recorder doesn't care what the pilot claims happened; it records what actually happened.

AI-driven trading systems need the same thing. And as of November 2025, the European Commission's Digital Omnibus proposal has made clear that they're going to require it.

This article walks through implementing the VeritasChain Protocol (VCP) v1.1—an open standard for cryptographic audit trails in algorithmic trading. We'll cover:

  • Why traditional logging fails the "Verify, Don't Trust" test
  • VCP's three-layer integrity architecture
  • Implementing Merkle trees for batch completeness
  • External anchoring strategies
  • Real code examples in Python and MQL5

Let's build an AI flight recorder.


Table of Contents

  1. The Problem: Why Traditional Logs Aren't Enough
  2. EU AI Act Article 12: What's Actually Required
  3. VCP's Three-Layer Architecture
  4. Layer 1: Event Integrity
  5. Layer 2: Collection Integrity (Merkle Trees)
  6. Layer 3: External Verifiability
  7. Implementation: Python SDK
  8. Implementation: MQL5 Bridge
  9. External Anchoring Strategies
  10. Compliance Tier Selection
  11. Putting It All Together

The Problem: Why Traditional Logs Aren't Enough {#the-problem}

Here's a standard trading log entry:

{
  "timestamp": "2025-01-14T10:30:00.123Z",
  "event": "ORDER",
  "symbol": "EURUSD",
  "side": "BUY",
  "quantity": 100000,
  "price": 1.0925
}
Enter fullscreen mode Exit fullscreen mode

Looks reasonable, right? Now consider these questions:

  1. How do you know this log wasn't modified after the fact?
  2. How do you prove no events were deleted?
  3. Can a third party verify this without trusting your database?

Traditional logging answers none of these. The database admin can modify records. The compliance team can delete inconvenient entries. When a regulator asks "prove your AI made this decision at this time with these inputs," you have... assertions.

The 2024-2025 prop trading crisis demonstrated this catastrophically. When firms collapsed and traders demanded their payouts, the firms' logs conveniently showed different execution prices than traders remembered. With no cryptographic proof, disputes became he-said-she-said arguments.

The fundamental problem: Traditional logs require trusting the log producer. VCP eliminates that requirement.


EU AI Act Article 12: What's Actually Required {#eu-ai-act-article-12}

The EU AI Act's Article 12 mandates that high-risk AI systems must have:

"...logging capabilities that enable the automatic recording of events ('logs') over the duration of the lifetime of the system."

But it's the implementation requirements that get interesting:

Requirement What It Means
Automatic recording No manual intervention in the logging path
Traceability to identify risks Reconstruct any decision chain
Facilitate post-market monitoring Continuous, not just at audit time
Enable verification Third parties can validate, not just view

The upcoming prEN ISO/IEC 24970 (AI System Logging) standard adds:

  • Timestamped events
  • Traceability markers
  • Data integrity checks
  • Interpretability for authorized reviewers

Notice what's missing from both: how to implement these requirements. That's where VCP comes in.


VCP's Three-Layer Architecture {#three-layer-architecture}

VCP v1.1 introduces a clean separation of concerns across three layers:

┌─────────────────────────────────────────────────────────────────────┐
│                                                                     │
│  LAYER 3: External Verifiability                                    │
│  ─────────────────────────────────                                  │
│  Purpose: Third-party verification without trusting the producer    │
│                                                                     │
│  Components:                                                        │
│  ├─ Digital Signature (Ed25519): REQUIRED                          │
│  ├─ Timestamp (dual format): REQUIRED                               │
│  └─ External Anchor (Blockchain/TSA): REQUIRED                      │
│                                                                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  LAYER 2: Collection Integrity                                      │
│  ──────────────────────────────                                     │
│  Purpose: Prove completeness of event batches                       │
│                                                                     │
│  Components:                                                        │
│  ├─ Merkle Tree (RFC 6962): REQUIRED                               │
│  ├─ Merkle Root: REQUIRED                                          │
│  └─ Audit Path: REQUIRED                                           │
│                                                                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  LAYER 1: Event Integrity                                           │
│  ────────────────────────                                           │
│  Purpose: Individual event completeness                             │
│                                                                     │
│  Components:                                                        │
│  ├─ EventHash (SHA-256): REQUIRED                                  │
│  └─ PrevHash (chain link): OPTIONAL                                │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Each layer builds on the previous:

  • Layer 1 ensures individual events haven't been modified
  • Layer 2 ensures no events were omitted from a batch
  • Layer 3 ensures the batch existed at a specific time, verified by external parties

Let's implement each layer.


Layer 1: Event Integrity {#layer-1-event-integrity}

Every VCP event needs a cryptographic fingerprint. If any byte changes, the fingerprint changes completely.

Event Hash Calculation

import hashlib
import json
from typing import Any

def canonicalize_json(obj: Any) -> str:
    """
    RFC 8785 JSON Canonicalization Scheme (JCS)

    Ensures identical objects produce identical strings regardless
    of key ordering or whitespace.
    """
    return json.dumps(
        obj,
        separators=(',', ':'),  # No whitespace
        sort_keys=True,          # Deterministic key order
        ensure_ascii=False       # UTF-8 support
    )


def calculate_event_hash(
    header: dict, 
    payload: dict, 
    algo: str = "SHA256"
) -> str:
    """
    Calculate VCP event hash.

    Args:
        header: VCP header containing EventID, EventType, Timestamp
        payload: Event-specific data (trade details, signals, etc.)
        algo: Hash algorithm (SHA256, SHA3_256, BLAKE3)

    Returns:
        Hex-encoded hash string

    Example:
        >>> header = {
        ...     "EventID": "019abc12-3456-7890-abcd-ef1234567890",
        ...     "EventType": "ORD",
        ...     "Timestamp": 1736848200123456
        ... }
        >>> payload = {"Symbol": "EURUSD", "Side": "BUY", "Quantity": 100000}
        >>> calculate_event_hash(header, payload)
        'a7f3d2b1c4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1'
    """
    # Step 1: Canonicalize both components
    canonical_header = canonicalize_json(header)
    canonical_payload = canonicalize_json(payload)

    # Step 2: Concatenate
    hash_input = (canonical_header + canonical_payload).encode('utf-8')

    # Step 3: Apply hash function
    if algo == "SHA256":
        return hashlib.sha256(hash_input).hexdigest()
    elif algo == "SHA3_256":
        return hashlib.sha3_256(hash_input).hexdigest()
    else:
        raise ValueError(f"Unsupported algorithm: {algo}")
Enter fullscreen mode Exit fullscreen mode

Optional: Hash Chain Linking

For real-time tamper detection (useful in HFT), you can chain events:

class VCPEventChain:
    """
    Hash chain for real-time tamper detection.

    OPTIONAL in VCP v1.1, but recommended for HFT systems.
    """

    def __init__(self):
        self.prev_hash = "0" * 64  # Genesis hash
        self.events = []

    def add_event(self, header: dict, payload: dict) -> dict:
        """Add event to chain, computing linked hash."""

        # Include previous hash in calculation
        canonical_header = canonicalize_json(header)
        canonical_payload = canonicalize_json(payload)

        hash_input = (
            canonical_header + 
            canonical_payload + 
            self.prev_hash
        ).encode('utf-8')

        event_hash = hashlib.sha256(hash_input).hexdigest()

        # Create complete VCP event
        vcp_event = {
            "Header": {
                **header,
                "EventHash": event_hash,
                "PrevHash": self.prev_hash,
                "HashAlgo": "SHA256"
            },
            "Payload": payload
        }

        # Update chain state
        self.prev_hash = event_hash
        self.events.append(vcp_event)

        return vcp_event

    def verify_chain(self) -> bool:
        """Verify entire chain integrity."""
        expected_prev = "0" * 64

        for event in self.events:
            header = {k: v for k, v in event["Header"].items() 
                     if k not in ["EventHash", "PrevHash", "HashAlgo"]}

            # Recalculate hash
            hash_input = (
                canonicalize_json(header) +
                canonicalize_json(event["Payload"]) +
                expected_prev
            ).encode('utf-8')

            calculated = hashlib.sha256(hash_input).hexdigest()

            if calculated != event["Header"]["EventHash"]:
                return False
            if event["Header"]["PrevHash"] != expected_prev:
                return False

            expected_prev = calculated

        return True
Enter fullscreen mode Exit fullscreen mode

Why PrevHash is Optional in v1.1

VCP v1.0 required hash chains. v1.1 made them optional because:

  1. Layer 2 + Layer 3 provides equivalent guarantees for batch-level integrity
  2. Simplifies Silver tier implementations (MT4/MT5 DLLs are complex enough)
  3. Backtesting scenarios generate events out of order

Use hash chains when you need real-time detection (HFT). Skip them when batch-level verification is sufficient.


Layer 2: Collection Integrity (Merkle Trees) {#layer-2-collection-integrity}

Individual event hashes prove events weren't modified. Merkle trees prove the collection is complete—no events were added or removed.

RFC 6962 Compliant Merkle Tree

VCP mandates RFC 6962 compliance to prevent second-preimage attacks:

from typing import List, Tuple
import hashlib

class VCPMerkleTree:
    """
    RFC 6962 compliant Merkle tree implementation.

    Domain separation prevents second-preimage attacks:
    - Leaf nodes: 0x00 prefix
    - Internal nodes: 0x01 prefix
    """

    LEAF_PREFIX = b'\x00'
    NODE_PREFIX = b'\x01'

    def __init__(self, event_hashes: List[str]):
        """
        Build Merkle tree from event hashes.

        Args:
            event_hashes: List of hex-encoded event hashes
        """
        if not event_hashes:
            raise ValueError("Cannot build tree from empty list")

        self.leaves = [bytes.fromhex(h) for h in event_hashes]
        self.tree = self._build_tree()
        self.root = self.tree[-1][0]

    def _hash_leaf(self, data: bytes) -> bytes:
        """Hash a leaf node with 0x00 prefix."""
        return hashlib.sha256(self.LEAF_PREFIX + data).digest()

    def _hash_node(self, left: bytes, right: bytes) -> bytes:
        """Hash an internal node with 0x01 prefix."""
        return hashlib.sha256(self.NODE_PREFIX + left + right).digest()

    def _build_tree(self) -> List[List[bytes]]:
        """Build complete Merkle tree, returning all levels."""
        tree = []

        # Level 0: Leaf hashes
        current_level = [self._hash_leaf(leaf) for leaf in self.leaves]
        tree.append(current_level)

        # Build up to root
        while len(current_level) > 1:
            next_level = []

            for i in range(0, len(current_level), 2):
                left = current_level[i]
                # If odd number, duplicate last node
                right = current_level[i + 1] if i + 1 < len(current_level) else left
                next_level.append(self._hash_node(left, right))

            tree.append(next_level)
            current_level = next_level

        return tree

    def get_root(self) -> str:
        """Get Merkle root as hex string."""
        return self.root.hex()

    def get_proof(self, index: int) -> List[Tuple[str, str]]:
        """
        Get inclusion proof for event at index.

        Returns:
            List of (hash, position) tuples where position is 'L' or 'R'
        """
        if index < 0 or index >= len(self.leaves):
            raise IndexError(f"Index {index} out of range")

        proof = []
        current_index = index

        for level in self.tree[:-1]:  # Exclude root level
            # Determine sibling
            if current_index % 2 == 0:
                # We're on the left, sibling is on right
                sibling_index = current_index + 1
                position = 'R'
            else:
                # We're on the right, sibling is on left
                sibling_index = current_index - 1
                position = 'L'

            # Handle edge case where sibling doesn't exist
            if sibling_index < len(level):
                proof.append((level[sibling_index].hex(), position))
            else:
                proof.append((level[current_index].hex(), position))

            current_index //= 2

        return proof

    @staticmethod
    def verify_proof(
        event_hash: str,
        proof: List[Tuple[str, str]],
        root: str
    ) -> bool:
        """
        Verify an inclusion proof.

        Args:
            event_hash: Hash of the event to verify
            proof: List of (sibling_hash, position) tuples
            root: Expected Merkle root

        Returns:
            True if event is included in tree with given root
        """
        current = hashlib.sha256(
            VCPMerkleTree.LEAF_PREFIX + bytes.fromhex(event_hash)
        ).digest()

        for sibling_hex, position in proof:
            sibling = bytes.fromhex(sibling_hex)

            if position == 'L':
                current = hashlib.sha256(
                    VCPMerkleTree.NODE_PREFIX + sibling + current
                ).digest()
            else:
                current = hashlib.sha256(
                    VCPMerkleTree.NODE_PREFIX + current + sibling
                ).digest()

        return current.hex() == root
Enter fullscreen mode Exit fullscreen mode

Usage Example

# Simulate a batch of trading events
event_hashes = [
    calculate_event_hash(
        {"EventID": f"event-{i}", "EventType": "ORD", "Timestamp": 1736848200000000 + i},
        {"Symbol": "EURUSD", "Side": "BUY", "Quantity": 100000}
    )
    for i in range(100)
]

# Build Merkle tree
tree = VCPMerkleTree(event_hashes)
print(f"Merkle Root: {tree.get_root()}")

# Generate proof for event 42
proof = tree.get_proof(42)
print(f"Proof for event 42: {len(proof)} nodes")

# Verify the proof
is_valid = VCPMerkleTree.verify_proof(
    event_hashes[42],
    proof,
    tree.get_root()
)
print(f"Proof valid: {is_valid}")

# Try to verify a fake event
fake_hash = "0" * 64
is_fake_valid = VCPMerkleTree.verify_proof(fake_hash, proof, tree.get_root())
print(f"Fake proof valid: {is_fake_valid}")  # False
Enter fullscreen mode Exit fullscreen mode

Why Merkle Trees Matter for Compliance

A Merkle root is a single 32-byte value that represents an entire batch of events. If ANY event is added, removed, or modified, the root changes completely.

This means:

  1. Store the root externally (Layer 3)
  2. Keep the events locally
  3. Anyone can verify any event was in the batch by checking the inclusion proof against the anchored root

The regulator doesn't need access to your database. They just need the proof.


Layer 3: External Verifiability {#layer-3-external-verifiability}

Layers 1 and 2 can be computed locally. But a malicious operator could simply recompute them after modifying events. Layer 3 solves this by anchoring Merkle roots to external systems that the operator doesn't control.

Digital Signatures

Every VCP event batch must be signed:

from nacl.signing import SigningKey, VerifyKey
from nacl.encoding import HexEncoder
import time

class VCPSigner:
    """
    Ed25519 digital signatures for VCP events.

    Ed25519 is VCP's default because:
    - Fast: ~71,000 signatures/second on commodity hardware
    - Small: 64-byte signatures
    - Secure: 128-bit security level
    """

    def __init__(self, private_key_hex: str = None):
        if private_key_hex:
            self.signing_key = SigningKey(
                bytes.fromhex(private_key_hex)
            )
        else:
            self.signing_key = SigningKey.generate()

        self.verify_key = self.signing_key.verify_key

    def get_public_key(self) -> str:
        """Get public key for verification."""
        return self.verify_key.encode(encoder=HexEncoder).decode()

    def sign_merkle_root(
        self, 
        merkle_root: str,
        timestamp: int = None
    ) -> dict:
        """
        Sign a Merkle root with timestamp.

        Args:
            merkle_root: Hex-encoded Merkle root
            timestamp: Unix timestamp in microseconds (auto-generated if None)

        Returns:
            Signed anchor record
        """
        timestamp = timestamp or int(time.time() * 1_000_000)

        # Create message to sign
        message = f"{merkle_root}:{timestamp}".encode('utf-8')

        # Sign
        signed = self.signing_key.sign(message)

        return {
            "MerkleRoot": merkle_root,
            "Timestamp": timestamp,
            "TimestampISO": self._format_timestamp(timestamp),
            "Signature": signed.signature.hex(),
            "SignerPublicKey": self.get_public_key(),
            "SignAlgo": "ED25519"
        }

    @staticmethod
    def verify_signature(anchor_record: dict) -> bool:
        """Verify a signed anchor record."""
        try:
            verify_key = VerifyKey(
                bytes.fromhex(anchor_record["SignerPublicKey"])
            )

            message = (
                f"{anchor_record['MerkleRoot']}:"
                f"{anchor_record['Timestamp']}"
            ).encode('utf-8')

            signature = bytes.fromhex(anchor_record["Signature"])

            verify_key.verify(message, signature)
            return True
        except Exception:
            return False

    def _format_timestamp(self, ts_micros: int) -> str:
        """Format microsecond timestamp as ISO 8601."""
        from datetime import datetime, timezone
        dt = datetime.fromtimestamp(ts_micros / 1_000_000, tz=timezone.utc)
        return dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
Enter fullscreen mode Exit fullscreen mode

External Anchoring

The signature proves who created the anchor. External anchoring proves when it existed.

Option 1: OpenTimestamps (Free, Decentralized)

import subprocess
import tempfile
import os

class OpenTimestampsAnchor:
    """
    Anchor Merkle roots to Bitcoin blockchain via OpenTimestamps.

    Free, decentralized, but confirmation takes ~1-2 hours.
    Suitable for Silver tier (daily anchoring).
    """

    def __init__(self):
        # Check OTS is installed
        try:
            subprocess.run(
                ["ots", "--version"], 
                capture_output=True, 
                check=True
            )
        except FileNotFoundError:
            raise RuntimeError(
                "OpenTimestamps CLI not found. "
                "Install: pip install opentimestamps-client"
            )

    def create_timestamp(self, merkle_root: str) -> bytes:
        """
        Create OpenTimestamps proof for a Merkle root.

        Returns:
            OTS proof file contents (binary)
        """
        # Write Merkle root to temp file
        with tempfile.NamedTemporaryFile(
            mode='w', 
            suffix='.txt', 
            delete=False
        ) as f:
            f.write(merkle_root)
            temp_path = f.name

        try:
            # Create timestamp
            subprocess.run(
                ["ots", "stamp", temp_path],
                capture_output=True,
                check=True
            )

            # Read proof file
            proof_path = temp_path + ".ots"
            with open(proof_path, 'rb') as f:
                proof = f.read()

            os.unlink(proof_path)
            return proof
        finally:
            os.unlink(temp_path)

    def verify_timestamp(self, merkle_root: str, proof: bytes) -> dict:
        """
        Verify an OpenTimestamps proof.

        Returns:
            Verification result including Bitcoin block info
        """
        with tempfile.NamedTemporaryFile(
            mode='w', 
            suffix='.txt', 
            delete=False
        ) as f:
            f.write(merkle_root)
            temp_path = f.name

        proof_path = temp_path + ".ots"
        with open(proof_path, 'wb') as f:
            f.write(proof)

        try:
            result = subprocess.run(
                ["ots", "verify", proof_path],
                capture_output=True,
                text=True
            )

            return {
                "verified": result.returncode == 0,
                "output": result.stdout,
                "error": result.stderr if result.returncode != 0 else None
            }
        finally:
            os.unlink(temp_path)
            os.unlink(proof_path)
Enter fullscreen mode Exit fullscreen mode

Option 2: RFC 3161 Timestamping Authority

import requests
from asn1crypto import tsp, cms
import hashlib

class RFC3161Anchor:
    """
    Anchor Merkle roots via RFC 3161 Time Stamp Authority.

    Faster than blockchain (seconds), legally recognized in EU.
    Suitable for Gold tier (hourly anchoring).
    """

    # Free TSA servers (for development/testing)
    FREE_TSA_URLS = [
        "http://timestamp.digicert.com",
        "http://timestamp.globalsign.com/tsa/r6advanced1",
        "http://tsa.starfieldtech.com",
    ]

    def __init__(self, tsa_url: str = None):
        self.tsa_url = tsa_url or self.FREE_TSA_URLS[0]

    def create_timestamp(self, merkle_root: str) -> bytes:
        """
        Request RFC 3161 timestamp for Merkle root.

        Returns:
            DER-encoded timestamp response
        """
        # Create timestamp request
        hash_bytes = bytes.fromhex(merkle_root)

        tsp_request = tsp.TimeStampReq({
            'version': 1,
            'message_imprint': {
                'hash_algorithm': {'algorithm': 'sha256'},
                'hashed_message': hash_bytes
            },
            'cert_req': True
        })

        # Send request
        response = requests.post(
            self.tsa_url,
            data=tsp_request.dump(),
            headers={'Content-Type': 'application/timestamp-query'}
        )

        if response.status_code != 200:
            raise RuntimeError(f"TSA request failed: {response.status_code}")

        return response.content

    def parse_timestamp(self, response: bytes) -> dict:
        """
        Parse RFC 3161 timestamp response.

        Returns:
            Parsed timestamp info including time and TSA identity
        """
        tsp_response = tsp.TimeStampResp.load(response)

        if tsp_response['status']['status'].native != 'granted':
            raise RuntimeError(
                f"Timestamp not granted: "
                f"{tsp_response['status']['status'].native}"
            )

        token = tsp_response['time_stamp_token']
        signed_data = cms.ContentInfo.load(token.dump())['content']
        tst_info = tsp.TSTInfo.load(
            signed_data['encap_content_info']['content'].native
        )

        return {
            "timestamp": tst_info['gen_time'].native.isoformat(),
            "serial_number": tst_info['serial_number'].native,
            "tsa_name": str(tst_info['tsa']) if tst_info['tsa'] else None,
            "hash_algorithm": tst_info['message_imprint']['hash_algorithm']['algorithm'].native
        }
Enter fullscreen mode Exit fullscreen mode

Option 3: AWS QLDB (Enterprise)

import boto3
from amazon.ion import simpleion

class QLDBAnchor:
    """
    Anchor Merkle roots to AWS Quantum Ledger Database.

    ACID-compliant, immutable, cryptographically verifiable.
    Suitable for Platinum tier (10-minute anchoring).
    """

    def __init__(self, ledger_name: str, region: str = "us-east-1"):
        self.ledger_name = ledger_name
        self.client = boto3.client('qldb-session', region_name=region)

    def anchor(self, merkle_root: str, metadata: dict) -> dict:
        """
        Anchor Merkle root to QLDB.

        Returns:
            QLDB document ID and commit digest
        """
        from pyqldb.driver.qldb_driver import QldbDriver

        driver = QldbDriver(self.ledger_name)

        def insert_anchor(transaction_executor):
            transaction_executor.execute_statement(
                "INSERT INTO VCPAnchors ?",
                {
                    "merkle_root": merkle_root,
                    "timestamp": metadata.get("timestamp"),
                    "policy_id": metadata.get("policy_id"),
                    "event_count": metadata.get("event_count")
                }
            )

        result = driver.execute_lambda(insert_anchor)

        # Get commit digest for verification
        digest = self._get_digest()

        return {
            "ledger": self.ledger_name,
            "document_id": result.get("documentId"),
            "commit_digest": digest
        }

    def _get_digest(self) -> str:
        """Get current ledger digest for verification."""
        qldb = boto3.client('qldb')
        response = qldb.get_digest(Name=self.ledger_name)
        return response['Digest'].hex()
Enter fullscreen mode Exit fullscreen mode

Implementation: Python SDK {#implementation-python}

Let's put all three layers together into a complete VCP implementation:

"""
vcp_core.py - Complete VCP v1.1 implementation
"""

from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import uuid
import time
import json
import hashlib
from nacl.signing import SigningKey


class ConformanceTier(Enum):
    SILVER = "SILVER"
    GOLD = "GOLD"
    PLATINUM = "PLATINUM"


class EventType(Enum):
    SIG = "SIG"  # Signal generated
    ORD = "ORD"  # Order sent
    ACK = "ACK"  # Order acknowledged
    REJ = "REJ"  # Order rejected
    EXE = "EXE"  # Execution (fill)
    CXL = "CXL"  # Cancellation
    ERR_RISK = "ERR_RISK"  # Risk limit breach
    ERR_CONN = "ERR_CONN"  # Connection error


@dataclass
class VCPEvent:
    """A single VCP event."""

    event_id: str
    event_type: EventType
    timestamp: int  # Microseconds since epoch
    account_id: str
    payload: Dict[str, Any]

    # Layer 1
    event_hash: str = ""
    prev_hash: str = ""

    # Policy
    policy_id: str = ""
    conformance_tier: ConformanceTier = ConformanceTier.SILVER

    def __post_init__(self):
        if not self.event_id:
            self.event_id = str(uuid.uuid7())
        if not self.timestamp:
            self.timestamp = int(time.time() * 1_000_000)

    def to_dict(self) -> dict:
        return {
            "Header": {
                "EventID": self.event_id,
                "EventType": self.event_type.value,
                "Timestamp": self.timestamp,
                "AccountID": self.account_id,
                "EventHash": self.event_hash,
                "PrevHash": self.prev_hash,
                "HashAlgo": "SHA256"
            },
            "PolicyIdentification": {
                "PolicyID": self.policy_id,
                "ConformanceTier": self.conformance_tier.value,
                "Version": "1.1"
            },
            "Payload": self.payload
        }


class VCPLogger:
    """
    Complete VCP v1.1 logging implementation.

    Usage:
        logger = VCPLogger(
            policy_id="org.example.trading:algo-001",
            tier=ConformanceTier.GOLD
        )

        # Log events
        logger.log_signal("EURUSD", "BUY", 100000, 1.0925)
        logger.log_order("ORD-001", "EURUSD", "BUY", 100000, 1.0925)
        logger.log_execution("ORD-001", "EXE-001", 50000, 1.0925)

        # Create anchor batch
        anchor = logger.create_anchor_batch()
        print(f"Merkle Root: {anchor['merkle_root']}")
    """

    def __init__(
        self,
        policy_id: str,
        tier: ConformanceTier = ConformanceTier.SILVER,
        private_key: str = None,
        use_hash_chain: bool = False
    ):
        self.policy_id = policy_id
        self.tier = tier
        self.use_hash_chain = use_hash_chain

        # Initialize signer
        if private_key:
            self.signing_key = SigningKey(bytes.fromhex(private_key))
        else:
            self.signing_key = SigningKey.generate()

        # Event storage
        self.events: List[VCPEvent] = []
        self.prev_hash = "0" * 64

        # Batch tracking
        self.last_anchor_index = 0

    def _calculate_hash(self, event: VCPEvent) -> str:
        """Calculate event hash (Layer 1)."""
        header = {
            "EventID": event.event_id,
            "EventType": event.event_type.value,
            "Timestamp": event.timestamp,
            "AccountID": event.account_id
        }

        canonical = json.dumps(
            {"header": header, "payload": event.payload},
            separators=(',', ':'),
            sort_keys=True
        )

        if self.use_hash_chain:
            canonical += self.prev_hash

        return hashlib.sha256(canonical.encode()).hexdigest()

    def log_event(
        self,
        event_type: EventType,
        account_id: str,
        payload: Dict[str, Any]
    ) -> VCPEvent:
        """Log a generic VCP event."""
        event = VCPEvent(
            event_id="",  # Auto-generated
            event_type=event_type,
            timestamp=0,  # Auto-generated
            account_id=account_id,
            payload=payload,
            policy_id=self.policy_id,
            conformance_tier=self.tier
        )

        # Calculate hash
        event.event_hash = self._calculate_hash(event)

        if self.use_hash_chain:
            event.prev_hash = self.prev_hash
            self.prev_hash = event.event_hash

        self.events.append(event)
        return event

    # Convenience methods for common event types

    def log_signal(
        self,
        symbol: str,
        side: str,
        quantity: float,
        price: float,
        account_id: str = "default",
        **kwargs
    ) -> VCPEvent:
        """Log a trading signal."""
        return self.log_event(
            EventType.SIG,
            account_id,
            {
                "Symbol": symbol,
                "Side": side,
                "Quantity": quantity,
                "Price": price,
                **kwargs
            }
        )

    def log_order(
        self,
        order_id: str,
        symbol: str,
        side: str,
        quantity: float,
        price: float,
        account_id: str = "default",
        **kwargs
    ) -> VCPEvent:
        """Log an order submission."""
        return self.log_event(
            EventType.ORD,
            account_id,
            {
                "OrderID": order_id,
                "Symbol": symbol,
                "Side": side,
                "Quantity": quantity,
                "Price": price,
                **kwargs
            }
        )

    def log_execution(
        self,
        order_id: str,
        execution_id: str,
        quantity: float,
        price: float,
        account_id: str = "default",
        **kwargs
    ) -> VCPEvent:
        """Log a trade execution."""
        return self.log_event(
            EventType.EXE,
            account_id,
            {
                "OrderID": order_id,
                "ExecutionID": execution_id,
                "Quantity": quantity,
                "Price": price,
                **kwargs
            }
        )

    def log_risk_breach(
        self,
        breach_type: str,
        limit_value: float,
        actual_value: float,
        account_id: str = "default",
        **kwargs
    ) -> VCPEvent:
        """Log a risk limit breach."""
        return self.log_event(
            EventType.ERR_RISK,
            account_id,
            {
                "BreachType": breach_type,
                "LimitValue": limit_value,
                "ActualValue": actual_value,
                "Severity": "CRITICAL",
                **kwargs
            }
        )

    def create_anchor_batch(self) -> dict:
        """
        Create a signed anchor batch (Layers 2 & 3).

        Returns:
            Anchor record ready for external timestamping
        """
        # Get events since last anchor
        batch_events = self.events[self.last_anchor_index:]

        if not batch_events:
            raise ValueError("No new events to anchor")

        # Build Merkle tree (Layer 2)
        event_hashes = [e.event_hash for e in batch_events]
        tree = VCPMerkleTree(event_hashes)
        merkle_root = tree.get_root()

        # Sign (Layer 3)
        timestamp = int(time.time() * 1_000_000)
        message = f"{merkle_root}:{timestamp}".encode()
        signed = self.signing_key.sign(message)

        # Create anchor record
        anchor = {
            "version": "1.1",
            "policy_id": self.policy_id,
            "conformance_tier": self.tier.value,
            "merkle_root": merkle_root,
            "event_count": len(batch_events),
            "first_event_id": batch_events[0].event_id,
            "last_event_id": batch_events[-1].event_id,
            "timestamp": timestamp,
            "signature": signed.signature.hex(),
            "signer_public_key": self.signing_key.verify_key.encode().hex(),
            "sign_algo": "ED25519"
        }

        # Update tracking
        self.last_anchor_index = len(self.events)

        return anchor

    def get_inclusion_proof(self, event: VCPEvent) -> dict:
        """
        Get Merkle inclusion proof for an event.

        Use after create_anchor_batch() to generate proofs
        for regulatory submission.
        """
        # Find event index
        try:
            index = self.events.index(event)
        except ValueError:
            raise ValueError("Event not found in log")

        # Build tree and get proof
        event_hashes = [e.event_hash for e in self.events]
        tree = VCPMerkleTree(event_hashes)
        proof = tree.get_proof(index)

        return {
            "event_hash": event.event_hash,
            "merkle_root": tree.get_root(),
            "proof": proof,
            "index": index
        }
Enter fullscreen mode Exit fullscreen mode

Implementation: MQL5 Bridge {#implementation-mql5}

For MetaTrader 5 integration, VCP uses a sidecar architecture—the logging runs alongside MT5 without modifying the trading logic:

//+------------------------------------------------------------------+
//|                                           VCP_Bridge.mqh          |
//|                              VeritasChain Protocol v1.1           |
//+------------------------------------------------------------------+
#property copyright "VeritasChain Standards Organization"
#property link      "https://veritaschain.org"
#property version   "1.10"

// Import DLL functions
#import "vcp_sidecar.dll"
   int     VCP_Initialize(string policy_id, string tier);
   void    VCP_Shutdown();
   string  VCP_LogSignal(string symbol, string side, double qty, double price);
   string  VCP_LogOrder(string order_id, string symbol, string side, double qty, double price);
   string  VCP_LogExecution(string order_id, string exec_id, double qty, double price);
   string  VCP_LogError(string error_type, string message, string severity);
   int     VCP_CreateAnchor();
   string  VCP_GetMerkleRoot();
#import

//+------------------------------------------------------------------+
//| VCP Logger class for MQL5                                        |
//+------------------------------------------------------------------+
class CVCPLogger
{
private:
   bool           m_initialized;
   string         m_policy_id;
   string         m_tier;

public:
   //--- Constructor
   CVCPLogger(void) : m_initialized(false) {}

   //--- Destructor
   ~CVCPLogger(void)
   {
      if(m_initialized)
         Shutdown();
   }

   //--- Initialize VCP logging
   bool Initialize(string policy_id, string tier = "SILVER")
   {
      m_policy_id = policy_id;
      m_tier = tier;

      int result = VCP_Initialize(policy_id, tier);

      if(result == 0)
      {
         m_initialized = true;
         Print("[VCP] Initialized: ", policy_id, " (", tier, ")");
         return true;
      }

      Print("[VCP] Initialization failed: ", result);
      return false;
   }

   //--- Shutdown
   void Shutdown(void)
   {
      if(m_initialized)
      {
         VCP_Shutdown();
         m_initialized = false;
         Print("[VCP] Shutdown complete");
      }
   }

   //--- Log trading signal
   string LogSignal(string symbol, ENUM_ORDER_TYPE side, double quantity, double price)
   {
      if(!m_initialized)
         return "";

      string side_str = (side == ORDER_TYPE_BUY) ? "BUY" : "SELL";
      return VCP_LogSignal(symbol, side_str, quantity, price);
   }

   //--- Log order
   string LogOrder(ulong ticket, string symbol, ENUM_ORDER_TYPE side, double quantity, double price)
   {
      if(!m_initialized)
         return "";

      string order_id = IntegerToString(ticket);
      string side_str = (side == ORDER_TYPE_BUY) ? "BUY" : "SELL";
      return VCP_LogOrder(order_id, symbol, side_str, quantity, price);
   }

   //--- Log execution
   string LogExecution(ulong order_ticket, ulong deal_ticket, double quantity, double price)
   {
      if(!m_initialized)
         return "";

      string order_id = IntegerToString(order_ticket);
      string exec_id = IntegerToString(deal_ticket);
      return VCP_LogExecution(order_id, exec_id, quantity, price);
   }

   //--- Log risk breach
   string LogRiskBreach(string breach_type, string details)
   {
      if(!m_initialized)
         return "";

      return VCP_LogError("ERR_RISK", breach_type + ": " + details, "CRITICAL");
   }

   //--- Create anchor batch
   bool CreateAnchor(void)
   {
      if(!m_initialized)
         return false;

      int result = VCP_CreateAnchor();

      if(result == 0)
      {
         string root = VCP_GetMerkleRoot();
         Print("[VCP] Anchor created. Merkle Root: ", root);
         return true;
      }

      Print("[VCP] Anchor creation failed: ", result);
      return false;
   }
};

//+------------------------------------------------------------------+
//| Example EA using VCP                                             |
//+------------------------------------------------------------------+
/*
#include "VCP_Bridge.mqh"

CVCPLogger g_vcp;

int OnInit()
{
   // Initialize VCP with your policy ID
   if(!g_vcp.Initialize("org.mycompany.mt5:algo-001", "SILVER"))
   {
      Print("Failed to initialize VCP");
      return INIT_FAILED;
   }

   return INIT_SUCCEEDED;
}

void OnDeinit(const int reason)
{
   // Create final anchor before shutdown
   g_vcp.CreateAnchor();
   g_vcp.Shutdown();
}

void OnTick()
{
   // Your trading logic...

   // Log signal when generated
   if(signal_generated)
   {
      g_vcp.LogSignal(_Symbol, ORDER_TYPE_BUY, 0.1, Ask);
   }
}

void OnTradeTransaction(
   const MqlTradeTransaction& trans,
   const MqlTradeRequest& request,
   const MqlTradeResult& result
)
{
   // Log orders
   if(trans.type == TRADE_TRANSACTION_ORDER_ADD)
   {
      g_vcp.LogOrder(
         trans.order,
         trans.symbol,
         (ENUM_ORDER_TYPE)trans.order_type,
         trans.volume,
         trans.price
      );
   }

   // Log executions
   if(trans.type == TRADE_TRANSACTION_DEAL_ADD)
   {
      g_vcp.LogExecution(
         trans.order,
         trans.deal,
         trans.volume,
         trans.price
      );
   }
}
*/
Enter fullscreen mode Exit fullscreen mode

Sidecar Architecture Benefits

┌─────────────────────────────────────────────────────────────┐
│                     MetaTrader 5                            │
│  ┌─────────────────┐       ┌─────────────────────────────┐ │
│  │                 │       │                             │ │
│  │   Trading EA    │──────▶│      VCP_Bridge.mqh         │ │
│  │                 │       │                             │ │
│  └─────────────────┘       └──────────────┬──────────────┘ │
│                                           │                 │
└───────────────────────────────────────────│─────────────────┘
                                            │
                                            ▼
                               ┌─────────────────────────┐
                               │                         │
                               │   vcp_sidecar.dll       │
                               │   (Separate process)    │
                               │                         │
                               └────────────┬────────────┘
                                            │
                                            ▼
                               ┌─────────────────────────┐
                               │                         │
                               │   VCP Anchor Service    │
                               │   (Cloud/Local)         │
                               │                         │
                               └─────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Key benefits:

  1. Non-invasive: Doesn't modify MT5 internals
  2. Fail-safe: VCP failure doesn't crash trading
  3. Async-first: Logging is non-blocking
  4. Recoverable: Missed events can be replayed

Compliance Tier Selection {#compliance-tiers}

VCP defines three tiers based on your requirements:

Requirement Silver Gold Platinum
Target Use Development, retail trading Prop firms, institutional HFT, exchanges
Clock Sync System clock NTP (<1ms) PTP (<1μs)
Anchor Frequency 24 hours 1 hour 10 minutes
Throughput 1K events/sec 100K events/sec 1M+ events/sec
MiFID II RTS 25 ❌ Not compliant ✅ Compliant ✅ Exceeds
Recommended For Backtesting, demos Production trading Regulatory-critical

Choosing Your Tier

def recommend_tier(
    is_regulated: bool,
    events_per_second: int,
    requires_dispute_resolution: bool,
    trading_frequency: str  # "hft", "intraday", "swing"
) -> ConformanceTier:
    """
    Recommend appropriate VCP tier based on requirements.
    """

    # HFT always needs Platinum
    if trading_frequency == "hft":
        return ConformanceTier.PLATINUM

    # Regulated entities need at least Gold
    if is_regulated:
        if events_per_second > 100_000:
            return ConformanceTier.PLATINUM
        return ConformanceTier.GOLD

    # High-stakes dispute resolution needs Gold
    if requires_dispute_resolution:
        return ConformanceTier.GOLD

    # Development and testing
    return ConformanceTier.SILVER
Enter fullscreen mode Exit fullscreen mode

Putting It All Together {#putting-it-together}

Here's a complete example workflow:

from vcp_core import VCPLogger, ConformanceTier
from anchoring import RFC3161Anchor
import json

# 1. Initialize logger
logger = VCPLogger(
    policy_id="org.example.trading:eurusd-scalper-v2",
    tier=ConformanceTier.GOLD,
    use_hash_chain=True  # Enable for production
)

# 2. Simulate trading session
print("=== Trading Session ===\n")

# Log signal
signal = logger.log_signal(
    symbol="EURUSD",
    side="BUY",
    quantity=100000,
    price=1.0925,
    model_id="ml-model-v2.3",
    confidence=0.87
)
print(f"Signal logged: {signal.event_id[:8]}...")

# Log order
order = logger.log_order(
    order_id="ORD-2025-001234",
    symbol="EURUSD",
    side="BUY",
    quantity=100000,
    price=1.0925
)
print(f"Order logged: {order.event_id[:8]}...")

# Log execution
execution = logger.log_execution(
    order_id="ORD-2025-001234",
    execution_id="EXE-2025-001234",
    quantity=100000,
    price=1.0925
)
print(f"Execution logged: {execution.event_id[:8]}...")

# 3. Create anchor batch (Layer 2)
print("\n=== Creating Anchor ===\n")
anchor = logger.create_anchor_batch()
print(f"Merkle Root: {anchor['merkle_root']}")
print(f"Event Count: {anchor['event_count']}")
print(f"Signature: {anchor['signature'][:32]}...")

# 4. External timestamping (Layer 3)
print("\n=== External Anchoring ===\n")
tsa = RFC3161Anchor()
timestamp_proof = tsa.create_timestamp(anchor['merkle_root'])
timestamp_info = tsa.parse_timestamp(timestamp_proof)
print(f"Anchored at: {timestamp_info['timestamp']}")
print(f"TSA: {timestamp_info['tsa_name']}")

# 5. Generate inclusion proof for specific event
print("\n=== Inclusion Proof ===\n")
proof = logger.get_inclusion_proof(execution)
print(f"Event Hash: {proof['event_hash'][:16]}...")
print(f"Proof Path: {len(proof['proof'])} nodes")

# 6. Verify proof (as a third party would)
is_valid = VCPMerkleTree.verify_proof(
    proof['event_hash'],
    proof['proof'],
    anchor['merkle_root']
)
print(f"Proof Valid: {is_valid}")

# 7. Export for regulatory submission
print("\n=== Regulatory Export ===\n")
regulatory_package = {
    "anchor": anchor,
    "timestamp_proof": timestamp_proof.hex(),
    "timestamp_info": timestamp_info,
    "events": [e.to_dict() for e in logger.events],
    "proof_for_execution": proof
}

with open("regulatory_submission.json", "w") as f:
    json.dump(regulatory_package, f, indent=2)

print("Exported to regulatory_submission.json")
print("✅ Complete audit trail with third-party verifiable proofs")
Enter fullscreen mode Exit fullscreen mode

Output:

=== Trading Session ===

Signal logged: 019abc12...
Order logged: 019abc13...
Execution logged: 019abc14...

=== Creating Anchor ===

Merkle Root: a7f3d2b1c4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1
Event Count: 3
Signature: 3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e...

=== External Anchoring ===

Anchored at: 2025-01-14T10:30:00.000Z
TSA: DigiCert Timestamp Authority

=== Inclusion Proof ===

Event Hash: f2a3b4c5d6e7f8a9...
Proof Path: 2 nodes
Proof Valid: True

=== Regulatory Export ===

Exported to regulatory_submission.json
✅ Complete audit trail with third-party verifiable proofs
Enter fullscreen mode Exit fullscreen mode

What We've Built

Let's recap what VCP v1.1 provides:

Layer What It Proves How
Layer 1 Events weren't modified SHA-256 hashes
Layer 2 No events were omitted Merkle trees with inclusion proofs
Layer 3 Records existed at a specific time External anchoring + signatures

Together, these enable true third-party verification. A regulator doesn't need to trust your database. They can:

  1. Take your submitted Merkle root
  2. Verify it was anchored at a specific time (via TSA/blockchain)
  3. Verify any event you claim existed using inclusion proofs
  4. Detect if you've modified anything

This is the "Verify, Don't Trust" principle in action.


Next Steps

  1. Try the SDK: github.com/veritaschain/vcp-sdk-python
  2. Read the spec: VCP v1.1 Specification
  3. Join the discussion: IETF SCITT Working Group
  4. Get certified: VC-Certified Program

The EU AI Act's enforcement may be delayed to December 2027, but the firms that build verification infrastructure now will define the industry's standards.


Resources


Questions? Reach out: technical@veritaschain.org or open an issue on GitHub.

python #cryptography #fintech #regulation #opensource #ai #machinelearning #trading

Top comments (0)