DEV Community

Cover image for Building Tamper-Evident Audit Trails for Algorithmic Trading: A Deep Dive into VCP v1.1

Building Tamper-Evident Audit Trails for Algorithmic Trading: A Deep Dive into VCP v1.1

A hands-on guide to implementing cryptographic audit infrastructure that can survive regulatory scrutiny


TL;DR: When a hedge fund's model manipulation goes undetected for 22 months, when a fake tweet moves $2.4 trillion in 4 minutes, and when AI bots autonomously learn to collude—traditional logging isn't enough. This article walks through implementing VeritasChain Protocol (VCP) v1.1, an open standard for tamper-evident audit trails, with working code in Python and MQL5.


The Problem: "Trust Me" Doesn't Scale

In January 2025, the SEC announced a $90 million settlement with Two Sigma. The core failure? A researcher modified trading model parameters for 22 months without detection. The firm had logs. The firm had access controls. But the logs couldn't prove their own integrity.

This isn't a Two Sigma problem—it's an industry problem. Traditional server logs answer "what does the log say happened?" They can't answer "has this log been modified since the event occurred?"

The distinction matters when:

  • Regulators demand proof of trading decisions (MiFID II RTS 25, EU AI Act Article 12)
  • Prop firms dispute payout calculations with traders
  • Exchanges need to prove order book state during flash crashes
  • AI systems make decisions that require post-hoc explanation

VCP v1.1 addresses this through a three-layer architecture that transforms "trust me" logs into mathematically verifiable audit trails.


Architecture Overview: Three Layers of Integrity

┌─────────────────────────────────────────────────────────────────┐
│                    Layer 3: External Verifiability              │
│    (Public timestamping - Bitcoin, RFC 3161, OpenTimestamps)    │
├─────────────────────────────────────────────────────────────────┤
│                    Layer 2: Collection Integrity                │
│         (Merkle Trees - proves batch completeness)              │
├─────────────────────────────────────────────────────────────────┤
│                    Layer 1: Event Integrity                     │
│   (Hash chains, signatures - proves individual event validity)  │
└─────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Layer 1 ensures each event hasn't been modified.

Layer 2 ensures no events have been omitted from a batch.

Layer 3 ensures the entire log state was committed at a specific time.

Let's implement each layer.


Layer 1: Event Integrity with Hash Chains

Every VCP event contains a cryptographic link to its predecessor. Modifying or deleting an event breaks the chain—and the break is detectable by anyone with the logs.

Python Implementation

import hashlib
import json
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional
from enum import Enum

class EventType(str, Enum):
    SIG = "SIG"  # Signal generated
    ORD = "ORD"  # Order submitted
    ACK = "ACK"  # Order acknowledged
    EXE = "EXE"  # Execution
    REJ = "REJ"  # Rejection
    RSK = "RSK"  # Risk event
    ERR = "ERR"  # Error

class TimestampPrecision(str, Enum):
    NANOSECOND = "NANOSECOND"
    MICROSECOND = "MICROSECOND"
    MILLISECOND = "MILLISECOND"

class ClockSyncStatus(str, Enum):
    PTP_LOCKED = "PTP_LOCKED"
    NTP_SYNCED = "NTP_SYNCED"
    BEST_EFFORT = "BEST_EFFORT"

@dataclass
class VCPHeader:
    event_id: str
    trace_id: str
    timestamp_int: str
    timestamp_iso: str
    event_type: str
    event_type_code: int
    timestamp_precision: str
    clock_sync_status: str
    hash_algo: str
    venue_id: str
    symbol: str
    account_id: str
    policy_id: str  # NEW in v1.1

@dataclass
class VCPTradePayload:
    order_id: Optional[str] = None
    side: Optional[str] = None
    order_type: Optional[str] = None
    price: Optional[str] = None
    quantity: Optional[str] = None
    execution_price: Optional[str] = None
    slippage: Optional[str] = None

@dataclass
class VCPSecurity:
    event_hash: str
    prev_hash: str

@dataclass
class VCPEvent:
    header: VCPHeader
    payload: dict
    security: VCPSecurity
Enter fullscreen mode Exit fullscreen mode

UUID v7: Time-Ordered Identifiers

VCP mandates UUID v7 (RFC 9562) for event IDs. Unlike UUID v4, v7 embeds millisecond timestamps, enabling natural chronological ordering without separate timestamp comparisons.

