TL;DR
On November 28, 2025, a cooling system failure at CME Group's data center froze global futures trading for 10+ hours. The incident exposed a fundamental problem: we can't cryptographically prove what happened when trading infrastructure fails.
This post shows you how to implement VCP (VeritasChain Protocol) v1.1—an open standard for tamper-evident audit trails that acts as a "flight recorder" for algorithmic trading systems.
What you'll learn:
- Why hash chains alone aren't enough (and what v1.1 fixes)
- Implementing the three-layer integrity architecture
- Building Merkle trees with RFC 6962 compliance
- External anchoring with OpenTimestamps (free!)
- Cross-party verification with VCP-XREF
The CME Wake-Up Call
Timeline (November 27-28, 2025):
03:40 CST - Maintenance skips drain procedure
04:19 CST - First alert to customers
10:19 CST - Second notification
17:00 CST - Trading halts (Asian markets opening)
18:19 CST - All chillers fail
07:30 CST - Markets resume (next day)
Duration: 10+ hours
Impact: Trillions in frozen positions
Root cause: Human error → No failover → No verifiable audit trail
The scary part? CME had a backup data center in New York. They chose not to fail over because "available information suggested short-term recovery."
There's no cryptographic proof of:
- What decisions were made and when
- What was communicated between CyrusOne and CME
- Which algorithms were affected during degradation
- Whether any trades executed during the incident were valid
This is the problem VCP solves.
Why v1.1? The Hash Chain Paradox
VCP v1.0 made external anchoring optional for Silver tier. Community feedback made clear this was a mistake.
The problem with hash chains alone:
# v1.0 approach: Hash chain provides ordering
event_1_hash = sha256(event_1)
event_2_hash = sha256(event_2 + event_1_hash) # Links to previous
event_3_hash = sha256(event_3 + event_2_hash) # Links to previous
This proves ordering and detects mid-chain tampering. But it doesn't prevent:
- Pre-anchoring modification: The log producer can rewrite the entire chain before anchoring
- Selective omission: Events can be dropped before they're ever chained
- Split-view attacks: Different parties can see different versions
v1.1 solution: External anchoring is REQUIRED for all tiers.
v1.0: "Trust me, here's my hash chain"
v1.1: "Here's the Merkle root. Verify it against the blockchain yourself."
The Three-Layer Architecture
VCP v1.1 separates concerns into three distinct layers:
┌─────────────────────────────────────────────────────┐
│ LAYER 3: External Verifiability │
│ ───────────────────────────────── │
│ • External Anchor (Blockchain/TSA): REQUIRED │
│ • Digital Signature (Ed25519): REQUIRED │
│ Purpose: Third-party proof of existence │
├─────────────────────────────────────────────────────┤
│ LAYER 2: Collection Integrity │
│ ───────────────────────────────── │
│ • Merkle Tree (RFC 6962): REQUIRED │
│ • Merkle Root: REQUIRED │
│ • Audit Path: REQUIRED │
│ Purpose: Prove batch completeness │
├─────────────────────────────────────────────────────┤
│ LAYER 1: Event Integrity │
│ ───────────────────────────────── │
│ • EventHash (SHA-256): REQUIRED │
│ • PrevHash (chain): OPTIONAL │
│ Purpose: Individual event verification │
└─────────────────────────────────────────────────────┘
Let's implement each layer.
Layer 1: Event Hashing
Every VCP event gets a canonical hash. The key is deterministic serialization—the same event must always produce the same hash.
import hashlib
import json
from typing import Any
def canonicalize_json(obj: Any) -> str:
"""
RFC 8785 JSON Canonicalization Scheme (JCS)
- Keys sorted alphabetically
- No whitespace
- Unicode escaping normalized
"""
return json.dumps(obj, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
def calculate_event_hash(header: dict, payload: dict, algo: str = "SHA256") -> str:
"""
Calculate VCP event hash
Args:
header: VCP event header (EventID, EventType, Timestamp, etc.)
payload: Event-specific data (order details, risk params, etc.)
algo: Hash algorithm (SHA256, SHA3_256, BLAKE3)
Returns:
Hex-encoded hash string
"""
canonical_header = canonicalize_json(header)
canonical_payload = canonicalize_json(payload)
hash_input = canonical_header + canonical_payload
if algo == "SHA256":
return hashlib.sha256(hash_input.encode('utf-8')).hexdigest()
elif algo == "SHA3_256":
return hashlib.sha3_256(hash_input.encode('utf-8')).hexdigest()
else:
raise ValueError(f"Unsupported algorithm: {algo}")
# Example usage
header = {
"EventID": "019abc12-3456-7890-abcd-ef1234567890",
"EventType": "ORD",
"Timestamp": 1735520400000000,
"TimestampISO": "2025-12-30T00:00:00.000000Z"
}
payload = {
"Symbol": "EURUSD",
"Side": "BUY",
"Quantity": 100000,
"Price": 1.0850,
"OrderType": "LIMIT"
}
event_hash = calculate_event_hash(header, payload)
print(f"EventHash: {event_hash}")
# EventHash: 7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069
Optional: Hash Chain Linking
If you want real-time tamper detection (recommended for HFT):
class VCPEventChain:
def __init__(self):
self.prev_hash = "0" * 64 # Genesis
self.events = []
def add_event(self, header: dict, payload: dict) -> dict:
"""Add event with hash chain linking"""
# Calculate base hash
base_hash = calculate_event_hash(header, payload)
# Link to previous
chain_input = base_hash + self.prev_hash
event_hash = hashlib.sha256(chain_input.encode()).hexdigest()
event = {
"header": header,
"payload": payload,
"EventHash": event_hash,
"PrevHash": self.prev_hash
}
self.prev_hash = event_hash
self.events.append(event)
return event
Layer 2: Merkle Tree Construction
This is where v1.1 gets serious. The Merkle tree must follow RFC 6962 to prevent second preimage attacks.
from typing import List, Tuple
def merkle_hash_leaf(data: bytes) -> bytes:
"""RFC 6962: Leaf nodes get 0x00 prefix"""
return hashlib.sha256(b'\x00' + data).digest()
def merkle_hash_node(left: bytes, right: bytes) -> bytes:
"""RFC 6962: Internal nodes get 0x01 prefix"""
return hashlib.sha256(b'\x01' + left + right).digest()
def build_merkle_tree(event_hashes: List[str]) -> Tuple[str, List[List[bytes]]]:
"""
Build RFC 6962 compliant Merkle tree
Args:
event_hashes: List of hex-encoded event hashes
Returns:
(merkle_root, tree_levels) for audit path generation
"""
if not event_hashes:
raise ValueError("Cannot build tree from empty list")
# Convert hex strings to bytes and create leaf nodes
leaves = [merkle_hash_leaf(bytes.fromhex(h)) for h in event_hashes]
tree = [leaves]
current_level = leaves
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
# Handle odd number of nodes by duplicating last
right = current_level[i + 1] if i + 1 < len(current_level) else left
next_level.append(merkle_hash_node(left, right))
tree.append(next_level)
current_level = next_level
merkle_root = current_level[0].hex()
return merkle_root, tree
# Example: Build tree from 4 events
event_hashes = [
"7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069",
"a3f2b8c9e4d5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1",
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"fedcba0987654321fedcba0987654321fedcba0987654321fedcba0987654321"
]
merkle_root, tree = build_merkle_tree(event_hashes)
print(f"Merkle Root: {merkle_root}")
print(f"Tree depth: {len(tree)} levels")
Generating Audit Paths
To verify a single event without the full dataset:
def generate_audit_path(tree: List[List[bytes]], leaf_index: int) -> List[dict]:
"""
Generate inclusion proof for a specific event
Args:
tree: Merkle tree levels from build_merkle_tree()
leaf_index: Index of the event to prove
Returns:
List of sibling hashes with positions
"""
path = []
index = leaf_index
for level in tree[:-1]: # Exclude root level
sibling_index = index ^ 1 # XOR to get sibling
if sibling_index < len(level):
path.append({
"hash": level[sibling_index].hex(),
"position": "left" if sibling_index < index else "right"
})
index //= 2
return path
def verify_inclusion(event_hash: str, audit_path: List[dict], merkle_root: str) -> bool:
"""
Verify event inclusion using audit path
"""
current = merkle_hash_leaf(bytes.fromhex(event_hash))
for step in audit_path:
sibling = bytes.fromhex(step["hash"])
if step["position"] == "left":
current = merkle_hash_node(sibling, current)
else:
current = merkle_hash_node(current, sibling)
return current.hex() == merkle_root
Layer 3: External Anchoring
Here's where we make the audit trail externally verifiable. The key insight: you don't need to run a blockchain node or pay for expensive timestamping.
Free Option: OpenTimestamps
OpenTimestamps is a free, Bitcoin-backed timestamping service perfect for Silver tier.
import requests
import time
def anchor_opentimestamps(merkle_root: str) -> dict:
"""
Anchor Merkle root to Bitcoin via OpenTimestamps
Free tier: Perfect for Silver compliance
"""
# Submit hash for timestamping
digest = bytes.fromhex(merkle_root)
response = requests.post(
'https://a.pool.opentimestamps.org/digest',
data=digest,
headers={'Content-Type': 'application/octet-stream'}
)
if response.status_code == 200:
return {
"type": "PUBLIC_SERVICE",
"identifier": "opentimestamps.org",
"proof": response.content.hex(),
"status": "PENDING", # Confirmed after ~2 hours (Bitcoin block)
"timestamp": int(time.time() * 1_000_000)
}
else:
raise Exception(f"OpenTimestamps failed: {response.status_code}")
RFC 3161 TSA (Gold/Platinum)
For regulatory-grade timestamps:
from asn1crypto import tsp, core
import requests
def anchor_rfc3161(merkle_root: str, tsa_url: str) -> dict:
"""
RFC 3161 Time Stamp Authority anchoring
Recommended TSAs:
- FreeTSA: https://freetsa.org/tsr
- DigiCert: https://timestamp.digicert.com
"""
# Build TimeStampReq
message_imprint = tsp.MessageImprint({
'hash_algorithm': {'algorithm': 'sha256'},
'hashed_message': bytes.fromhex(merkle_root)
})
ts_request = tsp.TimeStampReq({
'version': 1,
'message_imprint': message_imprint,
'cert_req': True
})
response = requests.post(
tsa_url,
data=ts_request.dump(),
headers={'Content-Type': 'application/timestamp-query'}
)
ts_response = tsp.TimeStampResp.load(response.content)
return {
"type": "TSA",
"identifier": tsa_url,
"proof": response.content.hex(),
"timestamp": int(ts_response['time_stamp_token']['tst_info']['gen_time'].native.timestamp() * 1_000_000)
}
Putting It Together: Anchor Record
from dataclasses import dataclass
from typing import Optional
import nacl.signing
import base64
@dataclass
class AnchorRecord:
merkle_root: str
signature: str
sign_algo: str
timestamp: int
anchor_target: dict
event_count: int
first_event_id: str
last_event_id: str
policy_id: str
def create_anchor_record(
events: List[dict],
private_key: bytes,
policy_id: str,
anchor_func: callable
) -> AnchorRecord:
"""
Create complete VCP anchor record
"""
# Extract event hashes
event_hashes = [e["EventHash"] for e in events]
# Build Merkle tree
merkle_root, tree = build_merkle_tree(event_hashes)
# Sign the Merkle root (Ed25519)
signing_key = nacl.signing.SigningKey(private_key)
signature = signing_key.sign(bytes.fromhex(merkle_root))
signature_b64 = base64.b64encode(signature.signature).decode()
# External anchoring
anchor_result = anchor_func(merkle_root)
return AnchorRecord(
merkle_root=merkle_root,
signature=signature_b64,
sign_algo="ED25519",
timestamp=anchor_result["timestamp"],
anchor_target=anchor_result,
event_count=len(events),
first_event_id=events[0]["header"]["EventID"],
last_event_id=events[-1]["header"]["EventID"],
policy_id=policy_id
)
VCP-XREF: Dual Logging (New in v1.1)
This is the killer feature for prop trading and broker relationships. Both parties log independently, and discrepancies are cryptographically detectable.
Trader Broker
│ │
│─────── Order Request ───────────▶│
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ VCP Event (INIT) │ │ VCP Event (RECV) │
│ XREF-ID: abc123 │ │ XREF-ID: abc123 │
│ Role: INITIATOR │ │ Role: COUNTERPTY │
│ Hash: 0x7f83... │ │ Hash: 0x7f83... │
└──────────────────┘ └──────────────────┘
│ │
└──────────┬───────────────┘
▼
┌─────────────────┐
│ Cross-Reference │
│ Verification │
│ MATCH ✓ │
└─────────────────┘
Implementation
import uuid
from enum import Enum
class XrefRole(Enum):
INITIATOR = "INITIATOR"
COUNTERPARTY = "COUNTERPARTY"
OBSERVER = "OBSERVER"
class ReconciliationStatus(Enum):
PENDING = "PENDING"
MATCHED = "MATCHED"
DISCREPANCY = "DISCREPANCY"
TIMEOUT = "TIMEOUT"
@dataclass
class VCPXref:
cross_reference_id: str
party_role: XrefRole
counterparty_id: str
order_id: str
timestamp: int
tolerance_ms: int = 100
expected_counterparty_hash: Optional[str] = None
reconciliation_status: ReconciliationStatus = ReconciliationStatus.PENDING
def create_initiator_xref(order_id: str, counterparty: str) -> VCPXref:
"""Create XREF for the party initiating a transaction"""
return VCPXref(
cross_reference_id=str(uuid.uuid4()),
party_role=XrefRole.INITIATOR,
counterparty_id=counterparty,
order_id=order_id,
timestamp=int(time.time() * 1_000_000_000)
)
def create_counterparty_xref(
initiator_xref_id: str,
initiator_id: str,
order_id: str,
expected_hash: str
) -> VCPXref:
"""Create XREF for the receiving party"""
return VCPXref(
cross_reference_id=initiator_xref_id,
party_role=XrefRole.COUNTERPARTY,
counterparty_id=initiator_id,
order_id=order_id,
timestamp=int(time.time() * 1_000_000_000),
expected_counterparty_hash=expected_hash
)
def verify_xref_pair(initiator: VCPXref, counterparty: VCPXref) -> ReconciliationStatus:
"""Verify that both parties logged consistent events"""
# Check XREF ID match
if initiator.cross_reference_id != counterparty.cross_reference_id:
return ReconciliationStatus.DISCREPANCY
# Check order ID match
if initiator.order_id != counterparty.order_id:
return ReconciliationStatus.DISCREPANCY
# Check timestamp within tolerance
time_diff_ns = abs(initiator.timestamp - counterparty.timestamp)
tolerance_ns = initiator.tolerance_ms * 1_000_000
if time_diff_ns > tolerance_ns:
return ReconciliationStatus.DISCREPANCY
return ReconciliationStatus.MATCHED
The key guarantee: If Party A claims an event occurred and Party B denies it, the presence or absence of VCP-XREF records provides non-repudiable evidence. Manipulation requires collusion between both parties AND compromise of external anchors.
Complete Sidecar Implementation
Here's a minimal but complete VCP sidecar that you can integrate with any trading system:
"""
VCP v1.1 Sidecar Implementation
Minimal example for Silver tier compliance
"""
import hashlib
import json
import time
import uuid
from dataclasses import dataclass, asdict
from typing import List, Optional
from pathlib import Path
import nacl.signing
import base64
class VCPSidecar:
"""
Non-invasive VCP sidecar for trading systems
Design principles:
- Fail-safe: Sidecar failure never impacts trading
- Async-first: Event capture is asynchronous
- Idempotent: Duplicate events are handled safely
"""
def __init__(self,
policy_id: str,
private_key: bytes,
tier: str = "SILVER",
anchor_interval_hours: int = 24):
self.policy_id = policy_id
self.private_key = private_key
self.tier = tier
self.anchor_interval = anchor_interval_hours * 3600
self.event_buffer: List[dict] = []
self.anchor_records: List[dict] = []
self.last_anchor_time = 0
# Storage paths
self.storage_path = Path(f"./vcp_data/{policy_id}")
self.storage_path.mkdir(parents=True, exist_ok=True)
def capture_event(self, event_type: str, payload: dict) -> str:
"""
Capture trading event with VCP envelope
Returns: EventID
"""
event_id = str(uuid.uuid7()) if hasattr(uuid, 'uuid7') else str(uuid.uuid4())
timestamp = int(time.time() * 1_000_000)
header = {
"EventID": event_id,
"EventType": event_type,
"Timestamp": timestamp,
"TimestampISO": time.strftime("%Y-%m-%dT%H:%M:%S.000000Z", time.gmtime()),
"PolicyID": self.policy_id,
"ConformanceTier": self.tier
}
# Calculate event hash
event_hash = self._calculate_hash(header, payload)
event = {
"header": header,
"payload": payload,
"EventHash": event_hash
}
self.event_buffer.append(event)
# Check if anchoring needed
if time.time() - self.last_anchor_time >= self.anchor_interval:
self._perform_anchoring()
return event_id
def _calculate_hash(self, header: dict, payload: dict) -> str:
"""RFC 8785 canonical JSON hash"""
canonical = json.dumps(header, sort_keys=True, separators=(',', ':'))
canonical += json.dumps(payload, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
def _build_merkle_tree(self, event_hashes: List[str]) -> str:
"""RFC 6962 Merkle tree"""
def hash_leaf(data: bytes) -> bytes:
return hashlib.sha256(b'\x00' + data).digest()
def hash_node(left: bytes, right: bytes) -> bytes:
return hashlib.sha256(b'\x01' + left + right).digest()
leaves = [hash_leaf(bytes.fromhex(h)) for h in event_hashes]
current = leaves
while len(current) > 1:
next_level = []
for i in range(0, len(current), 2):
left = current[i]
right = current[i + 1] if i + 1 < len(current) else left
next_level.append(hash_node(left, right))
current = next_level
return current[0].hex()
def _perform_anchoring(self):
"""Anchor current buffer to external service"""
if not self.event_buffer:
return
# Build Merkle tree
event_hashes = [e["EventHash"] for e in self.event_buffer]
merkle_root = self._build_merkle_tree(event_hashes)
# Sign
signing_key = nacl.signing.SigningKey(self.private_key)
signature = signing_key.sign(bytes.fromhex(merkle_root))
# Anchor (OpenTimestamps for Silver tier)
anchor_result = self._anchor_opentimestamps(merkle_root)
anchor_record = {
"MerkleRoot": merkle_root,
"Signature": base64.b64encode(signature.signature).decode(),
"SignAlgo": "ED25519",
"Timestamp": int(time.time() * 1_000_000),
"AnchorTarget": anchor_result,
"EventCount": len(self.event_buffer),
"FirstEventID": self.event_buffer[0]["header"]["EventID"],
"LastEventID": self.event_buffer[-1]["header"]["EventID"],
"PolicyID": self.policy_id
}
# Persist
self._persist_batch(self.event_buffer, anchor_record)
self.anchor_records.append(anchor_record)
self.event_buffer = []
self.last_anchor_time = time.time()
def _anchor_opentimestamps(self, merkle_root: str) -> dict:
"""Free Bitcoin-backed timestamping"""
import requests
try:
response = requests.post(
'https://a.pool.opentimestamps.org/digest',
data=bytes.fromhex(merkle_root),
headers={'Content-Type': 'application/octet-stream'},
timeout=10
)
return {
"Type": "PUBLIC_SERVICE",
"Identifier": "opentimestamps.org",
"Proof": response.content.hex() if response.ok else "FAILED"
}
except Exception as e:
# Fail gracefully - don't break trading
return {
"Type": "PUBLIC_SERVICE",
"Identifier": "opentimestamps.org",
"Proof": f"ERROR:{str(e)}"
}
def _persist_batch(self, events: List[dict], anchor: dict):
"""Save to local storage"""
batch_id = anchor["FirstEventID"][:8]
batch_file = self.storage_path / f"batch_{batch_id}.json"
with open(batch_file, 'w') as f:
json.dump({
"events": events,
"anchor": anchor
}, f, indent=2)
# Usage example
if __name__ == "__main__":
# Generate key pair
private_key = nacl.signing.SigningKey.generate()
# Initialize sidecar
sidecar = VCPSidecar(
policy_id="com.example.trading:silver-demo",
private_key=bytes(private_key),
tier="SILVER",
anchor_interval_hours=24
)
# Capture trading events
sidecar.capture_event("ORD", {
"Symbol": "EURUSD",
"Side": "BUY",
"Quantity": 100000,
"Price": 1.0850
})
sidecar.capture_event("ACK", {
"OrderID": "ORD-001",
"Status": "ACCEPTED"
})
sidecar.capture_event("EXE", {
"OrderID": "ORD-001",
"FillPrice": 1.0851,
"FillQuantity": 100000
})
# Force anchor for demo
sidecar._perform_anchoring()
print(f"Anchored {len(sidecar.anchor_records)} batches")
print(f"Latest Merkle Root: {sidecar.anchor_records[-1]['MerkleRoot']}")
Tier Requirements Summary
| Requirement | Silver | Gold | Platinum |
|---|---|---|---|
| Clock Sync | Best-effort | NTP (<1ms) | PTP (<1µs) |
| Anchor Frequency | 24 hours | 1 hour | 10 minutes |
| Anchor Target | OpenTimestamps (free) | RFC 3161 TSA | Blockchain |
| Serialization | JSON | JSON | SBE/Binary |
| Signature | Ed25519 (delegated OK) | Ed25519 | Ed25519 (HSM) |
| Hash Chain | OPTIONAL | RECOMMENDED | REQUIRED |
What CME Could Have Had
If CME had deployed VCP sidecars, the November incident would have produced:
- Tamper-evident timeline of all cooling system events
- Externally verifiable decision timestamps (blockchain-anchored)
- Cross-referenced logs between CyrusOne and CME (VCP-XREF)
- Completeness proofs showing no events were omitted
Instead, we have competing narratives and no cryptographic proof.
Get Started
Specification: VCP v1.1 Full Spec
GitHub: github.com/veritaschain
IETF Draft: draft-kamimura-scitt-vcp
Questions? developers@veritaschain.org
VCP is an open standard (CC BY 4.0) developed by the VeritasChain Standards Organization. The protocol is vendor-neutral and implementation-agnostic.
"Verify, Don't Trust."
Discussion
Have you implemented audit trails for trading or financial systems? What challenges did you face with tamper-evidence and external verification? Let me know in the comments!
Top comments (0)