DEV Community

Cover image for Building Cryptographic Audit Trails for AI Trading Systems: A Deep Dive into RFC 6962-Based Verification

Building Cryptographic Audit Trails for AI Trading Systems: A Deep Dive into RFC 6962-Based Verification

How we're applying Certificate Transparency architecture to solve the "trust but can't verify" problem in algorithmic trading


This is a technical deep-dive into the VeritasChain Protocol (VCP), an open standard for cryptographically verifiable audit trails. If you're building trading systems, RegTech tools, or any application where audit integrity matters, this architecture might be relevant to your work.


The Problem: Audit Logs That Can't Prove Their Own Integrity

Here's a scenario every developer working on regulated systems has encountered:

# The naive approach to audit logging
def log_trade_event(event: TradeEvent):
    timestamp = datetime.utcnow().isoformat()
    log_entry = {
        "timestamp": timestamp,
        "event_type": event.type,
        "order_id": event.order_id,
        "symbol": event.symbol,
        "quantity": event.quantity,
        "price": event.price
    }
    db.insert("audit_logs", log_entry)
    return log_entry
Enter fullscreen mode Exit fullscreen mode

This code has a fundamental problem that no amount of database security can fix: there's no way to prove the log is complete and unaltered.

Consider what an attacker (or a desperate compliance officer) could do:

  • Delete embarrassing entries directly from the database
  • Modify timestamps to hide latency issues
  • Insert backdated entries to cover gaps
  • Present different logs to different auditors

Traditional solutions—database triggers, write-once storage, access controls—all rely on trusting the infrastructure. But in a world of insider threats, compromised credentials, and sophisticated attacks, trust isn't enough.

We need verification.


Enter Certificate Transparency: Proven Cryptographic Audit

Before we dive into trading-specific solutions, let's look at a system that solved a similar problem at internet scale: Certificate Transparency (CT).

CT was created to solve a trust problem in the TLS certificate ecosystem. Certificate Authorities (CAs) were occasionally issuing fraudulent certificates, and there was no way to detect this until damage was done.

The solution? Append-only logs with cryptographic proofs.

RFC 6962: The Foundation

RFC 6962 defines the Certificate Transparency protocol. Its key innovations:

  1. Merkle Trees: A binary tree of hashes where the root hash commits to all entries
  2. Inclusion Proofs: Prove a specific entry exists in the log in O(log n) time/space
  3. Consistency Proofs: Prove a log only appended entries (never modified or deleted)
  4. Signed Tree Heads (STH): Periodic commitments by the log operator

Here's the crucial insight: you can verify log integrity without trusting the log operator.

                    [Root Hash]
                    /          \
            [Hash 0-1]        [Hash 2-3]
            /        \        /        \
        [Hash 0]  [Hash 1] [Hash 2]  [Hash 3]
           |         |        |         |
        Entry 0   Entry 1  Entry 2   Entry 3
Enter fullscreen mode Exit fullscreen mode

If someone modifies Entry 1, the root hash changes. If someone deletes Entry 2, the consistency proof fails. Mathematical certainty replaces institutional trust.


Adapting CT for Trading: The VeritasChain Protocol

The challenge: CT was designed for certificates, not trading events. We needed to:

  1. Define event schemas for trading lifecycles
  2. Handle microsecond-precision timing requirements
  3. Integrate with existing trading infrastructure (FIX, MT5, etc.)
  4. Support regulatory requirements (EU AI Act, MiFID II, MAR)
  5. Enable GDPR-compliant data deletion without breaking audit integrity

The result is VCP (VeritasChain Protocol), an open standard (CC BY 4.0) that applies CT principles to trading audit trails.

Three-Layer Architecture

┌─────────────────────────────────────────────────────────────┐
│  Layer 3: External Verifiability                            │
│  ┌─────────────────────────────────────────────────────────┐│
│  │  External Anchors (Blockchain, RFC 3161 TSA, Witnesses) ││
│  └─────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────┤
│  Layer 2: Collection Integrity (Batch Level)                │
│  ┌─────────────────────────────────────────────────────────┐│
│  │  Merkle Trees + prev_hash Chain + Signed Batch Headers  ││
│  └─────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────┤
│  Layer 1: Event Integrity (Per-Event)                       │
│  ┌─────────────────────────────────────────────────────────┐│
│  │  Canonical JSON (RFC 8785) + SHA-256 + Ed25519 Sigs     ││
│  └─────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Let's implement each layer.


Layer 1: Event Integrity