def generate_uuid_v7() -> str:
    """Generate RFC 9562 compliant UUID v7."""
    # Get current timestamp in milliseconds
    timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)

    # 48 bits of timestamp
    timestamp_hex = format(timestamp_ms, '012x')

    # 4 bits version (7) + 12 bits random
    rand_a = uuid.uuid4().hex[:3]
    version_rand_a = '7' + rand_a

    # 2 bits variant (10) + 62 bits random
    rand_b_int = int(uuid.uuid4().hex[:16], 16)
    rand_b_int = (rand_b_int & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000
    rand_b = format(rand_b_int, '016x')

    # Assemble: timestamp (48) + version (4) + rand_a (12) + variant (2) + rand_b (62)
    uuid_hex = timestamp_hex + version_rand_a + rand_b

    # Format as UUID string
    return f"{uuid_hex[:8]}-{uuid_hex[8:12]}-{uuid_hex[12:16]}-{uuid_hex[16:20]}-{uuid_hex[20:]}"
Enter fullscreen mode Exit fullscreen mode

RFC 8785 JSON Canonicalization

Hash computation requires deterministic serialization. VCP uses RFC 8785 (JSON Canonicalization Scheme) to ensure identical hashes regardless of field ordering or whitespace.

def canonicalize_json(obj: dict) -> str:
    """
    RFC 8785 compliant JSON canonicalization.

    Key requirements:
    - Keys sorted lexicographically (by UTF-16 code units)
    - No whitespace
    - Numbers without unnecessary precision
    - Unicode escaped consistently
    """
    def _canonicalize(value):
        if isinstance(value, dict):
            # Sort keys lexicographically
            sorted_items = sorted(value.items(), key=lambda x: x[0])
            return '{' + ','.join(
                f'"{k}":{_canonicalize(v)}' 
                for k, v in sorted_items 
                if v is not None
            ) + '}'
        elif isinstance(value, list):
            return '[' + ','.join(_canonicalize(v) for v in value) + ']'
        elif isinstance(value, str):
            # Escape special characters
            escaped = value.replace('\\', '\\\\').replace('"', '\\"')
            escaped = escaped.replace('\n', '\\n').replace('\r', '\\r')
            escaped = escaped.replace('\t', '\\t')
            return f'"{escaped}"'
        elif isinstance(value, bool):
            return 'true' if value else 'false'
        elif isinstance(value, (int, float)):
            # Handle numeric precision per RFC 8785
            if isinstance(value, float):
                if value.is_integer():
                    return str(int(value))
                return repr(value)
            return str(value)
        elif value is None:
            return 'null'
        else:
            raise TypeError(f"Cannot canonicalize type: {type(value)}")

    return _canonicalize(obj)


def compute_event_hash(header: dict, payload: dict, prev_hash: str) -> str:
    """
    Compute SHA-256 hash of canonicalized event data.

    Hash input: canonical(header) || canonical(payload) || prev_hash
    """
    header_canonical = canonicalize_json(header)
    payload_canonical = canonicalize_json(payload)

    hash_input = f"{header_canonical}{payload_canonical}{prev_hash}"

    return hashlib.sha256(hash_input.encode('utf-8')).hexdigest()
Enter fullscreen mode Exit fullscreen mode

Building a Hash Chain

class VCPLogger:
    """VCP-compliant event logger with hash chain."""

    GENESIS_HASH = "0" * 64  # All zeros for chain start

    def __init__(self, 
                 venue_id: str,
                 policy_id: str,
                 precision: TimestampPrecision = TimestampPrecision.MILLISECOND,
                 clock_sync: ClockSyncStatus = ClockSyncStatus.BEST_EFFORT):
        self.venue_id = venue_id
        self.policy_id = policy_id
        self.precision = precision
        self.clock_sync = clock_sync
        self.prev_hash = self.GENESIS_HASH
        self.events: list[VCPEvent] = []

    def _get_timestamp(self) -> tuple[str, str]:
        """Return (timestamp_int, timestamp_iso) tuple."""
        now = datetime.now(timezone.utc)

        if self.precision == TimestampPrecision.NANOSECOND:
            ts_int = str(int(now.timestamp() * 1_000_000_000))
        elif self.precision == TimestampPrecision.MICROSECOND:
            ts_int = str(int(now.timestamp() * 1_000_000))
        else:  # MILLISECOND
            ts_int = str(int(now.timestamp() * 1_000))

        ts_iso = now.strftime('%Y-%m-%dT%H:%M:%S.') + f'{now.microsecond:06d}Z'

        return ts_int, ts_iso

    def log_event(self,
                  event_type: EventType,
                  symbol: str,
                  account_id: str,
                  payload: dict,
                  trace_id: Optional[str] = None) -> VCPEvent:
        """Create and log a VCP event with hash chain linking."""

        event_id = generate_uuid_v7()
        trace_id = trace_id or event_id
        ts_int, ts_iso = self._get_timestamp()

        header = VCPHeader(
            event_id=event_id,
            trace_id=trace_id,
            timestamp_int=ts_int,
            timestamp_iso=ts_iso,
            event_type=event_type.value,
            event_type_code=self._event_type_to_code(event_type),
            timestamp_precision=self.precision.value,
            clock_sync_status=self.clock_sync.value,
            hash_algo="SHA256",
            venue_id=self.venue_id,
            symbol=symbol,
            account_id=account_id,
            policy_id=self.policy_id
        )

        # Compute hash with chain linking
        header_dict = asdict(header)
        event_hash = compute_event_hash(header_dict, payload, self.prev_hash)

        security = VCPSecurity(
            event_hash=event_hash,
            prev_hash=self.prev_hash
        )

        event = VCPEvent(
            header=header,
            payload=payload,
            security=security
        )

        # Update chain state
        self.prev_hash = event_hash
        self.events.append(event)

        return event

    def _event_type_to_code(self, event_type: EventType) -> int:
        """Map event type to numeric code."""
        codes = {
            EventType.SIG: 1, EventType.ORD: 2, EventType.ACK: 3,
            EventType.EXE: 4, EventType.REJ: 6, EventType.RSK: 21,
            EventType.ERR: 99
        }
        return codes.get(event_type, 0)

    def verify_chain(self) -> tuple[bool, Optional[int]]:
        """
        Verify the entire hash chain.

        Returns:
            (is_valid, first_invalid_index)
            - (True, None) if chain is valid
            - (False, index) if chain breaks at index
        """
        prev_hash = self.GENESIS_HASH

        for i, event in enumerate(self.events):
            # Verify prev_hash link
            if event.security.prev_hash != prev_hash:
                return False, i

            # Recompute and verify event hash
            header_dict = asdict(event.header)
            expected_hash = compute_event_hash(
                header_dict, 
                event.payload, 
                prev_hash
            )

            if event.security.event_hash != expected_hash:
                return False, i

            prev_hash = event.security.event_hash

        return True, None
Enter fullscreen mode Exit fullscreen mode

Usage Example: Complete Trade Lifecycle

# Initialize logger
logger = VCPLogger(
    venue_id="MT5-BROKER-DEMO",
    policy_id="org.example.trading:gold-algo-v2",
    precision=TimestampPrecision.MILLISECOND,
    clock_sync=ClockSyncStatus.NTP_SYNCED
)

# 1. Signal Event - trading decision made
signal_event = logger.log_event(
    event_type=EventType.SIG,
    symbol="XAUUSD",
    account_id="trader_001",
    payload={
        "vcp_gov": {
            "algo_id": "momentum-crossover-v3",
            "algo_version": "3.2.1",
            "algo_type": "RULE_BASED",
            "confidence": "0.87",
            "decision_factors": {
                "ema_20": "2645.30",
                "ema_50": "2638.75",
                "rsi_14": "62.4",
                "signal": "BULLISH_CROSSOVER"
            }
        }
    }
)
print(f"Signal logged: {signal_event.header.event_id}")

# 2. Order Event - order submitted
order_event = logger.log_event(
    event_type=EventType.ORD,
    symbol="XAUUSD",
    account_id="trader_001",
    trace_id=signal_event.header.trace_id,  # Link to signal
    payload={
        "trade_data": {
            "order_id": "ORD-2025-001234",
            "side": "BUY",
            "order_type": "MARKET",
            "quantity": "1.00",
            "price": "2650.50"
        }
    }
)
print(f"Order logged: {order_event.header.event_id}")

# 3. Execution Event - order filled
exec_event = logger.log_event(
    event_type=EventType.EXE,
    symbol="XAUUSD",
    account_id="trader_001",
    trace_id=signal_event.header.trace_id,
    payload={
        "trade_data": {
            "order_id": "ORD-2025-001234",
            "broker_order_id": "BRK-98765432",
            "side": "BUY",
            "execution_price": "2650.55",
            "executed_qty": "1.00",
            "slippage": "0.05",
            "commission": "7.50"
        }
    }
)
print(f"Execution logged: {exec_event.header.event_id}")

# Verify chain integrity
is_valid, break_point = logger.verify_chain()
print(f"\nChain valid: {is_valid}")

# Demonstrate tamper detection
print("\n--- Simulating tampering ---")
original_price = logger.events[1].payload["trade_data"]["price"]
logger.events[1].payload["trade_data"]["price"] = "2600.00"  # Tamper!

is_valid, break_point = logger.verify_chain()
print(f"Chain valid after tampering: {is_valid}")
print(f"Tampering detected at event index: {break_point}")

# Restore original
logger.events[1].payload["trade_data"]["price"] = original_price
Enter fullscreen mode Exit fullscreen mode

Output:

Signal logged: 019479a8-1234-7def-8abc-123456789abc
Order logged: 019479a8-1235-7ef0-9bcd-234567890bcd
Execution logged: 019479a8-1236-7f01-acde-345678901cde

Chain valid: True

--- Simulating tampering ---
Chain valid after tampering: False
Tampering detected at event index: 1
Enter fullscreen mode Exit fullscreen mode

Layer 2: Collection Integrity with Merkle Trees

Hash chains detect modification, but they don't prevent omission. An adversary could delete events from the middle of a chain and re-compute all subsequent hashes.

Merkle trees solve this by creating a compact commitment to all events in a batch. Omitting a single event changes the root hash.

Merkle Tree Implementation

from typing import List, Tuple

class MerkleTree:
    """RFC 6962 compliant Merkle tree for VCP event batches."""

    def __init__(self, event_hashes: List[str]):
        self.leaves = event_hashes
        self.tree: List[List[str]] = []
        self.root = self._build_tree()

    def _hash_pair(self, left: str, right: str) -> str:
        """Hash two nodes together."""
        combined = bytes.fromhex(left) + bytes.fromhex(right)
        return hashlib.sha256(combined).hexdigest()

    def _build_tree(self) -> str:
        """Build Merkle tree and return root hash."""
        if not self.leaves:
            return "0" * 64

        # Start with leaves
        current_level = self.leaves.copy()
        self.tree.append(current_level)

        while len(current_level) > 1:
            next_level = []

            for i in range(0, len(current_level), 2):
                left = current_level[i]
                # If odd number of nodes, duplicate last one
                right = current_level[i + 1] if i + 1 < len(current_level) else left
                parent = self._hash_pair(left, right)
                next_level.append(parent)

            self.tree.append(next_level)
            current_level = next_level

        return current_level[0]

    def get_proof(self, leaf_index: int) -> List[Tuple[str, str]]:
        """
        Generate Merkle proof for a leaf.

        Returns list of (hash, direction) tuples where direction is 'L' or 'R'
        indicating the sibling's position relative to the path.
        """
        if leaf_index >= len(self.leaves):
            raise ValueError(f"Leaf index {leaf_index} out of range")

        proof = []
        current_index = leaf_index

        for level in range(len(self.tree) - 1):
            current_level = self.tree[level]

            # Find sibling
            if current_index % 2 == 0:
                # Current is left child, sibling is right
                sibling_index = current_index + 1
                direction = 'R'
            else:
                # Current is right child, sibling is left
                sibling_index = current_index - 1
                direction = 'L'

            # Handle edge case: odd number of nodes
            if sibling_index < len(current_level):
                sibling_hash = current_level[sibling_index]
            else:
                sibling_hash = current_level[current_index]

            proof.append((sibling_hash, direction))
            current_index //= 2

        return proof

    @staticmethod
    def verify_proof(leaf_hash: str, proof: List[Tuple[str, str]], root: str) -> bool:
        """
        Verify a Merkle proof.

        This can be done client-side without trusting the server.
        """
        current = leaf_hash

        for sibling_hash, direction in proof:
            if direction == 'L':
                # Sibling is on the left
                combined = bytes.fromhex(sibling_hash) + bytes.fromhex(current)
            else:
                # Sibling is on the right
                combined = bytes.fromhex(current) + bytes.fromhex(sibling_hash)

            current = hashlib.sha256(combined).hexdigest()

        return current == root


# Example: Build Merkle tree from logged events
def batch_events_to_merkle(events: List[VCPEvent]) -> MerkleTree:
    """Create Merkle tree from VCP event batch."""
    event_hashes = [event.security.event_hash for event in events]
    return MerkleTree(event_hashes)


# Usage
tree = batch_events_to_merkle(logger.events)
print(f"Merkle root: {tree.root}")

# Generate proof for first event
proof = tree.get_proof(0)
print(f"\nProof for event 0: {proof}")

# Verify proof (can be done by any third party)
is_valid = MerkleTree.verify_proof(
    leaf_hash=logger.events[0].security.event_hash,
    proof=proof,
    root=tree.root
)
print(f"Proof valid: {is_valid}")
Enter fullscreen mode Exit fullscreen mode

The Completeness Guarantee

Here's the key insight: if a batch of events produces Merkle root R, and that root is anchored externally, then:

  1. No event can be modified without changing R
  2. No event can be omitted without changing R
  3. No event can be inserted without changing R

The external anchor (Layer 3) prevents re-computation of R after the fact.


Layer 3: External Anchoring

The final layer commits the Merkle root to an external, independent timestamping service. VCP v1.1 requires this for all tiers:

Tier Anchor Frequency Acceptable Services
Platinum Every 10 minutes Bitcoin, Ethereum, RFC 3161 TSA
Gold Every 1 hour RFC 3161 TSA, Database with attestation
Silver Every 24 hours OpenTimestamps, FreeTSA

OpenTimestamps Integration (Silver Tier)

OpenTimestamps is free and anchors to Bitcoin, making it ideal for Silver tier implementations.

import subprocess
import tempfile
import os
from pathlib import Path

class OpenTimestampsAnchor:
    """
    External anchor using OpenTimestamps (ots).

    Requires: pip install opentimestamps-client
    """

    def __init__(self, ots_binary: str = "ots"):
        self.ots_binary = ots_binary

    def create_timestamp(self, merkle_root: str) -> bytes:
        """
        Submit Merkle root to OpenTimestamps.

        Returns the .ots proof file contents.
        """
        with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
            f.write(merkle_root)
            temp_path = f.name

        try:
            # Create timestamp
            result = subprocess.run(
                [self.ots_binary, "stamp", temp_path],
                capture_output=True,
                text=True
            )

            if result.returncode != 0:
                raise RuntimeError(f"OTS stamp failed: {result.stderr}")

            # Read the .ots proof file
            ots_path = temp_path + ".ots"
            with open(ots_path, 'rb') as f:
                proof = f.read()

            # Cleanup
            os.unlink(ots_path)

            return proof

        finally:
            os.unlink(temp_path)

    def verify_timestamp(self, merkle_root: str, proof: bytes) -> dict:
        """
        Verify an OpenTimestamps proof.

        Returns verification details or raises exception.
        """
        with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
            f.write(merkle_root)
            data_path = f.name

        with tempfile.NamedTemporaryFile(mode='wb', suffix='.ots', delete=False) as f:
            f.write(proof)
            proof_path = f.name

        try:
            result = subprocess.run(
                [self.ots_binary, "verify", proof_path],
                capture_output=True,
                text=True
            )

            # Parse verification result
            if "Success!" in result.stdout or "Bitcoin block" in result.stdout:
                # Extract block height and timestamp
                return {
                    "verified": True,
                    "details": result.stdout.strip()
                }
            else:
                return {
                    "verified": False,
                    "pending": "Pending confirmation in Bitcoin block" in result.stderr,
                    "details": result.stderr.strip()
                }
        finally:
            os.unlink(data_path)
            os.unlink(proof_path)


# Alternative: HTTP API for environments without CLI
class OpenTimestampsAPI:
    """HTTP-based OpenTimestamps interaction."""

    OTS_CALENDARS = [
        "https://a.pool.opentimestamps.org",
        "https://b.pool.opentimestamps.org",
        "https://alice.btc.calendar.opentimestamps.org",
    ]

    async def submit_digest(self, digest: bytes) -> dict:
        """Submit hash digest to OTS calendars."""
        import httpx

        results = {}
        async with httpx.AsyncClient() as client:
            for calendar in self.OTS_CALENDARS:
                try:
                    response = await client.post(
                        f"{calendar}/digest",
                        content=digest,
                        headers={"Content-Type": "application/octet-stream"}
                    )
                    if response.status_code == 200:
                        results[calendar] = response.content
                except Exception as e:
                    results[calendar] = f"Error: {e}"

        return results
Enter fullscreen mode Exit fullscreen mode

RFC 3161 TSA Integration (Gold/Platinum Tier)

For production environments, RFC 3161 Time-Stamp Authority provides legally recognized timestamps.

import base64
from cryptography.hazmat.primitives import hashes
from cryptography.x509 import load_pem_x509_certificate
import httpx

class RFC3161Anchor:
    """RFC 3161 Time-Stamp Authority integration."""

    # Free TSA services
    TSA_URLS = {
        "freetsa": "https://freetsa.org/tsr",
        "digicert": "http://timestamp.digicert.com",
        "sectigo": "http://timestamp.sectigo.com",
    }

    def __init__(self, tsa_url: str = None):
        self.tsa_url = tsa_url or self.TSA_URLS["freetsa"]

    def create_timestamp_request(self, digest: bytes) -> bytes:
        """
        Create RFC 3161 TimeStampReq.

        Using cryptography library for ASN.1 encoding.
        """
        from asn1crypto import tsp, algos, core

        # Build MessageImprint
        message_imprint = tsp.MessageImprint({
            'hash_algorithm': algos.DigestAlgorithm({
                'algorithm': 'sha256'
            }),
            'hashed_message': digest
        })

        # Build TimeStampReq
        ts_request = tsp.TimeStampReq({
            'version': 1,
            'message_imprint': message_imprint,
            'cert_req': True
        })

        return ts_request.dump()

    async def get_timestamp(self, merkle_root: str) -> dict:
        """
        Request timestamp from TSA.

        Returns timestamp token and metadata.
        """
        # Hash the Merkle root
        digest = bytes.fromhex(merkle_root)

        # Create TSA request
        ts_request = self.create_timestamp_request(digest)

        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.tsa_url,
                content=ts_request,
                headers={
                    "Content-Type": "application/timestamp-query"
                }
            )

            if response.status_code != 200:
                raise RuntimeError(f"TSA request failed: {response.status_code}")

            return {
                "tsa_url": self.tsa_url,
                "timestamp_token": base64.b64encode(response.content).decode(),
                "merkle_root": merkle_root
            }
