Trading systems increasingly rely on AI for decision-making, but how do you prove what your AI actually decidedβand when? Traditional logs can be modified after the fact. VCP v1.1 (VeritasChain Protocol) solves this with cryptographic tamper-evidence.
This guide walks you through implementing VCP v1.1 in a live MT5 environment using a non-invasive sidecar architecture.
π― What You'll Build
A complete audit trail system that:
- β Captures AI trading decisions in real-time
- β Creates tamper-evident event chains (SHA-256 + Ed25519)
- β Builds RFC 6962 Merkle Trees for completeness proofs
- β Anchors to external timestamps (OpenTimestamps/Bitcoin)
- β Enables third-party verification without trusting you
ποΈ Architecture Overview
VCP uses a sidecar patternβthe audit system runs alongside MT5, not inside it:
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β MT5 Terminal ββββββΆβ Python Sidecar ββββββΆβ Evidence Pack β
β (MQL5 EA) β β (VCP Logger) β β (JSONL + JSON) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β
β File-based β Cryptographic β External
β Communication β Signing β Anchoring
βΌ βΌ βΌ
signal.json vcp_events.jsonl OpenTimestamps
Why sidecar?
- Zero impact on trading execution
- MT5/broker infrastructure unchanged
- Failures in logging don't affect trades
- Easier regulatory compliance
π Prerequisites
- MetaTrader 5 terminal
- Python 3.8+
- Basic MQL5 knowledge
# Python dependencies
pip install cryptography
π§ Step 1: MQL5 Event Emitter
First, create an Expert Advisor that exports trading events to files:
// VCPEventEmitter.mqh
#property copyright "Your Company"
#property version "1.00"
#include <Files\FileTxt.mqh>
class CVCPEventEmitter {
private:
string m_output_path;
public:
CVCPEventEmitter(string path = "VCP_Events") {
m_output_path = path;
}
// Emit AI Signal Event
bool EmitSignalEvent(string signal_id,
string direction,
double confidence,
int buy_votes,
int sell_votes,
int abstain_votes) {
string filename = m_output_path + "\\signal_" + signal_id + ".json";
string json = "{";
json += "\"event_type\": \"SIGNAL\",";
json += "\"signal_id\": \"" + signal_id + "\",";
json += "\"timestamp\": \"" + TimeToString(TimeCurrent(), TIME_DATE|TIME_SECONDS) + "\",";
json += "\"direction\": \"" + direction + "\",";
json += "\"confidence\": " + DoubleToString(confidence, 4) + ",";
json += "\"consensus\": {";
json += "\"buy_votes\": " + IntegerToString(buy_votes) + ",";
json += "\"sell_votes\": " + IntegerToString(sell_votes) + ",";
json += "\"abstain_votes\": " + IntegerToString(abstain_votes);
json += "}}";
return WriteFile(filename, json);
}
// Emit Order Execution Event
bool EmitExecutionEvent(string signal_id,
ulong ticket,
string action,
double price,
double volume) {
string filename = m_output_path + "\\exec_" + signal_id + ".json";
string json = "{";
json += "\"event_type\": \"EXECUTION\",";
json += "\"signal_ref\": \"" + signal_id + "\",";
json += "\"timestamp\": \"" + TimeToString(TimeCurrent(), TIME_DATE|TIME_SECONDS) + "\",";
json += "\"ticket\": " + IntegerToString(ticket) + ",";
json += "\"action\": \"" + action + "\",";
json += "\"price\": " + DoubleToString(price, 5) + ",";
json += "\"volume\": " + DoubleToString(volume, 2);
json += "}";
return WriteFile(filename, json);
}
private:
bool WriteFile(string filename, string content) {
int handle = FileOpen(filename, FILE_WRITE|FILE_TXT|FILE_ANSI);
if(handle == INVALID_HANDLE) return false;
FileWriteString(handle, content);
FileClose(handle);
return true;
}
};
π Step 2: Python VCP Sidecar
The sidecar watches for events and builds the cryptographic chain:
# vcp_sidecar.py
import json
import hashlib
import os
import time
from datetime import datetime, timezone
from pathlib import Path
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
import uuid
class VCPSidecar:
"""VCP v1.1 Compliant Sidecar for MT5"""
def __init__(self, watch_dir: str, output_file: str):
self.watch_dir = Path(watch_dir)
self.output_file = Path(output_file)
self.events = []
self.sequence = 0
self.prev_hash = "0" * 64 # Genesis
# Generate or load signing key
self.private_key = Ed25519PrivateKey.generate()
self.public_key = self.private_key.public_key()
# Policy identification (VCP v1.1 required)
self.policy_id = "org.veritaschain.rta:silver-reference-v1"
self.agent_id = "VCP-MT5-Sidecar"
def process_event(self, event_data: dict, event_type: str) -> dict:
"""Transform raw event into VCP v1.1 compliant format"""
self.sequence += 1
now = datetime.now(timezone.utc)
event_id = str(uuid.uuid4())
# Build Header (VCP v1.1 schema)
header = {
"EventID": event_id,
"EventType": "SIG" if event_type == "SIGNAL" else "EXE",
"TimestampISO": now.isoformat(),
"TimestampInt": int(now.timestamp() * 1e9), # Nanoseconds
"Sequence": self.sequence,
"AgentID": self.agent_id
}
# Build Payload (anonymize sensitive data)
payload = self._build_payload(event_data, event_type)
# Compute EventHash (canonical JSON)
hash_input = json.dumps(header, sort_keys=True) + json.dumps(payload, sort_keys=True)
event_hash = hashlib.sha256(hash_input.encode()).hexdigest()
# Sign the event
signature = self.private_key.sign(bytes.fromhex(event_hash)).hex()
# Build Security object (VCP v1.1 required fields)
security = {
"Version": "1.1",
"EventHash": event_hash,
"HashAlgo": "SHA256",
"SignAlgo": "ED25519",
"Signature": signature,
"PrevHash": self.prev_hash,
"MerkleRoot": "", # Computed at finalization
"MerkleIndex": self.sequence - 1,
"AnchorReference": "" # Set at anchoring
}
# Build PolicyIdentification (VCP v1.1 required)
policy = {
"PolicyID": self.policy_id,
"ConformanceTier": "SILVER",
"VerificationDepth": {
"HashChainValidation": True,
"MerkleProofRequired": True,
"ExternalAnchorRequired": True
}
}
# Complete VCP v1.1 event
vcp_event = {
"Header": header,
"Payload": payload,
"Security": security,
"PolicyIdentification": policy
}
# Update chain state
self.prev_hash = event_hash
self.events.append(vcp_event)
return vcp_event
def _build_payload(self, data: dict, event_type: str) -> dict:
"""Build payload with anonymization"""
if event_type == "SIGNAL":
return {
"SignalRef": hashlib.sha256(
data.get("signal_id", "").encode()
).hexdigest()[:16],
"Direction": data.get("direction", ""),
"Confidence": data.get("confidence", 0),
"ConsensusSummary": {
"ModelCount": sum([
data.get("consensus", {}).get("buy_votes", 0),
data.get("consensus", {}).get("sell_votes", 0),
data.get("consensus", {}).get("abstain_votes", 0)
]),
"BuyVotes": data.get("consensus", {}).get("buy_votes", 0),
"SellVotes": data.get("consensus", {}).get("sell_votes", 0),
"AbstainVotes": data.get("consensus", {}).get("abstain_votes", 0)
}
}
else: # EXECUTION
return {
"SignalRef": hashlib.sha256(
data.get("signal_ref", "").encode()
).hexdigest()[:16],
"TicketHash": hashlib.sha256(
str(data.get("ticket", "")).encode()
).hexdigest()[:16],
"Action": data.get("action", ""),
"Symbol": "USDJPY" # Configurable
}
def compute_merkle_root(self) -> str:
"""Compute RFC 6962 compliant Merkle root"""
if not self.events:
return hashlib.sha256(b"").hexdigest()
# Extract event hashes
hashes = [bytes.fromhex(e["Security"]["EventHash"]) for e in self.events]
# RFC 6962: Leaf nodes get 0x00 prefix
leaves = [hashlib.sha256(b'\x00' + h).digest() for h in hashes]
# Build tree
current_level = leaves
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
if i + 1 < len(current_level):
# RFC 6962: Internal nodes get 0x01 prefix
combined = hashlib.sha256(
b'\x01' + current_level[i] + current_level[i + 1]
).digest()
else:
combined = hashlib.sha256(
b'\x01' + current_level[i] + current_level[i]
).digest()
next_level.append(combined)
current_level = next_level
return current_level[0].hex()
def finalize(self) -> dict:
"""Finalize evidence pack with Merkle root"""
merkle_root = self.compute_merkle_root()
anchor_id = str(uuid.uuid4())
# Update all events with final Merkle root
for event in self.events:
event["Security"]["MerkleRoot"] = merkle_root
event["Security"]["AnchorReference"] = anchor_id
# Write events to JSONL
with open(self.output_file, 'w') as f:
for event in self.events:
f.write(json.dumps(event) + '\n')
# Create security object
security_object = {
"MerkleRoot": merkle_root,
"EventCount": len(self.events),
"Algorithm": "RFC6962",
"Signature": self.private_key.sign(
bytes.fromhex(merkle_root)
).hex(),
"PublicKey": self.public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
).hex()
}
return {
"events_file": str(self.output_file),
"event_count": len(self.events),
"merkle_root": merkle_root,
"security_object": security_object
}
def watch(self, interval: float = 1.0):
"""Watch directory for new events"""
processed = set()
print(f"Watching {self.watch_dir} for events...")
while True:
for filepath in self.watch_dir.glob("*.json"):
if filepath.name in processed:
continue
try:
with open(filepath, 'r') as f:
data = json.load(f)
event_type = data.get("event_type", "UNKNOWN")
if event_type in ["SIGNAL", "EXECUTION"]:
vcp_event = self.process_event(data, event_type)
print(f"[VCP] Processed: {event_type} seq={vcp_event['Header']['Sequence']}")
processed.add(filepath.name)
except Exception as e:
print(f"Error processing {filepath}: {e}")
time.sleep(interval)
# Usage
if __name__ == "__main__":
import sys
watch_dir = sys.argv[1] if len(sys.argv) > 1 else "./mt5_events"
output_file = sys.argv[2] if len(sys.argv) > 2 else "./vcp_events.jsonl"
sidecar = VCPSidecar(watch_dir, output_file)
try:
sidecar.watch()
except KeyboardInterrupt:
print("\nFinalizing evidence pack...")
result = sidecar.finalize()
print(f"Evidence pack created: {result['event_count']} events")
print(f"Merkle Root: {result['merkle_root']}")
π Step 3: Three-Layer Architecture
VCP v1.1 requires three layers of integrity:
Layer 1: Event Integrity
Each event has its own hash and signature:
# Already implemented in vcp_sidecar.py
event_hash = hashlib.sha256(header + payload).hexdigest()
signature = private_key.sign(event_hash)
Layer 2: Collection Integrity
RFC 6962 Merkle Tree proves completeness:
Merkle Root
β
βββββββββββββ΄ββββββββββββ
β β
H(0x01 || H0 || H1) H(0x01 || H2 || H3)
β β
βββββββ΄ββββββ βββββββ΄ββββββ
H(0x00||E0) H(0x00||E1) H(0x00||E2) H(0x00||E3)
Layer 3: External Verifiability
Anchor to Bitcoin via OpenTimestamps:
# anchor_to_bitcoin.py
import subprocess
def anchor_merkle_root(merkle_root: str):
"""Submit Merkle root to OpenTimestamps"""
# Write root to file
with open("merkle_root.txt", "w") as f:
f.write(merkle_root)
# Stamp it (requires ots client)
subprocess.run(["ots", "stamp", "merkle_root.txt"])
print(f"Anchored to Bitcoin: merkle_root.txt.ots")
β Step 4: Verification
Create a verifier that third parties can run:
# vcp_verifier.py
import json
import hashlib
def verify_chain(events_file: str, expected_merkle: str = None) -> dict:
"""Verify VCP v1.1 event chain"""
results = {
"valid": True,
"events_checked": 0,
"errors": []
}
events = []
with open(events_file, 'r') as f:
for line in f:
events.append(json.loads(line))
prev_hash = "0" * 64
event_hashes = []
for i, event in enumerate(events):
results["events_checked"] += 1
# Check sequence
if event["Header"]["Sequence"] != i + 1:
results["errors"].append(f"Sequence error at event {i}")
results["valid"] = False
# Verify event hash
header = event["Header"]
payload = event["Payload"]
hash_input = json.dumps(header, sort_keys=True) + json.dumps(payload, sort_keys=True)
computed_hash = hashlib.sha256(hash_input.encode()).hexdigest()
if computed_hash != event["Security"]["EventHash"]:
results["errors"].append(f"Hash mismatch at event {i}")
results["valid"] = False
# Check PrevHash chain
if event["Security"]["PrevHash"] != prev_hash:
results["errors"].append(f"PrevHash mismatch at event {i}")
results["valid"] = False
prev_hash = computed_hash
event_hashes.append(bytes.fromhex(computed_hash))
# Verify Merkle root
if expected_merkle:
computed_merkle = compute_rfc6962_merkle(event_hashes)
if computed_merkle != expected_merkle:
results["errors"].append("Merkle root mismatch")
results["valid"] = False
return results
def compute_rfc6962_merkle(hashes: list) -> str:
"""RFC 6962 compliant Merkle root computation"""
leaves = [hashlib.sha256(b'\x00' + h).digest() for h in hashes]
current = leaves
while len(current) > 1:
next_level = []
for i in range(0, len(current), 2):
if i + 1 < len(current):
node = hashlib.sha256(b'\x01' + current[i] + current[i+1]).digest()
else:
node = hashlib.sha256(b'\x01' + current[i] + current[i]).digest()
next_level.append(node)
current = next_level
return current[0].hex() if current else ""
# Run verification
if __name__ == "__main__":
import sys
result = verify_chain(sys.argv[1])
print(f"Valid: {result['valid']}")
print(f"Events: {result['events_checked']}")
if result['errors']:
print(f"Errors: {result['errors']}")
π¦ Complete File Structure
your_mt5_project/
βββ MQL5/
β βββ Include/
β β βββ VCPEventEmitter.mqh
β βββ Experts/
β βββ YourTradingEA.mq5
βββ vcp_sidecar/
β βββ vcp_sidecar.py
β βββ vcp_verifier.py
β βββ anchor_to_bitcoin.py
βββ evidence/
β βββ vcp_events.jsonl
β βββ security_object.json
β βββ merkle_root.ots
βββ README.md
π Running the System
Terminal 1: Start MT5 with your EA
Your EA emits events via CVCPEventEmitter.
Terminal 2: Run VCP Sidecar
python vcp_sidecar.py "C:\Users\You\AppData\Roaming\MetaQuotes\Terminal\...\MQL5\Files\VCP_Events" ./evidence/vcp_events.jsonl
Terminal 3: Verify (anytime)
python vcp_verifier.py ./evidence/vcp_events.jsonl
π Key Takeaways
- Sidecar = Safety: Never modify MT5 internals
- Hash Everything: SHA-256 for events, Ed25519 for signatures
- Merkle Trees: RFC 6962 proves no events were omitted
- External Anchors: Bitcoin timestamps make logs third-party verifiable
- Anonymize: Hash ticket numbers, exclude trade amounts
π Resources
- VCP v1.1 Specification
- VCP Reference Implementation
- RFC 6962 - Certificate Transparency
- OpenTimestamps
π€ Contributing
VCP is open source. Contributions welcome:
- Report issues on GitHub
- Submit PRs for improvements
- Share your implementation experiences
Building trust in AI trading, one hash at a time. π
Tags: #trading #blockchain #python #fintech #mt5 #cryptography #audit #ai #regtech #opensource
Top comments (0)