Every trading event becomes a cryptographically signed, content-addressed record.

Event Schema

from dataclasses import dataclass
from enum import Enum
from typing import Optional
import hashlib
import json
from datetime import datetime
import uuid

class EventType(Enum):
    SIG = "SIG"  # AI Signal Generation
    ORD = "ORD"  # Order Submission
    ACK = "ACK"  # Order Acknowledgment
    EXE = "EXE"  # Execution/Fill
    REJ = "REJ"  # Order Rejection
    CXL = "CXL"  # Order Cancellation
    MOD = "MOD"  # Order Modification

class ClockSyncStatus(Enum):
    PTP_LOCKED = "PTP_LOCKED"      # IEEE 1588 PTP synchronized
    NTP_SYNCED = "NTP_SYNCED"      # NTP synchronized
    FREE_RUNNING = "FREE_RUNNING"  # No external sync

@dataclass
class VCPEvent:
    event_id: str           # UUIDv7 (time-sortable)
    trace_id: str           # Correlation across related events
    event_type: EventType
    timestamp: str          # ISO 8601 with precision indicator
    clock_sync_status: ClockSyncStatus

    # Trading-specific fields
    symbol: Optional[str] = None
    order_id: Optional[str] = None
    quantity: Optional[float] = None
    price: Optional[float] = None
    side: Optional[str] = None

    # AI governance fields (EU AI Act compliance)
    model_hash: Optional[str] = None      # SHA-256 of model weights
    decision_factors: Optional[dict] = None
    confidence_score: Optional[float] = None
    operator_id: Optional[str] = None

    def to_canonical_json(self) -> str:
        """RFC 8785 canonical JSON serialization"""
        # Remove None values, sort keys, no whitespace
        data = {k: v for k, v in self.__dict__.items() 
                if v is not None}
        # Convert enums to strings
        if 'event_type' in data:
            data['event_type'] = data['event_type'].value
        if 'clock_sync_status' in data:
            data['clock_sync_status'] = data['clock_sync_status'].value
        return json.dumps(data, sort_keys=True, separators=(',', ':'))

    def compute_hash(self) -> str:
        """SHA-256 hash of canonical representation"""
        canonical = self.to_canonical_json()
        return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
Enter fullscreen mode Exit fullscreen mode

Why UUIDv7?

We use UUIDv7 (RFC 9562) for event IDs because they're:

  1. Time-sortable: The first 48 bits encode Unix timestamp in milliseconds
  2. Globally unique: No coordination required between distributed systems
  3. Database-friendly: Natural ordering improves index performance
import time
import os