Enter fullscreen mode Exit fullscreen mode

MQL5 Integration: The Sidecar Pattern

For MetaTrader 5 EAs, VCP uses a "sidecar" pattern—the EA logs events to VCP without modifying its core trading logic.

MQL5 Bridge Implementation

//+------------------------------------------------------------------+
//| VCP Integration Example for MT5 EA                                |
//+------------------------------------------------------------------+
#include "vcp_mql_bridge_v1_0.mqh"

// Global configuration
VCP_CONFIG g_config;

//+------------------------------------------------------------------+
//| Expert initialization                                             |
//+------------------------------------------------------------------+
int OnInit()
{
    // Configure VCP
    g_config.api_key = "your-api-key";
    g_config.endpoint = "https://api.veritaschain.org";
    g_config.venue_id = "MT5-" + AccountInfoString(ACCOUNT_COMPANY);
    g_config.tier = VCP_TIER_SILVER;
    g_config.async_mode = true;
    g_config.queue_max_size = 1000;
    g_config.batch_size = 50;

    // Initialize VCP logger
    int result = VCP_Initialize(g_config);
    if(result != 0)
    {
        Print("VCP initialization failed: ", result);
        return INIT_FAILED;
    }

    // Set timer for async queue processing
    EventSetMillisecondTimer(100);

    Print("VCP initialized successfully");
    return INIT_SUCCEEDED;
}

