When a plane crashes, investigators have one invaluable resource: the flight recorder. This black box captures every decision, every input, every system state—creating an immutable record that tells us exactly what happened and why.
Now imagine your AI trading system makes a catastrophic decision. A flash crash. A rogue algorithm. A regulatory inquiry asking: "What did your AI decide, when, and why?"
Can you prove it?
If you're relying on traditional logging—database entries, text files, even append-only logs—the answer is probably "no." Not cryptographically. Not in a way that proves the logs weren't tampered with after the fact.
This is the problem the VeritasChain Protocol (VCP) solves. It's an open standard for creating cryptographic audit trails that can mathematically prove:
- What happened (event authenticity)
- When it happened (temporal integrity)
- That nothing was hidden (completeness guarantees)
- That nothing was changed (tamper evidence)
In this article, I'll walk you through the architecture, show you working code, and explain why this matters for the EU AI Act compliance deadline that's rapidly approaching.
Table of Contents
- Why Traditional Logging Fails
- The Three-Layer Architecture
- Building Your First VCP Event
- Hash Chains and Merkle Trees
- External Anchoring: The Trust Anchor
- EU AI Act Compliance Mapping
- Integration Patterns
- Production Considerations
- Resources and Next Steps
Why Traditional Logging Fails
Let's be real about what traditional logging gives you:
# Traditional logging
import logging
logger = logging.getLogger('trading')
def execute_trade(order):
logger.info(f"Executing order: {order.id} at {order.price}")
# ... trade logic ...
logger.info(f"Order {order.id} filled at {fill_price}")
This creates a record. But that record has problems:
Problem 1: Mutability
# Anyone with file access can do this
sed -i 's/filled at 150.00/filled at 145.00/' trading.log
Problem 2: Deletability
# Oops, those inconvenient entries are gone
grep -v "suspicious_trade" trading.log > clean.log && mv clean.log trading.log
Problem 3: No Completeness Proof
How do you prove that you logged everything? That there aren't gaps where events should be? Traditional logs can prove what they contain, but not what they're missing.
Problem 4: Clock Manipulation
# Change the system clock, create backdated entries
import os
os.system('date -s "2025-01-15 14:30:00"')
logger.info("Totally legitimate entry from the past")
For regulatory compliance—especially under the EU AI Act—these aren't theoretical concerns. Article 12 requires "automatic recording of events (logs) over the lifetime of the system" with "appropriate level of traceability." Regulators increasingly expect more than trust-me-bro logging.
The Three-Layer Architecture
VCP v1.1 introduces a three-layer architecture that addresses each of these problems:
┌─────────────────────────────────────────────────────────────────┐
│ LAYER 3: EXTERNAL VERIFIABILITY │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ External Anchor (TSA / Blockchain / SCITT) ││
│ │ → Independent timestamp proves WHEN the log existed ││
│ │ → Third party can verify without trusting log producer ││
│ └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│ LAYER 2: LOCAL INTEGRITY (Collection Integrity) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Merkle Tree + Digital Signatures ││
│ │ → Batch integrity: any modification invalidates the root ││
│ │ → Completeness: missing events create detectable gaps ││
│ └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│ LAYER 1: EVENT GENERATION (Event Integrity) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ VCP Event Records ││
│ │ → Canonical format ensures consistent hashing ││
│ │ → Microsecond timestamps with sync attestation ││
│ │ → Full provenance metadata ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
Each layer provides specific guarantees:
| Layer | Guarantees | Attack Prevented |
|---|---|---|
| L1: Event Generation | Individual event authenticity | Event fabrication |
| L2: Local Integrity | Batch completeness, tamper detection | Selective deletion, modification |
| L3: External Verifiability | Temporal proof, third-party verification | Backdating, log rewriting |
Building Your First VCP Event
Let's build a real VCP event from scratch. I'll use Python, but the concepts apply to any language.
Event Structure
Every VCP event has three sections:
{
"header": { ... }, # Identity, timing, classification
"payload": { ... }, # Domain-specific data (trade, risk, governance)
"security": { ... } # Cryptographic elements
}
Minimal Valid Event
Here's the simplest possible VCP event:
import json
import hashlib
import uuid
from datetime import datetime, timezone
def generate_uuid_v7():
"""Generate a UUID v7 (timestamp-ordered)."""
# Simplified implementation - use uuid7 library in production
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
timestamp_hex = format(timestamp_ms, '012x')
random_hex = uuid.uuid4().hex[12:]
uuid_str = f"{timestamp_hex[:8]}-{timestamp_hex[8:12]}-7{random_hex[:3]}-{random_hex[3:7]}-{random_hex[7:19]}"
return uuid_str
def create_minimal_event():
"""Create a minimal valid VCP heartbeat event."""
now = datetime.now(timezone.utc)
timestamp_ns = int(now.timestamp() * 1_000_000_000)
event = {
"header": {
"event_id": generate_uuid_v7(),
"trace_id": generate_uuid_v7(),
"timestamp_int": str(timestamp_ns), # Always string!
"timestamp_iso": now.isoformat(timespec='milliseconds').replace('+00:00', 'Z'),
"event_type": "HBT", # Heartbeat
"event_type_code": 98,
"timestamp_precision": "MILLISECOND",
"clock_sync_status": "NTP_SYNCED",
"hash_algo": "SHA256",
"venue_id": "MY-TRADING-SYSTEM",
"symbol": "SYSTEM",
"account_id": "system_monitor"
},
"payload": {},
"security": {
"event_hash": "", # Will compute
"prev_hash": "0" * 64 # Genesis event
}
}
# Compute event hash
event["security"]["event_hash"] = compute_event_hash(event)
return event
def compute_event_hash(event):
"""Compute SHA-256 hash of canonical event representation."""
# Create hashable representation (excluding event_hash itself)
hashable = {
"header": event["header"],
"payload": event["payload"],
"prev_hash": event["security"]["prev_hash"]
}
# Canonical JSON (sorted keys, no whitespace)
canonical = json.dumps(hashable, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
# Create and print event
event = create_minimal_event()
print(json.dumps(event, indent=2))
Output:
{
"header": {
"event_id": "01934f8a-3c21-7a8b-9d1e-4f2c6b8a0e12",
"trace_id": "01934f8a-3c21-7000-8000-1a2b3c4d5e6f",
"timestamp_int": "1738195200000000000",
"timestamp_iso": "2025-01-30T00:00:00.000Z",
"event_type": "HBT",
"event_type_code": 98,
"timestamp_precision": "MILLISECOND",
"clock_sync_status": "NTP_SYNCED",
"hash_algo": "SHA256",
"venue_id": "MY-TRADING-SYSTEM",
"symbol": "SYSTEM",
"account_id": "system_monitor"
},
"payload": {},
"security": {
"event_hash": "a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd",
"prev_hash": "0000000000000000000000000000000000000000000000000000000000000000"
}
}
Trading Event with Full Payload
Now let's create a real trading signal event with AI governance metadata:
def create_signal_event(
symbol: str,
algo_id: str,
algo_version: str,
decision_factors: dict,
confidence_score: float,
prev_hash: str
) -> dict:
"""Create a VCP SIG (Signal) event with governance metadata."""
now = datetime.now(timezone.utc)
timestamp_ns = int(now.timestamp() * 1_000_000_000)
trace_id = generate_uuid_v7()
event = {
"header": {
"event_id": generate_uuid_v7(),
"trace_id": trace_id,
"timestamp_int": str(timestamp_ns),
"timestamp_iso": now.isoformat(timespec='microseconds').replace('+00:00', 'Z'),
"event_type": "SIG",
"event_type_code": 1,
"timestamp_precision": "MICROSECOND",
"clock_sync_status": "NTP_SYNCED",
"hash_algo": "SHA256",
"venue_id": "MT5-BROKER-ALPHA",
"symbol": symbol,
"account_id": "acc_7f83b162a9c4e521"
},
"payload": {
"vcp_gov": {
"algo_id": algo_id,
"algo_version": algo_version,
"algo_type": "HYBRID", # RULE_BASED, ML, HYBRID
"model_hash": compute_model_hash(algo_id, algo_version),
"risk_classification": "MEDIUM",
"decision_factors": {
"features": decision_factors.get("features", []),
"confidence_score": str(confidence_score),
"explainability_method": "SHAP"
}
}
},
"security": {
"event_hash": "",
"prev_hash": prev_hash
}
}
event["security"]["event_hash"] = compute_event_hash(event)
return event, trace_id
def compute_model_hash(algo_id: str, algo_version: str) -> str:
"""Compute hash of algorithm model for versioning."""
# In production, this would hash the actual model weights/parameters
model_identifier = f"{algo_id}:{algo_version}"
return f"sha256:{hashlib.sha256(model_identifier.encode()).hexdigest()}"
# Example usage
decision_factors = {
"features": [
{"name": "rsi_14", "value": "28.5", "weight": "0.35", "contribution": "0.12"},
{"name": "macd_signal", "value": "-0.0023", "weight": "0.25", "contribution": "0.08"},
{"name": "sentiment_score", "value": "0.72", "weight": "0.40", "contribution": "0.29"}
]
}
signal_event, trace_id = create_signal_event(
symbol="XAUUSD",
algo_id="gold-momentum-v2",
algo_version="2.3.1",
decision_factors=decision_factors,
confidence_score=0.87,
prev_hash="0" * 64
)
print(json.dumps(signal_event, indent=2))
Event Types
VCP defines a comprehensive event taxonomy for trading:
| Code | Type | Description |
|---|---|---|
| 1 | SIG | Signal generation (AI decision) |
| 2 | ORD | Order submission |
| 3 | ACK | Order acknowledgment |
| 4 | EXE | Full execution |
| 5 | PRT | Partial fill |
| 6 | REJ | Order rejection |
| 7 | CXL | Order cancellation |
| 8 | MOD | Order modification |
| 20 | ALG | Algorithm update |
| 21 | RSK | Risk event |
| 22 | AUD | Audit marker |
| 98 | HBT | Heartbeat |
| 99 | ERR | Error |
Hash Chains and Merkle Trees
Individual events are linked into verifiable chains using two complementary mechanisms.
Hash Chains (Optional in v1.1)
Each event's prev_hash references the previous event:
class VCPEventChain:
"""Simple hash chain implementation."""
def __init__(self):
self.events = []
self.last_hash = "0" * 64 # Genesis
def add_event(self, event_data: dict) -> dict:
"""Add an event to the chain."""
event = {
"header": event_data["header"],
"payload": event_data["payload"],
"security": {
"event_hash": "",
"prev_hash": self.last_hash
}
}
event["security"]["event_hash"] = compute_event_hash(event)
self.last_hash = event["security"]["event_hash"]
self.events.append(event)
return event
def verify_chain(self) -> bool:
"""Verify the integrity of the entire chain."""
if not self.events:
return True
# Check genesis
if self.events[0]["security"]["prev_hash"] != "0" * 64:
return False
# Check each link
for i, event in enumerate(self.events):
# Recompute hash
expected_hash = compute_event_hash(event)
if event["security"]["event_hash"] != expected_hash:
print(f"Event {i}: Hash mismatch")
return False
# Check chain link (except genesis)
if i > 0:
if event["security"]["prev_hash"] != self.events[i-1]["security"]["event_hash"]:
print(f"Event {i}: Chain broken")
return False
return True
Why hash chains matter: Modifying any event invalidates all subsequent hashes:
# Attempt to tamper with an event
chain = VCPEventChain()
chain.add_event({"header": {...}, "payload": {...}})
chain.add_event({"header": {...}, "payload": {...}})
chain.add_event({"header": {...}, "payload": {...}})
print(chain.verify_chain()) # True
# Tamper with middle event
chain.events[1]["payload"]["trade_data"]["price"] = "999999.99"
print(chain.verify_chain()) # False - tampering detected!
Merkle Trees (Required in v1.1)
For batch verification, events are aggregated into Merkle trees:
import hashlib
from typing import List, Optional
class MerkleTree:
"""Simple Merkle tree implementation for VCP batches."""
def __init__(self, leaves: List[str]):
"""Initialize with leaf hashes (event hashes)."""
self.leaves = leaves
self.tree = self._build_tree(leaves)
self.root = self.tree[-1][0] if self.tree else None
def _hash_pair(self, left: str, right: str) -> str:
"""Hash two nodes together."""
combined = left + right
return hashlib.sha256(combined.encode()).hexdigest()
def _build_tree(self, leaves: List[str]) -> List[List[str]]:
"""Build the Merkle tree from leaves."""
if not leaves:
return []
tree = [leaves.copy()]
while len(tree[-1]) > 1:
level = tree[-1]
next_level = []
for i in range(0, len(level), 2):
left = level[i]
right = level[i + 1] if i + 1 < len(level) else left # Duplicate if odd
next_level.append(self._hash_pair(left, right))
tree.append(next_level)
return tree
def get_proof(self, index: int) -> List[dict]:
"""Get Merkle proof for a leaf at given index."""
if index >= len(self.leaves):
raise IndexError("Leaf index out of range")
proof = []
for level in self.tree[:-1]: # Exclude root
sibling_index = index ^ 1 # XOR to get sibling
if sibling_index < len(level):
proof.append({
"hash": level[sibling_index],
"position": "right" if index % 2 == 0 else "left"
})
index //= 2 # Move to parent index
return proof
@staticmethod
def verify_proof(leaf_hash: str, proof: List[dict], root: str) -> bool:
"""Verify a Merkle proof."""
current = leaf_hash
for step in proof:
if step["position"] == "right":
current = hashlib.sha256((current + step["hash"]).encode()).hexdigest()
else:
current = hashlib.sha256((step["hash"] + current).encode()).hexdigest()
return current == root
# Example: Batch verification
event_hashes = [
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"d7a8fbb307d7809469ca9abcb0082e4f8d5651e46d3cdb762d02d0bf37c9e592",
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824",
"ef2d127de37b942baad06145e54b0c619a1f22327b2ebbcfbec78f5564afe39d"
]
tree = MerkleTree(event_hashes)
print(f"Merkle Root: {tree.root}")
# Generate proof for second event
proof = tree.get_proof(1)
print(f"Proof for event 1: {json.dumps(proof, indent=2)}")
# Verify the proof
is_valid = MerkleTree.verify_proof(event_hashes[1], proof, tree.root)
print(f"Proof valid: {is_valid}") # True
Why Merkle trees matter:
- Efficient verification: Prove inclusion of one event in O(log n) instead of O(n)
- Completeness guarantee: The root commits to ALL events in the batch
- Split-view resistance: Can't show different auditors different event sets
External Anchoring: The Trust Anchor
Here's the key insight: hash chains and Merkle trees can prove tampering occurred, but they can't prove WHEN you created the original log.
A malicious actor could:
- Modify the log
- Recompute all hashes
- Claim the new log is the original
External anchoring solves this by publishing the Merkle root to a trusted third party BEFORE any potential tampering:
import requests
from datetime import datetime, timezone
class ExternalAnchor:
"""External anchoring service abstraction."""
def __init__(self, anchor_type: str, config: dict):
self.anchor_type = anchor_type
self.config = config
def anchor(self, merkle_root: str, batch_metadata: dict) -> dict:
"""Anchor a Merkle root to external service."""
if self.anchor_type == "TSA":
return self._anchor_tsa(merkle_root, batch_metadata)
elif self.anchor_type == "BLOCKCHAIN":
return self._anchor_blockchain(merkle_root, batch_metadata)
elif self.anchor_type == "SCITT":
return self._anchor_scitt(merkle_root, batch_metadata)
else:
raise ValueError(f"Unknown anchor type: {self.anchor_type}")
def _anchor_tsa(self, merkle_root: str, metadata: dict) -> dict:
"""Anchor to RFC 3161 Timestamp Authority."""
# In production, use a real TSA like FreeTSA or DigiCert
# This is a simplified example
timestamp_request = {
"hash_algorithm": "SHA256",
"hashed_message": merkle_root,
"policy_id": "1.2.3.4.5" # TSA policy OID
}
# Simulated response
return {
"anchor_type": "TSA",
"anchor_timestamp": datetime.now(timezone.utc).isoformat(),
"anchor_proof": {
"tsa_url": self.config.get("tsa_url", "https://freetsa.org/tsr"),
"timestamp_token": "base64_encoded_token_here",
"certificate_chain": ["cert1", "cert2"]
},
"merkle_root": merkle_root,
"status": "SUCCESS"
}
def _anchor_blockchain(self, merkle_root: str, metadata: dict) -> dict:
"""Anchor to blockchain (Ethereum example)."""
# In production, use web3.py or similar
return {
"anchor_type": "BLOCKCHAIN",
"anchor_timestamp": datetime.now(timezone.utc).isoformat(),
"anchor_proof": {
"chain": self.config.get("chain", "ethereum"),
"network": self.config.get("network", "mainnet"),
"transaction_hash": "0x" + "a" * 64, # Simulated
"block_number": 12345678,
"contract_address": self.config.get("contract_address")
},
"merkle_root": merkle_root,
"status": "SUCCESS"
}
def _anchor_scitt(self, merkle_root: str, metadata: dict) -> dict:
"""Anchor to IETF SCITT transparency log."""
# SCITT provides supply chain transparency
# Ideal for cross-organizational verification
return {
"anchor_type": "SCITT",
"anchor_timestamp": datetime.now(timezone.utc).isoformat(),
"anchor_proof": {
"transparency_service": self.config.get("scitt_url"),
"receipt": {
"tree_alg": "CCF",
"inclusion_proof": ["proof_element_1", "proof_element_2"],
"leaf_components": {
"claim_hash": merkle_root
}
}
},
"merkle_root": merkle_root,
"status": "SUCCESS"
}
# Usage example
class VCPBatch:
"""VCP batch with Merkle tree and external anchoring."""
def __init__(self, events: List[dict], anchor_service: ExternalAnchor):
self.events = events
self.anchor_service = anchor_service
# Build Merkle tree from event hashes
event_hashes = [e["security"]["event_hash"] for e in events]
self.merkle_tree = MerkleTree(event_hashes)
self.anchor_result = None
def anchor(self) -> dict:
"""Anchor this batch to external service."""
metadata = {
"batch_size": len(self.events),
"first_event_id": self.events[0]["header"]["event_id"],
"last_event_id": self.events[-1]["header"]["event_id"],
"first_timestamp": self.events[0]["header"]["timestamp_iso"],
"last_timestamp": self.events[-1]["header"]["timestamp_iso"]
}
self.anchor_result = self.anchor_service.anchor(
self.merkle_tree.root,
metadata
)
return self.anchor_result
def get_verifiable_proof(self, event_index: int) -> dict:
"""Get complete verifiable proof for an event."""
return {
"event": self.events[event_index],
"merkle_proof": self.merkle_tree.get_proof(event_index),
"merkle_root": self.merkle_tree.root,
"anchor": self.anchor_result
}
# Create a batch and anchor it
anchor_service = ExternalAnchor("TSA", {"tsa_url": "https://freetsa.org/tsr"})
events = [
create_minimal_event(),
create_minimal_event(),
create_minimal_event()
]
batch = VCPBatch(events, anchor_service)
anchor_result = batch.anchor()
print(f"Batch anchored!")
print(f"Merkle Root: {batch.merkle_tree.root}")
print(f"Anchor Timestamp: {anchor_result['anchor_timestamp']}")
# Get verifiable proof for any event
proof = batch.get_verifiable_proof(1)
print(json.dumps(proof, indent=2))
Anchor Frequency by Tier
VCP v1.1 mandates external anchoring for ALL tiers, but with different frequencies:
| Tier | Anchor Frequency | Use Case |
|---|---|---|
| Platinum | Every 10 minutes | HFT, exchanges |
| Gold | Every 1 hour | Prop trading, institutional |
| Silver | Every 24 hours | Retail, MT4/MT5 |
The frequency balances:
- More frequent: Smaller batches, finer-grained temporal proof
- Less frequent: Lower cost, simpler infrastructure
EU AI Act Compliance Mapping
The EU AI Act (Regulation 2024/1689) establishes record-keeping requirements for high-risk AI systems. Here's how VCP maps to the key articles:
Article 12: Record-Keeping
"High-risk AI systems shall technically allow for the automatic recording of events (logs) over the lifetime of the system."
VCP Response:
| Requirement | VCP Field | Implementation |
|---|---|---|
| Automatic recording | EventID (UUIDv7) | Every event automatically assigned unique identifier |
| Lifetime logging | TraceID | Links all events in a decision chain |
| Traceability | prev_hash + MerkleProof | Cryptographic chain enables full reconstruction |
| Start/end timestamps | timestamp_iso | Microsecond precision with sync attestation |
Article 13: Transparency
"...sufficiently transparent to enable deployers to interpret a system's output"
VCP Response:
# VCP-GOV module captures AI decision transparency
vcp_gov = {
"algo_id": "gold-momentum-v2",
"algo_version": "2.3.1",
"algo_type": "HYBRID",
"decision_factors": {
"features": [
{"name": "rsi_14", "value": "28.5", "contribution": "0.12"},
{"name": "sentiment", "value": "0.72", "contribution": "0.29"}
],
"confidence_score": "0.87",
"explainability_method": "SHAP" # or LIME, attention weights, etc.
}
}
Article 14: Human Oversight
"...enable deployers to... intervene in the operation of the high-risk AI system"
VCP Response:
# Human oversight events are first-class citizens
human_override_event = {
"header": {
"event_type": "GOV",
"event_type_code": 22,
# ... other fields
},
"payload": {
"vcp_gov": {
"intervention_type": "MANUAL_OVERRIDE",
"override_reason": "Risk committee decision",
"authorized_by": "trader_id_hash",
"previous_state": "ACTIVE",
"new_state": "HALTED"
}
}
}
Article 15: Robustness
"...resilient against attempts by unauthorised third parties to alter their use, outputs or performance"
VCP Response:
| Attack Vector | VCP Defense |
|---|---|
| Log tampering | SHA-256 hash chains detect modification |
| Event deletion | Merkle tree completeness guarantees |
| Backdating | External TSA anchoring proves temporal order |
| Replay attacks | UUIDv7 monotonic ordering |
| Clock manipulation | ClockSyncStatus attestation |
Integration Patterns
VCP is designed for sidecar deployment—it adds cryptographic verification without modifying existing trading systems.
Pattern A: API Interception
┌─────────────────────┐
│ Trading System │──[REST/FIX]──> Broker
└─────────┬───────────┘
│ (copy)
▼
┌─────────────────────┐
│ VCP Sidecar │──> Audit Storage
└─────────────────────┘
# FastAPI sidecar example
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
vcp_chain = VCPEventChain()
@app.post("/proxy/order")
async def proxy_order(request: Request):
"""Intercept order, log to VCP, forward to broker."""
body = await request.json()
# Create VCP event BEFORE forwarding
order_event = create_order_event(body)
vcp_chain.add_event(order_event)
# Forward to broker
async with httpx.AsyncClient() as client:
response = await client.post(
"https://broker.example/api/order",
json=body,
headers=request.headers
)
# Log response
if response.status_code == 200:
ack_event = create_ack_event(response.json(), order_event["header"]["trace_id"])
else:
rej_event = create_reject_event(response.json(), order_event["header"]["trace_id"])
return response.json()
Pattern B: Message Queue Tap
Trading System ──> [Kafka] ──> Broker
│
└──> VCP Consumer
# Kafka consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'trading-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
vcp_chain = VCPEventChain()
batch_events = []
for message in consumer:
trading_event = message.value
# Convert to VCP event
vcp_event = convert_to_vcp(trading_event)
vcp_chain.add_event(vcp_event)
batch_events.append(vcp_event)
# Anchor batch periodically
if len(batch_events) >= 100: # Or time-based
batch = VCPBatch(batch_events, anchor_service)
batch.anchor()
batch_events = []
Pattern C: In-Process Hook (MT4/MT5)
For MetaTrader platforms, VCP provides an MQL bridge:
// MQL5 Expert Advisor with VCP integration
#include <vcp_bridge.mqh>
int OnInit()
{
// Initialize VCP
VCP_Init("Gold-Momentum-EA", "2.3.1", TIER_SILVER);
return(INIT_SUCCEEDED);
}
void OnTick()
{
// Your trading logic
double rsi = iRSI(_Symbol, PERIOD_H1, 14, PRICE_CLOSE, 0);
if (rsi < 30)
{
// Log signal BEFORE trading
VCP_LogSignal(
_Symbol,
"RSI oversold signal",
rsi,
0.85 // confidence
);
// Execute trade
ulong ticket = trade.Buy(0.1);
if (ticket > 0)
{
VCP_LogExecution(ticket, trade.ResultPrice());
}
else
{
VCP_LogRejection(GetLastError());
}
}
}
void OnDeinit(const int reason)
{
// Flush and anchor remaining events
VCP_Finalize();
}
Production Considerations
Clock Synchronization
Timestamp integrity is critical. VCP requires explicit attestation:
import subprocess
import re
def get_clock_sync_status() -> str:
"""Determine clock synchronization status."""
# Check for PTP
try:
result = subprocess.run(['ptp4l', '-v'], capture_output=True, text=True)
if 'master offset' in result.stdout:
offset_match = re.search(r'master offset\s+(-?\d+)', result.stdout)
if offset_match and abs(int(offset_match.group(1))) < 1000: # < 1µs
return "PTP_LOCKED"
except FileNotFoundError:
pass
# Check for NTP
try:
result = subprocess.run(['ntpq', '-p'], capture_output=True, text=True)
if '*' in result.stdout: # Synchronized with a peer
return "NTP_SYNCED"
except FileNotFoundError:
pass
# Check chrony
try:
result = subprocess.run(['chronyc', 'tracking'], capture_output=True, text=True)
if 'System time' in result.stdout:
return "NTP_SYNCED"
except FileNotFoundError:
pass
return "BEST_EFFORT"
Key Management
Ed25519 signing keys must be protected:
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
import os
class VCPKeyManager:
"""Secure key management for VCP signing."""
def __init__(self, key_path: str = None):
self.key_path = key_path or os.environ.get('VCP_KEY_PATH', '/etc/vcp/signing.key')
self._private_key = None
def get_private_key(self) -> Ed25519PrivateKey:
"""Load or generate signing key."""
if self._private_key:
return self._private_key
if os.path.exists(self.key_path):
with open(self.key_path, 'rb') as f:
self._private_key = serialization.load_pem_private_key(
f.read(),
password=os.environ.get('VCP_KEY_PASSWORD', '').encode() or None
)
else:
# Generate new key
self._private_key = Ed25519PrivateKey.generate()
self._save_key()
return self._private_key
def _save_key(self):
"""Save key to secure storage."""
pem = self._private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(
os.environ.get('VCP_KEY_PASSWORD', '').encode()
) if os.environ.get('VCP_KEY_PASSWORD') else serialization.NoEncryption()
)
# Secure file permissions
os.makedirs(os.path.dirname(self.key_path), exist_ok=True)
with open(self.key_path, 'wb') as f:
f.write(pem)
os.chmod(self.key_path, 0o600)
def sign(self, data: bytes) -> bytes:
"""Sign data with Ed25519."""
return self.get_private_key().sign(data)
def get_public_key_pem(self) -> bytes:
"""Export public key for verification."""
return self.get_private_key().public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
Failure Handling
VCP sidecar failure must NEVER impact trading:
import asyncio
from contextlib import asynccontextmanager
from typing import Callable
class VCPCircuitBreaker:
"""Circuit breaker for VCP operations."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
@asynccontextmanager
async def protected(self, fallback: Callable = None):
"""Execute operation with circuit breaker protection."""
# Check if we should attempt
if self.state == "OPEN":
if asyncio.get_event_loop().time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
if fallback:
yield fallback
return
try:
yield None # Caller executes their operation
# Success - reset state
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failures = 0
except Exception as e:
self.failures += 1
self.last_failure_time = asyncio.get_event_loop().time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
# Log but don't re-raise - trading must continue
print(f"VCP operation failed (circuit {self.state}): {e}")
if fallback:
yield fallback
# Usage
circuit_breaker = VCPCircuitBreaker()
async def execute_trade_with_logging(order):
"""Execute trade with protected VCP logging."""
# VCP logging is protected
async with circuit_breaker.protected(fallback=lambda: print("VCP unavailable")):
await vcp_service.log_order(order)
# Trading continues regardless of VCP status
result = await broker.execute(order)
async with circuit_breaker.protected():
await vcp_service.log_execution(result)
return result
Resources and Next Steps
Official Resources
- VCP Specification: github.com/veritaschain/vcp-spec
- IETF Internet-Draft: datatracker.ietf.org/doc/draft-kamimura-scitt-vcp
- VSO Website: veritaschain.org
SDK Libraries
| Language | Package | Status |
|---|---|---|
| Python | vcp-core-py |
Available |
| TypeScript | @veritaschain/vcp-sdk |
Available |
| MQL5 | vcp-mql-bridge |
Available |
| C++ | vcp-core-cpp |
Coming soon |
Regulatory References
- EU AI Act: Regulation (EU) 2024/1689
- MiFID II RTS 25: Algorithmic trading requirements
- ISO/IEC 42001: AI Management Systems
- ISO/IEC DIS 24970: AI System Logging (draft)
Getting Started
-
Clone the spec:
git clone https://github.com/veritaschain/vcp-spec -
Run examples: Check
/examplesfor working code - Try the sandbox: VCP Explorer provides interactive verification
- Join the community: github.com/veritaschain/vcp-spec/discussions
Conclusion
The days of "trust me" logging are ending. As AI systems make increasingly consequential decisions—especially in financial markets—regulators and stakeholders demand proof, not promises.
VCP provides that proof through cryptographic guarantees:
- Hash chains detect tampering
- Merkle trees prove completeness
- External anchors establish temporal truth
- Three-layer architecture separates concerns
Whether you're building a prop trading algorithm, running an AI-powered exchange, or just want to prove your MT4 EA didn't cause that suspicious spike, VCP gives you the technical foundation for verifiable AI operations.
The EU AI Act compliance deadline is approaching. The time to implement is now.
Have questions or want to contribute? Open an issue on GitHub or reach out at standards@veritaschain.org.
VCP is developed by the VeritasChain Standards Organization (VSO), an independent, vendor-neutral standards body. Licensed under CC BY 4.0.
Tags: #ai #fintech #security #opensource #blockchain #compliance #trading #python
Top comments (0)