def generate_uuidv7() -> str:
    """Generate a UUIDv7 with millisecond precision"""
    # 48 bits of Unix timestamp (milliseconds)
    timestamp_ms = int(time.time() * 1000)
    timestamp_bytes = timestamp_ms.to_bytes(6, 'big')

    # 4 bits version (7) + 12 bits random
    rand_a = os.urandom(2)
    rand_a_int = int.from_bytes(rand_a, 'big')
    rand_a_int = (rand_a_int & 0x0FFF) | 0x7000  # Version 7

    # 2 bits variant (10) + 62 bits random
    rand_b = os.urandom(8)
    rand_b_int = int.from_bytes(rand_b, 'big')
    rand_b_int = (rand_b_int & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000

    # Combine
    uuid_bytes = (
        timestamp_bytes + 
        rand_a_int.to_bytes(2, 'big') + 
        rand_b_int.to_bytes(8, 'big')
    )

    # Format as UUID string
    hex_str = uuid_bytes.hex()
    return f"{hex_str[:8]}-{hex_str[8:12]}-{hex_str[12:16]}-{hex_str[16:20]}-{hex_str[20:]}"
Enter fullscreen mode Exit fullscreen mode

Digital Signatures with Ed25519

Each event is signed using Ed25519, chosen for:

  • Fast signing (suitable for high-frequency events)
  • Small signatures (64 bytes)
  • No weak keys or implementation pitfalls
  • Deterministic signatures (same input always produces same output)
from nacl.signing import SigningKey, VerifyKey
from nacl.encoding import HexEncoder

class VCPSigner:
    def __init__(self, private_key_hex: Optional[str] = None):
        if private_key_hex:
            self.signing_key = SigningKey(
                bytes.fromhex(private_key_hex)
            )
        else:
            self.signing_key = SigningKey.generate()
        self.verify_key = self.signing_key.verify_key

    def sign_event(self, event: VCPEvent) -> dict:
        """Sign an event and return signed envelope"""
        canonical = event.to_canonical_json()
        event_hash = event.compute_hash()

        # Sign the hash, not the full content
        signed = self.signing_key.sign(
            bytes.fromhex(event_hash),
            encoder=HexEncoder
        )

        return {
            "event": json.loads(canonical),
            "hash": event_hash,
            "signature": signed.signature.decode('utf-8'),
            "public_key": self.verify_key.encode(
                encoder=HexEncoder
            ).decode('utf-8')
        }

    def verify_signature(self, signed_event: dict) -> bool:
        """Verify a signed event"""
        try:
            verify_key = VerifyKey(
                bytes.fromhex(signed_event["public_key"])
            )
            verify_key.verify(
                bytes.fromhex(signed_event["hash"]),
                bytes.fromhex(signed_event["signature"])
            )

            # Also verify hash matches content
            canonical = json.dumps(
                signed_event["event"], 
                sort_keys=True, 
                separators=(',', ':')
            )
            computed_hash = hashlib.sha256(
                canonical.encode('utf-8')
            ).hexdigest()

            return computed_hash == signed_event["hash"]
        except Exception:
            return False
Enter fullscreen mode Exit fullscreen mode

Layer 2: Collection Integrity with Merkle Trees

Individual signed events are good, but we need to prove:

  1. Inclusion: A specific event is part of the official log
  2. Consistency: The log only grows (no deletions or modifications)
  3. Completeness: All events are accounted for

This is where Merkle trees shine.

Merkle Tree Implementation

from typing import List, Tuple
import hashlib

class MerkleTree:
    """RFC 6962-compliant Merkle Tree implementation"""

    # Domain separation prefixes (per RFC 6962)
    LEAF_PREFIX = b'\x00'
    NODE_PREFIX = b'\x01'

    def __init__(self, leaves: List[str] = None):
        self.leaves = leaves or []
        self._tree = []
        if self.leaves:
            self._build_tree()

    def _hash_leaf(self, data: str) -> str:
        """Hash a leaf node with domain separation"""
        h = hashlib.sha256()
        h.update(self.LEAF_PREFIX)
        h.update(data.encode('utf-8'))
        return h.hexdigest()

    def _hash_node(self, left: str, right: str) -> str:
        """Hash an internal node with domain separation"""
        h = hashlib.sha256()
        h.update(self.NODE_PREFIX)
        h.update(bytes.fromhex(left))
        h.update(bytes.fromhex(right))
        return h.hexdigest()

    def _build_tree(self):
        """Build the complete Merkle tree"""
        if not self.leaves:
            self._tree = []
            return

        # Hash all leaves
        current_level = [self._hash_leaf(leaf) for leaf in self.leaves]
        self._tree = [current_level]

        # Build tree bottom-up
        while len(current_level) > 1:
            next_level = []
            for i in range(0, len(current_level), 2):
                if i + 1 < len(current_level):
                    node_hash = self._hash_node(
                        current_level[i], 
                        current_level[i + 1]
                    )
                else:
                    # Odd number of nodes: promote the last one
                    node_hash = current_level[i]
                next_level.append(node_hash)
            self._tree.append(next_level)
            current_level = next_level

    @property
    def root(self) -> str:
        """Get the Merkle root"""
        if not self._tree:
            return hashlib.sha256(b'').hexdigest()
        return self._tree[-1][0]

    def add_leaf(self, data: str):
        """Add a new leaf and rebuild tree"""
        self.leaves.append(data)
        self._build_tree()

    def get_inclusion_proof(self, index: int) -> List[Tuple[str, str]]:
        """
        Generate an inclusion proof for leaf at index.
        Returns list of (hash, position) tuples.
        Position is 'L' or 'R' indicating sibling position.
        """
        if index >= len(self.leaves):
            raise IndexError(f"Index {index} out of range")

        proof = []
        idx = index

        for level in range(len(self._tree) - 1):
            level_size = len(self._tree[level])

            if idx % 2 == 0:  # We're on the left
                if idx + 1 < level_size:
                    proof.append((self._tree[level][idx + 1], 'R'))
            else:  # We're on the right
                proof.append((self._tree[level][idx - 1], 'L'))

            idx //= 2

        return proof

    def verify_inclusion(
        self, 
        leaf_data: str, 
        index: int, 
        proof: List[Tuple[str, str]], 
        root: str
    ) -> bool:
        """Verify an inclusion proof"""
        current_hash = self._hash_leaf(leaf_data)

        for sibling_hash, position in proof:
            if position == 'L':
                current_hash = self._hash_node(sibling_hash, current_hash)
            else:
                current_hash = self._hash_node(current_hash, sibling_hash)

        return current_hash == root


# Usage example
def demo_merkle_tree():
    # Create tree with some event hashes
    event_hashes = [
        "a1b2c3d4e5f6...",  # Event 0 hash
        "b2c3d4e5f6a1...",  # Event 1 hash
        "c3d4e5f6a1b2...",  # Event 2 hash
        "d4e5f6a1b2c3...",  # Event 3 hash
    ]

    tree = MerkleTree(event_hashes)
    print(f"Merkle Root: {tree.root}")

    # Generate proof for event 1
    proof = tree.get_inclusion_proof(1)
    print(f"Inclusion proof for event 1: {proof}")

    # Verify
    is_valid = tree.verify_inclusion(
        event_hashes[1], 
        1, 
        proof, 
        tree.root
    )
    print(f"Proof valid: {is_valid}")
Enter fullscreen mode Exit fullscreen mode

Batch Collection with Hash Chaining

Events are collected into batches. Each batch header links to the previous batch, creating an unbroken chain:

from dataclasses import dataclass, field
from typing import List
import time

@dataclass
class VCPBatch:
    batch_id: str
    prev_batch_hash: str
    events: List[dict] = field(default_factory=list)
    merkle_root: str = ""
    timestamp: str = ""

    def __post_init__(self):
        if not self.batch_id:
            self.batch_id = generate_uuidv7()
        if not self.timestamp:
            self.timestamp = datetime.utcnow().isoformat() + 'Z'

    def finalize(self, signer: VCPSigner) -> dict:
        """Compute Merkle root and sign batch header"""
        # Extract event hashes
        event_hashes = [e["hash"] for e in self.events]

        # Build Merkle tree
        tree = MerkleTree(event_hashes)
        self.merkle_root = tree.root

        # Create header
        header = {
            "batch_id": self.batch_id,
            "prev_batch_hash": self.prev_batch_hash,
            "merkle_root": self.merkle_root,
            "event_count": len(self.events),
            "timestamp": self.timestamp,
            "first_event_id": self.events[0]["event"]["event_id"] if self.events else None,
            "last_event_id": self.events[-1]["event"]["event_id"] if self.events else None,
        }

        # Hash and sign header
        header_canonical = json.dumps(header, sort_keys=True, separators=(',', ':'))
        header_hash = hashlib.sha256(header_canonical.encode()).hexdigest()

        signed = signer.signing_key.sign(
            bytes.fromhex(header_hash),
            encoder=HexEncoder
        )

        return {
            "header": header,
            "header_hash": header_hash,
            "signature": signed.signature.decode('utf-8'),
            "public_key": signer.verify_key.encode(encoder=HexEncoder).decode('utf-8'),
            "events": self.events
        }


class VCPCollector:
    """Collects events into batches with integrity guarantees"""

    def __init__(
        self, 
        signer: VCPSigner, 
        batch_size: int = 100,
        batch_timeout_seconds: float = 1.0
    ):
        self.signer = signer
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout_seconds

        self.current_batch = None
        self.last_batch_hash = "0" * 64  # Genesis
        self.batch_start_time = None
        self.finalized_batches = []

    def add_event(self, event: VCPEvent) -> Optional[dict]:
        """
        Add event to current batch.
        Returns finalized batch if batch is complete.
        """
        if self.current_batch is None:
            self.current_batch = VCPBatch(
                batch_id=generate_uuidv7(),
                prev_batch_hash=self.last_batch_hash
            )
            self.batch_start_time = time.time()

        # Sign and add event
        signed_event = self.signer.sign_event(event)
        self.current_batch.events.append(signed_event)

        # Check if batch should be finalized
        should_finalize = (
            len(self.current_batch.events) >= self.batch_size or
            (time.time() - self.batch_start_time) >= self.batch_timeout
        )

        if should_finalize:
            return self._finalize_batch()

        return None

    def _finalize_batch(self) -> dict:
        """Finalize current batch and prepare for next"""
        if not self.current_batch or not self.current_batch.events:
            return None

        finalized = self.current_batch.finalize(self.signer)
        self.last_batch_hash = finalized["header_hash"]
        self.finalized_batches.append(finalized)

        self.current_batch = None
        self.batch_start_time = None

        return finalized

    def flush(self) -> Optional[dict]:
        """Force finalize current batch"""
        return self._finalize_batch()
Enter fullscreen mode Exit fullscreen mode

Layer 3: External Verifiability

Here's where VCP diverges from pure CT: mandatory external anchoring.

In CT, logs are operated by trusted third parties. In trading, we can't assume the log operator is trustworthy. We need external, independent verification.

Anchoring Strategies by Tier

Tier Mechanism Latency Cost Use Case
Platinum Private blockchain or distributed witnesses <1 second High HFT, exchanges
Gold RFC 3161 TSA + redundant anchors <1 minute Medium Institutional
Silver OpenTimestamps (Bitcoin) ~10 minutes Free Retail, prop trading

OpenTimestamps Integration (Silver Tier)

OpenTimestamps leverages Bitcoin's blockchain for free, decentralized timestamping:

import subprocess
import tempfile
import os

class OpenTimestampsAnchor:
    """Anchor VCP batch hashes using OpenTimestamps"""

    def __init__(self, ots_binary: str = "ots"):
        self.ots_binary = ots_binary

    def create_timestamp(self, batch_hash: str) -> bytes:
        """
        Create an OpenTimestamps proof for a batch hash.
        Returns the .ots proof file contents.
        """
        # Write hash to temp file
        with tempfile.NamedTemporaryFile(
            mode='w', 
            suffix='.txt', 
            delete=False
        ) as f:
            f.write(batch_hash)
            hash_file = f.name

        try:
            # Create timestamp
            result = subprocess.run(
                [self.ots_binary, "stamp", hash_file],
                capture_output=True,
                text=True,
                timeout=30
            )

            if result.returncode != 0:
                raise Exception(f"OTS stamp failed: {result.stderr}")

            # Read the proof file
            proof_file = hash_file + ".ots"
            with open(proof_file, 'rb') as f:
                proof = f.read()

            return proof

        finally:
            # Cleanup
            os.unlink(hash_file)
            if os.path.exists(hash_file + ".ots"):
                os.unlink(hash_file + ".ots")

    def verify_timestamp(
        self, 
        batch_hash: str, 
        proof: bytes
    ) -> dict:
        """
        Verify an OpenTimestamps proof.
        Returns verification result with attestation details.
        """
        # Write hash and proof to temp files
        with tempfile.NamedTemporaryFile(
            mode='w', 
            suffix='.txt', 
            delete=False
        ) as f:
            f.write(batch_hash)
            hash_file = f.name

        proof_file = hash_file + ".ots"
        with open(proof_file, 'wb') as f:
            f.write(proof)

        try:
            # Verify
            result = subprocess.run(
                [self.ots_binary, "verify", proof_file],
                capture_output=True,
                text=True,
                timeout=60
            )

            return {
                "verified": result.returncode == 0,
                "output": result.stdout,
                "errors": result.stderr
            }

        finally:
            os.unlink(hash_file)
            os.unlink(proof_file)
Enter fullscreen mode Exit fullscreen mode

RFC 3161 Timestamp Authority (Gold Tier)

For institutional use, RFC 3161 TSAs provide legally recognized timestamps:

import requests
from asn1crypto import tsp, core
import hashlib

class RFC3161Anchor:
    """Anchor using RFC 3161 Timestamp Authority"""

    def __init__(self, tsa_url: str):
        self.tsa_url = tsa_url

    def create_timestamp(self, batch_hash: str) -> bytes:
        """Request a timestamp from the TSA"""
        # Create TimeStampReq
        message_imprint = tsp.MessageImprint({
            'hash_algorithm': {'algorithm': 'sha256'},
            'hashed_message': bytes.fromhex(batch_hash)
        })

        ts_request = tsp.TimeStampReq({
            'version': 1,
            'message_imprint': message_imprint,
            'cert_req': True
        })

        # Send request
        response = requests.post(
            self.tsa_url,
            data=ts_request.dump(),
            headers={'Content-Type': 'application/timestamp-query'},
            timeout=30
        )

        if response.status_code != 200:
            raise Exception(f"TSA request failed: {response.status_code}")

        return response.content

    def verify_timestamp(self, batch_hash: str, token: bytes) -> dict:
        """Verify a timestamp token"""
        ts_response = tsp.TimeStampResp.load(token)

        status = ts_response['status']['status'].native
        if status != 'granted':
            return {"verified": False, "status": status}

        ts_token = ts_response['time_stamp_token']
        tst_info = ts_token['content']['encap_content_info']['content'].parsed

        # Verify hash matches
        imprint = tst_info['message_imprint']['hashed_message'].native
        if imprint.hex() != batch_hash:
            return {"verified": False, "error": "Hash mismatch"}

        return {
            "verified": True,
            "timestamp": tst_info['gen_time'].native.isoformat(),
            "serial": str(tst_info['serial_number'].native),
            "tsa": str(tst_info['tsa'].native) if tst_info['tsa'] else None
        }
Enter fullscreen mode Exit fullscreen mode

MQL5 Integration: Bringing VCP to MetaTrader

Many algorithmic traders use MetaTrader 5. Here's how to integrate VCP:

//+------------------------------------------------------------------+
//| VCP Bridge for MetaTrader 5                                       |
//| Captures trading events and forwards to VCP Sidecar               |
//+------------------------------------------------------------------+
#property copyright "VeritasChain Standards Organization"
#property link      "https://veritaschain.org"
#property version   "1.0"

#include <Trade\Trade.mqh>
#include <JAson.mqh>  // JSON library

// Sidecar endpoint
input string VCP_SIDECAR_URL = "http://localhost:8080/events";
input string OPERATOR_ID = "trader_001";

//+------------------------------------------------------------------+
//| Create VCP event from trade request                               |
//+------------------------------------------------------------------+
string CreateVCPEvent(
    string eventType,
    string symbol,
    ENUM_ORDER_TYPE orderType,
    double volume,
    double price,
    ulong ticket = 0
)
{
    CJAVal json;

    // UUIDv7-like ID (simplified)
    string eventId = StringFormat("%d-%s", 
        (ulong)TimeCurrent() * 1000 + GetTickCount() % 1000,
        IntegerToString(MathRand(), 8, '0'));

    json["event_id"] = eventId;
    json["trace_id"] = IntegerToString(ticket > 0 ? ticket : MathRand());
    json["event_type"] = eventType;
    json["timestamp"] = TimeToString(TimeCurrent(), TIME_DATE|TIME_SECONDS) + "Z";
    json["clock_sync_status"] = "NTP_SYNCED";  // MT5 uses broker time

    json["symbol"] = symbol;
    json["order_type"] = EnumToString(orderType);
    json["quantity"] = volume;
    json["price"] = price;
    json["ticket"] = IntegerToString(ticket);

    json["operator_id"] = OPERATOR_ID;
    json["platform"] = "MT5";
    json["account_id"] = IntegerToString(AccountInfoInteger(ACCOUNT_LOGIN));

    return json.Serialize();
}

//+------------------------------------------------------------------+
//| Send event to VCP Sidecar                                         |
//+------------------------------------------------------------------+
bool SendToSidecar(string jsonEvent)
{
    char post[];
    char result[];
    string headers = "Content-Type: application/json\r\n";

    StringToCharArray(jsonEvent, post, 0, WHOLE_ARRAY, CP_UTF8);

    int res = WebRequest(
        "POST",
        VCP_SIDECAR_URL,
        headers,
        5000,
        post,
        result,
        headers
    );

    if(res == -1)
    {
        int error = GetLastError();
        PrintFormat("VCP Sidecar error: %d", error);
        return false;
    }

    return (res == 200 || res == 201);
}

//+------------------------------------------------------------------+
//| Expert initialization function                                    |
//+------------------------------------------------------------------+
int OnInit()
{
    // Log initialization event
    string initEvent = CreateVCPEvent(
        "INIT",
        _Symbol,
        ORDER_TYPE_BUY,  // placeholder
        0,
        0
    );
    SendToSidecar(initEvent);

    return(INIT_SUCCEEDED);
}

//+------------------------------------------------------------------+
//| Trade transaction handler                                         |
//+------------------------------------------------------------------+
void OnTradeTransaction(
    const MqlTradeTransaction& trans,
    const MqlTradeRequest& request,
    const MqlTradeResult& result
)
{
    string eventType = "";

    switch(trans.type)
    {
        case TRADE_TRANSACTION_ORDER_ADD:
            eventType = "ORD";
            break;
        case TRADE_TRANSACTION_ORDER_DELETE:
            eventType = "CXL";
            break;
        case TRADE_TRANSACTION_DEAL_ADD:
            eventType = "EXE";
            break;
        case TRADE_TRANSACTION_ORDER_UPDATE:
            eventType = "MOD";
            break;
        default:
            return;  // Ignore other transaction types
    }

    string event = CreateVCPEvent(
        eventType,
        trans.symbol,
        (ENUM_ORDER_TYPE)trans.order_type,
        trans.volume,
        trans.price,
        trans.order
    );

    if(!SendToSidecar(event))
    {
        PrintFormat("Failed to send %s event to VCP Sidecar", eventType);
    }
}
Enter fullscreen mode Exit fullscreen mode

Python Sidecar Service

The MT5 EA sends events to a local sidecar that handles batching and anchoring:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI(title="VCP Sidecar")

# Initialize VCP components
signer = VCPSigner()
collector = VCPCollector(signer, batch_size=50, batch_timeout_seconds=5.0)
anchor = OpenTimestampsAnchor()

class EventInput(BaseModel):
    event_id: str
    trace_id: str
    event_type: str
    timestamp: str
    clock_sync_status: str
    symbol: Optional[str] = None
    order_type: Optional[str] = None
    quantity: Optional[float] = None
    price: Optional[float] = None
    ticket: Optional[str] = None
    operator_id: Optional[str] = None
    platform: Optional[str] = None
    account_id: Optional[str] = None

@app.post("/events")
async def receive_event(event: EventInput):
    """Receive trading event from MT5 EA"""

    # Convert to VCPEvent
    vcp_event = VCPEvent(
        event_id=event.event_id,
        trace_id=event.trace_id,
        event_type=EventType(event.event_type) if event.event_type in [e.value for e in EventType] else EventType.ORD,
        timestamp=event.timestamp,
        clock_sync_status=ClockSyncStatus(event.clock_sync_status),
        symbol=event.symbol,
        order_id=event.ticket,
        quantity=event.quantity,
        price=event.price,
        operator_id=event.operator_id
    )

    # Add to collector
    finalized_batch = collector.add_event(vcp_event)

    if finalized_batch:
        # Anchor the batch asynchronously
        asyncio.create_task(anchor_batch(finalized_batch))

    return {"status": "accepted", "event_id": event.event_id}

async def anchor_batch(batch: dict):
    """Anchor a finalized batch to external timestamp service"""
    try:
        proof = anchor.create_timestamp(batch["header_hash"])

        # Store batch and proof
        await store_batch(batch, proof)

        print(f"Batch {batch['header']['batch_id']} anchored successfully")
    except Exception as e:
        print(f"Anchoring failed: {e}")
        # Implement retry logic here

async def store_batch(batch: dict, proof: bytes):
    """Persist batch and proof to storage"""
    # Implementation depends on your storage backend
    # Could be PostgreSQL, S3, IPFS, etc.
    pass

@app.get("/batches/{batch_id}")
async def get_batch(batch_id: str):
    """Retrieve a batch with its anchor proof"""
    # Fetch from storage
    pass

@app.get("/verify/{event_id}")
async def verify_event(event_id: str):
    """Generate and verify inclusion proof for an event"""
    # Find the batch containing this event
    # Generate Merkle inclusion proof
    # Verify against anchored root
    pass

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8080)
Enter fullscreen mode Exit fullscreen mode

