TL;DR: The EU AI Act is coming for algorithmic trading. Traditional logs can be tampered with. VCP v1.1 uses Merkle trees + external anchoring to create verifiable audit trails. Here's how to implement it.
π― The Problem
You're building a trading bot. Your compliance team says you need audit logs. Easy, right?
# "Audit logging" circa 2024
def execute_trade(order):
result = broker.submit(order)
logger.info(f"Trade executed: {order} -> {result}")
db.insert("audit_log", {
"timestamp": datetime.now(),
"order": order,
"result": result
})
return result
Ship it. β
Except... what happens when a regulator asks: "How do I know you didn't delete the embarrassing trades?"
Your answer: "Trust me bro."
That's not going to fly under the EU AI Act.
π₯ Why This Matters Now
In November 2025, three things happened:
EBA Factsheet (Nov 21): Confirmed AI credit scoring is high-risk. Algo trading classification pending EC guidance (February 2026).
EU Parliament Resolution (Nov 25): Called for "tamper-resistant logs with audit integration" in financial AI systems.
CEN-CENELEC Standards (Oct 23): Logging capabilities standard expected Q2 2026. Specifies cryptographic integrity requirements.
The regulatory direction is clear: logs need to be verifiable, not just stored.
ποΈ Enter VCP v1.1: The Architecture
VeritasChain Protocol (VCP) is an open specification for tamper-evident audit trails. Version 1.1 introduces a three-layer security architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LAYER 3: EXTERNAL VERIFIABILITY β
β β’ Digital Signature (Ed25519) βββ REQUIRED β
β β’ External Anchor (blockchain/TSA) βββ REQUIRED β
β β Third parties can verify without trusting you β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β LAYER 2: COLLECTION INTEGRITY β
β β’ Merkle Tree (RFC 6962) βββ REQUIRED β
β β’ Merkle Root βββ REQUIRED β
β β Proves batch completeness (no omissions) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β LAYER 1: EVENT INTEGRITY β
β β’ EventHash (SHA-256) βββ REQUIRED β
β β’ PrevHash (hash chain) βββ OPTIONAL β
β β Individual event integrity β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Let's implement each layer.
π» Layer 1: Event Integrity
Every event gets a cryptographic hash. Change one bit, the hash changes completely.
EventHash Calculation
import hashlib
import json
from typing import Any
def canonicalize_json(obj: Any) -> str:
"""
RFC 8785 JSON Canonicalization Scheme (simplified)
- Sort keys alphabetically
- No whitespace
- Unicode normalization
"""
return json.dumps(obj, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
def calculate_event_hash(header: dict, payload: dict, algo: str = "sha256") -> str:
"""
Calculate EventHash for a VCP event
Args:
header: Event header (EventID, Timestamp, EventType, etc.)
payload: Event payload (trade data, risk parameters, etc.)
algo: Hash algorithm (sha256, sha3_256, blake3)
Returns:
Hex-encoded hash string
"""
canonical_header = canonicalize_json(header)
canonical_payload = canonicalize_json(payload)
# Concatenate canonicalized components
hash_input = (canonical_header + canonical_payload).encode('utf-8')
if algo == "sha256":
return hashlib.sha256(hash_input).hexdigest()
elif algo == "sha3_256":
return hashlib.sha3_256(hash_input).hexdigest()
else:
raise ValueError(f"Unsupported algorithm: {algo}")
# Example usage
header = {
"EventID": "01958c5a-b2d0-7f12-8a4f-1234567890ab", # UUID v7
"TimestampISO": "2026-01-17T10:30:00.123456Z",
"TimestampInt": 1768738200123456,
"EventType": "ORDER_SUBMIT"
}
payload = {
"Symbol": "EURUSD",
"Side": "BUY",
"Volume": 1.0,
"Price": 1.0850,
"AlgorithmID": "trend_follower_v2"
}
event_hash = calculate_event_hash(header, payload)
print(f"EventHash: {event_hash}")
# EventHash: 3a7f2c1b9e8d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a
Why Canonicalization Matters
Without canonicalization, the same data can produce different hashes:
# These are logically identical but hash differently
obj1 = {"b": 2, "a": 1}
obj2 = {"a": 1, "b": 2}
json.dumps(obj1) # '{"b": 2, "a": 1}'
json.dumps(obj2) # '{"a": 1, "b": 2}'
# Different strings β different hashes β verification fails
RFC 8785 solves this with deterministic serialization rules.
Optional: Hash Chains (PrevHash)
VCP v1.1 makes hash chains optional. But if you want real-time tamper detection between anchor points, you can still use them:
class HashChainLogger:
def __init__(self):
self.prev_hash = "0" * 64 # Genesis
def log_event(self, header: dict, payload: dict) -> dict:
# Include prev_hash in header
header["PrevHash"] = self.prev_hash
# Calculate hash including the chain
event_hash = calculate_event_hash(header, payload)
# Update chain
self.prev_hash = event_hash
return {
"Header": header,
"Payload": payload,
"Security": {
"EventHash": event_hash,
"PrevHash": header["PrevHash"]
}
}
When to use hash chains:
- β HFT systems (real-time gap detection)
- β Regulatory submission (auditors expect it)
- β Backtesting (events generated out of order)
- β MT4/MT5 DLLs (unnecessary complexity)
π³ Layer 2: Collection Integrity (Merkle Trees)
Here's where VCP gets interesting. Individual hashes prove events weren't modified. But how do you prove events weren't deleted?
Merkle trees.
RFC 6962 Merkle Tree Construction
VCP mandates RFC 6962 compliance to prevent second preimage attacks. The key is domain separation:
from typing import List, Tuple
import hashlib
def merkle_leaf_hash(data: bytes) -> bytes:
"""Hash a leaf node with 0x00 prefix (domain separation)"""
return hashlib.sha256(b'\x00' + data).digest()
def merkle_internal_hash(left: bytes, right: bytes) -> bytes:
"""Hash an internal node with 0x01 prefix (domain separation)"""
return hashlib.sha256(b'\x01' + left + right).digest()
def build_merkle_tree(event_hashes: List[str]) -> Tuple[str, List[List[bytes]]]:
"""
Build RFC 6962 compliant Merkle tree
Args:
event_hashes: List of hex-encoded event hashes
Returns:
(merkle_root, tree_levels) where tree_levels[0] is leaves
"""
if not event_hashes:
raise ValueError("Cannot build tree from empty list")
# Convert to bytes and hash as leaves
leaves = [bytes.fromhex(h) for h in event_hashes]
current_level = [merkle_leaf_hash(leaf) for leaf in leaves]
tree = [current_level]
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
# If odd number of nodes, duplicate the last one
right = current_level[i + 1] if i + 1 < len(current_level) else current_level[i]
next_level.append(merkle_internal_hash(left, right))
tree.append(next_level)
current_level = next_level
merkle_root = current_level[0].hex()
return merkle_root, tree
# Example: 4 events
event_hashes = [
"a1b2c3d4e5f6789012345678901234567890123456789012345678901234567890",
"b2c3d4e5f67890123456789012345678901234567890123456789012345678901a",
"c3d4e5f678901234567890123456789012345678901234567890123456789012ab",
"d4e5f6789012345678901234567890123456789012345678901234567890123abc"
]
merkle_root, tree = build_merkle_tree(event_hashes)
print(f"Merkle Root: {merkle_root}")
print(f"Tree depth: {len(tree)}")
Visualizing the Tree
[Root]
/ \
[H01] [H23]
/ \ / \
[H0] [H1] [H2] [H3]
| | | |
Event0 Event1 Event2 Event3
Key property: Change any event β the root changes. Remove any event β the root changes. Add any event β the root changes.
Generating Merkle Proofs
To prove a specific event is included in the root:
def generate_merkle_proof(tree: List[List[bytes]], leaf_index: int) -> List[dict]:
"""
Generate audit path for a specific event
Returns list of sibling hashes needed to reconstruct root
"""
proof = []
index = leaf_index
for level in tree[:-1]: # Exclude root level
sibling_index = index ^ 1 # XOR to get sibling
if sibling_index < len(level):
proof.append({
"hash": level[sibling_index].hex(),
"position": "left" if sibling_index < index else "right"
})
index //= 2
return proof
def verify_merkle_proof(
event_hash: str,
proof: List[dict],
merkle_root: str
) -> bool:
"""Verify that an event is included in a Merkle root"""
current = merkle_leaf_hash(bytes.fromhex(event_hash))
for step in proof:
sibling = bytes.fromhex(step["hash"])
if step["position"] == "left":
current = merkle_internal_hash(sibling, current)
else:
current = merkle_internal_hash(current, sibling)
return current.hex() == merkle_root
# Generate proof for Event1
proof = generate_merkle_proof(tree, leaf_index=1)
print(f"Proof for Event1: {proof}")
# Verify
is_valid = verify_merkle_proof(event_hashes[1], proof, merkle_root)
print(f"Verification: {'β
PASS' if is_valid else 'β FAIL'}")
β Layer 3: External Verifiability
This is the critical change in VCP v1.1: External anchoring is now REQUIRED for all tiers.
Why? Because without external anchoring, an operator can:
- Generate events with valid hashes
- Build a valid Merkle tree
- Realize some events are problematic
- Regenerate the entire tree without those events
- Present the sanitized version
No one would know because no one saw the original root.
External anchoring creates an irrevocable commitment.
Anchoring Options by Tier
| Tier | Frequency | Targets | Cost |
|---|---|---|---|
| Platinum | 10 min | Ethereum, Bitcoin, RFC 3161 TSA | $$ |
| Gold | 1 hour | RFC 3161 TSA, attested database | $ |
| Silver | 24 hours | OpenTimestamps, FreeTSA | Free |
Silver Tier: OpenTimestamps Implementation
OpenTimestamps is free and Bitcoin-backed. Perfect for getting started:
# pip install opentimestamps-client
import subprocess
import tempfile
import os
def anchor_with_opentimestamps(merkle_root: str) -> dict:
"""
Anchor a Merkle root using OpenTimestamps
Returns anchor proof that can be independently verified
"""
# Write root to temp file
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.txt') as f:
f.write(bytes.fromhex(merkle_root))
temp_path = f.name
try:
# Create timestamp (this calls the OTS servers)
subprocess.run(
['ots', 'stamp', temp_path],
check=True,
capture_output=True
)
# Read the proof file
proof_path = temp_path + '.ots'
with open(proof_path, 'rb') as f:
ots_proof = f.read()
return {
"type": "OPENTIMESTAMPS",
"merkle_root": merkle_root,
"proof": ots_proof.hex(),
"status": "PENDING" # Confirmed after Bitcoin block
}
finally:
os.unlink(temp_path)
if os.path.exists(temp_path + '.ots'):
os.unlink(temp_path + '.ots')
def verify_opentimestamps(anchor: dict) -> bool:
"""Verify an OpenTimestamps anchor"""
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.txt') as f:
f.write(bytes.fromhex(anchor["merkle_root"]))
data_path = f.name
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.ots') as f:
f.write(bytes.fromhex(anchor["proof"]))
proof_path = f.name
try:
result = subprocess.run(
['ots', 'verify', proof_path],
capture_output=True,
text=True
)
return "Success!" in result.stdout or result.returncode == 0
finally:
os.unlink(data_path)
os.unlink(proof_path)
Gold/Platinum Tier: RFC 3161 TSA
For production systems, use a qualified Time Stamp Authority:
import requests
import hashlib
from asn1crypto import tsp, cms
from datetime import datetime
def anchor_with_tsa(merkle_root: str, tsa_url: str = "https://freetsa.org/tsr") -> dict:
"""
Anchor using RFC 3161 Time Stamp Authority
Args:
merkle_root: Hex-encoded Merkle root
tsa_url: TSA server URL
Returns:
Anchor record with TSA token
"""
# Create timestamp request
digest = bytes.fromhex(merkle_root)
ts_request = tsp.TimeStampReq({
'version': 1,
'message_imprint': {
'hash_algorithm': {'algorithm': 'sha256'},
'hashed_message': digest
},
'cert_req': True
})
# Send to TSA
response = requests.post(
tsa_url,
data=ts_request.dump(),
headers={'Content-Type': 'application/timestamp-query'}
)
if response.status_code != 200:
raise Exception(f"TSA request failed: {response.status_code}")
# Parse response
ts_response = tsp.TimeStampResp.load(response.content)
if ts_response['status']['status'].native != 'granted':
raise Exception(f"TSA rejected request: {ts_response['status']}")
token = ts_response['time_stamp_token']
return {
"type": "RFC3161_TSA",
"merkle_root": merkle_root,
"tsa_url": tsa_url,
"token": response.content.hex(),
"timestamp": datetime.utcnow().isoformat() + "Z"
}
π§ Putting It All Together: Complete Implementation
Here's a production-ready Silver tier implementation:
"""
VCP v1.1 Silver Tier Implementation
"""
import uuid
import time
import hashlib
import json
from datetime import datetime
from typing import List, Optional
from dataclasses import dataclass, field
from nacl.signing import SigningKey
import base64
@dataclass
class VCPEvent:
header: dict
payload: dict
security: dict
policy_identification: dict
@dataclass
class AnchorRecord:
id: str
merkle_root: str
signature: str
anchor_target: dict
event_count: int
first_event_id: str
last_event_id: str
timestamp: int
class VCPSilverTier:
"""
VCP v1.1 Silver Tier Implementation
Requirements:
- EventHash: REQUIRED
- Merkle Tree: REQUIRED
- Digital Signature: REQUIRED
- External Anchor: REQUIRED (24 hours)
- PrevHash: OPTIONAL (not implemented here)
"""
def __init__(self, private_key_hex: str, policy_id: str, issuer: str):
"""
Initialize VCP logger
Args:
private_key_hex: Ed25519 private key (32 bytes hex)
policy_id: Unique policy identifier
issuer: Organization name
"""
self.signing_key = SigningKey(bytes.fromhex(private_key_hex))
self.verify_key = self.signing_key.verify_key
self.policy_id = policy_id
self.issuer = issuer
self.pending_events: List[VCPEvent] = []
self.anchored_batches: List[AnchorRecord] = []
self.last_anchor_time: Optional[float] = None
def _generate_uuid7(self) -> str:
"""Generate UUID v7 (time-ordered)"""
# Simplified UUID v7 generation
timestamp_ms = int(time.time() * 1000)
random_bits = int.from_bytes(uuid.uuid4().bytes[6:], 'big')
uuid_int = (timestamp_ms << 80) | (0b0111 << 76) | (random_bits & ((1 << 76) - 1))
return str(uuid.UUID(int=uuid_int))
def _canonicalize(self, obj: dict) -> str:
"""RFC 8785 canonicalization"""
return json.dumps(obj, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
def _calculate_hash(self, *parts: str) -> str:
"""Calculate SHA-256 hash of concatenated parts"""
combined = ''.join(parts).encode('utf-8')
return hashlib.sha256(combined).hexdigest()
def _sign(self, message: str) -> str:
"""Sign message with Ed25519, return base64"""
signed = self.signing_key.sign(message.encode('utf-8'))
return base64.b64encode(signed.signature).decode('ascii')
def log_event(
self,
event_type: str,
payload: dict,
timestamp: Optional[datetime] = None
) -> VCPEvent:
"""
Log a VCP event
Args:
event_type: Event type code (e.g., "ORDER_SUBMIT", "TRADE_EXECUTE")
payload: Event payload data
timestamp: Optional timestamp (defaults to now)
Returns:
VCPEvent object
"""
ts = timestamp or datetime.utcnow()
ts_int = int(ts.timestamp() * 1_000_000) # Microseconds
header = {
"EventID": self._generate_uuid7(),
"TimestampISO": ts.strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z",
"TimestampInt": ts_int,
"EventType": event_type,
"Version": "1.1"
}
# Calculate EventHash
event_hash = self._calculate_hash(
self._canonicalize(header),
self._canonicalize(payload)
)
# Sign the hash
signature = self._sign(event_hash)
security = {
"Version": "1.1",
"EventHash": event_hash,
"HashAlgo": "SHA256",
"Signature": signature,
"SignAlgo": "ED25519",
"PublicKey": self.verify_key.encode().hex()
}
policy_identification = {
"PolicyID": self.policy_id,
"ConformanceTier": "SILVER",
"RegistrationPolicy": {
"Issuer": self.issuer,
"IssuanceTimestamp": ts_int
},
"VerificationDepth": {
"MerkleProofRequired": True,
"ExternalAnchorRequired": True
}
}
event = VCPEvent(
header=header,
payload=payload,
security=security,
policy_identification=policy_identification
)
self.pending_events.append(event)
return event
def anchor_batch(self) -> Optional[AnchorRecord]:
"""
Anchor pending events to external service
REQUIRED: Must be called at least daily for Silver tier
Returns:
AnchorRecord or None if no pending events
"""
if not self.pending_events:
return None
# Collect event hashes
event_hashes = [e.security["EventHash"] for e in self.pending_events]
# Build Merkle tree
merkle_root, tree = self._build_merkle_tree(event_hashes)
# Sign Merkle root
root_signature = self._sign(merkle_root)
# External anchor (simplified - in production, call actual service)
anchor_id = self._generate_uuid7()
anchor_target = {
"Type": "PUBLIC_SERVICE",
"Identifier": "opentimestamps.org",
"Proof": f"ots_pending_{anchor_id}" # Placeholder
}
# Create anchor record
anchor = AnchorRecord(
id=anchor_id,
merkle_root=merkle_root,
signature=root_signature,
anchor_target=anchor_target,
event_count=len(self.pending_events),
first_event_id=self.pending_events[0].header["EventID"],
last_event_id=self.pending_events[-1].header["EventID"],
timestamp=int(time.time() * 1_000_000)
)
# Update events with Merkle info
for i, event in enumerate(self.pending_events):
event.security["MerkleRoot"] = merkle_root
event.security["MerkleIndex"] = i
event.security["AnchorReference"] = anchor_id
# Generate and store audit path
event.security["AuditPath"] = self._generate_audit_path(tree, i)
self.anchored_batches.append(anchor)
self.pending_events = []
self.last_anchor_time = time.time()
return anchor
def _build_merkle_tree(self, event_hashes: List[str]):
"""Build RFC 6962 Merkle tree"""
leaves = [bytes.fromhex(h) for h in event_hashes]
current = [hashlib.sha256(b'\x00' + leaf).digest() for leaf in leaves]
tree = [current]
while len(current) > 1:
next_level = []
for i in range(0, len(current), 2):
left = current[i]
right = current[i + 1] if i + 1 < len(current) else current[i]
next_level.append(hashlib.sha256(b'\x01' + left + right).digest())
tree.append(next_level)
current = next_level
return current[0].hex(), tree
def _generate_audit_path(self, tree: List[List[bytes]], index: int) -> List[dict]:
"""Generate Merkle proof for event at index"""
proof = []
for level in tree[:-1]:
sibling_idx = index ^ 1
if sibling_idx < len(level):
proof.append({
"hash": level[sibling_idx].hex(),
"position": "left" if sibling_idx < index else "right"
})
index //= 2
return proof
def verify_event(self, event: VCPEvent) -> dict:
"""
Verify a VCP event's integrity
Returns verification result with details
"""
results = {
"event_id": event.header["EventID"],
"checks": {}
}
# Check 1: EventHash
calculated_hash = self._calculate_hash(
self._canonicalize(event.header),
self._canonicalize(event.payload)
)
results["checks"]["event_hash"] = calculated_hash == event.security["EventHash"]
# Check 2: Signature
try:
from nacl.signing import VerifyKey
vk = VerifyKey(bytes.fromhex(event.security["PublicKey"]))
sig = base64.b64decode(event.security["Signature"])
vk.verify(event.security["EventHash"].encode(), sig)
results["checks"]["signature"] = True
except Exception:
results["checks"]["signature"] = False
# Check 3: Merkle proof (if anchored)
if "MerkleRoot" in event.security and "AuditPath" in event.security:
results["checks"]["merkle_proof"] = self._verify_merkle_proof(
event.security["EventHash"],
event.security["AuditPath"],
event.security["MerkleRoot"]
)
results["valid"] = all(results["checks"].values())
return results
def _verify_merkle_proof(self, event_hash: str, proof: List[dict], root: str) -> bool:
"""Verify Merkle inclusion proof"""
current = hashlib.sha256(b'\x00' + bytes.fromhex(event_hash)).digest()
for step in proof:
sibling = bytes.fromhex(step["hash"])
if step["position"] == "left":
current = hashlib.sha256(b'\x01' + sibling + current).digest()
else:
current = hashlib.sha256(b'\x01' + current + sibling).digest()
return current.hex() == root
def export_event(self, event: VCPEvent) -> dict:
"""Export event as JSON-serializable dict"""
return {
"Header": event.header,
"Payload": event.payload,
"Security": event.security,
"PolicyIdentification": event.policy_identification
}
# ========================================
# Usage Example
# ========================================
if __name__ == "__main__":
# Generate a key (in production, load from secure storage)
from nacl.signing import SigningKey
key = SigningKey.generate()
private_key_hex = key.encode().hex()
# Initialize VCP logger
vcp = VCPSilverTier(
private_key_hex=private_key_hex,
policy_id="DEMO-SILVER-2026-001",
issuer="Demo Trading Co."
)
# Log some trading events
events = []
# Order submission
events.append(vcp.log_event("ORDER_SUBMIT", {
"Symbol": "EURUSD",
"Side": "BUY",
"Volume": 1.0,
"Price": 1.0850,
"OrderType": "LIMIT",
"AlgorithmID": "trend_follower_v2"
}))
# Trade execution
events.append(vcp.log_event("TRADE_EXECUTE", {
"OrderID": events[0].header["EventID"],
"Symbol": "EURUSD",
"Side": "BUY",
"Volume": 1.0,
"ExecutionPrice": 1.0851,
"Slippage": 0.0001
}))
# Risk check
events.append(vcp.log_event("RISK_CHECK", {
"AccountEquity": 10000.00,
"OpenPositions": 1,
"MarginUsed": 100.00,
"DrawdownPercent": 0.5,
"RiskStatus": "NORMAL"
}))
# Anchor the batch
anchor = vcp.anchor_batch()
print("=" * 60)
print("VCP v1.1 Silver Tier Demo")
print("=" * 60)
print(f"\nπ¦ Logged {len(events)} events")
print(f"π³ Merkle Root: {anchor.merkle_root[:32]}...")
print(f"βοΈ Signature: {anchor.signature[:32]}...")
print(f"β Anchor ID: {anchor.id}")
# Verify each event
print("\nπ Verification Results:")
for event in events:
result = vcp.verify_event(event)
status = "β
" if result["valid"] else "β"
print(f" {status} {event.header['EventType']}: {result['checks']}")
# Export as JSON
print("\nπ Sample Event JSON:")
print(json.dumps(vcp.export_event(events[0]), indent=2))
π§ͺ Testing Your Implementation
VCP v1.1 defines critical conformance tests:
import pytest
class TestVCPConformance:
"""VCP v1.1 Conformance Test Suite"""
def test_event_hash_calculation(self, vcp_logger):
"""SCH-001: Event structure validation"""
event = vcp_logger.log_event("TEST", {"data": "value"})
assert "EventHash" in event.security
assert len(event.security["EventHash"]) == 64 # SHA-256 hex
def test_uuid_v7_format(self, vcp_logger):
"""UID-001: UUID v7 format validation"""
event = vcp_logger.log_event("TEST", {})
event_id = event.header["EventID"]
# UUID v7 has version nibble = 7
version = int(event_id[14], 16)
assert version == 7
def test_merkle_tree_construction(self, vcp_logger):
"""MKL-001: Merkle tree construction"""
for _ in range(5):
vcp_logger.log_event("TEST", {"i": _})
anchor = vcp_logger.anchor_batch()
assert anchor.merkle_root is not None
assert len(anchor.merkle_root) == 64
def test_merkle_proof_verification(self, vcp_logger):
"""MKL-002: Merkle proof verification"""
events = [vcp_logger.log_event("TEST", {"i": i}) for i in range(4)]
anchor = vcp_logger.anchor_batch()
for event in events:
result = vcp_logger.verify_event(event)
assert result["checks"]["merkle_proof"] is True
def test_external_anchor_present(self, vcp_logger):
"""ANC-001: External anchor presence"""
vcp_logger.log_event("TEST", {})
anchor = vcp_logger.anchor_batch()
assert anchor.anchor_target is not None
assert "Type" in anchor.anchor_target
assert "Identifier" in anchor.anchor_target
def test_signature_verification(self, vcp_logger):
"""SIG-001: Signature algorithm compliance"""
event = vcp_logger.log_event("TEST", {})
result = vcp_logger.verify_event(event)
assert result["checks"]["signature"] is True
assert event.security["SignAlgo"] == "ED25519"
Run with: pytest test_vcp.py -v
π VCP Compliance Tiers Comparison
| Feature | Silver | Gold | Platinum |
|---|---|---|---|
| Target | Retail, MT4/MT5 | Prop firms | HFT, Exchanges |
| Clock Sync | Best-effort | NTP (<1ms) | PTPv2 (<1ΞΌs) |
| Anchor Frequency | 24 hours | 1 hour | 10 minutes |
| Timestamp Precision | MILLISECOND | MICROSECOND | NANOSECOND |
| Signature | Ed25519 (delegated) | Ed25519 (client) | Ed25519 (HSM) |
| Serialization | JSON | JSON | SBE |
| Throughput | ~1K events/sec | ~100K events/sec | >1M events/sec |
π Quick Start Checklist
Ready to implement VCP v1.1? Here's your checklist:
- [ ] Generate Ed25519 keypair (use
naclorcryptographylibrary) - [ ] Define your PolicyID (unique identifier for your implementation)
- [ ] Implement EventHash calculation (RFC 8785 canonicalization + SHA-256)
- [ ] Implement Merkle tree (RFC 6962 with domain separation)
- [ ] Set up external anchoring (OpenTimestamps for free, TSA for production)
- [ ] Add daily anchor cron job (or more frequent for Gold/Platinum)
- [ ] Run conformance tests (MKL-001, MKL-002, ANC-001, SIG-001)
- [ ] Document your Policy (issuer, tier, verification depth)
π Resources
- VCP Specification: github.com/veritaschain/vcp-spec
- IETF Draft: datatracker.ietf.org/doc/draft-kamimura-scitt-vcp/
- RFC 6962 (Certificate Transparency): tools.ietf.org/html/rfc6962
- RFC 8785 (JSON Canonicalization): tools.ietf.org/html/rfc8785
- OpenTimestamps: opentimestamps.org
π― Summary
Traditional audit logs are trust-based. VCP v1.1 makes them verification-based.
The three-layer architecture provides:
- Event Integrity (Layer 1): Individual events can't be modified
- Collection Integrity (Layer 2): Events can't be deleted
- External Verifiability (Layer 3): Anyone can verify without trusting you
With EU AI Act coming into force and CEN-CENELEC standards specifying "tamper-resistant logs," this isn't optional anymore. Start with Silver tier, scale to Platinum as needed.
The code is open. The spec is free. The deadline is 2026.
Have questions? Drop a comment below or open an issue on the VCP GitHub repo.
Tags: #cryptography #trading #python #fintech #compliance #merkletree #blockchain #security #opensource #euaiact
Top comments (0)