//+------------------------------------------------------------------+
//| Expert deinitialization                                           |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
{
    // Flush remaining events
    while(VCP_GetQueueSize() > 0)
    {
        VCP_ProcessQueue();
        Sleep(100);
    }

    VCP_Shutdown();
}

//+------------------------------------------------------------------+
//| Timer event - process VCP queue                                   |
//+------------------------------------------------------------------+
void OnTimer()
{
    VCP_ProcessQueue();
}

//+------------------------------------------------------------------+
//| Example: Log trading decision and order                           |
//+------------------------------------------------------------------+
void ExecuteTradeWithVCP(string symbol, ENUM_ORDER_TYPE order_type, 
                          double volume, double price, string algo_id)
{
    // 1. Log the trading signal/decision
    string decision_factors = StringFormat(
        "[{\"indicator\":\"EMA_cross\",\"value\":\"bullish\"},"
        "{\"indicator\":\"RSI\",\"value\":\"%.1f\"}]",
        iRSI(symbol, PERIOD_H1, 14, PRICE_CLOSE, 0)
    );

    string trace_id = "";  // Will be set by LogSignal
    int sig_result = VCP_LogSignal(
        symbol, 
        algo_id, 
        "1.0.0",
        "0.85",
        decision_factors
    );

    if(sig_result != 0)
        Print("VCP LogSignal error: ", sig_result);

    // Retrieve trace_id from last logged event
    // (In production, modify LogSignal to return trace_id)
    trace_id = g_VCPLogger.GetLastTraceID();

    // 2. Execute the trade
    MqlTradeRequest request = {};
    MqlTradeResult result = {};

    request.action = TRADE_ACTION_DEAL;
    request.symbol = symbol;
    request.volume = volume;
    request.type = order_type;
    request.price = price;
    request.deviation = 10;
    request.magic = 123456;

    // 3. Log order submission
    VCP_LogOrder(
        symbol,
        trace_id,
        0,  // ticket not yet assigned
        order_type == ORDER_TYPE_BUY ? "BUY" : "SELL",
        "MARKET",
        DoubleToString(price, (int)SymbolInfoInteger(symbol, SYMBOL_DIGITS)),
        DoubleToString(volume, 2)
    );

    // 4. Send order
    if(OrderSend(request, result))
    {
        // 5. Log execution
        VCP_LogExecution(
            symbol,
            trace_id,
            result.order,
            result.deal,
            DoubleToString(result.price, (int)SymbolInfoInteger(symbol, SYMBOL_DIGITS)),
            DoubleToString(result.volume, 2),
            DoubleToString(result.price - price, (int)SymbolInfoInteger(symbol, SYMBOL_DIGITS))
        );

        Print("Trade executed and logged. Ticket: ", result.order);
    }
    else
    {
        // 6. Log rejection
        VCP_LogReject(
            symbol,
            trace_id,
            0,
            result.comment,
            IntegerToString(result.retcode)
        );

        Print("Trade rejected: ", result.comment);
    }
}