GDPR Compliance: Crypto-Shredding

Here's a challenge unique to EU markets: GDPR Article 17 gives users the right to erasure, but audit regulations require permanent records.

VCP solves this with crypto-shredding:

from cryptography.fernet import Fernet
from typing import Dict

class CryptoShredder:
    """
    Encrypt PII with rotatable keys.
    Delete keys to cryptographically "shred" data.
    """

    def __init__(self, key_store: Dict[str, bytes] = None):
        self.key_store = key_store or {}

    def encrypt_pii(
        self, 
        data: str, 
        subject_id: str
    ) -> tuple[str, str]:
        """
        Encrypt PII for a data subject.
        Returns (encrypted_data, key_id).
        """
        # Get or create key for this subject
        if subject_id not in self.key_store:
            self.key_store[subject_id] = Fernet.generate_key()

        key = self.key_store[subject_id]
        f = Fernet(key)

        encrypted = f.encrypt(data.encode('utf-8'))
        return encrypted.decode('utf-8'), subject_id

    def decrypt_pii(
        self, 
        encrypted_data: str, 
        key_id: str
    ) -> Optional[str]:
        """Decrypt PII if key still exists"""
        if key_id not in self.key_store:
            return None  # Key was shredded

        key = self.key_store[key_id]
        f = Fernet(key)

        try:
            decrypted = f.decrypt(encrypted_data.encode('utf-8'))
            return decrypted.decode('utf-8')
        except Exception:
            return None

    def shred(self, subject_id: str) -> bool:
        """
        Cryptographically shred all data for a subject.
        After this, encrypted data is unrecoverable.
        """
        if subject_id in self.key_store:
            del self.key_store[subject_id]
            return True
        return False


