A deep dive into implementing tamper-evident logging for algorithmic trading systems
When the SEC delayed Rule 13f-2 until 2028, most compliance teams breathed a sigh of relief. But for engineers building trading infrastructure, the delay signals something different: we have time to build this right.
This article walks through implementing a cryptographic audit trail system that satisfies multiple financial regulations—SEC Rule 13f-2, MiFID II RTS 25, and EU AI Act Article 12—through a unified architecture. We'll cover:
- Hash chain fundamentals and why they matter for audit trails
- Merkle tree structures for efficient verification
- Ed25519 signatures for event authentication
- Sidecar integration patterns for FIX and MT4/MT5
- Event schema design for multi-regulation compliance
Let's build something that actually works.
Table of Contents
- The Problem: Why Traditional Logging Fails
- Cryptographic Foundations
- The VCP Event Schema
- Hash Chain Implementation
- Merkle Tree for Batch Verification
- Digital Signatures with Ed25519
- Sidecar Architecture
- FIX Protocol Integration
- MT4/MT5 Bridge Implementation
- Multi-Regulation Mapping
- Putting It All Together
1. The Problem: Why Traditional Logging Fails
Traditional audit logging looks something like this:
# The "trust me bro" approach
def log_trade(trade):
with open('audit.log', 'a') as f:
f.write(f"{datetime.now()},{trade.symbol},{trade.qty},{trade.price}\n")
This has several problems for regulatory compliance:
| Issue | Why It Matters |
|---|---|
| No tamper evidence | Anyone with file access can modify historical records |
| No sequencing proof | Can't prove event A happened before event B |
| No authentication | Can't prove who generated the log entry |
| No efficient verification | Must read entire log to verify any single entry |
When regulators ask "prove this trade happened at this time with these parameters," you need more than a CSV file.
2. Cryptographic Foundations
The VeritasChain Protocol (VCP) uses three cryptographic primitives:
Hash Chains (Tamper Evidence)
Each event includes a hash of the previous event. Modifying any historical event breaks the chain:
Event₁ → H(Event₁) → Event₂ → H(Event₂) → Event₃ → ...
↓ ↓
prev_hash prev_hash
Merkle Trees (Efficient Verification)
For batch verification, events are organized into a binary tree where each parent is the hash of its children:
Root Hash
/ \
H(A+B) H(C+D)
/ \ / \
H(A) H(B) H(C) H(D)
| | | |
Event A Event B Event C Event D
To prove Event B is in the tree, you only need: H(A), H(C+D), and the root. That's O(log n) instead of O(n).
Digital Signatures (Authentication)
Ed25519 signatures prove who generated each event without requiring shared secrets:
# Signing
signature = Ed25519.sign(private_key, event_bytes)
# Verification (anyone can do this)
valid = Ed25519.verify(public_key, event_bytes, signature)
3. The VCP Event Schema
VCP defines canonical event types that map to multiple regulations:
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import uuid
from datetime import datetime
class EventType(Enum):
SIG = "SIG" # Signal (AI/algorithm output)
ORD = "ORD" # Order submission
ACK = "ACK" # Order acknowledgment
EXE = "EXE" # Execution/fill
REJ = "REJ" # Rejection
CXL = "CXL" # Cancellation
MOD = "MOD" # Modification
POS = "POS" # Position snapshot
@dataclass
class VCPEvent:
# Core fields (required)
event_id: str # UUID v7 (time-ordered)
event_type: EventType
timestamp: datetime # ISO 8601 with microseconds
actor_id: str # Entity generating the event
# Trading fields (conditional)
instrument_id: Optional[str] = None # ISIN, CUSIP, or internal ID
quantity: Optional[float] = None
price: Optional[float] = None
side: Optional[str] = None # BUY, SELL, SHORT
order_id: Optional[str] = None
# AI/Algorithm fields (for EU AI Act)
model_id: Optional[str] = None
model_version: Optional[str] = None
decision_factors: Optional[dict] = None
confidence_score: Optional[float] = None
# Chain fields (computed)
prev_hash: Optional[str] = None
event_hash: Optional[str] = None
signature: Optional[str] = None
UUID v7: Time-Ordered Identifiers
VCP uses UUID v7 (RFC 9562) for event IDs because they're:
- Globally unique
- Time-ordered (first 48 bits are Unix timestamp in milliseconds)
- K-sortable (lexicographic order = chronological order)
import uuid
import time
def generate_uuid7() -> str:
"""Generate a UUID v7 with millisecond precision."""
timestamp_ms = int(time.time() * 1000)
# First 48 bits: timestamp
# Next 4 bits: version (7)
# Next 12 bits: random
# Next 2 bits: variant
# Last 62 bits: random
uuid_int = (timestamp_ms & 0xFFFFFFFFFFFF) << 80
uuid_int |= 0x7000 << 64 # Version 7
uuid_int |= (uuid.uuid4().int & 0x0FFF) << 64
uuid_int |= 0x8000000000000000 # Variant
uuid_int |= uuid.uuid4().int & 0x3FFFFFFFFFFFFFFF
return str(uuid.UUID(int=uuid_int))
# Example output: "018d5f3c-7a2b-7def-8123-456789abcdef"
4. Hash Chain Implementation
The hash chain links events cryptographically:
import hashlib
import json
from typing import List
class HashChain:
def __init__(self):
self.events: List[VCPEvent] = []
self.genesis_hash = "0" * 64 # SHA-256 zero hash
def _canonicalize(self, event: VCPEvent) -> bytes:
"""
RFC 8785 JSON Canonicalization Scheme (JCS).
Ensures deterministic serialization for hashing.
"""
# Sort keys, no whitespace, escape unicode
event_dict = {
"event_id": event.event_id,
"event_type": event.event_type.value,
"timestamp": event.timestamp.isoformat(),
"actor_id": event.actor_id,
}
# Add optional fields only if present
if event.instrument_id:
event_dict["instrument_id"] = event.instrument_id
if event.quantity is not None:
event_dict["quantity"] = event.quantity
if event.price is not None:
event_dict["price"] = event.price
if event.side:
event_dict["side"] = event.side
if event.model_id:
event_dict["model_id"] = event.model_id
if event.decision_factors:
event_dict["decision_factors"] = event.decision_factors
# Include prev_hash in the hash calculation
event_dict["prev_hash"] = event.prev_hash
# Sort keys for deterministic output
return json.dumps(event_dict, sort_keys=True,
separators=(',', ':')).encode('utf-8')
def _compute_hash(self, event: VCPEvent) -> str:
"""Compute SHA-256 hash of canonical event representation."""
canonical = self._canonicalize(event)
return hashlib.sha256(canonical).hexdigest()
def append(self, event: VCPEvent) -> VCPEvent:
"""Add event to chain, computing prev_hash and event_hash."""
# Set prev_hash from last event (or genesis)
if self.events:
event.prev_hash = self.events[-1].event_hash
else:
event.prev_hash = self.genesis_hash
# Compute this event's hash
event.event_hash = self._compute_hash(event)
self.events.append(event)
return event
def verify_chain(self) -> bool:
"""Verify entire chain integrity."""
for i, event in enumerate(self.events):
# Check prev_hash linkage
expected_prev = self.genesis_hash if i == 0 else self.events[i-1].event_hash
if event.prev_hash != expected_prev:
return False
# Recompute and verify event_hash
if event.event_hash != self._compute_hash(event):
return False
return True
def detect_tampering(self) -> Optional[int]:
"""Return index of first tampered event, or None if chain is valid."""
for i, event in enumerate(self.events):
expected_prev = self.genesis_hash if i == 0 else self.events[i-1].event_hash
if event.prev_hash != expected_prev:
return i
if event.event_hash != self._compute_hash(event):
return i
return None
Tamper Detection in Action
# Create chain and add events
chain = HashChain()
event1 = VCPEvent(
event_id=generate_uuid7(),
event_type=EventType.ORD,
timestamp=datetime.utcnow(),
actor_id="algo-001",
instrument_id="US0378331005", # AAPL ISIN
quantity=100,
price=150.25,
side="BUY"
)
chain.append(event1)
event2 = VCPEvent(
event_id=generate_uuid7(),
event_type=EventType.EXE,
timestamp=datetime.utcnow(),
actor_id="exchange-nyse",
instrument_id="US0378331005",
quantity=100,
price=150.25,
side="BUY",
order_id=event1.event_id
)
chain.append(event2)
# Verify chain
print(chain.verify_chain()) # True
# Simulate tampering
chain.events[0].quantity = 1000 # Someone changed the quantity!
# Detect tampering
tampered_idx = chain.detect_tampering()
print(f"Tampering detected at index: {tampered_idx}") # 0
5. Merkle Tree for Batch Verification
For efficient auditing, we organize events into RFC 6962-compliant Merkle trees:
from typing import Tuple
class MerkleTree:
def __init__(self, events: List[VCPEvent]):
self.leaves = [e.event_hash for e in events]
self.tree = self._build_tree()
def _hash_pair(self, left: str, right: str) -> str:
"""Hash two nodes together (RFC 6962 style)."""
# Prefix with 0x01 for internal nodes
combined = bytes.fromhex("01") + bytes.fromhex(left) + bytes.fromhex(right)
return hashlib.sha256(combined).hexdigest()
def _build_tree(self) -> List[List[str]]:
"""Build tree bottom-up."""
if not self.leaves:
return [[]]
# Pad to power of 2
leaves = self.leaves.copy()
while len(leaves) & (len(leaves) - 1):
leaves.append(leaves[-1]) # Duplicate last leaf
tree = [leaves]
current_level = leaves
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
parent = self._hash_pair(current_level[i], current_level[i+1])
next_level.append(parent)
tree.append(next_level)
current_level = next_level
return tree
@property
def root(self) -> str:
"""Return Merkle root hash."""
return self.tree[-1][0] if self.tree[-1] else ""
def get_proof(self, index: int) -> List[Tuple[str, str]]:
"""
Generate inclusion proof for leaf at index.
Returns list of (hash, direction) tuples.
"""
if index >= len(self.leaves):
raise IndexError("Leaf index out of range")
proof = []
idx = index
for level in self.tree[:-1]:
if idx % 2 == 0:
sibling_idx = idx + 1
direction = "right"
else:
sibling_idx = idx - 1
direction = "left"
if sibling_idx < len(level):
proof.append((level[sibling_idx], direction))
idx //= 2
return proof
@staticmethod
def verify_proof(leaf_hash: str, proof: List[Tuple[str, str]],
root: str) -> bool:
"""Verify an inclusion proof."""
current = leaf_hash
for sibling_hash, direction in proof:
if direction == "left":
combined = bytes.fromhex("01") + bytes.fromhex(sibling_hash) + bytes.fromhex(current)
else:
combined = bytes.fromhex("01") + bytes.fromhex(current) + bytes.fromhex(sibling_hash)
current = hashlib.sha256(combined).hexdigest()
return current == root
# Usage example
events = [chain.events[0], chain.events[1]] # From previous example
tree = MerkleTree(events)
print(f"Merkle root: {tree.root}")
# Generate proof for first event
proof = tree.get_proof(0)
print(f"Proof for event 0: {proof}")
# Verify proof
is_valid = MerkleTree.verify_proof(
events[0].event_hash,
proof,
tree.root
)
print(f"Proof valid: {is_valid}") # True
Why Merkle Trees Matter
For regulatory examination:
- Without Merkle tree: Auditor must download and verify entire dataset
- With Merkle tree: Auditor verifies specific events with O(log n) proof
For a dataset of 1 million events:
- Full verification: 1,000,000 hash operations
- Merkle proof: ~20 hash operations
6. Digital Signatures with Ed25519
VCP uses Ed25519 (RFC 8032) for event authentication:
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey, Ed25519PublicKey
)
from cryptography.hazmat.primitives import serialization
import base64
class VCPSigner:
def __init__(self, private_key: Ed25519PrivateKey = None):
self.private_key = private_key or Ed25519PrivateKey.generate()
self.public_key = self.private_key.public_key()
def get_public_key_bytes(self) -> bytes:
"""Export public key in raw format."""
return self.public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
def get_public_key_b64(self) -> str:
"""Export public key as base64."""
return base64.b64encode(self.get_public_key_bytes()).decode('ascii')
def sign_event(self, event: VCPEvent) -> str:
"""Sign event and return base64 signature."""
# Sign the event_hash (which includes all content + prev_hash)
message = bytes.fromhex(event.event_hash)
signature = self.private_key.sign(message)
return base64.b64encode(signature).decode('ascii')
@staticmethod
def verify_signature(public_key_b64: str, event_hash: str,
signature_b64: str) -> bool:
"""Verify event signature."""
try:
public_key_bytes = base64.b64decode(public_key_b64)
public_key = Ed25519PublicKey.from_public_bytes(public_key_bytes)
message = bytes.fromhex(event_hash)
signature = base64.b64decode(signature_b64)
public_key.verify(signature, message)
return True
except Exception:
return False
# Usage
signer = VCPSigner()
# Sign an event
event = chain.events[0]
event.signature = signer.sign_event(event)
print(f"Public key: {signer.get_public_key_b64()}")
print(f"Signature: {event.signature}")
# Verify
is_valid = VCPSigner.verify_signature(
signer.get_public_key_b64(),
event.event_hash,
event.signature
)
print(f"Signature valid: {is_valid}") # True
Key Management Considerations
For production systems:
# Store keys securely (example using environment variables)
import os
def load_or_create_signer() -> VCPSigner:
key_b64 = os.environ.get('VCP_PRIVATE_KEY')
if key_b64:
# Load existing key
key_bytes = base64.b64decode(key_b64)
private_key = Ed25519PrivateKey.from_private_bytes(key_bytes)
return VCPSigner(private_key)
else:
# Generate new key (first run only!)
signer = VCPSigner()
print("WARNING: New key generated. Export and store securely:")
private_bytes = signer.private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
)
print(f"VCP_PRIVATE_KEY={base64.b64encode(private_bytes).decode()}")
return signer
7. Sidecar Architecture
The key to non-invasive integration is the sidecar pattern: VCP runs alongside your trading infrastructure without modifying it.
┌─────────────────────────────────────────────────────────────────┐
│ Trading Infrastructure │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ MT4/MT5 │ │ FIX │ │ OMS │ │ EMS │ │
│ │ Terminal │ │ Engine │ │ │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ │ Message │ Message │ Event │ │
│ │ Stream │ Stream │ Stream │ │
│ └───────────────┼───────────────┼───────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ VCP Sidecar Adapter │ │
│ │ ┌───────────────────────────┐ │ │
│ │ │ Protocol Normalizers │ │ │
│ │ │ - FIX → VCP Event │ │ │
│ │ │ - MT4 → VCP Event │ │ │
│ │ │ - REST → VCP Event │ │ │
│ │ └───────────────────────────┘ │ │
│ └──────────────┬──────────────────┘ │
│ │ │
└─────────────────────────────┼────────────────────────────────────┘
│
▼
┌──────────────────┐
│ VCP Core │
│ ┌────────────┐ │
│ │ Hash Chain │ │
│ ├────────────┤ │
│ │ Merkle │ │
│ │ Tree │ │
│ ├────────────┤ │
│ │ Signer │ │
│ └────────────┘ │
└────────┬─────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Form SHO │ │ SLATE │ │ RTS 25 │
│ XML │ │ JSON │ │ Report │
└──────────┘ └──────────┘ └──────────┘
The Sidecar Adapter
from abc import ABC, abstractmethod
from typing import AsyncIterator
import asyncio
class ProtocolAdapter(ABC):
"""Base class for protocol-specific adapters."""
@abstractmethod
async def connect(self) -> None:
"""Establish connection to source system."""
pass
@abstractmethod
async def stream_events(self) -> AsyncIterator[VCPEvent]:
"""Yield VCP events from source protocol."""
pass
class VCPSidecar:
"""Main sidecar process coordinating adapters and chain."""
def __init__(self, adapters: List[ProtocolAdapter],
signer: VCPSigner):
self.adapters = adapters
self.chain = HashChain()
self.signer = signer
self.merkle_batch: List[VCPEvent] = []
self.merkle_interval = 1000 # Events per Merkle tree
async def run(self):
"""Main event loop."""
tasks = [self._process_adapter(adapter) for adapter in self.adapters]
await asyncio.gather(*tasks)
async def _process_adapter(self, adapter: ProtocolAdapter):
"""Process events from a single adapter."""
await adapter.connect()
async for event in adapter.stream_events():
# Add to hash chain
self.chain.append(event)
# Sign the event
event.signature = self.signer.sign_event(event)
# Add to Merkle batch
self.merkle_batch.append(event)
# Build Merkle tree periodically
if len(self.merkle_batch) >= self.merkle_interval:
tree = MerkleTree(self.merkle_batch)
await self._anchor_merkle_root(tree.root)
self.merkle_batch = []
# Persist event
await self._persist_event(event)
async def _anchor_merkle_root(self, root: str):
"""Anchor Merkle root to external system (blockchain, etc.)."""
# Implementation depends on anchoring strategy
print(f"Anchoring Merkle root: {root[:16]}...")
async def _persist_event(self, event: VCPEvent):
"""Persist event to storage."""
# Implementation depends on storage backend
pass
8. FIX Protocol Integration
For institutional trading, the FIX Protocol is ubiquitous. Here's how to tap into a FIX engine:
import quickfix as fix
from datetime import datetime
from typing import AsyncIterator
import asyncio
class FIXAdapter(ProtocolAdapter):
"""Adapter for FIX Protocol message streams."""
# FIX tag mappings
TAG_MAPPINGS = {
fix.MsgType: 'msg_type',
fix.ClOrdID: 'order_id',
fix.Symbol: 'symbol',
fix.Side: 'side',
fix.OrderQty: 'quantity',
fix.Price: 'price',
fix.OrdStatus: 'status',
fix.ExecType: 'exec_type',
fix.ExecID: 'exec_id',
fix.TransactTime: 'transact_time',
}
SIDE_MAP = {'1': 'BUY', '2': 'SELL', '5': 'SHORT'}
def __init__(self, config_path: str):
self.config_path = config_path
self.event_queue: asyncio.Queue = asyncio.Queue()
async def connect(self) -> None:
"""Initialize FIX session."""
settings = fix.SessionSettings(self.config_path)
store_factory = fix.FileStoreFactory(settings)
log_factory = fix.FileLogFactory(settings)
self.application = FIXApplication(self.event_queue)
self.initiator = fix.SocketInitiator(
self.application, store_factory, settings, log_factory
)
self.initiator.start()
async def stream_events(self) -> AsyncIterator[VCPEvent]:
"""Yield VCP events from FIX messages."""
while True:
fix_msg = await self.event_queue.get()
vcp_event = self._convert_to_vcp(fix_msg)
if vcp_event:
yield vcp_event
def _convert_to_vcp(self, msg: fix.Message) -> Optional[VCPEvent]:
"""Convert FIX message to VCP event."""
msg_type = fix.MsgType()
msg.getHeader().getField(msg_type)
# Map FIX message type to VCP event type
event_type_map = {
fix.MsgType_NewOrderSingle: EventType.ORD,
fix.MsgType_ExecutionReport: self._get_exec_event_type(msg),
fix.MsgType_OrderCancelRequest: EventType.CXL,
fix.MsgType_OrderCancelReplaceRequest: EventType.MOD,
}
event_type = event_type_map.get(msg_type.getValue())
if not event_type:
return None
# Extract common fields
symbol = fix.Symbol()
side = fix.Side()
qty = fix.OrderQty()
price = fix.Price()
order_id = fix.ClOrdID()
msg.getField(symbol) if msg.isSetField(fix.Symbol()) else None
msg.getField(side) if msg.isSetField(fix.Side()) else None
msg.getField(qty) if msg.isSetField(fix.OrderQty()) else None
msg.getField(price) if msg.isSetField(fix.Price()) else None
msg.getField(order_id) if msg.isSetField(fix.ClOrdID()) else None
return VCPEvent(
event_id=generate_uuid7(),
event_type=event_type,
timestamp=datetime.utcnow(),
actor_id=self._get_sender_comp_id(msg),
instrument_id=symbol.getValue() if symbol else None,
quantity=float(qty.getValue()) if qty else None,
price=float(price.getValue()) if price else None,
side=self.SIDE_MAP.get(side.getValue()) if side else None,
order_id=order_id.getValue() if order_id else None,
)
def _get_exec_event_type(self, msg: fix.Message) -> EventType:
"""Determine VCP event type from ExecutionReport."""
exec_type = fix.ExecType()
msg.getField(exec_type)
exec_map = {
fix.ExecType_NEW: EventType.ACK,
fix.ExecType_FILL: EventType.EXE,
fix.ExecType_PARTIAL_FILL: EventType.EXE,
fix.ExecType_REJECTED: EventType.REJ,
fix.ExecType_CANCELED: EventType.CXL,
}
return exec_map.get(exec_type.getValue(), EventType.ACK)
def _get_sender_comp_id(self, msg: fix.Message) -> str:
"""Extract SenderCompID from message header."""
sender = fix.SenderCompID()
msg.getHeader().getField(sender)
return sender.getValue()
class FIXApplication(fix.Application):
"""QuickFIX application callback handler."""
def __init__(self, event_queue: asyncio.Queue):
super().__init__()
self.event_queue = event_queue
def fromApp(self, message: fix.Message, session_id: fix.SessionID):
"""Handle incoming application messages."""
# Put message in queue for async processing
asyncio.get_event_loop().call_soon_threadsafe(
self.event_queue.put_nowait, message
)
# Other required callbacks (onCreate, onLogon, etc.)
def onCreate(self, session_id): pass
def onLogon(self, session_id): pass
def onLogout(self, session_id): pass
def toAdmin(self, message, session_id): pass
def fromAdmin(self, message, session_id): pass
def toApp(self, message, session_id): pass
9. MT4/MT5 Bridge Implementation
For retail trading platforms, here's an MQL5 bridge:
//+------------------------------------------------------------------+
//| VCPBridge.mq5 |
//| VeritasChain Standards Organization |
//+------------------------------------------------------------------+
#property copyright "VSO"
#property version "1.00"
#include <Trade\Trade.mqh>
#include <Files\File.mqh>
// VCP Event structure matching Python schema
struct VCPEvent {
string event_id;
string event_type;
datetime timestamp;
string actor_id;
string instrument_id;
double quantity;
double price;
string side;
string order_id;
string prev_hash;
string event_hash;
};
// Global state
string g_prev_hash = "0000000000000000000000000000000000000000000000000000000000000000";
int g_socket_handle = INVALID_HANDLE;
string g_actor_id;
//+------------------------------------------------------------------+
//| Expert initialization function |
//+------------------------------------------------------------------+
int OnInit() {
// Generate actor ID from account info
g_actor_id = "MT5-" + IntegerToString(AccountInfoInteger(ACCOUNT_LOGIN));
// Connect to VCP sidecar (local socket)
g_socket_handle = SocketCreate();
if(g_socket_handle == INVALID_HANDLE) {
Print("Failed to create socket");
return INIT_FAILED;
}
if(!SocketConnect(g_socket_handle, "127.0.0.1", 9999, 5000)) {
Print("Failed to connect to VCP sidecar");
return INIT_FAILED;
}
Print("VCP Bridge initialized for account: ", g_actor_id);
return INIT_SUCCEEDED;
}
//+------------------------------------------------------------------+
//| Generate UUID v7 (simplified for MQL5) |
//+------------------------------------------------------------------+
string GenerateUUID7() {
// Get millisecond timestamp
datetime now = TimeCurrent();
long ms = (long)now * 1000 + GetTickCount() % 1000;
// Format: timestamp-random-7xxx-random
string uuid = StringFormat("%012llX", ms);
uuid += "-";
uuid += StringFormat("%04X", MathRand() % 0xFFFF);
uuid += "-7";
uuid += StringFormat("%03X", MathRand() % 0xFFF);
uuid += "-";
uuid += StringFormat("%04X", 0x8000 + MathRand() % 0x3FFF);
uuid += "-";
uuid += StringFormat("%012llX", (long)MathRand() << 32 | MathRand());
return uuid;
}
//+------------------------------------------------------------------+
//| Compute SHA-256 hash (via sidecar) |
//+------------------------------------------------------------------+
string ComputeHash(string data) {
// Send hash request to sidecar
string request = "HASH:" + data;
char req_bytes[];
StringToCharArray(request, req_bytes, 0, WHOLE_ARRAY, CP_UTF8);
if(SocketSend(g_socket_handle, req_bytes, ArraySize(req_bytes)) < 0) {
Print("Failed to send hash request");
return "";
}
// Receive hash response
char response[];
int received = SocketRead(g_socket_handle, response, 64, 1000);
if(received <= 0) {
Print("Failed to receive hash");
return "";
}
return CharArrayToString(response, 0, received, CP_UTF8);
}
//+------------------------------------------------------------------+
//| Create and send VCP event |
//+------------------------------------------------------------------+
void SendVCPEvent(string event_type, string symbol, double qty,
double price, string side, ulong ticket) {
VCPEvent event;
event.event_id = GenerateUUID7();
event.event_type = event_type;
event.timestamp = TimeCurrent();
event.actor_id = g_actor_id;
event.instrument_id = symbol;
event.quantity = qty;
event.price = price;
event.side = side;
event.order_id = IntegerToString(ticket);
event.prev_hash = g_prev_hash;
// Create canonical JSON for hashing
string json = StringFormat(
"{\"actor_id\":\"%s\",\"event_id\":\"%s\",\"event_type\":\"%s\","
"\"instrument_id\":\"%s\",\"order_id\":\"%s\",\"prev_hash\":\"%s\","
"\"price\":%.5f,\"quantity\":%.2f,\"side\":\"%s\",\"timestamp\":\"%s\"}",
event.actor_id, event.event_id, event.event_type,
event.instrument_id, event.order_id, event.prev_hash,
event.price, event.quantity, event.side,
TimeToString(event.timestamp, TIME_DATE|TIME_SECONDS)
);
// Compute hash
event.event_hash = ComputeHash(json);
// Send to sidecar
string message = "EVENT:" + json + ",\"event_hash\":\"" + event.event_hash + "\"}";
char msg_bytes[];
StringToCharArray(message, msg_bytes, 0, WHOLE_ARRAY, CP_UTF8);
SocketSend(g_socket_handle, msg_bytes, ArraySize(msg_bytes));
// Update chain state
g_prev_hash = event.event_hash;
Print("VCP Event sent: ", event.event_type, " ", symbol, " ", qty, " @ ", price);
}
//+------------------------------------------------------------------+
//| Trade transaction handler |
//+------------------------------------------------------------------+
void OnTradeTransaction(const MqlTradeTransaction& trans,
const MqlTradeRequest& request,
const MqlTradeResult& result) {
// Map MT5 transaction types to VCP events
switch(trans.type) {
case TRADE_TRANSACTION_ORDER_ADD:
SendVCPEvent("ORD", trans.symbol, trans.volume, trans.price,
trans.order_type == ORDER_TYPE_BUY ? "BUY" : "SELL",
trans.order);
break;
case TRADE_TRANSACTION_DEAL_ADD:
SendVCPEvent("EXE", trans.symbol, trans.volume, trans.price,
trans.deal_type == DEAL_TYPE_BUY ? "BUY" : "SELL",
trans.deal);
break;
case TRADE_TRANSACTION_ORDER_DELETE:
SendVCPEvent("CXL", trans.symbol, trans.volume, trans.price,
"", trans.order);
break;
}
}
//+------------------------------------------------------------------+
//| Expert deinitialization function |
//+------------------------------------------------------------------+
void OnDeinit(const int reason) {
if(g_socket_handle != INVALID_HANDLE) {
SocketClose(g_socket_handle);
}
Print("VCP Bridge disconnected");
}
Python Sidecar for MT4/MT5
import asyncio
import hashlib
import json
class MT5SidecarServer:
"""Socket server receiving events from MT5 bridge."""
def __init__(self, host: str = '127.0.0.1', port: int = 9999):
self.host = host
self.port = port
self.chain = HashChain()
self.signer = VCPSigner()
async def start(self):
server = await asyncio.start_server(
self.handle_client, self.host, self.port
)
print(f"MT5 Sidecar listening on {self.host}:{self.port}")
async with server:
await server.serve_forever()
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
print("MT5 client connected")
while True:
try:
data = await reader.read(4096)
if not data:
break
message = data.decode('utf-8')
if message.startswith('HASH:'):
# Hash request
content = message[5:]
hash_result = hashlib.sha256(content.encode()).hexdigest()
writer.write(hash_result.encode())
await writer.drain()
elif message.startswith('EVENT:'):
# Event submission
event_json = message[6:]
await self.process_event(event_json)
except Exception as e:
print(f"Error: {e}")
break
writer.close()
await writer.wait_closed()
async def process_event(self, event_json: str):
"""Process incoming VCP event from MT5."""
try:
data = json.loads(event_json)
event = VCPEvent(
event_id=data['event_id'],
event_type=EventType(data['event_type']),
timestamp=datetime.fromisoformat(data['timestamp']),
actor_id=data['actor_id'],
instrument_id=data.get('instrument_id'),
quantity=data.get('quantity'),
price=data.get('price'),
side=data.get('side'),
order_id=data.get('order_id'),
prev_hash=data.get('prev_hash'),
event_hash=data.get('event_hash'),
)
# Verify hash chain consistency
if self.chain.events:
expected_prev = self.chain.events[-1].event_hash
if event.prev_hash != expected_prev:
print(f"WARNING: Chain inconsistency detected!")
# Add to our chain and sign
self.chain.events.append(event)
event.signature = self.signer.sign_event(event)
print(f"Processed: {event.event_type.value} {event.instrument_id}")
except Exception as e:
print(f"Failed to process event: {e}")
# Run the sidecar
if __name__ == "__main__":
sidecar = MT5SidecarServer()
asyncio.run(sidecar.start())
10. Multi-Regulation Mapping
The real power of VCP is satisfying multiple regulations from a single event stream:
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import xml.etree.ElementTree as ET
class RegulatoryExporter(ABC):
"""Base class for regulation-specific exporters."""
@abstractmethod
def export(self, events: List[VCPEvent]) -> Any:
pass
class FormSHOExporter(RegulatoryExporter):
"""Export events to SEC Form SHO XML format."""
def export(self, events: List[VCPEvent]) -> str:
"""Generate Form SHO XML for short position reporting."""
# Filter to position events and calculate aggregates
positions = self._aggregate_positions(events)
root = ET.Element("formSHO")
root.set("xmlns", "http://www.sec.gov/edgar/formSHO")
header = ET.SubElement(root, "headerData")
ET.SubElement(header, "submissionType").text = "SHO"
ET.SubElement(header, "filerInfo") # Populated from config
info_table = ET.SubElement(root, "informationTable")
for isin, position_data in positions.items():
if position_data['gross_short'] >= 10_000_000 or \
position_data['pct_outstanding'] >= 0.025:
entry = ET.SubElement(info_table, "infoTableEntry")
ET.SubElement(entry, "issuerName").text = position_data['name']
ET.SubElement(entry, "issuerCusip").text = position_data['cusip']
ET.SubElement(entry, "grossShortPosition").text = \
str(int(position_data['gross_short']))
ET.SubElement(entry, "dollarValue").text = \
str(int(position_data['dollar_value']))
return ET.tostring(root, encoding='unicode', method='xml')
def _aggregate_positions(self, events: List[VCPEvent]) -> Dict:
"""Aggregate events into position snapshots."""
positions = {}
for event in events:
if event.event_type == EventType.POS and event.side == "SHORT":
isin = event.instrument_id
if isin not in positions:
positions[isin] = {
'gross_short': 0,
'dollar_value': 0,
'name': '',
'cusip': '',
'pct_outstanding': 0,
}
positions[isin]['gross_short'] = event.quantity
positions[isin]['dollar_value'] = event.quantity * (event.price or 0)
return positions
class MiFIDRTS25Exporter(RegulatoryExporter):
"""Export events to MiFID II RTS 25 format."""
def export(self, events: List[VCPEvent]) -> List[Dict]:
"""Generate RTS 25 execution records."""
records = []
for event in events:
if event.event_type == EventType.EXE:
record = {
"tradingVenueTransactionId": event.event_id,
"executingFirmId": event.actor_id,
"instrumentId": event.instrument_id,
"price": event.price,
"quantity": event.quantity,
"tradeDatetime": event.timestamp.isoformat() + "Z",
"buySellIndicator": "BUYI" if event.side == "BUY" else "SELL",
# Clock sync status from VCP metadata
"clockSyncStatus": "SYNC", # Or "NOSYNC" if degraded
}
records.append(record)
return records
class EUAIActExporter(RegulatoryExporter):
"""Export events to EU AI Act Article 12 format."""
def export(self, events: List[VCPEvent]) -> List[Dict]:
"""Generate AI Act logging records for high-risk AI systems."""
records = []
for event in events:
if event.event_type == EventType.SIG and event.model_id:
record = {
"logId": event.event_id,
"timestamp": event.timestamp.isoformat() + "Z",
"aiSystemId": event.model_id,
"aiSystemVersion": event.model_version,
# Article 12(2)(a): periods of use
"operationalPeriod": {
"start": event.timestamp.isoformat() + "Z",
"status": "ACTIVE"
},
# Article 12(2)(b): input data reference
"inputDataReference": event.prev_hash,
# Article 12(2)(c): output and decisions
"decisionOutput": {
"action": event.decision_factors.get("action") if event.decision_factors else None,
"confidence": event.confidence_score,
"factors": event.decision_factors,
},
# Cryptographic verification
"integrityHash": event.event_hash,
"signature": event.signature,
}
records.append(record)
return records
class MultiRegulationExporter:
"""Unified exporter for all supported regulations."""
def __init__(self):
self.exporters = {
'form_sho': FormSHOExporter(),
'mifid_rts25': MiFIDRTS25Exporter(),
'eu_ai_act': EUAIActExporter(),
}
def export_all(self, events: List[VCPEvent]) -> Dict[str, Any]:
"""Export events to all regulation formats."""
return {
name: exporter.export(events)
for name, exporter in self.exporters.items()
}
# Usage
exporter = MultiRegulationExporter()
all_outputs = exporter.export_all(chain.events)
print("Form SHO XML:", all_outputs['form_sho'][:200])
print("MiFID RTS 25 records:", len(all_outputs['mifid_rts25']))
print("EU AI Act logs:", len(all_outputs['eu_ai_act']))
Regulation Coverage Matrix
| VCP Field | Form SHO | 10c-1a SLATE | MiFID RTS 25 | EU AI Act Art. 12 |
|---|---|---|---|---|
event_id |
✓ | ✓ | ✓ | ✓ |
timestamp |
✓ | ✓ | ✓ (μs precision) | ✓ |
actor_id |
✓ | ✓ | ✓ | ✓ |
instrument_id |
✓ | ✓ | ✓ | — |
quantity |
✓ | ✓ | ✓ | — |
price |
✓ | ✓ | ✓ | — |
side |
✓ | ✓ | ✓ | — |
model_id |
— | — | — | ✓ |
decision_factors |
— | — | — | ✓ |
confidence_score |
— | — | — | ✓ |
prev_hash |
✓ | ✓ | ✓ | ✓ |
signature |
✓ | ✓ | ✓ | ✓ |
11. Putting It All Together
Here's a complete working example:
import asyncio
from datetime import datetime
async def main():
# Initialize components
signer = VCPSigner()
chain = HashChain()
print(f"VCP Node initialized")
print(f"Public key: {signer.get_public_key_b64()[:32]}...")
# Simulate trading events
events_data = [
# AI signal
{
"event_type": EventType.SIG,
"actor_id": "quant-model-v3",
"instrument_id": "US0378331005",
"side": "BUY",
"model_id": "alpha-momentum-v3.2.1",
"model_version": "3.2.1",
"decision_factors": {
"momentum_score": 0.85,
"mean_reversion": -0.12,
"volume_signal": 0.67,
},
"confidence_score": 0.78,
},
# Order submission
{
"event_type": EventType.ORD,
"actor_id": "oms-prod-1",
"instrument_id": "US0378331005",
"quantity": 1000,
"price": 178.50,
"side": "BUY",
},
# Order acknowledgment
{
"event_type": EventType.ACK,
"actor_id": "nasdaq-matching",
"instrument_id": "US0378331005",
"quantity": 1000,
"price": 178.50,
"side": "BUY",
"order_id": None, # Will be linked
},
# Execution
{
"event_type": EventType.EXE,
"actor_id": "nasdaq-matching",
"instrument_id": "US0378331005",
"quantity": 1000,
"price": 178.48,
"side": "BUY",
},
]
# Process events
for i, data in enumerate(events_data):
event = VCPEvent(
event_id=generate_uuid7(),
event_type=data["event_type"],
timestamp=datetime.utcnow(),
actor_id=data["actor_id"],
instrument_id=data.get("instrument_id"),
quantity=data.get("quantity"),
price=data.get("price"),
side=data.get("side"),
model_id=data.get("model_id"),
model_version=data.get("model_version"),
decision_factors=data.get("decision_factors"),
confidence_score=data.get("confidence_score"),
)
# Link order_id to previous event if applicable
if data["event_type"] in [EventType.ACK, EventType.EXE] and chain.events:
event.order_id = chain.events[-1].event_id
# Add to chain
chain.append(event)
# Sign
event.signature = signer.sign_event(event)
print(f"\n{'='*60}")
print(f"Event {i+1}: {event.event_type.value}")
print(f" ID: {event.event_id}")
print(f" Hash: {event.event_hash[:32]}...")
print(f" Prev: {event.prev_hash[:32]}...")
print(f" Sig: {event.signature[:32]}...")
# Verify chain integrity
print(f"\n{'='*60}")
print(f"Chain verification: {'✓ VALID' if chain.verify_chain() else '✗ INVALID'}")
# Build Merkle tree
tree = MerkleTree(chain.events)
print(f"Merkle root: {tree.root[:32]}...")
# Generate inclusion proof for first event
proof = tree.get_proof(0)
print(f"Proof for event 0: {len(proof)} nodes")
# Verify proof
valid = MerkleTree.verify_proof(chain.events[0].event_hash, proof, tree.root)
print(f"Proof verification: {'✓ VALID' if valid else '✗ INVALID'}")
# Export to regulatory formats
exporter = MultiRegulationExporter()
outputs = exporter.export_all(chain.events)
print(f"\n{'='*60}")
print("Regulatory Exports:")
print(f" EU AI Act records: {len(outputs['eu_ai_act'])}")
print(f" MiFID RTS 25 records: {len(outputs['mifid_rts25'])}")
# Simulate tampering detection
print(f"\n{'='*60}")
print("Simulating tampering...")
original_qty = chain.events[1].quantity
chain.events[1].quantity = 10000 # Attacker changes quantity
tampered_idx = chain.detect_tampering()
print(f"Tampering detected at event index: {tampered_idx}")
# Restore
chain.events[1].quantity = original_qty
if __name__ == "__main__":
asyncio.run(main())
Output
VCP Node initialized
Public key: MCowBQYDK2VwAyEA7Hp8j2mQ...
============================================================
Event 1: SIG
ID: 018d5f3c-7a2b-7def-8123-456789abcdef
Hash: a3f2e8c1d4b5a6978012345678...
Prev: 00000000000000000000000000...
Sig: MEUCIQDh8Kj2mNpQ3rS5tU6vW...
============================================================
Event 2: ORD
ID: 018d5f3c-7a2c-7abc-9234-567890abcdef
Hash: b4e3f9d2c5a6b7890123456789...
Prev: a3f2e8c1d4b5a6978012345678...
Sig: MEQCIDk9Lm3nOpR4sT6uV7wX...
...
============================================================
Chain verification: ✓ VALID
Merkle root: f8e7d6c5b4a3928170615243...
Proof for event 0: 2 nodes
Proof verification: ✓ VALID
============================================================
Regulatory Exports:
EU AI Act records: 1
MiFID RTS 25 records: 1
============================================================
Simulating tampering...
Tampering detected at event index: 1
Conclusion
We've built a complete cryptographic audit trail system that:
- Provides tamper evidence through hash chains
- Enables efficient verification via Merkle trees
- Authenticates events with Ed25519 signatures
- Integrates non-invasively via sidecar architecture
- Supports multiple protocols (FIX, MT4/MT5)
- Satisfies multiple regulations from a single event stream
The SEC's Rule 13f-2 delay gives us time to build this right. The Fifth Circuit's ruling on cumulative economic impact creates regulatory demand for solutions that reduce aggregate compliance burden.
This is that solution.
Resources
- VCP Specification: github.com/veritaschain/vcp-spec
- IETF Internet-Draft: datatracker.ietf.org/doc/draft-kamimura-scitt-vcp
- RFC 6962: Certificate Transparency (Merkle trees)
- RFC 8032: Ed25519 signatures
- RFC 8785: JSON Canonicalization Scheme
- RFC 9562: UUID v7
Questions? Reach out at developers@veritaschain.org or open an issue on GitHub.
#fintech #cryptography #compliance #python #trading
Top comments (0)