//+------------------------------------------------------------------+
//| Main tick handler                                                 |
//+------------------------------------------------------------------+
void OnTick()
{
    // Your trading logic here
    string symbol = Symbol();

    // Example: Simple EMA crossover
    double ema_fast = iMA(symbol, PERIOD_H1, 20, 0, MODE_EMA, PRICE_CLOSE, 0);
    double ema_slow = iMA(symbol, PERIOD_H1, 50, 0, MODE_EMA, PRICE_CLOSE, 0);
    double ema_fast_prev = iMA(symbol, PERIOD_H1, 20, 0, MODE_EMA, PRICE_CLOSE, 1);
    double ema_slow_prev = iMA(symbol, PERIOD_H1, 50, 0, MODE_EMA, PRICE_CLOSE, 1);

    // Bullish crossover
    if(ema_fast_prev < ema_slow_prev && ema_fast > ema_slow)
    {
        double price = SymbolInfoDouble(symbol, SYMBOL_ASK);
        ExecuteTradeWithVCP(symbol, ORDER_TYPE_BUY, 0.1, price, "ema-crossover-v1");
    }

    // Bearish crossover
    if(ema_fast_prev > ema_slow_prev && ema_fast < ema_slow)
    {
        double price = SymbolInfoDouble(symbol, SYMBOL_BID);
        ExecuteTradeWithVCP(symbol, ORDER_TYPE_SELL, 0.1, price, "ema-crossover-v1");
    }
}
Enter fullscreen mode Exit fullscreen mode