# Usage in VCP events
class VCPEventWithPrivacy(VCPEvent):
    """VCP Event with GDPR-compliant PII handling"""

    def __init__(self, shredder: CryptoShredder, **kwargs):
        self.shredder = shredder
        super().__init__(**kwargs)

    def encrypt_operator_id(self, subject_id: str):
        """Encrypt operator_id for GDPR compliance"""
        if self.operator_id:
            encrypted, key_id = self.shredder.encrypt_pii(
                self.operator_id, 
                subject_id
            )
            self.operator_id = f"ENCRYPTED:{key_id}:{encrypted}"
Enter fullscreen mode Exit fullscreen mode

The key insight: the Merkle tree and hash chain remain valid even after keys are destroyed. The audit trail proves integrity; the encryption ensures privacy.


Why This Matters for Developers

If you're building trading systems, RegTech tools, or any application where audit integrity is critical, VCP offers several advantages:

1. Open Standard, Not Vendor Lock-in

VCP is released under CC BY 4.0. You can implement it yourself, use our reference implementations, or build commercial products. No licensing fees, no API dependencies.

2. Built on Proven Cryptographic Foundations

We're not inventing new cryptography. VCP combines:

  • RFC 6962 (Certificate Transparency)
  • RFC 8785 (Canonical JSON)
  • RFC 9562 (UUIDv7)
  • Ed25519 (proven signature scheme)
  • SHA-256 (industry standard hash)

