The EU AI Act takes effect for high-risk AI systems in August 2026. Article 12 mandates "automatic recording of events" with "traceability"—but provides zero technical specifications.
This article shows you exactly how to build logging infrastructure that meets regulatory intent using cryptographic primitives. We'll cover hash chains, Merkle trees, digital signatures, and external timestamping with working code in Python and MQL5.
By the end, you'll understand:
- Why conventional logging fails for AI accountability
- How to implement tamper-evident hash chains
- Building RFC 6962-compliant Merkle trees
- External anchoring with OpenTimestamps and TSAs
- Completeness guarantees through multi-log replication
Let's build an "AI flight recorder."
Table of Contents
- The Problem: Why Traditional Logs Aren't Enough
- Architecture Overview: Three-Layer Integrity Model
- Layer 1: Event-Level Integrity with Hash Chains
- Layer 2: Collection Integrity with Merkle Trees
- Layer 3: External Verifiability
- Completeness Guarantees: Detecting Missing Events
- MQL5 Integration for MetaTrader
- Putting It All Together
- Compliance Mapping: EU AI Act & MiFID II
- Next Steps
1. The Problem: Why Traditional Logs Aren't Enough
Consider a typical trading system log:
import logging
import json
from datetime import datetime
logging.basicConfig(filename='trading.log', level=logging.INFO)
def log_trade(order_id, symbol, side, quantity, price):
logging.info(json.dumps({
'timestamp': datetime.utcnow().isoformat(),
'order_id': order_id,
'symbol': symbol,
'side': side,
'quantity': quantity,
'price': price
}))
This looks reasonable. But it has four critical vulnerabilities:
Vulnerability 1: Tampering
Anyone with file access can modify historical entries:
# Attacker modifies a past trade
sed -i 's/"price": 1.2345/"price": 1.2350/' trading.log
No cryptographic evidence of modification exists.
Vulnerability 2: Deletion
Events can be removed without detection:
# Delete a problematic trade
grep -v "order_id.*ORD-12345" trading.log > temp && mv temp trading.log
How do you prove 1000 trades happened if someone deleted trade #500?
Vulnerability 3: Insertion
Backdated events can be added:
# Create fake historical trade
fake_trade = {
'timestamp': '2024-06-15T10:30:00Z', # Past date
'order_id': 'ORD-FAKE',
'symbol': 'EURUSD',
'side': 'BUY',
'quantity': 100000,
'price': 1.0850
}
Timestamps prove nothing without external verification.
Vulnerability 4: Repudiation
When disputes arise, you can show a log entry. But you cannot prove:
- The entry existed at the claimed time
- No entries were deleted before or after
- The log represents complete system activity
The fundamental problem: Traditional logs require trust in the log maintainer. For regulatory compliance, we need verification without trust.
2. Architecture Overview: Three-Layer Integrity Model
VeritasChain Protocol (VCP) v1.1 addresses these vulnerabilities through a three-layer architecture:
┌─────────────────────────────────────────────────────────────────┐
│ LAYER 3: External Verifiability │
│ (TSA Timestamps, Blockchain Anchoring, Multi-Anchor) │
├─────────────────────────────────────────────────────────────────┤
│ LAYER 2: Collection Integrity │
│ (Merkle Trees, Signed Tree Heads, Consistency Proofs) │
├─────────────────────────────────────────────────────────────────┤
│ LAYER 1: Event Integrity │
│ (Hash Chains, Digital Signatures, UUIDv7 Ordering) │
├─────────────────────────────────────────────────────────────────┤
│ RAW EVENTS │
│ (Trading Decisions, Orders, Executions) │
└─────────────────────────────────────────────────────────────────┘
Each layer provides specific guarantees:
| Layer | Guarantee | Attack Mitigated |
|---|---|---|
| L1 | Individual event integrity | Tampering |
| L2 | Collection completeness | Deletion, Insertion |
| L3 | Temporal proof | Backdating, Repudiation |
Let's implement each layer.
3. Layer 1: Event-Level Integrity with Hash Chains
Core Data Structure
Every VCP event contains these cryptographic elements:
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
from enum import Enum
import hashlib
import json
import uuid
class TimestampPrecision(Enum):
SECOND = "SECOND"
MILLISECOND = "MILLISECOND"
MICROSECOND = "MICROSECOND"
NANOSECOND = "NANOSECOND"
class ClockSyncStatus(Enum):
BEST_EFFORT = "BEST_EFFORT"
NTP_SYNCED = "NTP_SYNCED"
PTP_LOCKED = "PTP_LOCKED"
class EventType(Enum):
SIG = "SIG" # Signal generated
ORD = "ORD" # Order submitted
ACK = "ACK" # Order acknowledged
EXE = "EXE" # Execution (fill)
REJ = "REJ" # Rejection
CXL = "CXL" # Cancellation
MOD = "MOD" # Modification
RSK = "RSK" # Risk event
@dataclass
class VCPEvent:
"""VeritasChain Protocol v1.1 Event Structure"""
# Header fields
vcp_version: str = "1.1"
event_id: str = field(default_factory=lambda: str(uuid.uuid7()))
prev_hash: str = ""
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
timestamp_precision: TimestampPrecision = TimestampPrecision.MICROSECOND
clock_sync_status: ClockSyncStatus = ClockSyncStatus.NTP_SYNCED
# Event classification
event_type: EventType = EventType.ORD
# Payload (varies by event type)
payload: dict = field(default_factory=dict)
# Cryptographic fields (computed)
event_hash: str = ""
signature: str = ""
def compute_hash(self) -> str:
"""Compute SHA-256 hash of canonical event representation"""
canonical = self._canonicalize()
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
def _canonicalize(self) -> str:
"""RFC 8785 JSON Canonicalization Scheme (JCS)"""
# Simplified JCS: sorted keys, no whitespace
data = {
'vcp_version': self.vcp_version,
'event_id': self.event_id,
'prev_hash': self.prev_hash,
'timestamp': self.timestamp,
'timestamp_precision': self.timestamp_precision.value,
'clock_sync_status': self.clock_sync_status.value,
'event_type': self.event_type.value,
'payload': self.payload
}
return json.dumps(data, sort_keys=True, separators=(',', ':'))
Hash Chain Implementation
The prev_hash field creates an immutable chain:
class VCPHashChain:
"""Manages a chain of VCP events with cryptographic linking"""
def __init__(self):
self.events: list[VCPEvent] = []
self.genesis_hash = "0" * 64 # Genesis block has zero hash
def append(self, event: VCPEvent) -> VCPEvent:
"""Add event to chain with proper hash linking"""
# Set prev_hash to last event's hash (or genesis)
if self.events:
event.prev_hash = self.events[-1].event_hash
else:
event.prev_hash = self.genesis_hash
# Compute this event's hash
event.event_hash = event.compute_hash()
self.events.append(event)
return event
def verify_chain(self) -> tuple[bool, Optional[int]]:
"""
Verify entire chain integrity.
Returns (is_valid, first_invalid_index)
"""
expected_prev = self.genesis_hash
for i, event in enumerate(self.events):
# Check prev_hash linkage
if event.prev_hash != expected_prev:
return False, i
# Verify event_hash is correct
computed = event.compute_hash()
if event.event_hash != computed:
return False, i
expected_prev = event.event_hash
return True, None
def detect_tampering(self) -> list[int]:
"""Return indices of all tampered events"""
tampered = []
expected_prev = self.genesis_hash
for i, event in enumerate(self.events):
computed = event.compute_hash()
# Check if hash was recomputed (content changed)
if event.event_hash != computed:
tampered.append(i)
# Check if chain is broken
if event.prev_hash != expected_prev:
tampered.append(i)
expected_prev = event.event_hash
return list(set(tampered))
Digital Signatures with Ed25519
Hash chains detect tampering, but don't prove who created events. Ed25519 signatures provide non-repudiation:
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey, Ed25519PublicKey
)
from cryptography.hazmat.primitives import serialization
import base64
class VCPSigner:
"""Ed25519 signing for VCP events"""
def __init__(self, private_key: Optional[Ed25519PrivateKey] = None):
if private_key:
self.private_key = private_key
else:
self.private_key = Ed25519PrivateKey.generate()
self.public_key = self.private_key.public_key()
def sign_event(self, event: VCPEvent) -> VCPEvent:
"""Sign event hash with Ed25519"""
if not event.event_hash:
event.event_hash = event.compute_hash()
signature_bytes = self.private_key.sign(
event.event_hash.encode('utf-8')
)
event.signature = f"ed25519:{base64.b64encode(signature_bytes).decode()}"
return event
def verify_signature(self, event: VCPEvent) -> bool:
"""Verify Ed25519 signature on event"""
if not event.signature.startswith("ed25519:"):
return False
try:
sig_b64 = event.signature.split(":", 1)[1]
signature_bytes = base64.b64decode(sig_b64)
self.public_key.verify(
signature_bytes,
event.event_hash.encode('utf-8')
)
return True
except Exception:
return False
def get_public_key_pem(self) -> str:
"""Export public key for verification distribution"""
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
Complete Layer 1 Example
# Create signer
signer = VCPSigner()
# Initialize chain
chain = VCPHashChain()
# Log a trading signal
signal_event = VCPEvent(
event_type=EventType.SIG,
payload={
'algorithm_id': 'ALGO-TREND-001',
'model_hash': 'sha256:a1b2c3...',
'signal': 'BUY',
'symbol': 'EURUSD',
'confidence': 0.85,
'features': {
'rsi_14': 32.5,
'macd_signal': 0.0012,
'sentiment_score': 0.67
}
}
)
# Add to chain and sign
chain.append(signal_event)
signer.sign_event(signal_event)
# Log the resulting order
order_event = VCPEvent(
event_type=EventType.ORD,
payload={
'order_id': 'ORD-2025-0001',
'parent_event_id': signal_event.event_id,
'symbol': 'EURUSD',
'side': 'BUY',
'order_type': 'LIMIT',
'quantity': 100000,
'price': 1.0850,
'time_in_force': 'GTC'
}
)
chain.append(order_event)
signer.sign_event(order_event)
# Verify chain integrity
is_valid, invalid_idx = chain.verify_chain()
print(f"Chain valid: {is_valid}") # True
# Simulate tampering
chain.events[0].payload['confidence'] = 0.95 # Modify past event
# Detect tampering
is_valid, invalid_idx = chain.verify_chain()
print(f"Chain valid: {is_valid}, first invalid: {invalid_idx}") # False, 0
4. Layer 2: Collection Integrity with Merkle Trees
Hash chains prove individual event integrity, but verifying a chain of millions of events is O(n). Merkle trees enable O(log n) verification.
RFC 6962 Merkle Tree Implementation
from typing import List, Tuple
import hashlib
class MerkleTree:
"""RFC 6962 compliant Merkle Tree for VCP"""
# Domain separation prefixes (RFC 6962)
LEAF_PREFIX = b'\x00'
NODE_PREFIX = b'\x01'
def __init__(self):
self.leaves: List[bytes] = []
self.tree: List[List[bytes]] = []
def _hash_leaf(self, data: bytes) -> bytes:
"""Hash a leaf node with domain separation"""
return hashlib.sha256(self.LEAF_PREFIX + data).digest()
def _hash_node(self, left: bytes, right: bytes) -> bytes:
"""Hash an internal node with domain separation"""
return hashlib.sha256(self.NODE_PREFIX + left + right).digest()
def add_leaf(self, event_hash: str) -> int:
"""Add event hash as leaf, return leaf index"""
leaf_data = bytes.fromhex(event_hash)
leaf_hash = self._hash_leaf(leaf_data)
self.leaves.append(leaf_hash)
return len(self.leaves) - 1
def build_tree(self) -> bytes:
"""Build complete Merkle tree, return root"""
if not self.leaves:
return b'\x00' * 32
# Start with leaves
current_level = self.leaves.copy()
self.tree = [current_level]
# Build tree bottom-up
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
# If odd number, duplicate last node
right = current_level[i + 1] if i + 1 < len(current_level) else left
parent = self._hash_node(left, right)
next_level.append(parent)
self.tree.append(next_level)
current_level = next_level
return current_level[0]
def get_root(self) -> str:
"""Get Merkle root as hex string"""
root = self.build_tree()
return root.hex()
def get_inclusion_proof(self, leaf_index: int) -> List[Tuple[bytes, str]]:
"""
Generate inclusion proof for a leaf.
Returns list of (sibling_hash, position) tuples.
Position is 'L' or 'R' indicating sibling position.
"""
if not self.tree:
self.build_tree()
proof = []
idx = leaf_index
for level in self.tree[:-1]: # Exclude root level
if idx % 2 == 0: # Current node is left child
sibling_idx = idx + 1
position = 'R' # Sibling is on right
else: # Current node is right child
sibling_idx = idx - 1
position = 'L' # Sibling is on left
# Handle edge case: odd number of nodes
if sibling_idx < len(level):
proof.append((level[sibling_idx], position))
else:
proof.append((level[idx], 'R')) # Duplicate self
idx //= 2
return proof
def verify_inclusion(
self,
event_hash: str,
leaf_index: int,
proof: List[Tuple[bytes, str]],
root: str
) -> bool:
"""Verify that an event is included in the tree"""
leaf_data = bytes.fromhex(event_hash)
current = self._hash_leaf(leaf_data)
for sibling, position in proof:
if position == 'L':
current = self._hash_node(sibling, current)
else:
current = self._hash_node(current, sibling)
return current.hex() == root
Signed Tree Head (STH)
The Merkle root alone isn't enough—we need to commit to it with a timestamp and signature:
@dataclass
class SignedTreeHead:
"""Commitment to a Merkle tree state"""
tree_size: int
timestamp: str
root_hash: str
signature: str = ""
def to_signable(self) -> str:
"""Create canonical representation for signing"""
return json.dumps({
'tree_size': self.tree_size,
'timestamp': self.timestamp,
'root_hash': self.root_hash
}, sort_keys=True, separators=(',', ':'))
def sign(self, signer: VCPSigner) -> 'SignedTreeHead':
"""Sign the STH"""
signable = self.to_signable().encode('utf-8')
signature_bytes = signer.private_key.sign(signable)
self.signature = f"ed25519:{base64.b64encode(signature_bytes).decode()}"
return self
class VCPMerkleLog:
"""Complete Merkle log with STH management"""
def __init__(self, signer: VCPSigner):
self.chain = VCPHashChain()
self.tree = MerkleTree()
self.signer = signer
self.sth_history: List[SignedTreeHead] = []
def append_event(self, event: VCPEvent) -> Tuple[VCPEvent, int]:
"""Add event to both chain and tree"""
# Add to hash chain
self.chain.append(event)
self.signer.sign_event(event)
# Add to Merkle tree
leaf_index = self.tree.add_leaf(event.event_hash)
return event, leaf_index
def commit(self) -> SignedTreeHead:
"""Create and sign a new STH"""
root = self.tree.get_root()
sth = SignedTreeHead(
tree_size=len(self.tree.leaves),
timestamp=datetime.utcnow().isoformat() + "Z",
root_hash=root
)
sth.sign(self.signer)
self.sth_history.append(sth)
return sth
def get_inclusion_proof(self, event: VCPEvent) -> dict:
"""Get proof that event is in the current tree"""
# Find leaf index
leaf_index = None
for i, leaf in enumerate(self.tree.leaves):
leaf_data = bytes.fromhex(event.event_hash)
expected = self.tree._hash_leaf(leaf_data)
if leaf == expected:
leaf_index = i
break
if leaf_index is None:
raise ValueError("Event not found in tree")
proof = self.tree.get_inclusion_proof(leaf_index)
return {
'event_hash': event.event_hash,
'leaf_index': leaf_index,
'tree_size': len(self.tree.leaves),
'proof': [(p[0].hex(), p[1]) for p in proof],
'root': self.tree.get_root()
}
Consistency Proofs
When a new STH is published, we need to prove it includes all events from the previous STH:
def get_consistency_proof(
self,
old_size: int,
new_size: int
) -> List[bytes]:
"""
Prove that tree of old_size is prefix of tree of new_size.
This proves no historical events were removed.
"""
if old_size > new_size or old_size < 0:
raise ValueError("Invalid tree sizes")
if old_size == 0:
return []
if old_size == new_size:
return []
# Simplified: return nodes needed to verify consistency
# Full implementation follows RFC 6962 Section 2.1.2
proof = []
# ... (full implementation omitted for brevity)
return proof
5. Layer 3: External Verifiability
Layer 1-2 prove integrity within the system. But the system operator controls the clock. Layer 3 anchors proofs to external trust sources.
OpenTimestamps Integration
OpenTimestamps provides free, decentralized timestamping via Bitcoin:
import subprocess
import tempfile
import os
class OpenTimestampsAnchor:
"""Anchor Merkle roots to Bitcoin via OpenTimestamps"""
def __init__(self):
# Requires: pip install opentimestamps-client
self.ots_available = self._check_ots()
def _check_ots(self) -> bool:
"""Check if ots CLI is available"""
try:
subprocess.run(['ots', '--version'], capture_output=True)
return True
except FileNotFoundError:
return False
def create_timestamp(self, data_hash: str) -> Optional[bytes]:
"""
Create OTS timestamp for a hash.
Returns .ots proof file contents.
"""
if not self.ots_available:
raise RuntimeError("OpenTimestamps CLI not available")
# Write hash to temp file
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.txt') as f:
f.write(bytes.fromhex(data_hash))
temp_path = f.name
try:
# Create timestamp
result = subprocess.run(
['ots', 'stamp', temp_path],
capture_output=True
)
if result.returncode != 0:
return None
# Read .ots file
ots_path = temp_path + '.ots'
if os.path.exists(ots_path):
with open(ots_path, 'rb') as f:
return f.read()
return None
finally:
os.unlink(temp_path)
if os.path.exists(temp_path + '.ots'):
os.unlink(temp_path + '.ots')
def verify_timestamp(self, data_hash: str, ots_proof: bytes) -> dict:
"""
Verify OTS timestamp.
Returns verification result with Bitcoin block info.
"""
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.txt') as f:
f.write(bytes.fromhex(data_hash))
temp_path = f.name
ots_path = temp_path + '.ots'
with open(ots_path, 'wb') as f:
f.write(ots_proof)
try:
result = subprocess.run(
['ots', 'verify', ots_path],
capture_output=True,
text=True
)
return {
'verified': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
}
finally:
os.unlink(temp_path)
os.unlink(ots_path)
RFC 3161 TSA Integration
For regulated environments, TSA timestamps provide legal standing:
import requests
from asn1crypto import tsp, core
class TSATimestamp:
"""RFC 3161 Timestamping Authority client"""
# Free public TSA servers
TSA_SERVERS = [
'http://timestamp.digicert.com',
'http://time.certum.pl',
'http://timestamp.sectigo.com',
]
def __init__(self, tsa_url: str = None):
self.tsa_url = tsa_url or self.TSA_SERVERS[0]
def create_timestamp_request(self, data_hash: str) -> bytes:
"""Create RFC 3161 timestamp request"""
# Build TimeStampReq
message_imprint = tsp.MessageImprint({
'hash_algorithm': {'algorithm': 'sha256'},
'hashed_message': bytes.fromhex(data_hash)
})
ts_request = tsp.TimeStampReq({
'version': 1,
'message_imprint': message_imprint,
'cert_req': True # Request TSA certificate in response
})
return ts_request.dump()
def get_timestamp(self, data_hash: str) -> Optional[bytes]:
"""Request timestamp from TSA server"""
request_der = self.create_timestamp_request(data_hash)
try:
response = requests.post(
self.tsa_url,
data=request_der,
headers={'Content-Type': 'application/timestamp-query'},
timeout=30
)
if response.status_code == 200:
return response.content
return None
except requests.RequestException:
return None
def verify_timestamp(self, data_hash: str, ts_response: bytes) -> dict:
"""Verify TSA response"""
try:
response = tsp.TimeStampResp.load(ts_response)
status = response['status']['status'].native
if status != 'granted':
return {'verified': False, 'error': f'Status: {status}'}
# Extract timestamp token
token = response['time_stamp_token']
signed_data = token['content']
encap_content = signed_data['encap_content_info']
tst_info = tsp.TSTInfo.load(encap_content['content'].native)
# Verify hash matches
imprint = tst_info['message_imprint']
returned_hash = imprint['hashed_message'].native.hex()
if returned_hash != data_hash:
return {'verified': False, 'error': 'Hash mismatch'}
# Extract timestamp
gen_time = tst_info['gen_time'].native
return {
'verified': True,
'timestamp': gen_time.isoformat(),
'tsa': self.tsa_url,
'serial_number': tst_info['serial_number'].native
}
except Exception as e:
return {'verified': False, 'error': str(e)}
Multi-Anchor Strategy
Platinum tier requires multiple independent anchors:
@dataclass
class ExternalAnchor:
"""Record of external timestamp anchoring"""
root_hash: str
sth_timestamp: str
anchors: List[dict] = field(default_factory=list)
class MultiAnchorStrategy:
"""Anchor to multiple independent sources"""
def __init__(self):
self.ots = OpenTimestampsAnchor()
self.tsa_servers = [
TSATimestamp('http://timestamp.digicert.com'),
TSATimestamp('http://time.certum.pl'),
]
def anchor(self, sth: SignedTreeHead) -> ExternalAnchor:
"""Anchor STH to all available sources"""
anchor = ExternalAnchor(
root_hash=sth.root_hash,
sth_timestamp=sth.timestamp
)
# OpenTimestamps (Bitcoin)
if self.ots.ots_available:
try:
ots_proof = self.ots.create_timestamp(sth.root_hash)
if ots_proof:
anchor.anchors.append({
'type': 'opentimestamps',
'proof': base64.b64encode(ots_proof).decode(),
'status': 'pending' # Needs Bitcoin confirmation
})
except Exception as e:
pass
# TSA timestamps
for tsa in self.tsa_servers:
try:
ts_response = tsa.get_timestamp(sth.root_hash)
if ts_response:
verification = tsa.verify_timestamp(sth.root_hash, ts_response)
anchor.anchors.append({
'type': 'rfc3161',
'tsa_url': tsa.tsa_url,
'proof': base64.b64encode(ts_response).decode(),
'verified_timestamp': verification.get('timestamp'),
'status': 'verified' if verification['verified'] else 'failed'
})
except Exception as e:
pass
return anchor
6. Completeness Guarantees: Detecting Missing Events
VCP v1.1 introduces completeness guarantees—detecting if events are hidden, not just modified.
Multi-Log Replication
from typing import Dict
import asyncio
import aiohttp
class VCPLogClient:
"""Client that replicates events to multiple log servers"""
def __init__(self, log_endpoints: List[str]):
self.endpoints = log_endpoints
self.min_replicas = 2 # Minimum required successful submissions
async def submit_event(self, event: VCPEvent) -> Dict[str, any]:
"""
Submit event to all log servers simultaneously.
Returns receipts from each server.
"""
async with aiohttp.ClientSession() as session:
tasks = [
self._submit_to_endpoint(session, endpoint, event)
for endpoint in self.endpoints
]
results = await asyncio.gather(*tasks, return_exceptions=True)
receipts = {}
success_count = 0
for endpoint, result in zip(self.endpoints, results):
if isinstance(result, Exception):
receipts[endpoint] = {'status': 'error', 'error': str(result)}
else:
receipts[endpoint] = result
if result.get('status') == 'accepted':
success_count += 1
if success_count < self.min_replicas:
raise RuntimeError(
f"Only {success_count}/{self.min_replicas} replicas accepted event"
)
return receipts
async def _submit_to_endpoint(
self,
session: aiohttp.ClientSession,
endpoint: str,
event: VCPEvent
) -> dict:
"""Submit event to single endpoint"""
async with session.post(
f"{endpoint}/v1/events",
json=event.__dict__,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
return await response.json()
Gossip Protocol for Root Consistency
@dataclass
class GossipMessage:
"""Message exchanged between log servers"""
server_id: str
timestamp: str
tree_size: int
root_hash: str
signature: str
class GossipProtocol:
"""Detect split-view attacks through root consistency checking"""
def __init__(self, server_id: str, signer: VCPSigner, peers: List[str]):
self.server_id = server_id
self.signer = signer
self.peers = peers
self.received_roots: Dict[str, List[GossipMessage]] = {}
def create_gossip_message(self, sth: SignedTreeHead) -> GossipMessage:
"""Create signed gossip message for current state"""
msg = GossipMessage(
server_id=self.server_id,
timestamp=datetime.utcnow().isoformat() + "Z",
tree_size=sth.tree_size,
root_hash=sth.root_hash,
signature=""
)
# Sign the message
signable = json.dumps({
'server_id': msg.server_id,
'timestamp': msg.timestamp,
'tree_size': msg.tree_size,
'root_hash': msg.root_hash
}, sort_keys=True).encode()
sig_bytes = self.signer.private_key.sign(signable)
msg.signature = base64.b64encode(sig_bytes).decode()
return msg
def receive_gossip(self, msg: GossipMessage) -> Optional[str]:
"""
Process received gossip message.
Returns alert message if inconsistency detected.
"""
# Store message
if msg.server_id not in self.received_roots:
self.received_roots[msg.server_id] = []
self.received_roots[msg.server_id].append(msg)
# Check for inconsistencies
return self._check_consistency(msg)
def _check_consistency(self, new_msg: GossipMessage) -> Optional[str]:
"""
Detect split-view: same tree_size but different roots
"""
for server_id, messages in self.received_roots.items():
if server_id == new_msg.server_id:
continue
for msg in messages:
# Same tree size should have same root
if msg.tree_size == new_msg.tree_size:
if msg.root_hash != new_msg.root_hash:
return (
f"ALERT: Split-view detected! "
f"Server {msg.server_id} has root {msg.root_hash[:16]}... "
f"but server {new_msg.server_id} has root {new_msg.root_hash[:16]}... "
f"for tree size {msg.tree_size}"
)
return None
7. MQL5 Integration for MetaTrader
For traders using MetaTrader, here's how to integrate VCP logging:
//+------------------------------------------------------------------+
//| VCP_Bridge.mqh - VeritasChain Protocol Bridge for MQL5 |
//| Version: 1.1.0 |
//+------------------------------------------------------------------+
#property copyright "VeritasChain Standards Organization"
#property version "1.1.0"
#include <Arrays/ArrayString.mqh>
#include <JAson.mqh> // JSON library
//--- VCP Event Types
enum ENUM_VCP_EVENT_TYPE {
VCP_EVENT_SIG, // Signal
VCP_EVENT_ORD, // Order
VCP_EVENT_ACK, // Acknowledgment
VCP_EVENT_EXE, // Execution
VCP_EVENT_REJ, // Rejection
VCP_EVENT_CXL, // Cancellation
VCP_EVENT_RSK // Risk event
};
//--- Clock Sync Status
enum ENUM_CLOCK_SYNC_STATUS {
CLOCK_BEST_EFFORT,
CLOCK_NTP_SYNCED,
CLOCK_PTP_LOCKED
};
//+------------------------------------------------------------------+
//| VCP Event Structure |
//+------------------------------------------------------------------+
struct VCPEvent {
string vcp_version;
string event_id;
string prev_hash;
datetime timestamp;
long timestamp_us; // Microseconds component
ENUM_VCP_EVENT_TYPE event_type;
ENUM_CLOCK_SYNC_STATUS clock_sync;
string payload_json;
string event_hash;
string signature;
};
//+------------------------------------------------------------------+
//| VCP Bridge Class |
//+------------------------------------------------------------------+
class CVCPBridge {
private:
string m_sidecar_url;
string m_api_key;
string m_last_hash;
int m_timeout_ms;
//--- Generate UUIDv7-like ID (simplified)
string GenerateEventID() {
long ms = (long)TimeCurrent() * 1000 + GetTickCount() % 1000;
string hex_time = StringFormat("%012llX", ms);
string random_part = StringFormat("%04X%012llX",
MathRand() & 0x0FFF | 0x7000, // Version 7
((long)MathRand() << 32) | MathRand()
);
return StringSubstr(hex_time, 0, 8) + "-" +
StringSubstr(hex_time, 8, 4) + "-" +
StringSubstr(random_part, 0, 4) + "-" +
StringSubstr(random_part, 4, 4) + "-" +
StringSubstr(random_part, 8, 12);
}
//--- SHA-256 hash via WebRequest to sidecar
string ComputeHash(string data) {
// In production, use sidecar for hashing
// Simplified: return placeholder
return "sha256_placeholder";
}
public:
//--- Constructor
CVCPBridge(string sidecar_url = "http://localhost:8080",
string api_key = "") {
m_sidecar_url = sidecar_url;
m_api_key = api_key;
m_last_hash = StringInit(64, '0'); // Genesis hash
m_timeout_ms = 5000;
}
//--- Log Trading Signal
bool LogSignal(string algo_id,
string symbol,
ENUM_ORDER_TYPE direction,
double confidence,
string features_json) {
VCPEvent event;
event.vcp_version = "1.1";
event.event_id = GenerateEventID();
event.prev_hash = m_last_hash;
event.timestamp = TimeCurrent();
event.timestamp_us = GetMicrosecondCount() % 1000000;
event.event_type = VCP_EVENT_SIG;
event.clock_sync = CLOCK_NTP_SYNCED;
// Build payload
CJAVal payload;
payload["algorithm_id"] = algo_id;
payload["symbol"] = symbol;
payload["direction"] = EnumToString(direction);
payload["confidence"] = confidence;
payload["features"].Deserialize(features_json);
event.payload_json = payload.Serialize();
return SubmitEvent(event);
}
//--- Log Order
bool LogOrder(ulong ticket,
string symbol,
ENUM_ORDER_TYPE type,
double volume,
double price,
string parent_event_id = "") {
VCPEvent event;
event.vcp_version = "1.1";
event.event_id = GenerateEventID();
event.prev_hash = m_last_hash;
event.timestamp = TimeCurrent();
event.timestamp_us = GetMicrosecondCount() % 1000000;
event.event_type = VCP_EVENT_ORD;
event.clock_sync = CLOCK_NTP_SYNCED;
CJAVal payload;
payload["order_ticket"] = (long)ticket;
payload["symbol"] = symbol;
payload["order_type"] = EnumToString(type);
payload["volume"] = volume;
payload["price"] = price;
if(parent_event_id != "")
payload["parent_event_id"] = parent_event_id;
event.payload_json = payload.Serialize();
return SubmitEvent(event);
}
//--- Log Execution
bool LogExecution(ulong deal_ticket,
ulong order_ticket,
string symbol,
ENUM_DEAL_TYPE deal_type,
double volume,
double price,
double commission,
double swap,
double profit) {
VCPEvent event;
event.vcp_version = "1.1";
event.event_id = GenerateEventID();
event.prev_hash = m_last_hash;
event.timestamp = TimeCurrent();
event.timestamp_us = GetMicrosecondCount() % 1000000;
event.event_type = VCP_EVENT_EXE;
event.clock_sync = CLOCK_NTP_SYNCED;
CJAVal payload;
payload["deal_ticket"] = (long)deal_ticket;
payload["order_ticket"] = (long)order_ticket;
payload["symbol"] = symbol;
payload["deal_type"] = EnumToString(deal_type);
payload["volume"] = volume;
payload["price"] = price;
payload["commission"] = commission;
payload["swap"] = swap;
payload["profit"] = profit;
event.payload_json = payload.Serialize();
return SubmitEvent(event);
}
//--- Submit event to sidecar
bool SubmitEvent(VCPEvent &event) {
// Build full event JSON
CJAVal json;
json["vcp_version"] = event.vcp_version;
json["event_id"] = event.event_id;
json["prev_hash"] = event.prev_hash;
json["timestamp"] = TimeToString(event.timestamp, TIME_DATE|TIME_SECONDS);
json["timestamp_us"] = event.timestamp_us;
json["event_type"] = EnumToString(event.event_type);
json["clock_sync_status"] = EnumToString(event.clock_sync);
json["payload"].Deserialize(event.payload_json);
string request_body = json.Serialize();
// Send to sidecar via WebRequest
char post_data[];
char result_data[];
string result_headers;
StringToCharArray(request_body, post_data, 0, WHOLE_ARRAY, CP_UTF8);
ArrayResize(post_data, ArraySize(post_data) - 1); // Remove null terminator
string headers = "Content-Type: application/json\r\n";
if(m_api_key != "")
headers += "Authorization: Bearer " + m_api_key + "\r\n";
int res = WebRequest(
"POST",
m_sidecar_url + "/v1/events",
headers,
m_timeout_ms,
post_data,
result_data,
result_headers
);
if(res == 200 || res == 201) {
// Parse response to get event_hash
string response = CharArrayToString(result_data, 0, WHOLE_ARRAY, CP_UTF8);
CJAVal resp_json;
if(resp_json.Deserialize(response)) {
event.event_hash = resp_json["event_hash"].ToStr();
event.signature = resp_json["signature"].ToStr();
m_last_hash = event.event_hash;
}
return true;
}
Print("VCP submission failed: ", res);
return false;
}
};
//+------------------------------------------------------------------+
//| Example Expert Advisor Integration |
//+------------------------------------------------------------------+
CVCPBridge *g_vcp_bridge = NULL;
int OnInit() {
// Initialize VCP bridge
g_vcp_bridge = new CVCPBridge(
"http://localhost:8080", // Sidecar URL
"your-api-key" // API key
);
return INIT_SUCCEEDED;
}
void OnDeinit(const int reason) {
if(g_vcp_bridge != NULL) {
delete g_vcp_bridge;
g_vcp_bridge = NULL;
}
}
void OnTrade() {
// Log executions when trades occur
static ulong last_deal = 0;
if(HistorySelect(TimeCurrent() - 60, TimeCurrent())) {
int deals = HistoryDealsTotal();
for(int i = deals - 1; i >= 0; i--) {
ulong ticket = HistoryDealGetTicket(i);
if(ticket <= last_deal) break;
// Log this execution
g_vcp_bridge.LogExecution(
ticket,
HistoryDealGetInteger(ticket, DEAL_ORDER),
HistoryDealGetString(ticket, DEAL_SYMBOL),
(ENUM_DEAL_TYPE)HistoryDealGetInteger(ticket, DEAL_TYPE),
HistoryDealGetDouble(ticket, DEAL_VOLUME),
HistoryDealGetDouble(ticket, DEAL_PRICE),
HistoryDealGetDouble(ticket, DEAL_COMMISSION),
HistoryDealGetDouble(ticket, DEAL_SWAP),
HistoryDealGetDouble(ticket, DEAL_PROFIT)
);
last_deal = ticket;
}
}
}
8. Putting It All Together
Here's a complete example combining all layers:
import asyncio
from datetime import datetime
async def main():
# Initialize components
signer = VCPSigner()
merkle_log = VCPMerkleLog(signer)
multi_anchor = MultiAnchorStrategy()
# Simulate trading activity
events = []
# 1. AI generates signal
signal = VCPEvent(
event_type=EventType.SIG,
payload={
'algorithm_id': 'ALGO-ML-MOMENTUM-001',
'model_hash': 'sha256:e3b0c44298fc1c149afbf4c8996fb924...',
'signal': 'BUY',
'symbol': 'EURUSD',
'confidence': 0.87,
'model_version': '2.3.1',
'features': {
'momentum_20': 0.023,
'volatility_10': 0.0045,
'sentiment': 0.62
}
}
)
event, leaf_idx = merkle_log.append_event(signal)
events.append(event)
print(f"Signal logged: {event.event_id}")
# 2. Order generated
order = VCPEvent(
event_type=EventType.ORD,
payload={
'order_id': 'ORD-2025-00001',
'parent_event_id': signal.event_id,
'symbol': 'EURUSD',
'side': 'BUY',
'order_type': 'LIMIT',
'quantity': 100000,
'price': 1.0850,
'time_in_force': 'IOC'
}
)
event, leaf_idx = merkle_log.append_event(order)
events.append(event)
print(f"Order logged: {event.event_id}")
# 3. Execution received
execution = VCPEvent(
event_type=EventType.EXE,
payload={
'execution_id': 'EXE-2025-00001',
'order_id': 'ORD-2025-00001',
'symbol': 'EURUSD',
'side': 'BUY',
'quantity': 100000,
'price': 1.08502,
'venue': 'LMAX',
'liquidity': 'ADDED'
}
)
event, leaf_idx = merkle_log.append_event(execution)
events.append(event)
print(f"Execution logged: {event.event_id}")
# 4. Create STH and anchor
sth = merkle_log.commit()
print(f"\nSigned Tree Head:")
print(f" Tree size: {sth.tree_size}")
print(f" Root: {sth.root_hash[:32]}...")
print(f" Timestamp: {sth.timestamp}")
# 5. External anchoring
anchor = multi_anchor.anchor(sth)
print(f"\nExternal anchors: {len(anchor.anchors)}")
for a in anchor.anchors:
print(f" - {a['type']}: {a.get('status', 'pending')}")
# 6. Generate inclusion proof for the execution
proof = merkle_log.get_inclusion_proof(execution)
print(f"\nInclusion proof for execution:")
print(f" Leaf index: {proof['leaf_index']}")
print(f" Proof length: {len(proof['proof'])} nodes")
# 7. Verify chain integrity
is_valid, invalid_idx = merkle_log.chain.verify_chain()
print(f"\nChain verification: {'PASSED' if is_valid else 'FAILED'}")
# 8. Demonstrate tamper detection
print("\n--- Simulating tampering ---")
original_confidence = events[0].payload['confidence']
events[0].payload['confidence'] = 0.95 # Tamper with signal
tampered = merkle_log.chain.detect_tampering()
print(f"Tampered events detected: {tampered}")
# Restore
events[0].payload['confidence'] = original_confidence
if __name__ == "__main__":
asyncio.run(main())
Output:
Signal logged: 0192a4d3-7e8f-7b2c-9d4e-1f6a3b8c5d2e
Order logged: 0192a4d3-7e90-7b2c-9d4e-2f7b4c9d6e3f
Execution logged: 0192a4d3-7e91-7b2c-9d4e-3f8c5d0e7f4a
Signed Tree Head:
Tree size: 3
Root: a1b2c3d4e5f6789012345678...
Timestamp: 2025-01-19T15:30:45.123456Z
External anchors: 3
- opentimestamps: pending
- rfc3161: verified
- rfc3161: verified
Inclusion proof for execution:
Leaf index: 2
Proof length: 2 nodes
Chain verification: PASSED
--- Simulating tampering ---
Tampered events detected: [0]
9. Compliance Mapping: EU AI Act & MiFID II
EU AI Act Article 12 Compliance
| Requirement | VCP Implementation | Code Reference |
|---|---|---|
| Automatic event recording | VCPHashChain.append() |
Layer 1 |
| Risk identification logs |
VCP-RISK payload fields |
EventType.RSK |
| Post-market monitoring | Merkle inclusion proofs | get_inclusion_proof() |
| Traceability |
parent_event_id linking |
Signal → Order → Execution |
| Integrity | Hash chains + signatures | verify_chain() |
MiFID II RTS 25 Timestamp Compliance
| Tier | Precision | Clock Sync | VCP Setting |
|---|---|---|---|
| Silver | Millisecond | NTP |
MILLISECOND, NTP_SYNCED
|
| Gold | Microsecond | NTP |
MICROSECOND, NTP_SYNCED
|
| Platinum | Nanosecond | PTP |
NANOSECOND, PTP_LOCKED
|
Record Retention
# Configure retention based on regulation
RETENTION_CONFIG = {
'eu_ai_act_minimum': timedelta(days=180), # 6 months
'mifid_ii_standard': timedelta(days=1825), # 5 years
'mifid_ii_extended': timedelta(days=2555), # 7 years
'technical_documentation': timedelta(days=3650) # 10 years
}
10. Next Steps
Get Started
- Clone the reference implementation:
git clone https://github.com/veritaschain/vcp-spec
cd vcp-spec/sdk/python
pip install -e .
- Run the conformance tests:
pytest tests/conformance/ -v
- Try the Explorer API:
curl https://api.veritaschain.org/v1/health
Resources
- Specification: github.com/veritaschain/vcp-spec
- IETF Draft: datatracker.ietf.org/doc/draft-kamimura-scitt-vcp
- Technical Support: technical@veritaschain.org
Contribute
VCP is an open standard developed by a vendor-neutral non-profit. We welcome:
- Implementation feedback
- Conformance test contributions
- Integration examples
- Translation help
Conclusion
The EU AI Act demands tamper-evident logging, but provides no implementation guidance. VCP v1.1 fills that gap with:
- Hash chains for tamper detection
- Merkle trees for efficient verification
- External anchoring for irrefutable timestamps
- Multi-log replication for completeness guarantees
The code in this article is production-ready. The cryptographic primitives are standard (SHA-256, Ed25519, RFC 6962). The architecture is proven (Certificate Transparency has operated at scale since 2013).
What's new is applying these techniques to AI accountability. As AI systems make increasingly consequential decisions, we need audit infrastructure that doesn't depend on trusting the AI operator.
AI needs a flight recorder. Now you know how to build one.
Questions? Comments? Find me on LinkedIn or reach out to technical@veritaschain.org.
If this was useful, consider giving the VCP spec repo a ⭐
Top comments (0)