VCP-XREF: Dual Logging for Dispute Resolution

One of v1.1's most powerful features is VCP-XREF—dual logging where both parties in a transaction maintain independent VCP chains that can be cross-referenced.

Use Case: Prop Firm Payout Disputes

@dataclass
class VCPXref:
    """VCP-XREF extension for cross-reference logging."""
    cross_reference_id: str
    party_role: str  # INITIATOR | COUNTERPARTY | OBSERVER
    counterparty_id: str
    shared_event_key: dict
    reconciliation_status: str  # PENDING | MATCHED | DISCREPANCY | TIMEOUT

class DualLogger:
    """
    Dual logging implementation for trader <-> prop firm scenarios.
    """

    def __init__(self, party_id: str, role: str):
        self.party_id = party_id
        self.role = role
        self.logger = VCPLogger(
            venue_id=f"PROPFIRM-{party_id}",
            policy_id=f"org.propfirm.{party_id}:dual-log-v1"
        )

    def log_trade_with_xref(self, 
                            event_type: EventType,
                            symbol: str,
                            account_id: str,
                            trade_data: dict,
                            counterparty_id: str,
                            cross_ref_id: str = None) -> VCPEvent:
        """Log trade event with cross-reference metadata."""

        # Generate cross-reference ID if not provided
        if cross_ref_id is None:
            cross_ref_id = generate_uuid_v7()

        # Add XREF extension to payload
        payload = {
            "trade_data": trade_data,
            "vcp_xref": {
                "cross_reference_id": cross_ref_id,
                "party_role": self.role,
                "counterparty_id": counterparty_id,
                "shared_event_key": {
                    "order_id": trade_data.get("order_id"),
                    "timestamp_int": str(int(datetime.now(timezone.utc).timestamp() * 1000)),
                    "tolerance_ms": 1000  # 1 second tolerance for matching
                },
                "reconciliation_status": "PENDING"
            }
        }

        return self.logger.log_event(
            event_type=event_type,
            symbol=symbol,
            account_id=account_id,
            payload=payload
        )


def reconcile_dual_logs(trader_events: List[VCPEvent], 
                        propfirm_events: List[VCPEvent]) -> dict:
    """
    Reconcile events from two independent VCP logs.

    Finds matching events by cross_reference_id and flags discrepancies.
    """
    results = {
        "matched": [],
        "discrepancies": [],
        "trader_only": [],
        "propfirm_only": []
    }

    # Index prop firm events by cross_reference_id
    propfirm_index = {}
    for event in propfirm_events:
        xref = event.payload.get("vcp_xref", {})
        xref_id = xref.get("cross_reference_id")
        if xref_id:
            propfirm_index[xref_id] = event

    # Match trader events
    matched_xrefs = set()
    for trader_event in trader_events:
        xref = trader_event.payload.get("vcp_xref", {})
        xref_id = xref.get("cross_reference_id")

        if xref_id and xref_id in propfirm_index:
            propfirm_event = propfirm_index[xref_id]
            matched_xrefs.add(xref_id)

            # Compare trade data
            trader_trade = trader_event.payload.get("trade_data", {})
            propfirm_trade = propfirm_event.payload.get("trade_data", {})

            discrepancies = []
            for key in set(trader_trade.keys()) | set(propfirm_trade.keys()):
                if trader_trade.get(key) != propfirm_trade.get(key):
                    discrepancies.append({
                        "field": key,
                        "trader_value": trader_trade.get(key),
                        "propfirm_value": propfirm_trade.get(key)
                    })

            if discrepancies:
                results["discrepancies"].append({
                    "cross_reference_id": xref_id,
                    "trader_event_id": trader_event.header.event_id,
                    "propfirm_event_id": propfirm_event.header.event_id,
                    "discrepancies": discrepancies
                })
            else:
                results["matched"].append({
                    "cross_reference_id": xref_id,
                    "trader_event_id": trader_event.header.event_id,
                    "propfirm_event_id": propfirm_event.header.event_id
                })
        else:
            results["trader_only"].append(trader_event.header.event_id)

    # Find prop firm events without trader match
    for xref_id, event in propfirm_index.items():
        if xref_id not in matched_xrefs:
            results["propfirm_only"].append(event.header.event_id)

    return results


# Example usage
trader_logger = DualLogger("TRADER_001", "INITIATOR")
propfirm_logger = DualLogger("PROPFIRM_ABC", "COUNTERPARTY")

# Trader logs execution
xref_id = generate_uuid_v7()
trader_event = trader_logger.log_trade_with_xref(
    event_type=EventType.EXE,
    symbol="EURUSD",
    account_id="ACC123",
    trade_data={
        "order_id": "ORD-001",
        "execution_price": "1.0850",
        "executed_qty": "100000",
        "pnl_realized": "150.00"
    },
    counterparty_id="PROPFIRM_ABC",
    cross_ref_id=xref_id
)