3. Regulatory Alignment by Design

VCP was designed with specific regulatory requirements in mind:

  • EU AI Act Article 12: Automatic event logging
  • MiFID II RTS 25: Timestamp precision tiers
  • MAR surveillance: Complete, tamper-evident audit trails
  • GDPR Article 17: Right to erasure via crypto-shredding

4. Production-Proven Integrations

VCP has been tested with:

  • FIX Protocol (institutional trading)
  • NASDAQ OUCH/ITCH (exchange integration)
  • MetaTrader 5 (retail trading)
  • cTrader (prop trading)
  • Interactive Brokers API

Getting Started

Quick Start with Python

pip install vcp-core  # Coming soon to PyPI
Enter fullscreen mode Exit fullscreen mode
from vcp import VCPSigner, VCPCollector, VCPEvent, EventType

# Initialize
signer = VCPSigner()
collector = VCPCollector(signer)

# Create and collect events
event = VCPEvent(
    event_id=generate_uuidv7(),
    trace_id="trade-001",
    event_type=EventType.ORD,
    timestamp=datetime.utcnow().isoformat() + 'Z',
    clock_sync_status=ClockSyncStatus.NTP_SYNCED,
    symbol="EURUSD",
    quantity=100000,
    price=1.0850,
    side="BUY"
)

