A hands-on guide to implementing cryptographic audit infrastructure that can survive regulatory scrutiny
TL;DR: When a hedge fund's model manipulation goes undetected for 22 months, when a fake tweet moves $2.4 trillion in 4 minutes, and when AI bots autonomously learn to collude—traditional logging isn't enough. This article walks through implementing VeritasChain Protocol (VCP) v1.1, an open standard for tamper-evident audit trails, with working code in Python and MQL5.
The Problem: "Trust Me" Doesn't Scale
In January 2025, the SEC announced a $90 million settlement with Two Sigma. The core failure? A researcher modified trading model parameters for 22 months without detection. The firm had logs. The firm had access controls. But the logs couldn't prove their own integrity.
This isn't a Two Sigma problem—it's an industry problem. Traditional server logs answer "what does the log say happened?" They can't answer "has this log been modified since the event occurred?"
The distinction matters when:
- Regulators demand proof of trading decisions (MiFID II RTS 25, EU AI Act Article 12)
- Prop firms dispute payout calculations with traders
- Exchanges need to prove order book state during flash crashes
- AI systems make decisions that require post-hoc explanation
VCP v1.1 addresses this through a three-layer architecture that transforms "trust me" logs into mathematically verifiable audit trails.
Architecture Overview: Three Layers of Integrity
┌─────────────────────────────────────────────────────────────────┐
│ Layer 3: External Verifiability │
│ (Public timestamping - Bitcoin, RFC 3161, OpenTimestamps) │
├─────────────────────────────────────────────────────────────────┤
│ Layer 2: Collection Integrity │
│ (Merkle Trees - proves batch completeness) │
├─────────────────────────────────────────────────────────────────┤
│ Layer 1: Event Integrity │
│ (Hash chains, signatures - proves individual event validity) │
└─────────────────────────────────────────────────────────────────┘
Layer 1 ensures each event hasn't been modified.
Layer 2 ensures no events have been omitted from a batch.
Layer 3 ensures the entire log state was committed at a specific time.
Let's implement each layer.
Layer 1: Event Integrity with Hash Chains
Every VCP event contains a cryptographic link to its predecessor. Modifying or deleting an event breaks the chain—and the break is detectable by anyone with the logs.
Python Implementation
import hashlib
import json
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional
from enum import Enum
class EventType(str, Enum):
SIG = "SIG" # Signal generated
ORD = "ORD" # Order submitted
ACK = "ACK" # Order acknowledged
EXE = "EXE" # Execution
REJ = "REJ" # Rejection
RSK = "RSK" # Risk event
ERR = "ERR" # Error
class TimestampPrecision(str, Enum):
NANOSECOND = "NANOSECOND"
MICROSECOND = "MICROSECOND"
MILLISECOND = "MILLISECOND"
class ClockSyncStatus(str, Enum):
PTP_LOCKED = "PTP_LOCKED"
NTP_SYNCED = "NTP_SYNCED"
BEST_EFFORT = "BEST_EFFORT"
@dataclass
class VCPHeader:
event_id: str
trace_id: str
timestamp_int: str
timestamp_iso: str
event_type: str
event_type_code: int
timestamp_precision: str
clock_sync_status: str
hash_algo: str
venue_id: str
symbol: str
account_id: str
policy_id: str # NEW in v1.1
@dataclass
class VCPTradePayload:
order_id: Optional[str] = None
side: Optional[str] = None
order_type: Optional[str] = None
price: Optional[str] = None
quantity: Optional[str] = None
execution_price: Optional[str] = None
slippage: Optional[str] = None
@dataclass
class VCPSecurity:
event_hash: str
prev_hash: str
@dataclass
class VCPEvent:
header: VCPHeader
payload: dict
security: VCPSecurity
UUID v7: Time-Ordered Identifiers
VCP mandates UUID v7 (RFC 9562) for event IDs. Unlike UUID v4, v7 embeds millisecond timestamps, enabling natural chronological ordering without separate timestamp comparisons.
def generate_uuid_v7() -> str:
"""Generate RFC 9562 compliant UUID v7."""
# Get current timestamp in milliseconds
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
# 48 bits of timestamp
timestamp_hex = format(timestamp_ms, '012x')
# 4 bits version (7) + 12 bits random
rand_a = uuid.uuid4().hex[:3]
version_rand_a = '7' + rand_a
# 2 bits variant (10) + 62 bits random
rand_b_int = int(uuid.uuid4().hex[:16], 16)
rand_b_int = (rand_b_int & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000
rand_b = format(rand_b_int, '016x')
# Assemble: timestamp (48) + version (4) + rand_a (12) + variant (2) + rand_b (62)
uuid_hex = timestamp_hex + version_rand_a + rand_b
# Format as UUID string
return f"{uuid_hex[:8]}-{uuid_hex[8:12]}-{uuid_hex[12:16]}-{uuid_hex[16:20]}-{uuid_hex[20:]}"
RFC 8785 JSON Canonicalization
Hash computation requires deterministic serialization. VCP uses RFC 8785 (JSON Canonicalization Scheme) to ensure identical hashes regardless of field ordering or whitespace.
def canonicalize_json(obj: dict) -> str:
"""
RFC 8785 compliant JSON canonicalization.
Key requirements:
- Keys sorted lexicographically (by UTF-16 code units)
- No whitespace
- Numbers without unnecessary precision
- Unicode escaped consistently
"""
def _canonicalize(value):
if isinstance(value, dict):
# Sort keys lexicographically
sorted_items = sorted(value.items(), key=lambda x: x[0])
return '{' + ','.join(
f'"{k}":{_canonicalize(v)}'
for k, v in sorted_items
if v is not None
) + '}'
elif isinstance(value, list):
return '[' + ','.join(_canonicalize(v) for v in value) + ']'
elif isinstance(value, str):
# Escape special characters
escaped = value.replace('\\', '\\\\').replace('"', '\\"')
escaped = escaped.replace('\n', '\\n').replace('\r', '\\r')
escaped = escaped.replace('\t', '\\t')
return f'"{escaped}"'
elif isinstance(value, bool):
return 'true' if value else 'false'
elif isinstance(value, (int, float)):
# Handle numeric precision per RFC 8785
if isinstance(value, float):
if value.is_integer():
return str(int(value))
return repr(value)
return str(value)
elif value is None:
return 'null'
else:
raise TypeError(f"Cannot canonicalize type: {type(value)}")
return _canonicalize(obj)
def compute_event_hash(header: dict, payload: dict, prev_hash: str) -> str:
"""
Compute SHA-256 hash of canonicalized event data.
Hash input: canonical(header) || canonical(payload) || prev_hash
"""
header_canonical = canonicalize_json(header)
payload_canonical = canonicalize_json(payload)
hash_input = f"{header_canonical}{payload_canonical}{prev_hash}"
return hashlib.sha256(hash_input.encode('utf-8')).hexdigest()
Building a Hash Chain
class VCPLogger:
"""VCP-compliant event logger with hash chain."""
GENESIS_HASH = "0" * 64 # All zeros for chain start
def __init__(self,
venue_id: str,
policy_id: str,
precision: TimestampPrecision = TimestampPrecision.MILLISECOND,
clock_sync: ClockSyncStatus = ClockSyncStatus.BEST_EFFORT):
self.venue_id = venue_id
self.policy_id = policy_id
self.precision = precision
self.clock_sync = clock_sync
self.prev_hash = self.GENESIS_HASH
self.events: list[VCPEvent] = []
def _get_timestamp(self) -> tuple[str, str]:
"""Return (timestamp_int, timestamp_iso) tuple."""
now = datetime.now(timezone.utc)
if self.precision == TimestampPrecision.NANOSECOND:
ts_int = str(int(now.timestamp() * 1_000_000_000))
elif self.precision == TimestampPrecision.MICROSECOND:
ts_int = str(int(now.timestamp() * 1_000_000))
else: # MILLISECOND
ts_int = str(int(now.timestamp() * 1_000))
ts_iso = now.strftime('%Y-%m-%dT%H:%M:%S.') + f'{now.microsecond:06d}Z'
return ts_int, ts_iso
def log_event(self,
event_type: EventType,
symbol: str,
account_id: str,
payload: dict,
trace_id: Optional[str] = None) -> VCPEvent:
"""Create and log a VCP event with hash chain linking."""
event_id = generate_uuid_v7()
trace_id = trace_id or event_id
ts_int, ts_iso = self._get_timestamp()
header = VCPHeader(
event_id=event_id,
trace_id=trace_id,
timestamp_int=ts_int,
timestamp_iso=ts_iso,
event_type=event_type.value,
event_type_code=self._event_type_to_code(event_type),
timestamp_precision=self.precision.value,
clock_sync_status=self.clock_sync.value,
hash_algo="SHA256",
venue_id=self.venue_id,
symbol=symbol,
account_id=account_id,
policy_id=self.policy_id
)
# Compute hash with chain linking
header_dict = asdict(header)
event_hash = compute_event_hash(header_dict, payload, self.prev_hash)
security = VCPSecurity(
event_hash=event_hash,
prev_hash=self.prev_hash
)
event = VCPEvent(
header=header,
payload=payload,
security=security
)
# Update chain state
self.prev_hash = event_hash
self.events.append(event)
return event
def _event_type_to_code(self, event_type: EventType) -> int:
"""Map event type to numeric code."""
codes = {
EventType.SIG: 1, EventType.ORD: 2, EventType.ACK: 3,
EventType.EXE: 4, EventType.REJ: 6, EventType.RSK: 21,
EventType.ERR: 99
}
return codes.get(event_type, 0)
def verify_chain(self) -> tuple[bool, Optional[int]]:
"""
Verify the entire hash chain.
Returns:
(is_valid, first_invalid_index)
- (True, None) if chain is valid
- (False, index) if chain breaks at index
"""
prev_hash = self.GENESIS_HASH
for i, event in enumerate(self.events):
# Verify prev_hash link
if event.security.prev_hash != prev_hash:
return False, i
# Recompute and verify event hash
header_dict = asdict(event.header)
expected_hash = compute_event_hash(
header_dict,
event.payload,
prev_hash
)
if event.security.event_hash != expected_hash:
return False, i
prev_hash = event.security.event_hash
return True, None
Usage Example: Complete Trade Lifecycle
# Initialize logger
logger = VCPLogger(
venue_id="MT5-BROKER-DEMO",
policy_id="org.example.trading:gold-algo-v2",
precision=TimestampPrecision.MILLISECOND,
clock_sync=ClockSyncStatus.NTP_SYNCED
)
# 1. Signal Event - trading decision made
signal_event = logger.log_event(
event_type=EventType.SIG,
symbol="XAUUSD",
account_id="trader_001",
payload={
"vcp_gov": {
"algo_id": "momentum-crossover-v3",
"algo_version": "3.2.1",
"algo_type": "RULE_BASED",
"confidence": "0.87",
"decision_factors": {
"ema_20": "2645.30",
"ema_50": "2638.75",
"rsi_14": "62.4",
"signal": "BULLISH_CROSSOVER"
}
}
}
)
print(f"Signal logged: {signal_event.header.event_id}")
# 2. Order Event - order submitted
order_event = logger.log_event(
event_type=EventType.ORD,
symbol="XAUUSD",
account_id="trader_001",
trace_id=signal_event.header.trace_id, # Link to signal
payload={
"trade_data": {
"order_id": "ORD-2025-001234",
"side": "BUY",
"order_type": "MARKET",
"quantity": "1.00",
"price": "2650.50"
}
}
)
print(f"Order logged: {order_event.header.event_id}")
# 3. Execution Event - order filled
exec_event = logger.log_event(
event_type=EventType.EXE,
symbol="XAUUSD",
account_id="trader_001",
trace_id=signal_event.header.trace_id,
payload={
"trade_data": {
"order_id": "ORD-2025-001234",
"broker_order_id": "BRK-98765432",
"side": "BUY",
"execution_price": "2650.55",
"executed_qty": "1.00",
"slippage": "0.05",
"commission": "7.50"
}
}
)
print(f"Execution logged: {exec_event.header.event_id}")
# Verify chain integrity
is_valid, break_point = logger.verify_chain()
print(f"\nChain valid: {is_valid}")
# Demonstrate tamper detection
print("\n--- Simulating tampering ---")
original_price = logger.events[1].payload["trade_data"]["price"]
logger.events[1].payload["trade_data"]["price"] = "2600.00" # Tamper!
is_valid, break_point = logger.verify_chain()
print(f"Chain valid after tampering: {is_valid}")
print(f"Tampering detected at event index: {break_point}")
# Restore original
logger.events[1].payload["trade_data"]["price"] = original_price
Output:
Signal logged: 019479a8-1234-7def-8abc-123456789abc
Order logged: 019479a8-1235-7ef0-9bcd-234567890bcd
Execution logged: 019479a8-1236-7f01-acde-345678901cde
Chain valid: True
--- Simulating tampering ---
Chain valid after tampering: False
Tampering detected at event index: 1
Layer 2: Collection Integrity with Merkle Trees
Hash chains detect modification, but they don't prevent omission. An adversary could delete events from the middle of a chain and re-compute all subsequent hashes.
Merkle trees solve this by creating a compact commitment to all events in a batch. Omitting a single event changes the root hash.
Merkle Tree Implementation
from typing import List, Tuple
class MerkleTree:
"""RFC 6962 compliant Merkle tree for VCP event batches."""
def __init__(self, event_hashes: List[str]):
self.leaves = event_hashes
self.tree: List[List[str]] = []
self.root = self._build_tree()
def _hash_pair(self, left: str, right: str) -> str:
"""Hash two nodes together."""
combined = bytes.fromhex(left) + bytes.fromhex(right)
return hashlib.sha256(combined).hexdigest()
def _build_tree(self) -> str:
"""Build Merkle tree and return root hash."""
if not self.leaves:
return "0" * 64
# Start with leaves
current_level = self.leaves.copy()
self.tree.append(current_level)
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
# If odd number of nodes, duplicate last one
right = current_level[i + 1] if i + 1 < len(current_level) else left
parent = self._hash_pair(left, right)
next_level.append(parent)
self.tree.append(next_level)
current_level = next_level
return current_level[0]
def get_proof(self, leaf_index: int) -> List[Tuple[str, str]]:
"""
Generate Merkle proof for a leaf.
Returns list of (hash, direction) tuples where direction is 'L' or 'R'
indicating the sibling's position relative to the path.
"""
if leaf_index >= len(self.leaves):
raise ValueError(f"Leaf index {leaf_index} out of range")
proof = []
current_index = leaf_index
for level in range(len(self.tree) - 1):
current_level = self.tree[level]
# Find sibling
if current_index % 2 == 0:
# Current is left child, sibling is right
sibling_index = current_index + 1
direction = 'R'
else:
# Current is right child, sibling is left
sibling_index = current_index - 1
direction = 'L'
# Handle edge case: odd number of nodes
if sibling_index < len(current_level):
sibling_hash = current_level[sibling_index]
else:
sibling_hash = current_level[current_index]
proof.append((sibling_hash, direction))
current_index //= 2
return proof
@staticmethod
def verify_proof(leaf_hash: str, proof: List[Tuple[str, str]], root: str) -> bool:
"""
Verify a Merkle proof.
This can be done client-side without trusting the server.
"""
current = leaf_hash
for sibling_hash, direction in proof:
if direction == 'L':
# Sibling is on the left
combined = bytes.fromhex(sibling_hash) + bytes.fromhex(current)
else:
# Sibling is on the right
combined = bytes.fromhex(current) + bytes.fromhex(sibling_hash)
current = hashlib.sha256(combined).hexdigest()
return current == root
# Example: Build Merkle tree from logged events
def batch_events_to_merkle(events: List[VCPEvent]) -> MerkleTree:
"""Create Merkle tree from VCP event batch."""
event_hashes = [event.security.event_hash for event in events]
return MerkleTree(event_hashes)
# Usage
tree = batch_events_to_merkle(logger.events)
print(f"Merkle root: {tree.root}")
# Generate proof for first event
proof = tree.get_proof(0)
print(f"\nProof for event 0: {proof}")
# Verify proof (can be done by any third party)
is_valid = MerkleTree.verify_proof(
leaf_hash=logger.events[0].security.event_hash,
proof=proof,
root=tree.root
)
print(f"Proof valid: {is_valid}")
The Completeness Guarantee
Here's the key insight: if a batch of events produces Merkle root R, and that root is anchored externally, then:
-
No event can be modified without changing
R -
No event can be omitted without changing
R -
No event can be inserted without changing
R
The external anchor (Layer 3) prevents re-computation of R after the fact.
Layer 3: External Anchoring
The final layer commits the Merkle root to an external, independent timestamping service. VCP v1.1 requires this for all tiers:
| Tier | Anchor Frequency | Acceptable Services |
|---|---|---|
| Platinum | Every 10 minutes | Bitcoin, Ethereum, RFC 3161 TSA |
| Gold | Every 1 hour | RFC 3161 TSA, Database with attestation |
| Silver | Every 24 hours | OpenTimestamps, FreeTSA |
OpenTimestamps Integration (Silver Tier)
OpenTimestamps is free and anchors to Bitcoin, making it ideal for Silver tier implementations.
import subprocess
import tempfile
import os
from pathlib import Path
class OpenTimestampsAnchor:
"""
External anchor using OpenTimestamps (ots).
Requires: pip install opentimestamps-client
"""
def __init__(self, ots_binary: str = "ots"):
self.ots_binary = ots_binary
def create_timestamp(self, merkle_root: str) -> bytes:
"""
Submit Merkle root to OpenTimestamps.
Returns the .ots proof file contents.
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write(merkle_root)
temp_path = f.name
try:
# Create timestamp
result = subprocess.run(
[self.ots_binary, "stamp", temp_path],
capture_output=True,
text=True
)
if result.returncode != 0:
raise RuntimeError(f"OTS stamp failed: {result.stderr}")
# Read the .ots proof file
ots_path = temp_path + ".ots"
with open(ots_path, 'rb') as f:
proof = f.read()
# Cleanup
os.unlink(ots_path)
return proof
finally:
os.unlink(temp_path)
def verify_timestamp(self, merkle_root: str, proof: bytes) -> dict:
"""
Verify an OpenTimestamps proof.
Returns verification details or raises exception.
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write(merkle_root)
data_path = f.name
with tempfile.NamedTemporaryFile(mode='wb', suffix='.ots', delete=False) as f:
f.write(proof)
proof_path = f.name
try:
result = subprocess.run(
[self.ots_binary, "verify", proof_path],
capture_output=True,
text=True
)
# Parse verification result
if "Success!" in result.stdout or "Bitcoin block" in result.stdout:
# Extract block height and timestamp
return {
"verified": True,
"details": result.stdout.strip()
}
else:
return {
"verified": False,
"pending": "Pending confirmation in Bitcoin block" in result.stderr,
"details": result.stderr.strip()
}
finally:
os.unlink(data_path)
os.unlink(proof_path)
# Alternative: HTTP API for environments without CLI
class OpenTimestampsAPI:
"""HTTP-based OpenTimestamps interaction."""
OTS_CALENDARS = [
"https://a.pool.opentimestamps.org",
"https://b.pool.opentimestamps.org",
"https://alice.btc.calendar.opentimestamps.org",
]
async def submit_digest(self, digest: bytes) -> dict:
"""Submit hash digest to OTS calendars."""
import httpx
results = {}
async with httpx.AsyncClient() as client:
for calendar in self.OTS_CALENDARS:
try:
response = await client.post(
f"{calendar}/digest",
content=digest,
headers={"Content-Type": "application/octet-stream"}
)
if response.status_code == 200:
results[calendar] = response.content
except Exception as e:
results[calendar] = f"Error: {e}"
return results
RFC 3161 TSA Integration (Gold/Platinum Tier)
For production environments, RFC 3161 Time-Stamp Authority provides legally recognized timestamps.
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.x509 import load_pem_x509_certificate
import httpx
class RFC3161Anchor:
"""RFC 3161 Time-Stamp Authority integration."""
# Free TSA services
TSA_URLS = {
"freetsa": "https://freetsa.org/tsr",
"digicert": "http://timestamp.digicert.com",
"sectigo": "http://timestamp.sectigo.com",
}
def __init__(self, tsa_url: str = None):
self.tsa_url = tsa_url or self.TSA_URLS["freetsa"]
def create_timestamp_request(self, digest: bytes) -> bytes:
"""
Create RFC 3161 TimeStampReq.
Using cryptography library for ASN.1 encoding.
"""
from asn1crypto import tsp, algos, core
# Build MessageImprint
message_imprint = tsp.MessageImprint({
'hash_algorithm': algos.DigestAlgorithm({
'algorithm': 'sha256'
}),
'hashed_message': digest
})
# Build TimeStampReq
ts_request = tsp.TimeStampReq({
'version': 1,
'message_imprint': message_imprint,
'cert_req': True
})
return ts_request.dump()
async def get_timestamp(self, merkle_root: str) -> dict:
"""
Request timestamp from TSA.
Returns timestamp token and metadata.
"""
# Hash the Merkle root
digest = bytes.fromhex(merkle_root)
# Create TSA request
ts_request = self.create_timestamp_request(digest)
async with httpx.AsyncClient() as client:
response = await client.post(
self.tsa_url,
content=ts_request,
headers={
"Content-Type": "application/timestamp-query"
}
)
if response.status_code != 200:
raise RuntimeError(f"TSA request failed: {response.status_code}")
return {
"tsa_url": self.tsa_url,
"timestamp_token": base64.b64encode(response.content).decode(),
"merkle_root": merkle_root
}
MQL5 Integration: The Sidecar Pattern
For MetaTrader 5 EAs, VCP uses a "sidecar" pattern—the EA logs events to VCP without modifying its core trading logic.
MQL5 Bridge Implementation
//+------------------------------------------------------------------+
//| VCP Integration Example for MT5 EA |
//+------------------------------------------------------------------+
#include "vcp_mql_bridge_v1_0.mqh"
// Global configuration
VCP_CONFIG g_config;
//+------------------------------------------------------------------+
//| Expert initialization |
//+------------------------------------------------------------------+
int OnInit()
{
// Configure VCP
g_config.api_key = "your-api-key";
g_config.endpoint = "https://api.veritaschain.org";
g_config.venue_id = "MT5-" + AccountInfoString(ACCOUNT_COMPANY);
g_config.tier = VCP_TIER_SILVER;
g_config.async_mode = true;
g_config.queue_max_size = 1000;
g_config.batch_size = 50;
// Initialize VCP logger
int result = VCP_Initialize(g_config);
if(result != 0)
{
Print("VCP initialization failed: ", result);
return INIT_FAILED;
}
// Set timer for async queue processing
EventSetMillisecondTimer(100);
Print("VCP initialized successfully");
return INIT_SUCCEEDED;
}
//+------------------------------------------------------------------+
//| Expert deinitialization |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
{
// Flush remaining events
while(VCP_GetQueueSize() > 0)
{
VCP_ProcessQueue();
Sleep(100);
}
VCP_Shutdown();
}
//+------------------------------------------------------------------+
//| Timer event - process VCP queue |
//+------------------------------------------------------------------+
void OnTimer()
{
VCP_ProcessQueue();
}
//+------------------------------------------------------------------+
//| Example: Log trading decision and order |
//+------------------------------------------------------------------+
void ExecuteTradeWithVCP(string symbol, ENUM_ORDER_TYPE order_type,
double volume, double price, string algo_id)
{
// 1. Log the trading signal/decision
string decision_factors = StringFormat(
"[{\"indicator\":\"EMA_cross\",\"value\":\"bullish\"},"
"{\"indicator\":\"RSI\",\"value\":\"%.1f\"}]",
iRSI(symbol, PERIOD_H1, 14, PRICE_CLOSE, 0)
);
string trace_id = ""; // Will be set by LogSignal
int sig_result = VCP_LogSignal(
symbol,
algo_id,
"1.0.0",
"0.85",
decision_factors
);
if(sig_result != 0)
Print("VCP LogSignal error: ", sig_result);
// Retrieve trace_id from last logged event
// (In production, modify LogSignal to return trace_id)
trace_id = g_VCPLogger.GetLastTraceID();
// 2. Execute the trade
MqlTradeRequest request = {};
MqlTradeResult result = {};
request.action = TRADE_ACTION_DEAL;
request.symbol = symbol;
request.volume = volume;
request.type = order_type;
request.price = price;
request.deviation = 10;
request.magic = 123456;
// 3. Log order submission
VCP_LogOrder(
symbol,
trace_id,
0, // ticket not yet assigned
order_type == ORDER_TYPE_BUY ? "BUY" : "SELL",
"MARKET",
DoubleToString(price, (int)SymbolInfoInteger(symbol, SYMBOL_DIGITS)),
DoubleToString(volume, 2)
);
// 4. Send order
if(OrderSend(request, result))
{
// 5. Log execution
VCP_LogExecution(
symbol,
trace_id,
result.order,
result.deal,
DoubleToString(result.price, (int)SymbolInfoInteger(symbol, SYMBOL_DIGITS)),
DoubleToString(result.volume, 2),
DoubleToString(result.price - price, (int)SymbolInfoInteger(symbol, SYMBOL_DIGITS))
);
Print("Trade executed and logged. Ticket: ", result.order);
}
else
{
// 6. Log rejection
VCP_LogReject(
symbol,
trace_id,
0,
result.comment,
IntegerToString(result.retcode)
);
Print("Trade rejected: ", result.comment);
}
}
//+------------------------------------------------------------------+
//| Main tick handler |
//+------------------------------------------------------------------+
void OnTick()
{
// Your trading logic here
string symbol = Symbol();
// Example: Simple EMA crossover
double ema_fast = iMA(symbol, PERIOD_H1, 20, 0, MODE_EMA, PRICE_CLOSE, 0);
double ema_slow = iMA(symbol, PERIOD_H1, 50, 0, MODE_EMA, PRICE_CLOSE, 0);
double ema_fast_prev = iMA(symbol, PERIOD_H1, 20, 0, MODE_EMA, PRICE_CLOSE, 1);
double ema_slow_prev = iMA(symbol, PERIOD_H1, 50, 0, MODE_EMA, PRICE_CLOSE, 1);
// Bullish crossover
if(ema_fast_prev < ema_slow_prev && ema_fast > ema_slow)
{
double price = SymbolInfoDouble(symbol, SYMBOL_ASK);
ExecuteTradeWithVCP(symbol, ORDER_TYPE_BUY, 0.1, price, "ema-crossover-v1");
}
// Bearish crossover
if(ema_fast_prev > ema_slow_prev && ema_fast < ema_slow)
{
double price = SymbolInfoDouble(symbol, SYMBOL_BID);
ExecuteTradeWithVCP(symbol, ORDER_TYPE_SELL, 0.1, price, "ema-crossover-v1");
}
}
VCP-XREF: Dual Logging for Dispute Resolution
One of v1.1's most powerful features is VCP-XREF—dual logging where both parties in a transaction maintain independent VCP chains that can be cross-referenced.
Use Case: Prop Firm Payout Disputes
@dataclass
class VCPXref:
"""VCP-XREF extension for cross-reference logging."""
cross_reference_id: str
party_role: str # INITIATOR | COUNTERPARTY | OBSERVER
counterparty_id: str
shared_event_key: dict
reconciliation_status: str # PENDING | MATCHED | DISCREPANCY | TIMEOUT
class DualLogger:
"""
Dual logging implementation for trader <-> prop firm scenarios.
"""
def __init__(self, party_id: str, role: str):
self.party_id = party_id
self.role = role
self.logger = VCPLogger(
venue_id=f"PROPFIRM-{party_id}",
policy_id=f"org.propfirm.{party_id}:dual-log-v1"
)
def log_trade_with_xref(self,
event_type: EventType,
symbol: str,
account_id: str,
trade_data: dict,
counterparty_id: str,
cross_ref_id: str = None) -> VCPEvent:
"""Log trade event with cross-reference metadata."""
# Generate cross-reference ID if not provided
if cross_ref_id is None:
cross_ref_id = generate_uuid_v7()
# Add XREF extension to payload
payload = {
"trade_data": trade_data,
"vcp_xref": {
"cross_reference_id": cross_ref_id,
"party_role": self.role,
"counterparty_id": counterparty_id,
"shared_event_key": {
"order_id": trade_data.get("order_id"),
"timestamp_int": str(int(datetime.now(timezone.utc).timestamp() * 1000)),
"tolerance_ms": 1000 # 1 second tolerance for matching
},
"reconciliation_status": "PENDING"
}
}
return self.logger.log_event(
event_type=event_type,
symbol=symbol,
account_id=account_id,
payload=payload
)
def reconcile_dual_logs(trader_events: List[VCPEvent],
propfirm_events: List[VCPEvent]) -> dict:
"""
Reconcile events from two independent VCP logs.
Finds matching events by cross_reference_id and flags discrepancies.
"""
results = {
"matched": [],
"discrepancies": [],
"trader_only": [],
"propfirm_only": []
}
# Index prop firm events by cross_reference_id
propfirm_index = {}
for event in propfirm_events:
xref = event.payload.get("vcp_xref", {})
xref_id = xref.get("cross_reference_id")
if xref_id:
propfirm_index[xref_id] = event
# Match trader events
matched_xrefs = set()
for trader_event in trader_events:
xref = trader_event.payload.get("vcp_xref", {})
xref_id = xref.get("cross_reference_id")
if xref_id and xref_id in propfirm_index:
propfirm_event = propfirm_index[xref_id]
matched_xrefs.add(xref_id)
# Compare trade data
trader_trade = trader_event.payload.get("trade_data", {})
propfirm_trade = propfirm_event.payload.get("trade_data", {})
discrepancies = []
for key in set(trader_trade.keys()) | set(propfirm_trade.keys()):
if trader_trade.get(key) != propfirm_trade.get(key):
discrepancies.append({
"field": key,
"trader_value": trader_trade.get(key),
"propfirm_value": propfirm_trade.get(key)
})
if discrepancies:
results["discrepancies"].append({
"cross_reference_id": xref_id,
"trader_event_id": trader_event.header.event_id,
"propfirm_event_id": propfirm_event.header.event_id,
"discrepancies": discrepancies
})
else:
results["matched"].append({
"cross_reference_id": xref_id,
"trader_event_id": trader_event.header.event_id,
"propfirm_event_id": propfirm_event.header.event_id
})
else:
results["trader_only"].append(trader_event.header.event_id)
# Find prop firm events without trader match
for xref_id, event in propfirm_index.items():
if xref_id not in matched_xrefs:
results["propfirm_only"].append(event.header.event_id)
return results
# Example usage
trader_logger = DualLogger("TRADER_001", "INITIATOR")
propfirm_logger = DualLogger("PROPFIRM_ABC", "COUNTERPARTY")
# Trader logs execution
xref_id = generate_uuid_v7()
trader_event = trader_logger.log_trade_with_xref(
event_type=EventType.EXE,
symbol="EURUSD",
account_id="ACC123",
trade_data={
"order_id": "ORD-001",
"execution_price": "1.0850",
"executed_qty": "100000",
"pnl_realized": "150.00"
},
counterparty_id="PROPFIRM_ABC",
cross_ref_id=xref_id
)
# Prop firm logs same execution (possibly with different values!)
propfirm_event = propfirm_logger.log_trade_with_xref(
event_type=EventType.EXE,
symbol="EURUSD",
account_id="ACC123",
trade_data={
"order_id": "ORD-001",
"execution_price": "1.0850",
"executed_qty": "100000",
"pnl_realized": "145.00" # Discrepancy!
},
counterparty_id="TRADER_001",
cross_ref_id=xref_id
)
# Reconcile
result = reconcile_dual_logs(
trader_logger.logger.events,
propfirm_logger.logger.events
)
print("Reconciliation result:")
print(f" Matched: {len(result['matched'])}")
print(f" Discrepancies: {len(result['discrepancies'])}")
if result['discrepancies']:
for d in result['discrepancies']:
print(f" XREF: {d['cross_reference_id']}")
for disc in d['discrepancies']:
print(f" {disc['field']}: trader={disc['trader_value']}, propfirm={disc['propfirm_value']}")
Production Deployment: Putting It Together
Here's a complete example of a production-ready VCP integration:
import asyncio
from dataclasses import dataclass
from typing import Optional, Callable
import httpx
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("vcp")
@dataclass
class VCPConfig:
api_key: str
endpoint: str
venue_id: str
policy_id: str
tier: str = "SILVER"
batch_size: int = 50
flush_interval_seconds: float = 1.0
anchor_interval_seconds: float = 86400 # 24 hours for Silver
class ProductionVCPLogger:
"""Production-grade VCP logger with batching, anchoring, and error handling."""
def __init__(self, config: VCPConfig):
self.config = config
self.event_logger = VCPLogger(
venue_id=config.venue_id,
policy_id=config.policy_id
)
self.pending_events: list[VCPEvent] = []
self.last_anchor_time = datetime.now(timezone.utc)
self.anchor_service = OpenTimestampsAPI()
self._running = False
self._flush_task: Optional[asyncio.Task] = None
async def start(self):
"""Start background tasks."""
self._running = True
self._flush_task = asyncio.create_task(self._flush_loop())
logger.info("VCP logger started")
async def stop(self):
"""Stop and flush remaining events."""
self._running = False
if self._flush_task:
self._flush_task.cancel()
try:
await self._flush_task
except asyncio.CancelledError:
pass
# Final flush
await self._flush_batch()
await self._anchor_if_needed(force=True)
logger.info("VCP logger stopped")
def log(self, event_type: EventType, symbol: str, account_id: str,
payload: dict, trace_id: Optional[str] = None) -> VCPEvent:
"""Log an event (non-blocking)."""
event = self.event_logger.log_event(
event_type=event_type,
symbol=symbol,
account_id=account_id,
payload=payload,
trace_id=trace_id
)
self.pending_events.append(event)
return event
async def _flush_loop(self):
"""Background task to periodically flush events."""
while self._running:
await asyncio.sleep(self.config.flush_interval_seconds)
if len(self.pending_events) >= self.config.batch_size:
await self._flush_batch()
await self._anchor_if_needed()
async def _flush_batch(self):
"""Send pending events to VCC."""
if not self.pending_events:
return
batch = self.pending_events[:self.config.batch_size]
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.config.endpoint}/v1/events/batch",
json={
"events": [self._event_to_dict(e) for e in batch]
},
headers={
"X-API-Key": self.config.api_key,
"Content-Type": "application/json"
},
timeout=30.0
)
if response.status_code in (200, 201):
self.pending_events = self.pending_events[len(batch):]
logger.info(f"Flushed {len(batch)} events")
else:
logger.error(f"Flush failed: {response.status_code} - {response.text}")
except Exception as e:
logger.error(f"Flush error: {e}")
async def _anchor_if_needed(self, force: bool = False):
"""Create external anchor if interval has passed."""
now = datetime.now(timezone.utc)
elapsed = (now - self.last_anchor_time).total_seconds()
if not force and elapsed < self.config.anchor_interval_seconds:
return
if not self.event_logger.events:
return
# Build Merkle tree
tree = batch_events_to_merkle(self.event_logger.events)
try:
# Submit to OpenTimestamps
digest = bytes.fromhex(tree.root)
result = await self.anchor_service.submit_digest(digest)
logger.info(f"Anchored Merkle root: {tree.root[:16]}...")
self.last_anchor_time = now
# Store anchor receipt
# (In production, persist this to database)
except Exception as e:
logger.error(f"Anchor error: {e}")
def _event_to_dict(self, event: VCPEvent) -> dict:
"""Convert event to JSON-serializable dict."""
return {
"header": asdict(event.header),
"payload": event.payload,
"security": asdict(event.security)
}
# Usage example
async def main():
config = VCPConfig(
api_key="your-api-key",
endpoint="https://api.veritaschain.org",
venue_id="MT5-DEMO-BROKER",
policy_id="org.example:trading-system-v1"
)
vcp = ProductionVCPLogger(config)
await vcp.start()
try:
# Simulate trading activity
for i in range(100):
vcp.log(
event_type=EventType.SIG,
symbol="EURUSD",
account_id="ACC001",
payload={
"vcp_gov": {
"algo_id": "momentum-v1",
"confidence": f"0.{80 + i % 20}"
}
}
)
await asyncio.sleep(0.1)
finally:
await vcp.stop()
if __name__ == "__main__":
asyncio.run(main())
Verification: The Auditor's Perspective
The real power of VCP is in verification. Here's how an auditor (or regulator, or dispute resolution service) can independently verify logs:
class VCPVerifier:
"""Independent verification of VCP logs without trusting the log producer."""
def __init__(self, events: List[dict], merkle_root: str, anchor_proof: bytes):
self.events = events
self.claimed_root = merkle_root
self.anchor_proof = anchor_proof
def verify_all(self) -> dict:
"""Complete verification pipeline."""
results = {
"hash_chain_valid": False,
"merkle_root_valid": False,
"anchor_valid": False,
"errors": []
}
# 1. Verify hash chain
chain_result = self._verify_hash_chain()
results["hash_chain_valid"] = chain_result["valid"]
if not chain_result["valid"]:
results["errors"].append(chain_result["error"])
# 2. Verify Merkle tree
merkle_result = self._verify_merkle_tree()
results["merkle_root_valid"] = merkle_result["valid"]
if not merkle_result["valid"]:
results["errors"].append(merkle_result["error"])
# 3. Verify external anchor
anchor_result = self._verify_anchor()
results["anchor_valid"] = anchor_result["valid"]
if not anchor_result["valid"]:
results["errors"].append(anchor_result["error"])
results["overall_valid"] = all([
results["hash_chain_valid"],
results["merkle_root_valid"],
results["anchor_valid"]
])
return results
def _verify_hash_chain(self) -> dict:
"""Verify event hash chain integrity."""
prev_hash = "0" * 64
for i, event in enumerate(self.events):
security = event.get("security", {})
# Check prev_hash link
if security.get("prev_hash") != prev_hash:
return {
"valid": False,
"error": f"Hash chain break at event {i}: prev_hash mismatch"
}
# Recompute event hash
expected_hash = compute_event_hash(
event.get("header", {}),
event.get("payload", {}),
prev_hash
)
if security.get("event_hash") != expected_hash:
return {
"valid": False,
"error": f"Hash mismatch at event {i}: computed {expected_hash[:16]}... != claimed {security.get('event_hash', '')[:16]}..."
}
prev_hash = security.get("event_hash")
return {"valid": True}
def _verify_merkle_tree(self) -> dict:
"""Verify Merkle root matches events."""
event_hashes = [
e.get("security", {}).get("event_hash", "")
for e in self.events
]
tree = MerkleTree(event_hashes)
if tree.root != self.claimed_root:
return {
"valid": False,
"error": f"Merkle root mismatch: computed {tree.root[:16]}... != claimed {self.claimed_root[:16]}..."
}
return {"valid": True}
def _verify_anchor(self) -> dict:
"""Verify external timestamp anchor."""
ots = OpenTimestampsAnchor()
try:
result = ots.verify_timestamp(self.claimed_root, self.anchor_proof)
return {
"valid": result.get("verified", False),
"details": result.get("details", "")
}
except Exception as e:
return {
"valid": False,
"error": f"Anchor verification failed: {e}"
}
# Usage
def audit_vcp_logs(log_file: str, anchor_file: str):
"""Audit a VCP log file."""
import json
with open(log_file) as f:
data = json.load(f)
with open(anchor_file, 'rb') as f:
anchor_proof = f.read()
verifier = VCPVerifier(
events=data["events"],
merkle_root=data["merkle_root"],
anchor_proof=anchor_proof
)
result = verifier.verify_all()
print("=== VCP Audit Report ===")
print(f"Events analyzed: {len(data['events'])}")
print(f"Hash chain valid: {result['hash_chain_valid']}")
print(f"Merkle root valid: {result['merkle_root_valid']}")
print(f"Anchor valid: {result['anchor_valid']}")
print(f"\nOVERALL: {'✓ VERIFIED' if result['overall_valid'] else '✗ FAILED'}")
if result['errors']:
print("\nErrors:")
for error in result['errors']:
print(f" - {error}")
What's Next
VCP v1.1 is production-ready, but the ecosystem is expanding:
- VCP v1.2 (draft): Enhanced recovery semantics, ERASURE event type for GDPR crypto-shredding
- VAP Framework: Extending VCP patterns to AI systems beyond trading (healthcare, automotive, public administration)
- IETF Standardization: draft-kamimura-scitt-vcp aligns VCP with IETF SCITT architecture
The full specification is available at github.com/veritaschain/vcp-spec. SDKs for Python, TypeScript, and MQL5 are in development.
Key Takeaways
- Hash chains detect modification but don't prevent omission—you need Merkle trees for completeness
- External anchoring is what makes logs verifiable by third parties without trusting the log producer
- UUID v7 enables time-ordered event IDs without separate timestamp comparisons
- RFC 8785 canonicalization is essential for deterministic hashing
- Dual logging (VCP-XREF) creates mutual accountability between trading parties
The shift from "trust me" to "verify this" isn't optional anymore. EU AI Act Article 12 mandates logging for high-risk AI. MiFID II RTS 25 requires 25μs timestamp accuracy. The Two Sigma case shows what happens when governance fails.
Build the infrastructure now. The regulators are coming.
Questions? Open an issue at github.com/veritaschain/vcp-spec or reach out at technical@veritaschain.org.
Tags: #python #mql5 #fintech #cryptography #trading #audit #blockchain #regulation #opensource
Top comments (0)