# Prop firm logs same execution (possibly with different values!)
propfirm_event = propfirm_logger.log_trade_with_xref(
    event_type=EventType.EXE,
    symbol="EURUSD",
    account_id="ACC123",
    trade_data={
        "order_id": "ORD-001",
        "execution_price": "1.0850",
        "executed_qty": "100000",
        "pnl_realized": "145.00"  # Discrepancy!
    },
    counterparty_id="TRADER_001",
    cross_ref_id=xref_id
)

# Reconcile
result = reconcile_dual_logs(
    trader_logger.logger.events,
    propfirm_logger.logger.events
)

print("Reconciliation result:")
print(f"  Matched: {len(result['matched'])}")
print(f"  Discrepancies: {len(result['discrepancies'])}")
if result['discrepancies']:
    for d in result['discrepancies']:
        print(f"    XREF: {d['cross_reference_id']}")
        for disc in d['discrepancies']:
            print(f"      {disc['field']}: trader={disc['trader_value']}, propfirm={disc['propfirm_value']}")
Enter fullscreen mode Exit fullscreen mode

Production Deployment: Putting It Together

Here's a complete example of a production-ready VCP integration:

import asyncio
from dataclasses import dataclass
from typing import Optional, Callable
import httpx
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("vcp")

@dataclass
class VCPConfig:
    api_key: str
    endpoint: str
    venue_id: str
    policy_id: str
    tier: str = "SILVER"
    batch_size: int = 50
    flush_interval_seconds: float = 1.0
    anchor_interval_seconds: float = 86400  # 24 hours for Silver

class ProductionVCPLogger:
    """Production-grade VCP logger with batching, anchoring, and error handling."""

    def __init__(self, config: VCPConfig):
        self.config = config
        self.event_logger = VCPLogger(
            venue_id=config.venue_id,
            policy_id=config.policy_id
        )
        self.pending_events: list[VCPEvent] = []
        self.last_anchor_time = datetime.now(timezone.utc)
        self.anchor_service = OpenTimestampsAPI()
        self._running = False
        self._flush_task: Optional[asyncio.Task] = None

    async def start(self):
        """Start background tasks."""
        self._running = True
        self._flush_task = asyncio.create_task(self._flush_loop())
        logger.info("VCP logger started")

    async def stop(self):
        """Stop and flush remaining events."""
        self._running = False
        if self._flush_task:
            self._flush_task.cancel()
            try:
                await self._flush_task
            except asyncio.CancelledError:
                pass

        # Final flush
        await self._flush_batch()
        await self._anchor_if_needed(force=True)
        logger.info("VCP logger stopped")

    def log(self, event_type: EventType, symbol: str, account_id: str,
            payload: dict, trace_id: Optional[str] = None) -> VCPEvent:
        """Log an event (non-blocking)."""
        event = self.event_logger.log_event(
            event_type=event_type,
            symbol=symbol,
            account_id=account_id,
            payload=payload,
            trace_id=trace_id
        )
        self.pending_events.append(event)
        return event

    async def _flush_loop(self):
        """Background task to periodically flush events."""
        while self._running:
            await asyncio.sleep(self.config.flush_interval_seconds)

            if len(self.pending_events) >= self.config.batch_size:
                await self._flush_batch()

            await self._anchor_if_needed()

    async def _flush_batch(self):
        """Send pending events to VCC."""
        if not self.pending_events:
            return

        batch = self.pending_events[:self.config.batch_size]

        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    f"{self.config.endpoint}/v1/events/batch",
                    json={
                        "events": [self._event_to_dict(e) for e in batch]
                    },
                    headers={
                        "X-API-Key": self.config.api_key,
                        "Content-Type": "application/json"
                    },
                    timeout=30.0
                )

                if response.status_code in (200, 201):
                    self.pending_events = self.pending_events[len(batch):]
                    logger.info(f"Flushed {len(batch)} events")
                else:
                    logger.error(f"Flush failed: {response.status_code} - {response.text}")

        except Exception as e:
            logger.error(f"Flush error: {e}")

    async def _anchor_if_needed(self, force: bool = False):
        """Create external anchor if interval has passed."""
        now = datetime.now(timezone.utc)
        elapsed = (now - self.last_anchor_time).total_seconds()

        if not force and elapsed < self.config.anchor_interval_seconds:
            return

        if not self.event_logger.events:
            return

        # Build Merkle tree
        tree = batch_events_to_merkle(self.event_logger.events)

        try:
            # Submit to OpenTimestamps
            digest = bytes.fromhex(tree.root)
            result = await self.anchor_service.submit_digest(digest)

            logger.info(f"Anchored Merkle root: {tree.root[:16]}...")
            self.last_anchor_time = now

            # Store anchor receipt
            # (In production, persist this to database)

        except Exception as e:
            logger.error(f"Anchor error: {e}")

    def _event_to_dict(self, event: VCPEvent) -> dict:
        """Convert event to JSON-serializable dict."""
        return {
            "header": asdict(event.header),
            "payload": event.payload,
            "security": asdict(event.security)
        }


# Usage example
async def main():
    config = VCPConfig(
        api_key="your-api-key",
        endpoint="https://api.veritaschain.org",
        venue_id="MT5-DEMO-BROKER",
        policy_id="org.example:trading-system-v1"
    )

    vcp = ProductionVCPLogger(config)
    await vcp.start()

    try:
        # Simulate trading activity
        for i in range(100):
            vcp.log(
                event_type=EventType.SIG,
                symbol="EURUSD",
                account_id="ACC001",
                payload={
                    "vcp_gov": {
                        "algo_id": "momentum-v1",
                        "confidence": f"0.{80 + i % 20}"
                    }
                }
            )
            await asyncio.sleep(0.1)

    finally:
        await vcp.stop()