batch = collector.add_event(event)
if batch:
    print(f"Batch finalized: {batch['header']['merkle_root']}")
Enter fullscreen mode Exit fullscreen mode

Full Specification

The complete VCP v1.1 specification is available at:


What's Next

VCP v1.2 is in development with:

  • VCP-XREF: Cross-party verification for bilateral trades
  • Post-quantum signatures: Migration path from Ed25519 to Dilithium
  • Enhanced completeness proofs: Stronger guarantees against omission attacks

We're also working with CEN-CENELEC JTC 21 to align VCP with emerging EU AI Act harmonized standards.


Conclusion

The traditional approach to audit logging—trust the database, trust the operator, trust the auditor—is increasingly inadequate for AI-driven trading systems.

VCP offers a different paradigm: verify, don't trust.

By applying Certificate Transparency's proven cryptographic architecture to trading events, we can build audit trails that:

  • Prove their own integrity
  • Detect any modification or deletion
  • Guarantee completeness
  • Support regulatory requirements across multiple jurisdictions
  • Remain GDPR-compliant

The code in this article is simplified for clarity. The production specification includes additional features for edge cases, error handling, and high-availability deployment.

If you're working on trading systems, surveillance tools, or RegTech solutions, I'd love to hear how you're approaching audit integrity. Drop a comment below or reach out at technical@veritaschain.org.


VeritasChain Standards Organization (VSO) is a non-profit standards body developing open cryptographic audit standards. VCP has been submitted to 67 regulatory authorities across 50 jurisdictions.


Further Reading


Tags: #cryptography #trading #fintech #regtech #python #mql5 #audit #blockchain #euaiact #merkletree

Top comments (0)