When an aircraft crashes, investigators recover the flight data recorder—the "black box"—to reconstruct exactly what happened. Every input, every output, every system state is preserved with cryptographic precision. The recorder doesn't care what the pilot claims happened; it records what actually happened.
AI-driven trading systems need the same thing. And as of November 2025, the European Commission's Digital Omnibus proposal has made clear that they're going to require it.
This article walks through implementing the VeritasChain Protocol (VCP) v1.1—an open standard for cryptographic audit trails in algorithmic trading. We'll cover:
- Why traditional logging fails the "Verify, Don't Trust" test
- VCP's three-layer integrity architecture
- Implementing Merkle trees for batch completeness
- External anchoring strategies
- Real code examples in Python and MQL5
Let's build an AI flight recorder.
Table of Contents
- The Problem: Why Traditional Logs Aren't Enough
- EU AI Act Article 12: What's Actually Required
- VCP's Three-Layer Architecture
- Layer 1: Event Integrity
- Layer 2: Collection Integrity (Merkle Trees)
- Layer 3: External Verifiability
- Implementation: Python SDK
- Implementation: MQL5 Bridge
- External Anchoring Strategies
- Compliance Tier Selection
- Putting It All Together
The Problem: Why Traditional Logs Aren't Enough {#the-problem}
Here's a standard trading log entry:
{
"timestamp": "2025-01-14T10:30:00.123Z",
"event": "ORDER",
"symbol": "EURUSD",
"side": "BUY",
"quantity": 100000,
"price": 1.0925
}
Looks reasonable, right? Now consider these questions:
- How do you know this log wasn't modified after the fact?
- How do you prove no events were deleted?
- Can a third party verify this without trusting your database?
Traditional logging answers none of these. The database admin can modify records. The compliance team can delete inconvenient entries. When a regulator asks "prove your AI made this decision at this time with these inputs," you have... assertions.
The 2024-2025 prop trading crisis demonstrated this catastrophically. When firms collapsed and traders demanded their payouts, the firms' logs conveniently showed different execution prices than traders remembered. With no cryptographic proof, disputes became he-said-she-said arguments.
The fundamental problem: Traditional logs require trusting the log producer. VCP eliminates that requirement.
EU AI Act Article 12: What's Actually Required {#eu-ai-act-article-12}
The EU AI Act's Article 12 mandates that high-risk AI systems must have:
"...logging capabilities that enable the automatic recording of events ('logs') over the duration of the lifetime of the system."
But it's the implementation requirements that get interesting:
| Requirement | What It Means |
|---|---|
| Automatic recording | No manual intervention in the logging path |
| Traceability to identify risks | Reconstruct any decision chain |
| Facilitate post-market monitoring | Continuous, not just at audit time |
| Enable verification | Third parties can validate, not just view |
The upcoming prEN ISO/IEC 24970 (AI System Logging) standard adds:
- Timestamped events
- Traceability markers
- Data integrity checks
- Interpretability for authorized reviewers
Notice what's missing from both: how to implement these requirements. That's where VCP comes in.
VCP's Three-Layer Architecture {#three-layer-architecture}
VCP v1.1 introduces a clean separation of concerns across three layers:
┌─────────────────────────────────────────────────────────────────────┐
│ │
│ LAYER 3: External Verifiability │
│ ───────────────────────────────── │
│ Purpose: Third-party verification without trusting the producer │
│ │
│ Components: │
│ ├─ Digital Signature (Ed25519): REQUIRED │
│ ├─ Timestamp (dual format): REQUIRED │
│ └─ External Anchor (Blockchain/TSA): REQUIRED │
│ │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ LAYER 2: Collection Integrity │
│ ────────────────────────────── │
│ Purpose: Prove completeness of event batches │
│ │
│ Components: │
│ ├─ Merkle Tree (RFC 6962): REQUIRED │
│ ├─ Merkle Root: REQUIRED │
│ └─ Audit Path: REQUIRED │
│ │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ LAYER 1: Event Integrity │
│ ──────────────────────── │
│ Purpose: Individual event completeness │
│ │
│ Components: │
│ ├─ EventHash (SHA-256): REQUIRED │
│ └─ PrevHash (chain link): OPTIONAL │
│ │
└─────────────────────────────────────────────────────────────────────┘
Each layer builds on the previous:
- Layer 1 ensures individual events haven't been modified
- Layer 2 ensures no events were omitted from a batch
- Layer 3 ensures the batch existed at a specific time, verified by external parties
Let's implement each layer.
Layer 1: Event Integrity {#layer-1-event-integrity}
Every VCP event needs a cryptographic fingerprint. If any byte changes, the fingerprint changes completely.
Event Hash Calculation
import hashlib
import json
from typing import Any
def canonicalize_json(obj: Any) -> str:
"""
RFC 8785 JSON Canonicalization Scheme (JCS)
Ensures identical objects produce identical strings regardless
of key ordering or whitespace.
"""
return json.dumps(
obj,
separators=(',', ':'), # No whitespace
sort_keys=True, # Deterministic key order
ensure_ascii=False # UTF-8 support
)
def calculate_event_hash(
header: dict,
payload: dict,
algo: str = "SHA256"
) -> str:
"""
Calculate VCP event hash.
Args:
header: VCP header containing EventID, EventType, Timestamp
payload: Event-specific data (trade details, signals, etc.)
algo: Hash algorithm (SHA256, SHA3_256, BLAKE3)
Returns:
Hex-encoded hash string
Example:
>>> header = {
... "EventID": "019abc12-3456-7890-abcd-ef1234567890",
... "EventType": "ORD",
... "Timestamp": 1736848200123456
... }
>>> payload = {"Symbol": "EURUSD", "Side": "BUY", "Quantity": 100000}
>>> calculate_event_hash(header, payload)
'a7f3d2b1c4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1'
"""
# Step 1: Canonicalize both components
canonical_header = canonicalize_json(header)
canonical_payload = canonicalize_json(payload)
# Step 2: Concatenate
hash_input = (canonical_header + canonical_payload).encode('utf-8')
# Step 3: Apply hash function
if algo == "SHA256":
return hashlib.sha256(hash_input).hexdigest()
elif algo == "SHA3_256":
return hashlib.sha3_256(hash_input).hexdigest()
else:
raise ValueError(f"Unsupported algorithm: {algo}")
Optional: Hash Chain Linking
For real-time tamper detection (useful in HFT), you can chain events:
class VCPEventChain:
"""
Hash chain for real-time tamper detection.
OPTIONAL in VCP v1.1, but recommended for HFT systems.
"""
def __init__(self):
self.prev_hash = "0" * 64 # Genesis hash
self.events = []
def add_event(self, header: dict, payload: dict) -> dict:
"""Add event to chain, computing linked hash."""
# Include previous hash in calculation
canonical_header = canonicalize_json(header)
canonical_payload = canonicalize_json(payload)
hash_input = (
canonical_header +
canonical_payload +
self.prev_hash
).encode('utf-8')
event_hash = hashlib.sha256(hash_input).hexdigest()
# Create complete VCP event
vcp_event = {
"Header": {
**header,
"EventHash": event_hash,
"PrevHash": self.prev_hash,
"HashAlgo": "SHA256"
},
"Payload": payload
}
# Update chain state
self.prev_hash = event_hash
self.events.append(vcp_event)
return vcp_event
def verify_chain(self) -> bool:
"""Verify entire chain integrity."""
expected_prev = "0" * 64
for event in self.events:
header = {k: v for k, v in event["Header"].items()
if k not in ["EventHash", "PrevHash", "HashAlgo"]}
# Recalculate hash
hash_input = (
canonicalize_json(header) +
canonicalize_json(event["Payload"]) +
expected_prev
).encode('utf-8')
calculated = hashlib.sha256(hash_input).hexdigest()
if calculated != event["Header"]["EventHash"]:
return False
if event["Header"]["PrevHash"] != expected_prev:
return False
expected_prev = calculated
return True
Why PrevHash is Optional in v1.1
VCP v1.0 required hash chains. v1.1 made them optional because:
- Layer 2 + Layer 3 provides equivalent guarantees for batch-level integrity
- Simplifies Silver tier implementations (MT4/MT5 DLLs are complex enough)
- Backtesting scenarios generate events out of order
Use hash chains when you need real-time detection (HFT). Skip them when batch-level verification is sufficient.
Layer 2: Collection Integrity (Merkle Trees) {#layer-2-collection-integrity}
Individual event hashes prove events weren't modified. Merkle trees prove the collection is complete—no events were added or removed.
RFC 6962 Compliant Merkle Tree
VCP mandates RFC 6962 compliance to prevent second-preimage attacks:
from typing import List, Tuple
import hashlib
class VCPMerkleTree:
"""
RFC 6962 compliant Merkle tree implementation.
Domain separation prevents second-preimage attacks:
- Leaf nodes: 0x00 prefix
- Internal nodes: 0x01 prefix
"""
LEAF_PREFIX = b'\x00'
NODE_PREFIX = b'\x01'
def __init__(self, event_hashes: List[str]):
"""
Build Merkle tree from event hashes.
Args:
event_hashes: List of hex-encoded event hashes
"""
if not event_hashes:
raise ValueError("Cannot build tree from empty list")
self.leaves = [bytes.fromhex(h) for h in event_hashes]
self.tree = self._build_tree()
self.root = self.tree[-1][0]
def _hash_leaf(self, data: bytes) -> bytes:
"""Hash a leaf node with 0x00 prefix."""
return hashlib.sha256(self.LEAF_PREFIX + data).digest()
def _hash_node(self, left: bytes, right: bytes) -> bytes:
"""Hash an internal node with 0x01 prefix."""
return hashlib.sha256(self.NODE_PREFIX + left + right).digest()
def _build_tree(self) -> List[List[bytes]]:
"""Build complete Merkle tree, returning all levels."""
tree = []
# Level 0: Leaf hashes
current_level = [self._hash_leaf(leaf) for leaf in self.leaves]
tree.append(current_level)
# Build up to root
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
# If odd number, duplicate last node
right = current_level[i + 1] if i + 1 < len(current_level) else left
next_level.append(self._hash_node(left, right))
tree.append(next_level)
current_level = next_level
return tree
def get_root(self) -> str:
"""Get Merkle root as hex string."""
return self.root.hex()
def get_proof(self, index: int) -> List[Tuple[str, str]]:
"""
Get inclusion proof for event at index.
Returns:
List of (hash, position) tuples where position is 'L' or 'R'
"""
if index < 0 or index >= len(self.leaves):
raise IndexError(f"Index {index} out of range")
proof = []
current_index = index
for level in self.tree[:-1]: # Exclude root level
# Determine sibling
if current_index % 2 == 0:
# We're on the left, sibling is on right
sibling_index = current_index + 1
position = 'R'
else:
# We're on the right, sibling is on left
sibling_index = current_index - 1
position = 'L'
# Handle edge case where sibling doesn't exist
if sibling_index < len(level):
proof.append((level[sibling_index].hex(), position))
else:
proof.append((level[current_index].hex(), position))
current_index //= 2
return proof
@staticmethod
def verify_proof(
event_hash: str,
proof: List[Tuple[str, str]],
root: str
) -> bool:
"""
Verify an inclusion proof.
Args:
event_hash: Hash of the event to verify
proof: List of (sibling_hash, position) tuples
root: Expected Merkle root
Returns:
True if event is included in tree with given root
"""
current = hashlib.sha256(
VCPMerkleTree.LEAF_PREFIX + bytes.fromhex(event_hash)
).digest()
for sibling_hex, position in proof:
sibling = bytes.fromhex(sibling_hex)
if position == 'L':
current = hashlib.sha256(
VCPMerkleTree.NODE_PREFIX + sibling + current
).digest()
else:
current = hashlib.sha256(
VCPMerkleTree.NODE_PREFIX + current + sibling
).digest()
return current.hex() == root
Usage Example
# Simulate a batch of trading events
event_hashes = [
calculate_event_hash(
{"EventID": f"event-{i}", "EventType": "ORD", "Timestamp": 1736848200000000 + i},
{"Symbol": "EURUSD", "Side": "BUY", "Quantity": 100000}
)
for i in range(100)
]
# Build Merkle tree
tree = VCPMerkleTree(event_hashes)
print(f"Merkle Root: {tree.get_root()}")
# Generate proof for event 42
proof = tree.get_proof(42)
print(f"Proof for event 42: {len(proof)} nodes")
# Verify the proof
is_valid = VCPMerkleTree.verify_proof(
event_hashes[42],
proof,
tree.get_root()
)
print(f"Proof valid: {is_valid}")
# Try to verify a fake event
fake_hash = "0" * 64
is_fake_valid = VCPMerkleTree.verify_proof(fake_hash, proof, tree.get_root())
print(f"Fake proof valid: {is_fake_valid}") # False
Why Merkle Trees Matter for Compliance
A Merkle root is a single 32-byte value that represents an entire batch of events. If ANY event is added, removed, or modified, the root changes completely.
This means:
- Store the root externally (Layer 3)
- Keep the events locally
- Anyone can verify any event was in the batch by checking the inclusion proof against the anchored root
The regulator doesn't need access to your database. They just need the proof.
Layer 3: External Verifiability {#layer-3-external-verifiability}
Layers 1 and 2 can be computed locally. But a malicious operator could simply recompute them after modifying events. Layer 3 solves this by anchoring Merkle roots to external systems that the operator doesn't control.
Digital Signatures
Every VCP event batch must be signed:
from nacl.signing import SigningKey, VerifyKey
from nacl.encoding import HexEncoder
import time
class VCPSigner:
"""
Ed25519 digital signatures for VCP events.
Ed25519 is VCP's default because:
- Fast: ~71,000 signatures/second on commodity hardware
- Small: 64-byte signatures
- Secure: 128-bit security level
"""
def __init__(self, private_key_hex: str = None):
if private_key_hex:
self.signing_key = SigningKey(
bytes.fromhex(private_key_hex)
)
else:
self.signing_key = SigningKey.generate()
self.verify_key = self.signing_key.verify_key
def get_public_key(self) -> str:
"""Get public key for verification."""
return self.verify_key.encode(encoder=HexEncoder).decode()
def sign_merkle_root(
self,
merkle_root: str,
timestamp: int = None
) -> dict:
"""
Sign a Merkle root with timestamp.
Args:
merkle_root: Hex-encoded Merkle root
timestamp: Unix timestamp in microseconds (auto-generated if None)
Returns:
Signed anchor record
"""
timestamp = timestamp or int(time.time() * 1_000_000)
# Create message to sign
message = f"{merkle_root}:{timestamp}".encode('utf-8')
# Sign
signed = self.signing_key.sign(message)
return {
"MerkleRoot": merkle_root,
"Timestamp": timestamp,
"TimestampISO": self._format_timestamp(timestamp),
"Signature": signed.signature.hex(),
"SignerPublicKey": self.get_public_key(),
"SignAlgo": "ED25519"
}
@staticmethod
def verify_signature(anchor_record: dict) -> bool:
"""Verify a signed anchor record."""
try:
verify_key = VerifyKey(
bytes.fromhex(anchor_record["SignerPublicKey"])
)
message = (
f"{anchor_record['MerkleRoot']}:"
f"{anchor_record['Timestamp']}"
).encode('utf-8')
signature = bytes.fromhex(anchor_record["Signature"])
verify_key.verify(message, signature)
return True
except Exception:
return False
def _format_timestamp(self, ts_micros: int) -> str:
"""Format microsecond timestamp as ISO 8601."""
from datetime import datetime, timezone
dt = datetime.fromtimestamp(ts_micros / 1_000_000, tz=timezone.utc)
return dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
External Anchoring
The signature proves who created the anchor. External anchoring proves when it existed.
Option 1: OpenTimestamps (Free, Decentralized)
import subprocess
import tempfile
import os
class OpenTimestampsAnchor:
"""
Anchor Merkle roots to Bitcoin blockchain via OpenTimestamps.
Free, decentralized, but confirmation takes ~1-2 hours.
Suitable for Silver tier (daily anchoring).
"""
def __init__(self):
# Check OTS is installed
try:
subprocess.run(
["ots", "--version"],
capture_output=True,
check=True
)
except FileNotFoundError:
raise RuntimeError(
"OpenTimestamps CLI not found. "
"Install: pip install opentimestamps-client"
)
def create_timestamp(self, merkle_root: str) -> bytes:
"""
Create OpenTimestamps proof for a Merkle root.
Returns:
OTS proof file contents (binary)
"""
# Write Merkle root to temp file
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.txt',
delete=False
) as f:
f.write(merkle_root)
temp_path = f.name
try:
# Create timestamp
subprocess.run(
["ots", "stamp", temp_path],
capture_output=True,
check=True
)
# Read proof file
proof_path = temp_path + ".ots"
with open(proof_path, 'rb') as f:
proof = f.read()
os.unlink(proof_path)
return proof
finally:
os.unlink(temp_path)
def verify_timestamp(self, merkle_root: str, proof: bytes) -> dict:
"""
Verify an OpenTimestamps proof.
Returns:
Verification result including Bitcoin block info
"""
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.txt',
delete=False
) as f:
f.write(merkle_root)
temp_path = f.name
proof_path = temp_path + ".ots"
with open(proof_path, 'wb') as f:
f.write(proof)
try:
result = subprocess.run(
["ots", "verify", proof_path],
capture_output=True,
text=True
)
return {
"verified": result.returncode == 0,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None
}
finally:
os.unlink(temp_path)
os.unlink(proof_path)
Option 2: RFC 3161 Timestamping Authority
import requests
from asn1crypto import tsp, cms
import hashlib
class RFC3161Anchor:
"""
Anchor Merkle roots via RFC 3161 Time Stamp Authority.
Faster than blockchain (seconds), legally recognized in EU.
Suitable for Gold tier (hourly anchoring).
"""
# Free TSA servers (for development/testing)
FREE_TSA_URLS = [
"http://timestamp.digicert.com",
"http://timestamp.globalsign.com/tsa/r6advanced1",
"http://tsa.starfieldtech.com",
]
def __init__(self, tsa_url: str = None):
self.tsa_url = tsa_url or self.FREE_TSA_URLS[0]
def create_timestamp(self, merkle_root: str) -> bytes:
"""
Request RFC 3161 timestamp for Merkle root.
Returns:
DER-encoded timestamp response
"""
# Create timestamp request
hash_bytes = bytes.fromhex(merkle_root)
tsp_request = tsp.TimeStampReq({
'version': 1,
'message_imprint': {
'hash_algorithm': {'algorithm': 'sha256'},
'hashed_message': hash_bytes
},
'cert_req': True
})
# Send request
response = requests.post(
self.tsa_url,
data=tsp_request.dump(),
headers={'Content-Type': 'application/timestamp-query'}
)
if response.status_code != 200:
raise RuntimeError(f"TSA request failed: {response.status_code}")
return response.content
def parse_timestamp(self, response: bytes) -> dict:
"""
Parse RFC 3161 timestamp response.
Returns:
Parsed timestamp info including time and TSA identity
"""
tsp_response = tsp.TimeStampResp.load(response)
if tsp_response['status']['status'].native != 'granted':
raise RuntimeError(
f"Timestamp not granted: "
f"{tsp_response['status']['status'].native}"
)
token = tsp_response['time_stamp_token']
signed_data = cms.ContentInfo.load(token.dump())['content']
tst_info = tsp.TSTInfo.load(
signed_data['encap_content_info']['content'].native
)
return {
"timestamp": tst_info['gen_time'].native.isoformat(),
"serial_number": tst_info['serial_number'].native,
"tsa_name": str(tst_info['tsa']) if tst_info['tsa'] else None,
"hash_algorithm": tst_info['message_imprint']['hash_algorithm']['algorithm'].native
}
Option 3: AWS QLDB (Enterprise)
import boto3
from amazon.ion import simpleion
class QLDBAnchor:
"""
Anchor Merkle roots to AWS Quantum Ledger Database.
ACID-compliant, immutable, cryptographically verifiable.
Suitable for Platinum tier (10-minute anchoring).
"""
def __init__(self, ledger_name: str, region: str = "us-east-1"):
self.ledger_name = ledger_name
self.client = boto3.client('qldb-session', region_name=region)
def anchor(self, merkle_root: str, metadata: dict) -> dict:
"""
Anchor Merkle root to QLDB.
Returns:
QLDB document ID and commit digest
"""
from pyqldb.driver.qldb_driver import QldbDriver
driver = QldbDriver(self.ledger_name)
def insert_anchor(transaction_executor):
transaction_executor.execute_statement(
"INSERT INTO VCPAnchors ?",
{
"merkle_root": merkle_root,
"timestamp": metadata.get("timestamp"),
"policy_id": metadata.get("policy_id"),
"event_count": metadata.get("event_count")
}
)
result = driver.execute_lambda(insert_anchor)
# Get commit digest for verification
digest = self._get_digest()
return {
"ledger": self.ledger_name,
"document_id": result.get("documentId"),
"commit_digest": digest
}
def _get_digest(self) -> str:
"""Get current ledger digest for verification."""
qldb = boto3.client('qldb')
response = qldb.get_digest(Name=self.ledger_name)
return response['Digest'].hex()
Implementation: Python SDK {#implementation-python}
Let's put all three layers together into a complete VCP implementation:
"""
vcp_core.py - Complete VCP v1.1 implementation
"""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import uuid
import time
import json
import hashlib
from nacl.signing import SigningKey
class ConformanceTier(Enum):
SILVER = "SILVER"
GOLD = "GOLD"
PLATINUM = "PLATINUM"
class EventType(Enum):
SIG = "SIG" # Signal generated
ORD = "ORD" # Order sent
ACK = "ACK" # Order acknowledged
REJ = "REJ" # Order rejected
EXE = "EXE" # Execution (fill)
CXL = "CXL" # Cancellation
ERR_RISK = "ERR_RISK" # Risk limit breach
ERR_CONN = "ERR_CONN" # Connection error
@dataclass
class VCPEvent:
"""A single VCP event."""
event_id: str
event_type: EventType
timestamp: int # Microseconds since epoch
account_id: str
payload: Dict[str, Any]
# Layer 1
event_hash: str = ""
prev_hash: str = ""
# Policy
policy_id: str = ""
conformance_tier: ConformanceTier = ConformanceTier.SILVER
def __post_init__(self):
if not self.event_id:
self.event_id = str(uuid.uuid7())
if not self.timestamp:
self.timestamp = int(time.time() * 1_000_000)
def to_dict(self) -> dict:
return {
"Header": {
"EventID": self.event_id,
"EventType": self.event_type.value,
"Timestamp": self.timestamp,
"AccountID": self.account_id,
"EventHash": self.event_hash,
"PrevHash": self.prev_hash,
"HashAlgo": "SHA256"
},
"PolicyIdentification": {
"PolicyID": self.policy_id,
"ConformanceTier": self.conformance_tier.value,
"Version": "1.1"
},
"Payload": self.payload
}
class VCPLogger:
"""
Complete VCP v1.1 logging implementation.
Usage:
logger = VCPLogger(
policy_id="org.example.trading:algo-001",
tier=ConformanceTier.GOLD
)
# Log events
logger.log_signal("EURUSD", "BUY", 100000, 1.0925)
logger.log_order("ORD-001", "EURUSD", "BUY", 100000, 1.0925)
logger.log_execution("ORD-001", "EXE-001", 50000, 1.0925)
# Create anchor batch
anchor = logger.create_anchor_batch()
print(f"Merkle Root: {anchor['merkle_root']}")
"""
def __init__(
self,
policy_id: str,
tier: ConformanceTier = ConformanceTier.SILVER,
private_key: str = None,
use_hash_chain: bool = False
):
self.policy_id = policy_id
self.tier = tier
self.use_hash_chain = use_hash_chain
# Initialize signer
if private_key:
self.signing_key = SigningKey(bytes.fromhex(private_key))
else:
self.signing_key = SigningKey.generate()
# Event storage
self.events: List[VCPEvent] = []
self.prev_hash = "0" * 64
# Batch tracking
self.last_anchor_index = 0
def _calculate_hash(self, event: VCPEvent) -> str:
"""Calculate event hash (Layer 1)."""
header = {
"EventID": event.event_id,
"EventType": event.event_type.value,
"Timestamp": event.timestamp,
"AccountID": event.account_id
}
canonical = json.dumps(
{"header": header, "payload": event.payload},
separators=(',', ':'),
sort_keys=True
)
if self.use_hash_chain:
canonical += self.prev_hash
return hashlib.sha256(canonical.encode()).hexdigest()
def log_event(
self,
event_type: EventType,
account_id: str,
payload: Dict[str, Any]
) -> VCPEvent:
"""Log a generic VCP event."""
event = VCPEvent(
event_id="", # Auto-generated
event_type=event_type,
timestamp=0, # Auto-generated
account_id=account_id,
payload=payload,
policy_id=self.policy_id,
conformance_tier=self.tier
)
# Calculate hash
event.event_hash = self._calculate_hash(event)
if self.use_hash_chain:
event.prev_hash = self.prev_hash
self.prev_hash = event.event_hash
self.events.append(event)
return event
# Convenience methods for common event types
def log_signal(
self,
symbol: str,
side: str,
quantity: float,
price: float,
account_id: str = "default",
**kwargs
) -> VCPEvent:
"""Log a trading signal."""
return self.log_event(
EventType.SIG,
account_id,
{
"Symbol": symbol,
"Side": side,
"Quantity": quantity,
"Price": price,
**kwargs
}
)
def log_order(
self,
order_id: str,
symbol: str,
side: str,
quantity: float,
price: float,
account_id: str = "default",
**kwargs
) -> VCPEvent:
"""Log an order submission."""
return self.log_event(
EventType.ORD,
account_id,
{
"OrderID": order_id,
"Symbol": symbol,
"Side": side,
"Quantity": quantity,
"Price": price,
**kwargs
}
)
def log_execution(
self,
order_id: str,
execution_id: str,
quantity: float,
price: float,
account_id: str = "default",
**kwargs
) -> VCPEvent:
"""Log a trade execution."""
return self.log_event(
EventType.EXE,
account_id,
{
"OrderID": order_id,
"ExecutionID": execution_id,
"Quantity": quantity,
"Price": price,
**kwargs
}
)
def log_risk_breach(
self,
breach_type: str,
limit_value: float,
actual_value: float,
account_id: str = "default",
**kwargs
) -> VCPEvent:
"""Log a risk limit breach."""
return self.log_event(
EventType.ERR_RISK,
account_id,
{
"BreachType": breach_type,
"LimitValue": limit_value,
"ActualValue": actual_value,
"Severity": "CRITICAL",
**kwargs
}
)
def create_anchor_batch(self) -> dict:
"""
Create a signed anchor batch (Layers 2 & 3).
Returns:
Anchor record ready for external timestamping
"""
# Get events since last anchor
batch_events = self.events[self.last_anchor_index:]
if not batch_events:
raise ValueError("No new events to anchor")
# Build Merkle tree (Layer 2)
event_hashes = [e.event_hash for e in batch_events]
tree = VCPMerkleTree(event_hashes)
merkle_root = tree.get_root()
# Sign (Layer 3)
timestamp = int(time.time() * 1_000_000)
message = f"{merkle_root}:{timestamp}".encode()
signed = self.signing_key.sign(message)
# Create anchor record
anchor = {
"version": "1.1",
"policy_id": self.policy_id,
"conformance_tier": self.tier.value,
"merkle_root": merkle_root,
"event_count": len(batch_events),
"first_event_id": batch_events[0].event_id,
"last_event_id": batch_events[-1].event_id,
"timestamp": timestamp,
"signature": signed.signature.hex(),
"signer_public_key": self.signing_key.verify_key.encode().hex(),
"sign_algo": "ED25519"
}
# Update tracking
self.last_anchor_index = len(self.events)
return anchor
def get_inclusion_proof(self, event: VCPEvent) -> dict:
"""
Get Merkle inclusion proof for an event.
Use after create_anchor_batch() to generate proofs
for regulatory submission.
"""
# Find event index
try:
index = self.events.index(event)
except ValueError:
raise ValueError("Event not found in log")
# Build tree and get proof
event_hashes = [e.event_hash for e in self.events]
tree = VCPMerkleTree(event_hashes)
proof = tree.get_proof(index)
return {
"event_hash": event.event_hash,
"merkle_root": tree.get_root(),
"proof": proof,
"index": index
}
Implementation: MQL5 Bridge {#implementation-mql5}
For MetaTrader 5 integration, VCP uses a sidecar architecture—the logging runs alongside MT5 without modifying the trading logic:
//+------------------------------------------------------------------+
//| VCP_Bridge.mqh |
//| VeritasChain Protocol v1.1 |
//+------------------------------------------------------------------+
#property copyright "VeritasChain Standards Organization"
#property link "https://veritaschain.org"
#property version "1.10"
// Import DLL functions
#import "vcp_sidecar.dll"
int VCP_Initialize(string policy_id, string tier);
void VCP_Shutdown();
string VCP_LogSignal(string symbol, string side, double qty, double price);
string VCP_LogOrder(string order_id, string symbol, string side, double qty, double price);
string VCP_LogExecution(string order_id, string exec_id, double qty, double price);
string VCP_LogError(string error_type, string message, string severity);
int VCP_CreateAnchor();
string VCP_GetMerkleRoot();
#import
//+------------------------------------------------------------------+
//| VCP Logger class for MQL5 |
//+------------------------------------------------------------------+
class CVCPLogger
{
private:
bool m_initialized;
string m_policy_id;
string m_tier;
public:
//--- Constructor
CVCPLogger(void) : m_initialized(false) {}
//--- Destructor
~CVCPLogger(void)
{
if(m_initialized)
Shutdown();
}
//--- Initialize VCP logging
bool Initialize(string policy_id, string tier = "SILVER")
{
m_policy_id = policy_id;
m_tier = tier;
int result = VCP_Initialize(policy_id, tier);
if(result == 0)
{
m_initialized = true;
Print("[VCP] Initialized: ", policy_id, " (", tier, ")");
return true;
}
Print("[VCP] Initialization failed: ", result);
return false;
}
//--- Shutdown
void Shutdown(void)
{
if(m_initialized)
{
VCP_Shutdown();
m_initialized = false;
Print("[VCP] Shutdown complete");
}
}
//--- Log trading signal
string LogSignal(string symbol, ENUM_ORDER_TYPE side, double quantity, double price)
{
if(!m_initialized)
return "";
string side_str = (side == ORDER_TYPE_BUY) ? "BUY" : "SELL";
return VCP_LogSignal(symbol, side_str, quantity, price);
}
//--- Log order
string LogOrder(ulong ticket, string symbol, ENUM_ORDER_TYPE side, double quantity, double price)
{
if(!m_initialized)
return "";
string order_id = IntegerToString(ticket);
string side_str = (side == ORDER_TYPE_BUY) ? "BUY" : "SELL";
return VCP_LogOrder(order_id, symbol, side_str, quantity, price);
}
//--- Log execution
string LogExecution(ulong order_ticket, ulong deal_ticket, double quantity, double price)
{
if(!m_initialized)
return "";
string order_id = IntegerToString(order_ticket);
string exec_id = IntegerToString(deal_ticket);
return VCP_LogExecution(order_id, exec_id, quantity, price);
}
//--- Log risk breach
string LogRiskBreach(string breach_type, string details)
{
if(!m_initialized)
return "";
return VCP_LogError("ERR_RISK", breach_type + ": " + details, "CRITICAL");
}
//--- Create anchor batch
bool CreateAnchor(void)
{
if(!m_initialized)
return false;
int result = VCP_CreateAnchor();
if(result == 0)
{
string root = VCP_GetMerkleRoot();
Print("[VCP] Anchor created. Merkle Root: ", root);
return true;
}
Print("[VCP] Anchor creation failed: ", result);
return false;
}
};
//+------------------------------------------------------------------+
//| Example EA using VCP |
//+------------------------------------------------------------------+
/*
#include "VCP_Bridge.mqh"
CVCPLogger g_vcp;
int OnInit()
{
// Initialize VCP with your policy ID
if(!g_vcp.Initialize("org.mycompany.mt5:algo-001", "SILVER"))
{
Print("Failed to initialize VCP");
return INIT_FAILED;
}
return INIT_SUCCEEDED;
}
void OnDeinit(const int reason)
{
// Create final anchor before shutdown
g_vcp.CreateAnchor();
g_vcp.Shutdown();
}
void OnTick()
{
// Your trading logic...
// Log signal when generated
if(signal_generated)
{
g_vcp.LogSignal(_Symbol, ORDER_TYPE_BUY, 0.1, Ask);
}
}
void OnTradeTransaction(
const MqlTradeTransaction& trans,
const MqlTradeRequest& request,
const MqlTradeResult& result
)
{
// Log orders
if(trans.type == TRADE_TRANSACTION_ORDER_ADD)
{
g_vcp.LogOrder(
trans.order,
trans.symbol,
(ENUM_ORDER_TYPE)trans.order_type,
trans.volume,
trans.price
);
}
// Log executions
if(trans.type == TRADE_TRANSACTION_DEAL_ADD)
{
g_vcp.LogExecution(
trans.order,
trans.deal,
trans.volume,
trans.price
);
}
}
*/
Sidecar Architecture Benefits
┌─────────────────────────────────────────────────────────────┐
│ MetaTrader 5 │
│ ┌─────────────────┐ ┌─────────────────────────────┐ │
│ │ │ │ │ │
│ │ Trading EA │──────▶│ VCP_Bridge.mqh │ │
│ │ │ │ │ │
│ └─────────────────┘ └──────────────┬──────────────┘ │
│ │ │
└───────────────────────────────────────────│─────────────────┘
│
▼
┌─────────────────────────┐
│ │
│ vcp_sidecar.dll │
│ (Separate process) │
│ │
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ │
│ VCP Anchor Service │
│ (Cloud/Local) │
│ │
└─────────────────────────┘
Key benefits:
- Non-invasive: Doesn't modify MT5 internals
- Fail-safe: VCP failure doesn't crash trading
- Async-first: Logging is non-blocking
- Recoverable: Missed events can be replayed
Compliance Tier Selection {#compliance-tiers}
VCP defines three tiers based on your requirements:
| Requirement | Silver | Gold | Platinum |
|---|---|---|---|
| Target Use | Development, retail trading | Prop firms, institutional | HFT, exchanges |
| Clock Sync | System clock | NTP (<1ms) | PTP (<1μs) |
| Anchor Frequency | 24 hours | 1 hour | 10 minutes |
| Throughput | 1K events/sec | 100K events/sec | 1M+ events/sec |
| MiFID II RTS 25 | ❌ Not compliant | ✅ Compliant | ✅ Exceeds |
| Recommended For | Backtesting, demos | Production trading | Regulatory-critical |
Choosing Your Tier
def recommend_tier(
is_regulated: bool,
events_per_second: int,
requires_dispute_resolution: bool,
trading_frequency: str # "hft", "intraday", "swing"
) -> ConformanceTier:
"""
Recommend appropriate VCP tier based on requirements.
"""
# HFT always needs Platinum
if trading_frequency == "hft":
return ConformanceTier.PLATINUM
# Regulated entities need at least Gold
if is_regulated:
if events_per_second > 100_000:
return ConformanceTier.PLATINUM
return ConformanceTier.GOLD
# High-stakes dispute resolution needs Gold
if requires_dispute_resolution:
return ConformanceTier.GOLD
# Development and testing
return ConformanceTier.SILVER
Putting It All Together {#putting-it-together}
Here's a complete example workflow:
from vcp_core import VCPLogger, ConformanceTier
from anchoring import RFC3161Anchor
import json
# 1. Initialize logger
logger = VCPLogger(
policy_id="org.example.trading:eurusd-scalper-v2",
tier=ConformanceTier.GOLD,
use_hash_chain=True # Enable for production
)
# 2. Simulate trading session
print("=== Trading Session ===\n")
# Log signal
signal = logger.log_signal(
symbol="EURUSD",
side="BUY",
quantity=100000,
price=1.0925,
model_id="ml-model-v2.3",
confidence=0.87
)
print(f"Signal logged: {signal.event_id[:8]}...")
# Log order
order = logger.log_order(
order_id="ORD-2025-001234",
symbol="EURUSD",
side="BUY",
quantity=100000,
price=1.0925
)
print(f"Order logged: {order.event_id[:8]}...")
# Log execution
execution = logger.log_execution(
order_id="ORD-2025-001234",
execution_id="EXE-2025-001234",
quantity=100000,
price=1.0925
)
print(f"Execution logged: {execution.event_id[:8]}...")
# 3. Create anchor batch (Layer 2)
print("\n=== Creating Anchor ===\n")
anchor = logger.create_anchor_batch()
print(f"Merkle Root: {anchor['merkle_root']}")
print(f"Event Count: {anchor['event_count']}")
print(f"Signature: {anchor['signature'][:32]}...")
# 4. External timestamping (Layer 3)
print("\n=== External Anchoring ===\n")
tsa = RFC3161Anchor()
timestamp_proof = tsa.create_timestamp(anchor['merkle_root'])
timestamp_info = tsa.parse_timestamp(timestamp_proof)
print(f"Anchored at: {timestamp_info['timestamp']}")
print(f"TSA: {timestamp_info['tsa_name']}")
# 5. Generate inclusion proof for specific event
print("\n=== Inclusion Proof ===\n")
proof = logger.get_inclusion_proof(execution)
print(f"Event Hash: {proof['event_hash'][:16]}...")
print(f"Proof Path: {len(proof['proof'])} nodes")
# 6. Verify proof (as a third party would)
is_valid = VCPMerkleTree.verify_proof(
proof['event_hash'],
proof['proof'],
anchor['merkle_root']
)
print(f"Proof Valid: {is_valid}")
# 7. Export for regulatory submission
print("\n=== Regulatory Export ===\n")
regulatory_package = {
"anchor": anchor,
"timestamp_proof": timestamp_proof.hex(),
"timestamp_info": timestamp_info,
"events": [e.to_dict() for e in logger.events],
"proof_for_execution": proof
}
with open("regulatory_submission.json", "w") as f:
json.dump(regulatory_package, f, indent=2)
print("Exported to regulatory_submission.json")
print("✅ Complete audit trail with third-party verifiable proofs")
Output:
=== Trading Session ===
Signal logged: 019abc12...
Order logged: 019abc13...
Execution logged: 019abc14...
=== Creating Anchor ===
Merkle Root: a7f3d2b1c4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1
Event Count: 3
Signature: 3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e...
=== External Anchoring ===
Anchored at: 2025-01-14T10:30:00.000Z
TSA: DigiCert Timestamp Authority
=== Inclusion Proof ===
Event Hash: f2a3b4c5d6e7f8a9...
Proof Path: 2 nodes
Proof Valid: True
=== Regulatory Export ===
Exported to regulatory_submission.json
✅ Complete audit trail with third-party verifiable proofs
What We've Built
Let's recap what VCP v1.1 provides:
| Layer | What It Proves | How |
|---|---|---|
| Layer 1 | Events weren't modified | SHA-256 hashes |
| Layer 2 | No events were omitted | Merkle trees with inclusion proofs |
| Layer 3 | Records existed at a specific time | External anchoring + signatures |
Together, these enable true third-party verification. A regulator doesn't need to trust your database. They can:
- Take your submitted Merkle root
- Verify it was anchored at a specific time (via TSA/blockchain)
- Verify any event you claim existed using inclusion proofs
- Detect if you've modified anything
This is the "Verify, Don't Trust" principle in action.
Next Steps
- Try the SDK: github.com/veritaschain/vcp-sdk-python
- Read the spec: VCP v1.1 Specification
- Join the discussion: IETF SCITT Working Group
- Get certified: VC-Certified Program
The EU AI Act's enforcement may be delayed to December 2027, but the firms that build verification infrastructure now will define the industry's standards.
Resources
- VCP Specification: https://github.com/veritaschain/vcp-spec
- IETF Draft: https://datatracker.ietf.org/doc/draft-kamimura-scitt-vcp/
- EU AI Act Article 12: https://artificialintelligenceact.eu/article/12/
- prEN ISO/IEC 24970: https://www.iso.org/standard/88723.html
- RFC 6962 (Merkle Trees): https://datatracker.ietf.org/doc/html/rfc6962
- RFC 8785 (JSON Canonicalization): https://datatracker.ietf.org/doc/html/rfc8785
Questions? Reach out: technical@veritaschain.org or open an issue on GitHub.
Top comments (0)