if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Verification: The Auditor's Perspective

The real power of VCP is in verification. Here's how an auditor (or regulator, or dispute resolution service) can independently verify logs:

class VCPVerifier:
    """Independent verification of VCP logs without trusting the log producer."""

    def __init__(self, events: List[dict], merkle_root: str, anchor_proof: bytes):
        self.events = events
        self.claimed_root = merkle_root
        self.anchor_proof = anchor_proof

    def verify_all(self) -> dict:
        """Complete verification pipeline."""
        results = {
            "hash_chain_valid": False,
            "merkle_root_valid": False,
            "anchor_valid": False,
            "errors": []
        }

        # 1. Verify hash chain
        chain_result = self._verify_hash_chain()
        results["hash_chain_valid"] = chain_result["valid"]
        if not chain_result["valid"]:
            results["errors"].append(chain_result["error"])

        # 2. Verify Merkle tree
        merkle_result = self._verify_merkle_tree()
        results["merkle_root_valid"] = merkle_result["valid"]
        if not merkle_result["valid"]:
            results["errors"].append(merkle_result["error"])

        # 3. Verify external anchor
        anchor_result = self._verify_anchor()
        results["anchor_valid"] = anchor_result["valid"]
        if not anchor_result["valid"]:
            results["errors"].append(anchor_result["error"])

        results["overall_valid"] = all([
            results["hash_chain_valid"],
            results["merkle_root_valid"],
            results["anchor_valid"]
        ])

        return results

    def _verify_hash_chain(self) -> dict:
        """Verify event hash chain integrity."""
        prev_hash = "0" * 64

        for i, event in enumerate(self.events):
            security = event.get("security", {})

            # Check prev_hash link
            if security.get("prev_hash") != prev_hash:
                return {
                    "valid": False,
                    "error": f"Hash chain break at event {i}: prev_hash mismatch"
                }

            # Recompute event hash
            expected_hash = compute_event_hash(
                event.get("header", {}),
                event.get("payload", {}),
                prev_hash
            )

            if security.get("event_hash") != expected_hash:
                return {
                    "valid": False,
                    "error": f"Hash mismatch at event {i}: computed {expected_hash[:16]}... != claimed {security.get('event_hash', '')[:16]}..."
                }

            prev_hash = security.get("event_hash")

        return {"valid": True}

    def _verify_merkle_tree(self) -> dict:
        """Verify Merkle root matches events."""
        event_hashes = [
            e.get("security", {}).get("event_hash", "")
            for e in self.events
        ]

        tree = MerkleTree(event_hashes)

        if tree.root != self.claimed_root:
            return {
                "valid": False,
                "error": f"Merkle root mismatch: computed {tree.root[:16]}... != claimed {self.claimed_root[:16]}..."
            }

        return {"valid": True}

    def _verify_anchor(self) -> dict:
        """Verify external timestamp anchor."""
        ots = OpenTimestampsAnchor()

        try:
            result = ots.verify_timestamp(self.claimed_root, self.anchor_proof)
            return {
                "valid": result.get("verified", False),
                "details": result.get("details", "")
            }
        except Exception as e:
            return {
                "valid": False,
                "error": f"Anchor verification failed: {e}"
            }


# Usage
def audit_vcp_logs(log_file: str, anchor_file: str):
    """Audit a VCP log file."""
    import json

    with open(log_file) as f:
        data = json.load(f)

    with open(anchor_file, 'rb') as f:
        anchor_proof = f.read()

    verifier = VCPVerifier(
        events=data["events"],
        merkle_root=data["merkle_root"],
        anchor_proof=anchor_proof
    )

    result = verifier.verify_all()

    print("=== VCP Audit Report ===")
    print(f"Events analyzed: {len(data['events'])}")
    print(f"Hash chain valid: {result['hash_chain_valid']}")
    print(f"Merkle root valid: {result['merkle_root_valid']}")
    print(f"Anchor valid: {result['anchor_valid']}")
    print(f"\nOVERALL: {'✓ VERIFIED' if result['overall_valid'] else '✗ FAILED'}")

    if result['errors']:
        print("\nErrors:")
        for error in result['errors']:
            print(f"  - {error}")
Enter fullscreen mode Exit fullscreen mode

What's Next

VCP v1.1 is production-ready, but the ecosystem is expanding:

  • VCP v1.2 (draft): Enhanced recovery semantics, ERASURE event type for GDPR crypto-shredding
  • VAP Framework: Extending VCP patterns to AI systems beyond trading (healthcare, automotive, public administration)
  • IETF Standardization: draft-kamimura-scitt-vcp aligns VCP with IETF SCITT architecture

The full specification is available at github.com/veritaschain/vcp-spec. SDKs for Python, TypeScript, and MQL5 are in development.


Key Takeaways

  1. Hash chains detect modification but don't prevent omission—you need Merkle trees for completeness
  2. External anchoring is what makes logs verifiable by third parties without trusting the log producer
  3. UUID v7 enables time-ordered event IDs without separate timestamp comparisons
  4. RFC 8785 canonicalization is essential for deterministic hashing
  5. Dual logging (VCP-XREF) creates mutual accountability between trading parties

The shift from "trust me" to "verify this" isn't optional anymore. EU AI Act Article 12 mandates logging for high-risk AI. MiFID II RTS 25 requires 25μs timestamp accuracy. The Two Sigma case shows what happens when governance fails.

Build the infrastructure now. The regulators are coming.


Questions? Open an issue at github.com/veritaschain/vcp-spec or reach out at technical@veritaschain.org.


Tags: #python #mql5 #fintech #cryptography #trading #audit #blockchain #regulation #opensource

Top comments (0)