The $565 Million Question
In March 2025, Jane Street—one of the world's most sophisticated trading firms—found itself facing India's largest-ever securities penalty: ₹4,843 crore ($565 million USD).
The alleged crime? Market manipulation through coordinated trading across cash and derivatives markets.
The defense? "Our logs show we didn't do it."
The problem? Two different divisions of the same regulator looked at the same logs and reached opposite conclusions.
This is the Split-View Problem. And it's not just India's problem—it's a fundamental flaw in how we build audit systems.
SEBI Surveillance Division (December 2024):
→ "No conclusive evidence of market manipulation"
→ Recommendation: "May not be pursued further"
SEBI Enforcement Division (July 2025):
→ "Deliberately devised scheme to manipulate market"
→ Action: $565M escrow + market access ban
Same data. Opposite conclusions.
How is this possible? Because traditional logs can be:
- Interpreted differently by different analysts
- Modified after creation (no tamper evidence)
- Incomplete (no proof all events were recorded)
- Disputed on timestamps (no independent verification)
This article shows you how to build audit systems that mathematically prevent these problems.
Table of Contents
- Four Incidents That Changed Everything
- The Three-Layer Solution
- Layer 1: Making Every Event Tamper-Evident
- Layer 2: Proving Nothing Was Omitted
- Layer 3: Independent Verification
- Solving the Split-View Problem
- Solving the Code Disclosure Problem
- Solving the GDPR Paradox
- Complete Implementation
- Getting Started
Four Incidents That Changed Everything
Incident 1: China's Nuclear Option (January 2026)
China's securities regulator (CSRC) wanted to verify algorithmic trading compliance. Firms wouldn't share proprietary code. CSRC's solution? Physically remove all high-frequency trading servers from exchange data centers.
Timeline:
2024-02-19: Lingjun Investment sells ¥2.57B in 42 seconds
2024-02-29: CSRC summons 28 quant funds for "compliance training"
2024-05-16: Program trading rules tightened
2026-01-15: Colocation servers ordered removed
The underlying problem: No way to verify algorithm behavior without seeing the code.
Incident 2: India's Split-View Disaster (2025)
We already covered this one. Same data, opposite regulatory conclusions, $565 million at stake.
The underlying problem: Different parties can interpret the same logs differently.
Incident 3: The Flash Crash Nobody Could Explain (August 2024)
On August 5, 2024, Japan's Nikkei index dropped 12.4%—the worst single-day decline since 1987. A ¥40 trillion carry trade unwound across equities, forex, and crypto simultaneously.
Post-mortem analysis was impossible because:
- Events happened faster than logging systems could capture
- Cross-asset correlations weren't tracked
- Different venues used different time sources
- Margin call cascades were invisible until they completed
The underlying problem: Traditional logs can't reconstruct millisecond-level cascades.
Incident 4: The EU's Impossible Demand (2024-2026)
The EU AI Act requires "automatic recording of events" for AI systems. GDPR requires the "right to erasure." These two requirements directly conflict for immutable audit trails.
The underlying problem: How do you have tamper-proof logs AND delete personal data?
The Three-Layer Solution
VCP (VeritasChain Protocol) addresses all four incidents through a three-layer cryptographic architecture:
┌─────────────────────────────────────────────────────────────┐
│ Layer 3: EXTERNAL VERIFIABILITY │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ RFC 3161 Timestamps → Proves WHEN records existed │ │
│ │ Ed25519 Signatures → Proves WHO created them │ │
│ │ External Anchoring → Independent verification │ │
│ └───────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Layer 2: COLLECTION INTEGRITY │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Merkle Trees → Proves completeness │ │
│ │ Inclusion Proofs → Selective disclosure │ │
│ │ Consistency Proofs → Append-only guarantee │ │
│ └───────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Layer 1: EVENT INTEGRITY │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ SHA-256 Hashing → Unique fingerprint per event │ │
│ │ RFC 8785 Canon. → Deterministic hashing │ │
│ │ UUIDv7 IDs → Time-ordered identifiers │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Let's build each layer.
Layer 1: Making Every Event Tamper-Evident
The goal: Any modification to any event should be immediately detectable.
Step 1: Define Your Event Model
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
import time
import os
class EventType(IntEnum):
"""VCP Event Types - these codes are immutable"""
SIGNAL = 1 # Trading signal generated
ORDER = 2 # Order submitted
ACKNOWLEDGE = 3 # Order acknowledged by venue
EXECUTE = 4 # Trade executed
REJECT = 6 # Order rejected
CANCEL = 7 # Order cancelled
CLOSE = 9 # Position closed
ALGORITHM = 20 # Algorithm state change
RISK = 21 # Risk event
@dataclass
class EventHeader:
"""
VCP Event Header
Contains all metadata needed to identify and sequence events.
"""
event_id: str # UUIDv7 - time-ordered unique ID
trace_id: str # Links related events (e.g., order lifecycle)
timestamp_ns: str # Nanoseconds since epoch (string for precision)
timestamp_iso: str # Human-readable ISO 8601
event_type: EventType
venue_id: str # Exchange/broker identifier
symbol: str # Trading instrument
account_id: str # Pseudonymized account
@dataclass
class TradePayload:
"""
Trade-specific event data
Note: All numeric values are strings to preserve IEEE 754 precision.
"""
order_id: Optional[str] = None
side: Optional[str] = None # "BUY" or "SELL"
order_type: Optional[str] = None # "MARKET", "LIMIT", etc.
price: Optional[str] = None # String for precision
quantity: Optional[str] = None
execution_price: Optional[str] = None
commission: Optional[str] = None
slippage: Optional[str] = None
@dataclass
class SecurityBlock:
"""
Cryptographic security fields
"""
version: str = "1.1"
event_hash: str = ""
signature: str = ""
hash_algo: str = "SHA256"
sign_algo: str = "ED25519"
merkle_root: Optional[str] = None
merkle_index: Optional[int] = None
anchor_reference: Optional[str] = None
Step 2: Generate Time-Ordered UUIDs (UUIDv7)
UUIDv7 embeds a timestamp, ensuring events are naturally ordered:
def generate_uuid7() -> str:
"""
Generate RFC 9562 UUIDv7
Structure: tttttttt-tttt-7xxx-yxxx-xxxxxxxxxxxx
- First 48 bits: Unix timestamp in milliseconds
- Version 7 indicator
- Random bits for uniqueness
- Variant bits (RFC 4122)
Why UUIDv7?
- Time-ordered: Database indexes work efficiently
- Globally unique: No coordination needed
- Embeds creation time: Useful for debugging
"""
# Current time in milliseconds
timestamp_ms = int(time.time() * 1000)
# 6 bytes for timestamp (48 bits)
ts_bytes = timestamp_ms.to_bytes(6, 'big')
# 10 bytes of randomness
rand_bytes = os.urandom(10)
# Combine into 16-byte UUID
uuid_bytes = bytearray(16)
uuid_bytes[0:6] = ts_bytes
uuid_bytes[6:16] = rand_bytes
# Set version (7) in byte 6
uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x70
# Set variant (RFC 4122) in byte 8
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80
# Format as standard UUID string
h = uuid_bytes.hex()
return f"{h[:8]}-{h[8:12]}-{h[12:16]}-{h[16:20]}-{h[20:]}"
# Example output: "019478a3-b2c1-7d4e-8f01-234567890abc"
# ^^^^^^^^ ^^^^ timestamp portion
Step 3: Canonical JSON for Deterministic Hashing
The same data must always produce the same hash. JSON doesn't guarantee key ordering, so we need RFC 8785 canonicalization:
import json
import hashlib
def canonicalize(obj) -> str:
"""
RFC 8785 JSON Canonicalization Scheme
Ensures identical objects produce identical strings:
1. Keys sorted lexicographically (Unicode code point order)
2. No insignificant whitespace
3. Numbers in specific format
4. Strings properly escaped
Example:
{"b": 1, "a": 2} → '{"a":2,"b":1}'
"""
def _sort_recursively(item):
if isinstance(item, dict):
# Sort keys and recurse into values
return {k: _sort_recursively(v)
for k, v in sorted(item.items())}
elif isinstance(item, list):
return [_sort_recursively(i) for i in item]
else:
return item
sorted_obj = _sort_recursively(obj)
# Compact JSON: no whitespace, sorted keys
return json.dumps(
sorted_obj,
separators=(',', ':'),
ensure_ascii=False,
sort_keys=True
)
def calculate_event_hash(header: EventHeader, payload: TradePayload) -> str:
"""
Calculate SHA-256 hash of canonicalized event
This hash serves as the event's unique fingerprint.
Any modification - even a single bit - produces a completely
different hash.
"""
event_dict = {
"header": {
"event_id": header.event_id,
"trace_id": header.trace_id,
"timestamp_ns": header.timestamp_ns,
"timestamp_iso": header.timestamp_iso,
"event_type": header.event_type.value,
"venue_id": header.venue_id,
"symbol": header.symbol,
"account_id": header.account_id
},
"payload": {k: v for k, v in vars(payload).items() if v is not None}
}
canonical = canonicalize(event_dict)
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
Step 4: Digital Signatures for Attribution
Every event must be signed to prove who created it:
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey, Ed25519PublicKey
)
import base64
class EventSigner:
"""
Ed25519 signing for VCP events
Why Ed25519?
- Fast: ~70,000 signatures/second on modern hardware
- Secure: 128-bit security level
- Compact: 64-byte signatures, 32-byte keys
- Deterministic: Same input always produces same signature
"""
def __init__(self, private_key: Ed25519PrivateKey = None):
if private_key is None:
private_key = Ed25519PrivateKey.generate()
self.private_key = private_key
self.public_key = private_key.public_key()
def sign(self, event_hash: str) -> str:
"""
Sign an event hash
Args:
event_hash: Hex-encoded SHA-256 hash
Returns:
Base64-encoded signature
"""
hash_bytes = bytes.fromhex(event_hash)
signature = self.private_key.sign(hash_bytes)
return base64.b64encode(signature).decode('ascii')
def verify(self, event_hash: str, signature: str) -> bool:
"""
Verify a signature
Returns True if signature is valid, False otherwise.
"""
try:
hash_bytes = bytes.fromhex(event_hash)
sig_bytes = base64.b64decode(signature)
self.public_key.verify(sig_bytes, hash_bytes)
return True
except Exception:
return False
def get_public_key_hex(self) -> str:
"""Export public key for sharing with verifiers"""
from cryptography.hazmat.primitives import serialization
public_bytes = self.public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
return public_bytes.hex()
Layer 2: Proving Nothing Was Omitted
Individual event integrity isn't enough. An attacker could simply delete incriminating events. We need to prove completeness.
The Merkle Tree Solution
A Merkle tree creates a single hash (the "root") that represents an entire collection of events. If any event is added, removed, or modified, the root changes.
from typing import List, Tuple
class MerkleTree:
"""
RFC 6962 compliant Merkle Tree
Key insight: Domain separation prevents second preimage attacks.
- Leaf nodes: SHA256(0x00 || data)
- Internal nodes: SHA256(0x01 || left || right)
This ensures an attacker can't craft data that produces
a collision between leaf and internal nodes.
"""
LEAF_PREFIX = b'\x00'
NODE_PREFIX = b'\x01'
def __init__(self):
self.leaves: List[str] = []
self.tree: List[List[str]] = []
self._root: str = ""
def _hash_leaf(self, event_hash: str) -> str:
"""Hash a leaf with domain separation"""
data = bytes.fromhex(event_hash)
return hashlib.sha256(self.LEAF_PREFIX + data).hexdigest()
def _hash_node(self, left: str, right: str) -> str:
"""Hash an internal node with domain separation"""
combined = bytes.fromhex(left) + bytes.fromhex(right)
return hashlib.sha256(self.NODE_PREFIX + combined).hexdigest()
def add(self, event_hash: str) -> int:
"""
Add an event hash and return its index
Events are added in order. The index is needed later
for generating inclusion proofs.
"""
index = len(self.leaves)
self.leaves.append(event_hash)
return index
def build(self) -> str:
"""
Build the complete tree and return the root
This should be called after all events in a batch
have been added.
"""
if not self.leaves:
return ""
# Level 0: Hash all leaves
current_level = [self._hash_leaf(leaf) for leaf in self.leaves]
self.tree = [current_level]
# Build up until we have a single root
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
# Odd number of nodes: duplicate the last one
right = current_level[i + 1] if i + 1 < len(current_level) else left
next_level.append(self._hash_node(left, right))
self.tree.append(next_level)
current_level = next_level
self._root = self.tree[-1][0]
return self._root
@property
def root(self) -> str:
return self._root
def get_proof(self, index: int) -> List[Tuple[str, str]]:
"""
Generate inclusion proof for event at index
An inclusion proof allows anyone to verify that a specific
event is part of the tree without seeing all other events.
Returns:
List of (sibling_hash, direction) tuples
direction: 'L' = sibling on left, 'R' = sibling on right
"""
if not self.tree or index >= len(self.leaves):
raise ValueError(f"Invalid index: {index}")
proof = []
current_idx = index
for level in range(len(self.tree) - 1):
level_nodes = self.tree[level]
# Determine sibling position
if current_idx % 2 == 0:
sibling_idx = current_idx + 1
direction = 'R' # Sibling is on the right
else:
sibling_idx = current_idx - 1
direction = 'L' # Sibling is on the left
# Handle edge case of odd-length levels
sibling_hash = (level_nodes[sibling_idx]
if sibling_idx < len(level_nodes)
else level_nodes[current_idx])
proof.append((sibling_hash, direction))
current_idx //= 2
return proof
@staticmethod
def verify_proof(
event_hash: str,
proof: List[Tuple[str, str]],
expected_root: str
) -> bool:
"""
Verify an inclusion proof
This can be done by anyone with:
1. The original event hash
2. The proof (list of sibling hashes)
3. The expected Merkle root
No access to other events required!
"""
# Start with the hashed leaf
current = hashlib.sha256(
MerkleTree.LEAF_PREFIX + bytes.fromhex(event_hash)
).hexdigest()
# Walk up the tree using the proof
for sibling_hash, direction in proof:
if direction == 'L':
combined = bytes.fromhex(sibling_hash) + bytes.fromhex(current)
else:
combined = bytes.fromhex(current) + bytes.fromhex(sibling_hash)
current = hashlib.sha256(
MerkleTree.NODE_PREFIX + combined
).hexdigest()
return current == expected_root
Visualization: How Merkle Proofs Work
Tree structure for 4 events:
Root
/ \
H(01) H(23)
/ \ / \
H(E0) H(E1) H(E2) H(E3)
| | | |
E0 E1 E2 E3
Proof for E1:
To prove E1 is in the tree, provide:
1. H(E0) - sibling at level 0 (direction: L)
2. H(23) - sibling at level 1 (direction: R)
Verification:
1. Hash E1 → H(E1)
2. Combine H(E0) + H(E1) → H(01)
3. Combine H(01) + H(23) → Root
4. Compare with expected Root ✓
If ANY event changes, the Root changes.
If ANY event is omitted, the proof fails.
Layer 3: Independent Verification
Layers 1 and 2 prove integrity and completeness, but only relative to your own system. An attacker could theoretically:
- Modify events
- Rebuild the Merkle tree
- Generate new signatures
- Claim the new version is the "real" one
Layer 3 solves this with external anchoring: committing your Merkle root to an independent third party that you don't control.
Option 1: RFC 3161 Time Stamping Authority
import requests
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TimestampToken:
"""Represents an RFC 3161 timestamp"""
timestamp: datetime
merkle_root: str
tsa_name: str
token_id: str
token_bytes: bytes
class RFC3161Anchor:
"""
RFC 3161 Time Stamping Authority client
TSAs are independent organizations that:
1. Receive your hash
2. Add their timestamp
3. Sign the combination
4. Return a cryptographic token
Popular TSAs:
- FreeTSA (free): https://freetsa.org/tsr
- DigiCert: https://timestamp.digicert.com
- Sectigo: http://timestamp.sectigo.com
"""
def __init__(self, tsa_url: str):
self.tsa_url = tsa_url
def timestamp(self, merkle_root: str) -> TimestampToken:
"""
Request a timestamp from the TSA
In production, this creates an ASN.1-encoded request
per RFC 3161. Simplified here for clarity.
"""
# Create timestamp request
request_body = self._create_request(merkle_root)
response = requests.post(
self.tsa_url,
data=request_body,
headers={'Content-Type': 'application/timestamp-query'}
)
if response.status_code != 200:
raise Exception(f"TSA error: {response.status_code}")
return self._parse_response(response.content, merkle_root)
def _create_request(self, merkle_root: str) -> bytes:
"""Create RFC 3161 timestamp request"""
# Actual implementation would use pyasn1
# This is a simplified placeholder
return bytes.fromhex(merkle_root)
def _parse_response(self, data: bytes, merkle_root: str) -> TimestampToken:
"""Parse RFC 3161 timestamp response"""
return TimestampToken(
timestamp=datetime.utcnow(),
merkle_root=merkle_root,
tsa_name=self.tsa_url,
token_id=hashlib.sha256(data).hexdigest()[:16],
token_bytes=data
)
Option 2: OpenTimestamps (Free, Bitcoin-backed)
For lighter implementations, OpenTimestamps provides free anchoring to Bitcoin:
import subprocess
import tempfile
import base64
class OpenTimestampsAnchor:
"""
OpenTimestamps provides free, Bitcoin-backed timestamping.
How it works:
1. Your hash is combined with others in a Merkle tree
2. The root is embedded in a Bitcoin transaction
3. Bitcoin's proof-of-work secures the timestamp
Advantages:
- Free
- Decentralized (no single point of trust)
- Permanent (as long as Bitcoin exists)
Disadvantages:
- Confirmation takes 1-2 hours (Bitcoin block time)
- Not suitable for real-time verification
Install: pip install opentimestamps-client
"""
def stamp(self, merkle_root: str) -> str:
"""
Create an OpenTimestamps proof
Returns: Base64-encoded .ots proof file
"""
# Write hash to temp file
with tempfile.NamedTemporaryFile(
mode='wb', delete=False, suffix='.hash'
) as f:
f.write(bytes.fromhex(merkle_root))
hash_file = f.name
try:
# Run ots command
result = subprocess.run(
['ots', 'stamp', hash_file],
capture_output=True,
check=True
)
# Read the generated proof
ots_file = hash_file + '.ots'
with open(ots_file, 'rb') as f:
proof_bytes = f.read()
return base64.b64encode(proof_bytes).decode()
finally:
import os
os.unlink(hash_file)
if os.path.exists(hash_file + '.ots'):
os.unlink(hash_file + '.ots')
def verify(self, merkle_root: str, proof_b64: str) -> bool:
"""
Verify an OpenTimestamps proof
Returns True if the proof is valid AND confirmed on Bitcoin
"""
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as f:
f.write(bytes.fromhex(merkle_root))
hash_file = f.name
with tempfile.NamedTemporaryFile(
mode='wb', delete=False, suffix='.ots'
) as f:
f.write(base64.b64decode(proof_b64))
ots_file = f.name
try:
result = subprocess.run(
['ots', 'verify', '-f', hash_file, ots_file],
capture_output=True
)
return result.returncode == 0
finally:
import os
os.unlink(hash_file)
os.unlink(ots_file)
Solving the Split-View Problem
Remember the Jane Street case? Two divisions saw different versions of reality. VCP's Gossip Protocol makes this mathematically impossible.
The Gossip Protocol
from typing import Dict, List, Set
from dataclasses import dataclass
import time
@dataclass
class SignedRoot:
"""A Merkle root signed by a log server"""
server_id: str
timestamp: float
merkle_root: str
signature: str
class GossipProtocol:
"""
VCP Gossip Protocol for split-view detection
The idea: All log servers periodically exchange their
signed Merkle roots. If any server shows different
data to different parties, the inconsistency is
immediately detected.
This is similar to Certificate Transparency, where
multiple monitors ensure no certificate authority
can issue rogue certificates without detection.
"""
def __init__(self, server_id: str, signer: EventSigner):
self.server_id = server_id
self.signer = signer
self.peers: Set[str] = set()
self.known_roots: Dict[str, Dict[str, SignedRoot]] = {}
# known_roots[server_id][timestamp_key] = SignedRoot
def add_peer(self, peer_id: str):
"""Add a peer server to gossip with"""
self.peers.add(peer_id)
self.known_roots[peer_id] = {}
def create_signed_root(self, merkle_root: str) -> SignedRoot:
"""Create a signed root to broadcast to peers"""
timestamp = time.time()
# Sign the combination of server_id, timestamp, and root
message = f"{self.server_id}:{timestamp}:{merkle_root}"
message_hash = hashlib.sha256(message.encode()).hexdigest()
signature = self.signer.sign(message_hash)
return SignedRoot(
server_id=self.server_id,
timestamp=timestamp,
merkle_root=merkle_root,
signature=signature
)
def receive_root(self, signed_root: SignedRoot) -> bool:
"""
Process a signed root from a peer
Returns True if consistent, False if split-view detected
"""
server_id = signed_root.server_id
timestamp_key = str(int(signed_root.timestamp / 3600)) # Hourly buckets
# Verify signature
if not self._verify_root(signed_root):
print(f"⚠️ Invalid signature from {server_id}")
return True # Ignore invalid signatures
# Check for split-view
if server_id in self.known_roots:
existing = self.known_roots[server_id].get(timestamp_key)
if existing and existing.merkle_root != signed_root.merkle_root:
# SPLIT VIEW DETECTED!
self._alert_split_view(existing, signed_root)
return False
# Store this root
if server_id not in self.known_roots:
self.known_roots[server_id] = {}
self.known_roots[server_id][timestamp_key] = signed_root
return True
def _verify_root(self, signed_root: SignedRoot) -> bool:
"""Verify the signature on a signed root"""
message = (f"{signed_root.server_id}:{signed_root.timestamp}:"
f"{signed_root.merkle_root}")
message_hash = hashlib.sha256(message.encode()).hexdigest()
# In production, lookup the peer's public key
return True # Simplified
def _alert_split_view(self, prev: SignedRoot, current: SignedRoot):
"""Alert on split-view detection"""
print(f"""
╔══════════════════════════════════════════════════════════════╗
║ 🚨 SPLIT-VIEW ATTACK DETECTED 🚨 ║
╠══════════════════════════════════════════════════════════════╣
║ Server: {prev.server_id:<50} ║
║ Time Window: {prev.timestamp:<46.0f} ║
║ Previous Root: {prev.merkle_root[:32]}... ║
║ Current Root: {current.merkle_root[:32]}... ║
╠══════════════════════════════════════════════════════════════╣
║ This server showed different data to different parties! ║
║ All associated audit evidence should be considered suspect. ║
╚══════════════════════════════════════════════════════════════╝
""")
How This Prevents Jane Street's Problem
With Gossip Protocol:
┌─────────────┐
│ VCP Server │
│ (Jane St.) │
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ SEBI │ │ SEBI │ │ Monitor │
│ Surveil. │ │ Enforce. │ │ Node │
└──────────┘ └──────────┘ └──────────┘
Every hour, all parties compare Merkle roots.
If Jane Street's server showed different data
to Surveillance vs. Enforcement, the Monitor
would detect the discrepancy IMMEDIATELY.
The split-view problem becomes mathematically impossible.
Solving the Code Disclosure Problem
China's CSRC wanted to verify algorithm behavior but firms wouldn't share code. VCP's Governance Module provides a middle ground.
Algorithm Metadata Without Source Code
@dataclass
class AlgorithmGovernance:
"""
VCP-GOV: Algorithm governance metadata
This provides regulators with verifiable information about
algorithm behavior WITHOUT exposing proprietary code.
"""
# Identity
algorithm_id: str # Hash of algorithm binary
algorithm_version: str # Semantic version
model_hash: str # Hash of ML model weights (if applicable)
# Transparency
decision_factors: List[str] # What inputs influence decisions
# e.g., ["price_momentum", "order_flow", "volatility"]
# Risk controls
risk_parameters: Dict[str, any]
# e.g., {"max_position_pct": 0.05, "kill_switch_threshold": 0.02}
# Audit trail
last_audit_date: str
approved_by: str # Hash of approver identity
def create_governance_event(
algo: AlgorithmGovernance,
header_base: EventHeader
) -> Dict:
"""
Create a governance event for regulatory submission
This proves:
✓ Which algorithm version was running
✓ What inputs it considers
✓ What risk limits are in place
✓ When it was last audited
WITHOUT revealing:
✗ How the algorithm works
✗ Trading strategies
✗ Competitive intelligence
"""
return {
"header": {
"event_id": generate_uuid7(),
"trace_id": header_base.trace_id,
"timestamp_ns": str(int(time.time() * 1e9)),
"timestamp_iso": datetime.utcnow().isoformat(),
"event_type": EventType.ALGORITHM.value,
"venue_id": header_base.venue_id,
"symbol": "*", # Applies to all symbols
"account_id": header_base.account_id
},
"governance": {
"algorithm_id": algo.algorithm_id,
"algorithm_version": algo.algorithm_version,
"model_hash": algo.model_hash,
"decision_factors": algo.decision_factors,
"risk_parameters": algo.risk_parameters,
"last_audit_date": algo.last_audit_date,
"approved_by": algo.approved_by
}
}
# Example usage:
algo_metadata = AlgorithmGovernance(
algorithm_id=hashlib.sha256(open("algo.py", "rb").read()).hexdigest(),
algorithm_version="2.3.1",
model_hash="sha256:a1b2c3d4e5f6...",
decision_factors=["price_momentum", "order_flow", "volatility", "spread"],
risk_parameters={
"max_position_pct": 0.05,
"max_order_size_usd": 1000000,
"kill_switch_drawdown": 0.02,
"max_orders_per_second": 100
},
last_audit_date="2024-02-01",
approved_by=hashlib.sha256(b"Risk Committee 2024").hexdigest()[:16]
)
Solving the GDPR Paradox
How do you have immutable audit logs AND the right to erasure?
Crypto-shredding.
The Technique
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import secrets
class CryptoShredding:
"""
GDPR-compliant immutable audit trails
The insight: Encrypt personal data with per-subject keys.
To "erase" data, delete the key. The ciphertext remains
but is computationally unrecoverable.
This satisfies:
✓ GDPR Article 17 (right to erasure)
✓ EU AI Act Article 12 (automatic logging)
✓ MiFID II record-keeping (5-7 year retention)
EDPB (EU Data Protection Board) explicitly endorses this
approach in their April 2025 blockchain guidance.
"""
def __init__(self):
# In production: Use HSM or secure key management
self.keys: Dict[str, bytes] = {}
self.shredded: Set[str] = set()
def get_key(self, data_subject_id: str) -> bytes:
"""Get or create encryption key for a data subject"""
if data_subject_id in self.shredded:
raise ValueError(f"Key for {data_subject_id} has been shredded")
if data_subject_id not in self.keys:
self.keys[data_subject_id] = secrets.token_bytes(32)
return self.keys[data_subject_id]
def encrypt_pii(
self,
data_subject_id: str,
plaintext: str
) -> Dict[str, str]:
"""
Encrypt personal data for storage in VCP event
"""
key = self.get_key(data_subject_id)
# AES-256-GCM encryption
nonce = secrets.token_bytes(12)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None)
return {
"ciphertext": base64.b64encode(nonce + ciphertext).decode(),
"key_id": self._derive_key_id(data_subject_id),
"algorithm": "AES-256-GCM"
}
def decrypt_pii(self, encrypted: Dict[str, str]) -> str:
"""Decrypt personal data (for authorized access)"""
key_id = encrypted["key_id"]
data_subject_id = self._lookup_subject(key_id)
if data_subject_id in self.shredded:
raise ValueError("Data has been erased (key shredded)")
key = self.keys[data_subject_id]
raw = base64.b64decode(encrypted["ciphertext"])
nonce, ciphertext = raw[:12], raw[12:]
aesgcm = AESGCM(key)
return aesgcm.decrypt(nonce, ciphertext, None).decode()
def shred(self, data_subject_id: str) -> bool:
"""
GDPR Article 17: Right to erasure
Securely delete the encryption key.
The ciphertext remains in the audit log,
but is now computationally unrecoverable.
"""
if data_subject_id in self.keys:
# Secure deletion
del self.keys[data_subject_id]
self.shredded.add(data_subject_id)
print(f"🗑️ Shredded key for subject: {data_subject_id[:8]}...")
print(" Associated data is now computationally unrecoverable.")
return True
return False
def _derive_key_id(self, data_subject_id: str) -> str:
"""Derive a stable key ID from subject ID"""
return hashlib.sha256(
f"keyid:{data_subject_id}".encode()
).hexdigest()[:16]
def _lookup_subject(self, key_id: str) -> str:
"""Reverse lookup (in production: use a proper index)"""
for subject_id in self.keys:
if self._derive_key_id(subject_id) == key_id:
return subject_id
raise ValueError(f"Unknown key_id: {key_id}")
The Magic: Hash Chain Integrity Preserved
Before shredding:
┌─────────────────────────────────────────────────────────────┐
│ Event: ORDER_SUBMITTED │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ account_id: "ENC:AES256:KeyID_abc123..." │ │
│ │ order_details: "ENC:AES256:KeyID_abc123..." │ │
│ │ amount: "1000000" (non-PII, plaintext) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ event_hash: "sha256:def456..." │
└─────────────────────────────────────────────────────────────┘
After shredding KeyID_abc123:
┌─────────────────────────────────────────────────────────────┐
│ Event: ORDER_SUBMITTED │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ account_id: "ENC:AES256:KeyID_abc123..." [UNREADABLE] │ │
│ │ order_details: "ENC:AES256:KeyID_abc123..." [UNREADABLE]│
│ │ amount: "1000000" (non-PII, still readable) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ event_hash: "sha256:def456..." [UNCHANGED!] │
└─────────────────────────────────────────────────────────────┘
✓ Personal data is unrecoverable (GDPR satisfied)
✓ Event hash unchanged (audit integrity preserved)
✓ Merkle tree unaffected (completeness proof still valid)
✓ Non-personal data still accessible (regulatory compliance)
Complete Implementation
Putting it all together:
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from datetime import datetime, timezone
import time
@dataclass
class VCPEvent:
"""Complete VCP v1.1 compliant event"""
header: EventHeader
payload: TradePayload
security: SecurityBlock
governance: Optional[AlgorithmGovernance] = None
class VCPLogger:
"""
Production-ready VCP logger
Implements all three layers:
- Layer 1: Event hashing and signing
- Layer 2: Merkle tree batching
- Layer 3: External anchoring
"""
def __init__(
self,
signer: EventSigner,
policy_id: str,
tier: str = "SILVER", # SILVER, GOLD, or PLATINUM
anchor: Optional[RFC3161Anchor] = None,
batch_size: int = 100,
anchor_interval: int = 86400 # seconds
):
self.signer = signer
self.policy_id = policy_id
self.tier = tier
self.anchor = anchor
self.batch_size = batch_size
self.anchor_interval = anchor_interval
self.pending: List[VCPEvent] = []
self.tree = MerkleTree()
self.last_anchor = time.time()
self.crypto_shredder = CryptoShredding()
def log(
self,
event_type: EventType,
payload: TradePayload,
venue_id: str,
symbol: str,
account_id: str,
trace_id: Optional[str] = None
) -> VCPEvent:
"""
Log a trading event with full VCP compliance
"""
now = datetime.now(timezone.utc)
# Pseudonymize account (GDPR)
pseudo_account = self._pseudonymize(account_id)
header = EventHeader(
event_id=generate_uuid7(),
trace_id=trace_id or generate_uuid7(),
timestamp_ns=str(int(now.timestamp() * 1e9)),
timestamp_iso=now.isoformat(),
event_type=event_type,
venue_id=venue_id,
symbol=symbol,
account_id=pseudo_account
)
# Layer 1: Calculate hash and sign
event_hash = calculate_event_hash(header, payload)
signature = self.signer.sign(event_hash)
security = SecurityBlock(
version="1.1",
event_hash=event_hash,
signature=signature
)
event = VCPEvent(header=header, payload=payload, security=security)
# Layer 2: Add to Merkle tree
merkle_idx = self.tree.add(event_hash)
event.security.merkle_index = merkle_idx
self.pending.append(event)
# Check anchor conditions
if self._should_anchor():
self._anchor_batch()
return event
def _should_anchor(self) -> bool:
"""Determine if we should anchor the current batch"""
# Size threshold
if len(self.pending) >= self.batch_size:
return True
# Time threshold
if time.time() - self.last_anchor >= self.anchor_interval:
return True
return False
def _anchor_batch(self) -> Optional[str]:
"""Anchor the current batch"""
if not self.pending:
return None
# Layer 2: Build tree
merkle_root = self.tree.build()
# Layer 3: External anchor
anchor_ref = None
if self.anchor:
try:
token = self.anchor.timestamp(merkle_root)
anchor_ref = token.token_id
except Exception as e:
print(f"⚠️ Anchoring failed: {e}")
# Update all events
for event in self.pending:
event.security.merkle_root = merkle_root
event.security.anchor_reference = anchor_ref
# Archive
self._archive(self.pending, merkle_root, anchor_ref)
# Reset
self.pending = []
self.tree = MerkleTree()
self.last_anchor = time.time()
return merkle_root
def _pseudonymize(self, account_id: str) -> str:
"""GDPR-compliant pseudonymization"""
import os
salt = os.environ.get('VCP_SALT', 'default')
return hashlib.sha256(f"{salt}:{account_id}".encode()).hexdigest()[:16]
def _archive(
self,
events: List[VCPEvent],
root: str,
anchor: Optional[str]
):
"""Archive batch (implement your storage here)"""
print(f"📦 Archived {len(events)} events")
print(f" Root: {root[:16]}...")
if anchor:
print(f" Anchor: {anchor}")
def flush(self):
"""Force anchor pending events"""
if self.pending:
self._anchor_batch()
def close(self):
"""Close logger, ensuring all events are anchored"""
self.flush()
print("✅ Logger closed")
# Usage example
if __name__ == "__main__":
# Initialize
signer = EventSigner()
anchor = RFC3161Anchor("https://freetsa.org/tsr")
logger = VCPLogger(
signer=signer,
policy_id="org.example:demo-001",
tier="SILVER",
anchor=anchor,
batch_size=10,
anchor_interval=3600
)
# Log a trade lifecycle
trace = generate_uuid7()
# Signal
logger.log(
EventType.SIGNAL,
TradePayload(side="BUY", price="2650.50", quantity="1.0"),
"DEMO_EXCHANGE",
"XAUUSD",
"trader-123",
trace
)
# Order
logger.log(
EventType.ORDER,
TradePayload(
order_id="ORD-001",
side="BUY",
order_type="MARKET",
price="2650.50",
quantity="1.0"
),
"DEMO_EXCHANGE",
"XAUUSD",
"trader-123",
trace
)
# Execution
logger.log(
EventType.EXECUTE,
TradePayload(
order_id="ORD-001",
execution_price="2650.55",
quantity="1.0",
commission="7.50"
),
"DEMO_EXCHANGE",
"XAUUSD",
"trader-123",
trace
)
logger.close()
Getting Started
1. Choose Your Tier
| Tier | Target | Anchor Freq | Timestamp | Use Case |
|---|---|---|---|---|
| Silver | Retail/Prop | Daily | ms | MetaTrader, small firms |
| Gold | Institutional | Hourly | μs | Asset managers |
| Platinum | HFT | Per-batch | ns | Exchanges, HFT |
2. Install
# Python
pip install vcp-sdk
# TypeScript
npm install @veritaschain/vcp-sdk
# MQL5
# Download from github.com/veritaschain/vcp-mql-bridge
3. Run Conformance Tests
# Validate your implementation
vcp-test run --tier Silver
# 125 tests across 9 categories
# Required pass rate: 95% (Silver), 98% (Gold), 100% (Platinum)
4. Get Certified
Apply for VC-Certified status: certified.veritaschain.org
Resources
- GitHub: github.com/veritaschain/vcp-spec
- IETF Draft: datatracker.ietf.org/doc/draft-kamimura-scitt-vcp
- Specification: veritaschain.org/spec
- Discord: discord.gg/veritaschain
Conclusion
The four incidents we examined—China's colocation ban, India's split-view disaster, Japan's flash crash, and the EU's GDPR paradox—all stem from the same root cause: traditional audit systems rely on trust instead of verification.
VCP flips this model. Instead of asking "do you trust these logs?", it asks "can you verify this mathematical proof?"
The answer to the second question doesn't depend on who's asking.
"Verify, Don't Trust"
Have questions? Found a bug? Open an issue on GitHub or join our Discord.
veritaschain
/
vcp-spec
Official specification for the VeritasChain Protocol (VCP) v1.0 – global audit standard for algorithmic trading.
VeritasChain Protocol (VCP)
VeritasChain Protocol (VCP) is an open, vendor-neutral standard for
cryptographically verifiable audit trails in algorithmic and AI-driven
trading systems.
VCP enables regulators, auditors, and market participants to
verify — not merely trust — the integrity, completeness, and ordering of
trading decisions, orders, executions, and risk controls.
This repository is maintained by the
VeritasChain Standards Organization (VSO).
📌 Canonical Specification Location (IMPORTANT)
The canonical (normative) specification of VCP is located under:
/spec/
├─ v1.0/
└─ v1.1/
- Each version directory contains the authoritative specification (
SPEC.md) - Files outside
/spec/are non-normative - HTML, PDF, or translated documents (if any) are provided for convenience only
If there is any conflict, the content under /spec/ always prevails.
📘 Available Versions
▶ Current Stable
-
v1.1 — latest specification with strengthened integrity guarantees
→
/spec/v1.1/
▶ Legacy
-
v1.0 — initial released version
→
/spec/v1.0/
Migration notes and compatibility considerations are documented inside…
Top comments (0)