DEV Community

Cover image for Building a Cryptographic Audit Trail for Financial Compliance: From Hash Chains to Multi-Regulation Coverage

Building a Cryptographic Audit Trail for Financial Compliance: From Hash Chains to Multi-Regulation Coverage

A deep dive into implementing tamper-evident logging for algorithmic trading systems


When the SEC delayed Rule 13f-2 until 2028, most compliance teams breathed a sigh of relief. But for engineers building trading infrastructure, the delay signals something different: we have time to build this right.

This article walks through implementing a cryptographic audit trail system that satisfies multiple financial regulations—SEC Rule 13f-2, MiFID II RTS 25, and EU AI Act Article 12—through a unified architecture. We'll cover:

  • Hash chain fundamentals and why they matter for audit trails
  • Merkle tree structures for efficient verification
  • Ed25519 signatures for event authentication
  • Sidecar integration patterns for FIX and MT4/MT5
  • Event schema design for multi-regulation compliance

Let's build something that actually works.


Table of Contents

  1. The Problem: Why Traditional Logging Fails
  2. Cryptographic Foundations
  3. The VCP Event Schema
  4. Hash Chain Implementation
  5. Merkle Tree for Batch Verification
  6. Digital Signatures with Ed25519
  7. Sidecar Architecture
  8. FIX Protocol Integration
  9. MT4/MT5 Bridge Implementation
  10. Multi-Regulation Mapping
  11. Putting It All Together

1. The Problem: Why Traditional Logging Fails

Traditional audit logging looks something like this:

# The "trust me bro" approach
def log_trade(trade):
    with open('audit.log', 'a') as f:
        f.write(f"{datetime.now()},{trade.symbol},{trade.qty},{trade.price}\n")
Enter fullscreen mode Exit fullscreen mode

This has several problems for regulatory compliance:

Issue Why It Matters
No tamper evidence Anyone with file access can modify historical records
No sequencing proof Can't prove event A happened before event B
No authentication Can't prove who generated the log entry
No efficient verification Must read entire log to verify any single entry

When regulators ask "prove this trade happened at this time with these parameters," you need more than a CSV file.


2. Cryptographic Foundations

The VeritasChain Protocol (VCP) uses three cryptographic primitives:

Hash Chains (Tamper Evidence)

Each event includes a hash of the previous event. Modifying any historical event breaks the chain:

Event₁ → H(Event₁) → Event₂ → H(Event₂) → Event₃ → ...
              ↓                    ↓
         prev_hash             prev_hash
Enter fullscreen mode Exit fullscreen mode

Merkle Trees (Efficient Verification)

For batch verification, events are organized into a binary tree where each parent is the hash of its children:

                    Root Hash
                   /         \
            H(A+B)             H(C+D)
           /     \            /     \
        H(A)    H(B)       H(C)    H(D)
         |       |          |       |
      Event A  Event B   Event C  Event D
Enter fullscreen mode Exit fullscreen mode

To prove Event B is in the tree, you only need: H(A), H(C+D), and the root. That's O(log n) instead of O(n).

Digital Signatures (Authentication)

Ed25519 signatures prove who generated each event without requiring shared secrets:

# Signing
signature = Ed25519.sign(private_key, event_bytes)

# Verification (anyone can do this)
valid = Ed25519.verify(public_key, event_bytes, signature)
Enter fullscreen mode Exit fullscreen mode

3. The VCP Event Schema

VCP defines canonical event types that map to multiple regulations:

from enum import Enum
from dataclasses import dataclass
from typing import Optional
import uuid
from datetime import datetime

class EventType(Enum):
    SIG = "SIG"  # Signal (AI/algorithm output)
    ORD = "ORD"  # Order submission
    ACK = "ACK"  # Order acknowledgment
    EXE = "EXE"  # Execution/fill
    REJ = "REJ"  # Rejection
    CXL = "CXL"  # Cancellation
    MOD = "MOD"  # Modification
    POS = "POS"  # Position snapshot

@dataclass
class VCPEvent:
    # Core fields (required)
    event_id: str           # UUID v7 (time-ordered)
    event_type: EventType
    timestamp: datetime     # ISO 8601 with microseconds
    actor_id: str           # Entity generating the event

    # Trading fields (conditional)
    instrument_id: Optional[str] = None   # ISIN, CUSIP, or internal ID
    quantity: Optional[float] = None
    price: Optional[float] = None
    side: Optional[str] = None            # BUY, SELL, SHORT
    order_id: Optional[str] = None

    # AI/Algorithm fields (for EU AI Act)
    model_id: Optional[str] = None
    model_version: Optional[str] = None
    decision_factors: Optional[dict] = None
    confidence_score: Optional[float] = None

    # Chain fields (computed)
    prev_hash: Optional[str] = None
    event_hash: Optional[str] = None
    signature: Optional[str] = None
Enter fullscreen mode Exit fullscreen mode

UUID v7: Time-Ordered Identifiers

VCP uses UUID v7 (RFC 9562) for event IDs because they're:

  • Globally unique
  • Time-ordered (first 48 bits are Unix timestamp in milliseconds)
  • K-sortable (lexicographic order = chronological order)
import uuid
import time

def generate_uuid7() -> str:
    """Generate a UUID v7 with millisecond precision."""
    timestamp_ms = int(time.time() * 1000)

    # First 48 bits: timestamp
    # Next 4 bits: version (7)
    # Next 12 bits: random
    # Next 2 bits: variant
    # Last 62 bits: random

    uuid_int = (timestamp_ms & 0xFFFFFFFFFFFF) << 80
    uuid_int |= 0x7000 << 64  # Version 7
    uuid_int |= (uuid.uuid4().int & 0x0FFF) << 64
    uuid_int |= 0x8000000000000000  # Variant
    uuid_int |= uuid.uuid4().int & 0x3FFFFFFFFFFFFFFF

    return str(uuid.UUID(int=uuid_int))

# Example output: "018d5f3c-7a2b-7def-8123-456789abcdef"
Enter fullscreen mode Exit fullscreen mode

4. Hash Chain Implementation

The hash chain links events cryptographically:

import hashlib
import json
from typing import List

class HashChain:
    def __init__(self):
        self.events: List[VCPEvent] = []
        self.genesis_hash = "0" * 64  # SHA-256 zero hash

    def _canonicalize(self, event: VCPEvent) -> bytes:
        """
        RFC 8785 JSON Canonicalization Scheme (JCS).
        Ensures deterministic serialization for hashing.
        """
        # Sort keys, no whitespace, escape unicode
        event_dict = {
            "event_id": event.event_id,
            "event_type": event.event_type.value,
            "timestamp": event.timestamp.isoformat(),
            "actor_id": event.actor_id,
        }

        # Add optional fields only if present
        if event.instrument_id:
            event_dict["instrument_id"] = event.instrument_id
        if event.quantity is not None:
            event_dict["quantity"] = event.quantity
        if event.price is not None:
            event_dict["price"] = event.price
        if event.side:
            event_dict["side"] = event.side
        if event.model_id:
            event_dict["model_id"] = event.model_id
        if event.decision_factors:
            event_dict["decision_factors"] = event.decision_factors

        # Include prev_hash in the hash calculation
        event_dict["prev_hash"] = event.prev_hash

        # Sort keys for deterministic output
        return json.dumps(event_dict, sort_keys=True, 
                         separators=(',', ':')).encode('utf-8')

    def _compute_hash(self, event: VCPEvent) -> str:
        """Compute SHA-256 hash of canonical event representation."""
        canonical = self._canonicalize(event)
        return hashlib.sha256(canonical).hexdigest()

    def append(self, event: VCPEvent) -> VCPEvent:
        """Add event to chain, computing prev_hash and event_hash."""
        # Set prev_hash from last event (or genesis)
        if self.events:
            event.prev_hash = self.events[-1].event_hash
        else:
            event.prev_hash = self.genesis_hash

        # Compute this event's hash
        event.event_hash = self._compute_hash(event)

        self.events.append(event)
        return event

    def verify_chain(self) -> bool:
        """Verify entire chain integrity."""
        for i, event in enumerate(self.events):
            # Check prev_hash linkage
            expected_prev = self.genesis_hash if i == 0 else self.events[i-1].event_hash
            if event.prev_hash != expected_prev:
                return False

            # Recompute and verify event_hash
            if event.event_hash != self._compute_hash(event):
                return False

        return True

    def detect_tampering(self) -> Optional[int]:
        """Return index of first tampered event, or None if chain is valid."""
        for i, event in enumerate(self.events):
            expected_prev = self.genesis_hash if i == 0 else self.events[i-1].event_hash
            if event.prev_hash != expected_prev:
                return i
            if event.event_hash != self._compute_hash(event):
                return i
        return None
Enter fullscreen mode Exit fullscreen mode

Tamper Detection in Action

# Create chain and add events
chain = HashChain()

event1 = VCPEvent(
    event_id=generate_uuid7(),
    event_type=EventType.ORD,
    timestamp=datetime.utcnow(),
    actor_id="algo-001",
    instrument_id="US0378331005",  # AAPL ISIN
    quantity=100,
    price=150.25,
    side="BUY"
)
chain.append(event1)

event2 = VCPEvent(
    event_id=generate_uuid7(),
    event_type=EventType.EXE,
    timestamp=datetime.utcnow(),
    actor_id="exchange-nyse",
    instrument_id="US0378331005",
    quantity=100,
    price=150.25,
    side="BUY",
    order_id=event1.event_id
)
chain.append(event2)

# Verify chain
print(chain.verify_chain())  # True

# Simulate tampering
chain.events[0].quantity = 1000  # Someone changed the quantity!

# Detect tampering
tampered_idx = chain.detect_tampering()
print(f"Tampering detected at index: {tampered_idx}")  # 0
Enter fullscreen mode Exit fullscreen mode

5. Merkle Tree for Batch Verification

For efficient auditing, we organize events into RFC 6962-compliant Merkle trees:

from typing import Tuple

class MerkleTree:
    def __init__(self, events: List[VCPEvent]):
        self.leaves = [e.event_hash for e in events]
        self.tree = self._build_tree()

    def _hash_pair(self, left: str, right: str) -> str:
        """Hash two nodes together (RFC 6962 style)."""
        # Prefix with 0x01 for internal nodes
        combined = bytes.fromhex("01") + bytes.fromhex(left) + bytes.fromhex(right)
        return hashlib.sha256(combined).hexdigest()

    def _build_tree(self) -> List[List[str]]:
        """Build tree bottom-up."""
        if not self.leaves:
            return [[]]

        # Pad to power of 2
        leaves = self.leaves.copy()
        while len(leaves) & (len(leaves) - 1):
            leaves.append(leaves[-1])  # Duplicate last leaf

        tree = [leaves]
        current_level = leaves

        while len(current_level) > 1:
            next_level = []
            for i in range(0, len(current_level), 2):
                parent = self._hash_pair(current_level[i], current_level[i+1])
                next_level.append(parent)
            tree.append(next_level)
            current_level = next_level

        return tree

    @property
    def root(self) -> str:
        """Return Merkle root hash."""
        return self.tree[-1][0] if self.tree[-1] else ""

    def get_proof(self, index: int) -> List[Tuple[str, str]]:
        """
        Generate inclusion proof for leaf at index.
        Returns list of (hash, direction) tuples.
        """
        if index >= len(self.leaves):
            raise IndexError("Leaf index out of range")

        proof = []
        idx = index

        for level in self.tree[:-1]:
            if idx % 2 == 0:
                sibling_idx = idx + 1
                direction = "right"
            else:
                sibling_idx = idx - 1
                direction = "left"

            if sibling_idx < len(level):
                proof.append((level[sibling_idx], direction))

            idx //= 2

        return proof

    @staticmethod
    def verify_proof(leaf_hash: str, proof: List[Tuple[str, str]], 
                     root: str) -> bool:
        """Verify an inclusion proof."""
        current = leaf_hash

        for sibling_hash, direction in proof:
            if direction == "left":
                combined = bytes.fromhex("01") + bytes.fromhex(sibling_hash) + bytes.fromhex(current)
            else:
                combined = bytes.fromhex("01") + bytes.fromhex(current) + bytes.fromhex(sibling_hash)
            current = hashlib.sha256(combined).hexdigest()

        return current == root


# Usage example
events = [chain.events[0], chain.events[1]]  # From previous example
tree = MerkleTree(events)

print(f"Merkle root: {tree.root}")

# Generate proof for first event
proof = tree.get_proof(0)
print(f"Proof for event 0: {proof}")

# Verify proof
is_valid = MerkleTree.verify_proof(
    events[0].event_hash,
    proof,
    tree.root
)
print(f"Proof valid: {is_valid}")  # True
Enter fullscreen mode Exit fullscreen mode

Why Merkle Trees Matter

For regulatory examination:

  • Without Merkle tree: Auditor must download and verify entire dataset
  • With Merkle tree: Auditor verifies specific events with O(log n) proof

For a dataset of 1 million events:

  • Full verification: 1,000,000 hash operations
  • Merkle proof: ~20 hash operations

6. Digital Signatures with Ed25519

VCP uses Ed25519 (RFC 8032) for event authentication:

from cryptography.hazmat.primitives.asymmetric.ed25519 import (
    Ed25519PrivateKey, Ed25519PublicKey
)
from cryptography.hazmat.primitives import serialization
import base64

class VCPSigner:
    def __init__(self, private_key: Ed25519PrivateKey = None):
        self.private_key = private_key or Ed25519PrivateKey.generate()
        self.public_key = self.private_key.public_key()

    def get_public_key_bytes(self) -> bytes:
        """Export public key in raw format."""
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )

    def get_public_key_b64(self) -> str:
        """Export public key as base64."""
        return base64.b64encode(self.get_public_key_bytes()).decode('ascii')

    def sign_event(self, event: VCPEvent) -> str:
        """Sign event and return base64 signature."""
        # Sign the event_hash (which includes all content + prev_hash)
        message = bytes.fromhex(event.event_hash)
        signature = self.private_key.sign(message)
        return base64.b64encode(signature).decode('ascii')

    @staticmethod
    def verify_signature(public_key_b64: str, event_hash: str, 
                         signature_b64: str) -> bool:
        """Verify event signature."""
        try:
            public_key_bytes = base64.b64decode(public_key_b64)
            public_key = Ed25519PublicKey.from_public_bytes(public_key_bytes)

            message = bytes.fromhex(event_hash)
            signature = base64.b64decode(signature_b64)

            public_key.verify(signature, message)
            return True
        except Exception:
            return False


# Usage
signer = VCPSigner()

# Sign an event
event = chain.events[0]
event.signature = signer.sign_event(event)

print(f"Public key: {signer.get_public_key_b64()}")
print(f"Signature: {event.signature}")

# Verify
is_valid = VCPSigner.verify_signature(
    signer.get_public_key_b64(),
    event.event_hash,
    event.signature
)
print(f"Signature valid: {is_valid}")  # True
Enter fullscreen mode Exit fullscreen mode

Key Management Considerations

For production systems:

# Store keys securely (example using environment variables)
import os

def load_or_create_signer() -> VCPSigner:
    key_b64 = os.environ.get('VCP_PRIVATE_KEY')

    if key_b64:
        # Load existing key
        key_bytes = base64.b64decode(key_b64)
        private_key = Ed25519PrivateKey.from_private_bytes(key_bytes)
        return VCPSigner(private_key)
    else:
        # Generate new key (first run only!)
        signer = VCPSigner()
        print("WARNING: New key generated. Export and store securely:")
        private_bytes = signer.private_key.private_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PrivateFormat.Raw,
            encryption_algorithm=serialization.NoEncryption()
        )
        print(f"VCP_PRIVATE_KEY={base64.b64encode(private_bytes).decode()}")
        return signer
Enter fullscreen mode Exit fullscreen mode

7. Sidecar Architecture

The key to non-invasive integration is the sidecar pattern: VCP runs alongside your trading infrastructure without modifying it.

┌─────────────────────────────────────────────────────────────────┐
│                    Trading Infrastructure                        │
│                                                                  │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐  │
│  │  MT4/MT5 │    │   FIX    │    │   OMS    │    │   EMS    │  │
│  │ Terminal │    │  Engine  │    │          │    │          │  │
│  └────┬─────┘    └────┬─────┘    └────┬─────┘    └────┬─────┘  │
│       │               │               │               │         │
│       │    Message    │    Message    │    Event      │         │
│       │    Stream     │    Stream     │    Stream     │         │
│       └───────────────┼───────────────┼───────────────┘         │
│                       │               │                          │
│                       ▼               ▼                          │
│              ┌─────────────────────────────────┐                │
│              │     VCP Sidecar Adapter         │                │
│              │  ┌───────────────────────────┐  │                │
│              │  │  Protocol Normalizers     │  │                │
│              │  │  - FIX → VCP Event       │  │                │
│              │  │  - MT4 → VCP Event       │  │                │
│              │  │  - REST → VCP Event      │  │                │
│              │  └───────────────────────────┘  │                │
│              └──────────────┬──────────────────┘                │
│                             │                                    │
└─────────────────────────────┼────────────────────────────────────┘
                              │
                              ▼
                    ┌──────────────────┐
                    │    VCP Core      │
                    │  ┌────────────┐  │
                    │  │ Hash Chain │  │
                    │  ├────────────┤  │
                    │  │  Merkle    │  │
                    │  │   Tree     │  │
                    │  ├────────────┤  │
                    │  │  Signer    │  │
                    │  └────────────┘  │
                    └────────┬─────────┘
                             │
              ┌──────────────┼──────────────┐
              │              │              │
              ▼              ▼              ▼
        ┌──────────┐  ┌──────────┐  ┌──────────┐
        │ Form SHO │  │  SLATE   │  │ RTS 25   │
        │   XML    │  │  JSON    │  │  Report  │
        └──────────┘  └──────────┘  └──────────┘
Enter fullscreen mode Exit fullscreen mode

The Sidecar Adapter

from abc import ABC, abstractmethod
from typing import AsyncIterator
import asyncio

class ProtocolAdapter(ABC):
    """Base class for protocol-specific adapters."""

    @abstractmethod
    async def connect(self) -> None:
        """Establish connection to source system."""
        pass

    @abstractmethod
    async def stream_events(self) -> AsyncIterator[VCPEvent]:
        """Yield VCP events from source protocol."""
        pass


class VCPSidecar:
    """Main sidecar process coordinating adapters and chain."""

    def __init__(self, adapters: List[ProtocolAdapter], 
                 signer: VCPSigner):
        self.adapters = adapters
        self.chain = HashChain()
        self.signer = signer
        self.merkle_batch: List[VCPEvent] = []
        self.merkle_interval = 1000  # Events per Merkle tree

    async def run(self):
        """Main event loop."""
        tasks = [self._process_adapter(adapter) for adapter in self.adapters]
        await asyncio.gather(*tasks)

    async def _process_adapter(self, adapter: ProtocolAdapter):
        """Process events from a single adapter."""
        await adapter.connect()

        async for event in adapter.stream_events():
            # Add to hash chain
            self.chain.append(event)

            # Sign the event
            event.signature = self.signer.sign_event(event)

            # Add to Merkle batch
            self.merkle_batch.append(event)

            # Build Merkle tree periodically
            if len(self.merkle_batch) >= self.merkle_interval:
                tree = MerkleTree(self.merkle_batch)
                await self._anchor_merkle_root(tree.root)
                self.merkle_batch = []

            # Persist event
            await self._persist_event(event)

    async def _anchor_merkle_root(self, root: str):
        """Anchor Merkle root to external system (blockchain, etc.)."""
        # Implementation depends on anchoring strategy
        print(f"Anchoring Merkle root: {root[:16]}...")

    async def _persist_event(self, event: VCPEvent):
        """Persist event to storage."""
        # Implementation depends on storage backend
        pass
Enter fullscreen mode Exit fullscreen mode

8. FIX Protocol Integration

For institutional trading, the FIX Protocol is ubiquitous. Here's how to tap into a FIX engine:

import quickfix as fix
from datetime import datetime
from typing import AsyncIterator
import asyncio

class FIXAdapter(ProtocolAdapter):
    """Adapter for FIX Protocol message streams."""

    # FIX tag mappings
    TAG_MAPPINGS = {
        fix.MsgType: 'msg_type',
        fix.ClOrdID: 'order_id',
        fix.Symbol: 'symbol',
        fix.Side: 'side',
        fix.OrderQty: 'quantity',
        fix.Price: 'price',
        fix.OrdStatus: 'status',
        fix.ExecType: 'exec_type',
        fix.ExecID: 'exec_id',
        fix.TransactTime: 'transact_time',
    }

    SIDE_MAP = {'1': 'BUY', '2': 'SELL', '5': 'SHORT'}

    def __init__(self, config_path: str):
        self.config_path = config_path
        self.event_queue: asyncio.Queue = asyncio.Queue()

    async def connect(self) -> None:
        """Initialize FIX session."""
        settings = fix.SessionSettings(self.config_path)
        store_factory = fix.FileStoreFactory(settings)
        log_factory = fix.FileLogFactory(settings)

        self.application = FIXApplication(self.event_queue)
        self.initiator = fix.SocketInitiator(
            self.application, store_factory, settings, log_factory
        )
        self.initiator.start()

    async def stream_events(self) -> AsyncIterator[VCPEvent]:
        """Yield VCP events from FIX messages."""
        while True:
            fix_msg = await self.event_queue.get()
            vcp_event = self._convert_to_vcp(fix_msg)
            if vcp_event:
                yield vcp_event

    def _convert_to_vcp(self, msg: fix.Message) -> Optional[VCPEvent]:
        """Convert FIX message to VCP event."""
        msg_type = fix.MsgType()
        msg.getHeader().getField(msg_type)

        # Map FIX message type to VCP event type
        event_type_map = {
            fix.MsgType_NewOrderSingle: EventType.ORD,
            fix.MsgType_ExecutionReport: self._get_exec_event_type(msg),
            fix.MsgType_OrderCancelRequest: EventType.CXL,
            fix.MsgType_OrderCancelReplaceRequest: EventType.MOD,
        }

        event_type = event_type_map.get(msg_type.getValue())
        if not event_type:
            return None

        # Extract common fields
        symbol = fix.Symbol()
        side = fix.Side()
        qty = fix.OrderQty()
        price = fix.Price()
        order_id = fix.ClOrdID()

        msg.getField(symbol) if msg.isSetField(fix.Symbol()) else None
        msg.getField(side) if msg.isSetField(fix.Side()) else None
        msg.getField(qty) if msg.isSetField(fix.OrderQty()) else None
        msg.getField(price) if msg.isSetField(fix.Price()) else None
        msg.getField(order_id) if msg.isSetField(fix.ClOrdID()) else None

        return VCPEvent(
            event_id=generate_uuid7(),
            event_type=event_type,
            timestamp=datetime.utcnow(),
            actor_id=self._get_sender_comp_id(msg),
            instrument_id=symbol.getValue() if symbol else None,
            quantity=float(qty.getValue()) if qty else None,
            price=float(price.getValue()) if price else None,
            side=self.SIDE_MAP.get(side.getValue()) if side else None,
            order_id=order_id.getValue() if order_id else None,
        )

    def _get_exec_event_type(self, msg: fix.Message) -> EventType:
        """Determine VCP event type from ExecutionReport."""
        exec_type = fix.ExecType()
        msg.getField(exec_type)

        exec_map = {
            fix.ExecType_NEW: EventType.ACK,
            fix.ExecType_FILL: EventType.EXE,
            fix.ExecType_PARTIAL_FILL: EventType.EXE,
            fix.ExecType_REJECTED: EventType.REJ,
            fix.ExecType_CANCELED: EventType.CXL,
        }
        return exec_map.get(exec_type.getValue(), EventType.ACK)

    def _get_sender_comp_id(self, msg: fix.Message) -> str:
        """Extract SenderCompID from message header."""
        sender = fix.SenderCompID()
        msg.getHeader().getField(sender)
        return sender.getValue()


class FIXApplication(fix.Application):
    """QuickFIX application callback handler."""

    def __init__(self, event_queue: asyncio.Queue):
        super().__init__()
        self.event_queue = event_queue

    def fromApp(self, message: fix.Message, session_id: fix.SessionID):
        """Handle incoming application messages."""
        # Put message in queue for async processing
        asyncio.get_event_loop().call_soon_threadsafe(
            self.event_queue.put_nowait, message
        )

    # Other required callbacks (onCreate, onLogon, etc.)
    def onCreate(self, session_id): pass
    def onLogon(self, session_id): pass
    def onLogout(self, session_id): pass
    def toAdmin(self, message, session_id): pass
    def fromAdmin(self, message, session_id): pass
    def toApp(self, message, session_id): pass
Enter fullscreen mode Exit fullscreen mode

9. MT4/MT5 Bridge Implementation

For retail trading platforms, here's an MQL5 bridge:

//+------------------------------------------------------------------+
//|                                              VCPBridge.mq5       |
//|                        VeritasChain Standards Organization       |
//+------------------------------------------------------------------+
#property copyright "VSO"
#property version   "1.00"

#include <Trade\Trade.mqh>
#include <Files\File.mqh>

// VCP Event structure matching Python schema
struct VCPEvent {
    string event_id;
    string event_type;
    datetime timestamp;
    string actor_id;
    string instrument_id;
    double quantity;
    double price;
    string side;
    string order_id;
    string prev_hash;
    string event_hash;
};

// Global state
string g_prev_hash = "0000000000000000000000000000000000000000000000000000000000000000";
int g_socket_handle = INVALID_HANDLE;
string g_actor_id;

//+------------------------------------------------------------------+
//| Expert initialization function                                    |
//+------------------------------------------------------------------+
int OnInit() {
    // Generate actor ID from account info
    g_actor_id = "MT5-" + IntegerToString(AccountInfoInteger(ACCOUNT_LOGIN));

    // Connect to VCP sidecar (local socket)
    g_socket_handle = SocketCreate();
    if(g_socket_handle == INVALID_HANDLE) {
        Print("Failed to create socket");
        return INIT_FAILED;
    }

    if(!SocketConnect(g_socket_handle, "127.0.0.1", 9999, 5000)) {
        Print("Failed to connect to VCP sidecar");
        return INIT_FAILED;
    }

    Print("VCP Bridge initialized for account: ", g_actor_id);
    return INIT_SUCCEEDED;
}

//+------------------------------------------------------------------+
//| Generate UUID v7 (simplified for MQL5)                           |
//+------------------------------------------------------------------+
string GenerateUUID7() {
    // Get millisecond timestamp
    datetime now = TimeCurrent();
    long ms = (long)now * 1000 + GetTickCount() % 1000;

    // Format: timestamp-random-7xxx-random
    string uuid = StringFormat("%012llX", ms);
    uuid += "-";
    uuid += StringFormat("%04X", MathRand() % 0xFFFF);
    uuid += "-7";
    uuid += StringFormat("%03X", MathRand() % 0xFFF);
    uuid += "-";
    uuid += StringFormat("%04X", 0x8000 + MathRand() % 0x3FFF);
    uuid += "-";
    uuid += StringFormat("%012llX", (long)MathRand() << 32 | MathRand());

    return uuid;
}

//+------------------------------------------------------------------+
//| Compute SHA-256 hash (via sidecar)                               |
//+------------------------------------------------------------------+
string ComputeHash(string data) {
    // Send hash request to sidecar
    string request = "HASH:" + data;
    char req_bytes[];
    StringToCharArray(request, req_bytes, 0, WHOLE_ARRAY, CP_UTF8);

    if(SocketSend(g_socket_handle, req_bytes, ArraySize(req_bytes)) < 0) {
        Print("Failed to send hash request");
        return "";
    }

    // Receive hash response
    char response[];
    int received = SocketRead(g_socket_handle, response, 64, 1000);
    if(received <= 0) {
        Print("Failed to receive hash");
        return "";
    }

    return CharArrayToString(response, 0, received, CP_UTF8);
}

//+------------------------------------------------------------------+
//| Create and send VCP event                                         |
//+------------------------------------------------------------------+
void SendVCPEvent(string event_type, string symbol, double qty, 
                  double price, string side, ulong ticket) {
    VCPEvent event;
    event.event_id = GenerateUUID7();
    event.event_type = event_type;
    event.timestamp = TimeCurrent();
    event.actor_id = g_actor_id;
    event.instrument_id = symbol;
    event.quantity = qty;
    event.price = price;
    event.side = side;
    event.order_id = IntegerToString(ticket);
    event.prev_hash = g_prev_hash;

    // Create canonical JSON for hashing
    string json = StringFormat(
        "{\"actor_id\":\"%s\",\"event_id\":\"%s\",\"event_type\":\"%s\","
        "\"instrument_id\":\"%s\",\"order_id\":\"%s\",\"prev_hash\":\"%s\","
        "\"price\":%.5f,\"quantity\":%.2f,\"side\":\"%s\",\"timestamp\":\"%s\"}",
        event.actor_id, event.event_id, event.event_type,
        event.instrument_id, event.order_id, event.prev_hash,
        event.price, event.quantity, event.side,
        TimeToString(event.timestamp, TIME_DATE|TIME_SECONDS)
    );

    // Compute hash
    event.event_hash = ComputeHash(json);

    // Send to sidecar
    string message = "EVENT:" + json + ",\"event_hash\":\"" + event.event_hash + "\"}";
    char msg_bytes[];
    StringToCharArray(message, msg_bytes, 0, WHOLE_ARRAY, CP_UTF8);
    SocketSend(g_socket_handle, msg_bytes, ArraySize(msg_bytes));

    // Update chain state
    g_prev_hash = event.event_hash;

    Print("VCP Event sent: ", event.event_type, " ", symbol, " ", qty, " @ ", price);
}

//+------------------------------------------------------------------+
//| Trade transaction handler                                         |
//+------------------------------------------------------------------+
void OnTradeTransaction(const MqlTradeTransaction& trans,
                        const MqlTradeRequest& request,
                        const MqlTradeResult& result) {
    // Map MT5 transaction types to VCP events
    switch(trans.type) {
        case TRADE_TRANSACTION_ORDER_ADD:
            SendVCPEvent("ORD", trans.symbol, trans.volume, trans.price,
                        trans.order_type == ORDER_TYPE_BUY ? "BUY" : "SELL",
                        trans.order);
            break;

        case TRADE_TRANSACTION_DEAL_ADD:
            SendVCPEvent("EXE", trans.symbol, trans.volume, trans.price,
                        trans.deal_type == DEAL_TYPE_BUY ? "BUY" : "SELL",
                        trans.deal);
            break;

        case TRADE_TRANSACTION_ORDER_DELETE:
            SendVCPEvent("CXL", trans.symbol, trans.volume, trans.price,
                        "", trans.order);
            break;
    }
}

//+------------------------------------------------------------------+
//| Expert deinitialization function                                  |
//+------------------------------------------------------------------+
void OnDeinit(const int reason) {
    if(g_socket_handle != INVALID_HANDLE) {
        SocketClose(g_socket_handle);
    }
    Print("VCP Bridge disconnected");
}
Enter fullscreen mode Exit fullscreen mode

Python Sidecar for MT4/MT5

import asyncio
import hashlib
import json

class MT5SidecarServer:
    """Socket server receiving events from MT5 bridge."""

    def __init__(self, host: str = '127.0.0.1', port: int = 9999):
        self.host = host
        self.port = port
        self.chain = HashChain()
        self.signer = VCPSigner()

    async def start(self):
        server = await asyncio.start_server(
            self.handle_client, self.host, self.port
        )
        print(f"MT5 Sidecar listening on {self.host}:{self.port}")
        async with server:
            await server.serve_forever()

    async def handle_client(self, reader: asyncio.StreamReader, 
                           writer: asyncio.StreamWriter):
        print("MT5 client connected")

        while True:
            try:
                data = await reader.read(4096)
                if not data:
                    break

                message = data.decode('utf-8')

                if message.startswith('HASH:'):
                    # Hash request
                    content = message[5:]
                    hash_result = hashlib.sha256(content.encode()).hexdigest()
                    writer.write(hash_result.encode())
                    await writer.drain()

                elif message.startswith('EVENT:'):
                    # Event submission
                    event_json = message[6:]
                    await self.process_event(event_json)

            except Exception as e:
                print(f"Error: {e}")
                break

        writer.close()
        await writer.wait_closed()

    async def process_event(self, event_json: str):
        """Process incoming VCP event from MT5."""
        try:
            data = json.loads(event_json)

            event = VCPEvent(
                event_id=data['event_id'],
                event_type=EventType(data['event_type']),
                timestamp=datetime.fromisoformat(data['timestamp']),
                actor_id=data['actor_id'],
                instrument_id=data.get('instrument_id'),
                quantity=data.get('quantity'),
                price=data.get('price'),
                side=data.get('side'),
                order_id=data.get('order_id'),
                prev_hash=data.get('prev_hash'),
                event_hash=data.get('event_hash'),
            )

            # Verify hash chain consistency
            if self.chain.events:
                expected_prev = self.chain.events[-1].event_hash
                if event.prev_hash != expected_prev:
                    print(f"WARNING: Chain inconsistency detected!")

            # Add to our chain and sign
            self.chain.events.append(event)
            event.signature = self.signer.sign_event(event)

            print(f"Processed: {event.event_type.value} {event.instrument_id}")

        except Exception as e:
            print(f"Failed to process event: {e}")


# Run the sidecar
if __name__ == "__main__":
    sidecar = MT5SidecarServer()
    asyncio.run(sidecar.start())
Enter fullscreen mode Exit fullscreen mode

10. Multi-Regulation Mapping

The real power of VCP is satisfying multiple regulations from a single event stream:

from abc import ABC, abstractmethod
from typing import List, Dict, Any
import xml.etree.ElementTree as ET

class RegulatoryExporter(ABC):
    """Base class for regulation-specific exporters."""

    @abstractmethod
    def export(self, events: List[VCPEvent]) -> Any:
        pass


class FormSHOExporter(RegulatoryExporter):
    """Export events to SEC Form SHO XML format."""

    def export(self, events: List[VCPEvent]) -> str:
        """Generate Form SHO XML for short position reporting."""
        # Filter to position events and calculate aggregates
        positions = self._aggregate_positions(events)

        root = ET.Element("formSHO")
        root.set("xmlns", "http://www.sec.gov/edgar/formSHO")

        header = ET.SubElement(root, "headerData")
        ET.SubElement(header, "submissionType").text = "SHO"
        ET.SubElement(header, "filerInfo")  # Populated from config

        info_table = ET.SubElement(root, "informationTable")

        for isin, position_data in positions.items():
            if position_data['gross_short'] >= 10_000_000 or \
               position_data['pct_outstanding'] >= 0.025:

                entry = ET.SubElement(info_table, "infoTableEntry")
                ET.SubElement(entry, "issuerName").text = position_data['name']
                ET.SubElement(entry, "issuerCusip").text = position_data['cusip']
                ET.SubElement(entry, "grossShortPosition").text = \
                    str(int(position_data['gross_short']))
                ET.SubElement(entry, "dollarValue").text = \
                    str(int(position_data['dollar_value']))

        return ET.tostring(root, encoding='unicode', method='xml')

    def _aggregate_positions(self, events: List[VCPEvent]) -> Dict:
        """Aggregate events into position snapshots."""
        positions = {}
        for event in events:
            if event.event_type == EventType.POS and event.side == "SHORT":
                isin = event.instrument_id
                if isin not in positions:
                    positions[isin] = {
                        'gross_short': 0,
                        'dollar_value': 0,
                        'name': '',
                        'cusip': '',
                        'pct_outstanding': 0,
                    }
                positions[isin]['gross_short'] = event.quantity
                positions[isin]['dollar_value'] = event.quantity * (event.price or 0)
        return positions


class MiFIDRTS25Exporter(RegulatoryExporter):
    """Export events to MiFID II RTS 25 format."""

    def export(self, events: List[VCPEvent]) -> List[Dict]:
        """Generate RTS 25 execution records."""
        records = []

        for event in events:
            if event.event_type == EventType.EXE:
                record = {
                    "tradingVenueTransactionId": event.event_id,
                    "executingFirmId": event.actor_id,
                    "instrumentId": event.instrument_id,
                    "price": event.price,
                    "quantity": event.quantity,
                    "tradeDatetime": event.timestamp.isoformat() + "Z",
                    "buySellIndicator": "BUYI" if event.side == "BUY" else "SELL",
                    # Clock sync status from VCP metadata
                    "clockSyncStatus": "SYNC",  # Or "NOSYNC" if degraded
                }
                records.append(record)

        return records


class EUAIActExporter(RegulatoryExporter):
    """Export events to EU AI Act Article 12 format."""

    def export(self, events: List[VCPEvent]) -> List[Dict]:
        """Generate AI Act logging records for high-risk AI systems."""
        records = []

        for event in events:
            if event.event_type == EventType.SIG and event.model_id:
                record = {
                    "logId": event.event_id,
                    "timestamp": event.timestamp.isoformat() + "Z",
                    "aiSystemId": event.model_id,
                    "aiSystemVersion": event.model_version,

                    # Article 12(2)(a): periods of use
                    "operationalPeriod": {
                        "start": event.timestamp.isoformat() + "Z",
                        "status": "ACTIVE"
                    },

                    # Article 12(2)(b): input data reference
                    "inputDataReference": event.prev_hash,

                    # Article 12(2)(c): output and decisions
                    "decisionOutput": {
                        "action": event.decision_factors.get("action") if event.decision_factors else None,
                        "confidence": event.confidence_score,
                        "factors": event.decision_factors,
                    },

                    # Cryptographic verification
                    "integrityHash": event.event_hash,
                    "signature": event.signature,
                }
                records.append(record)

        return records


class MultiRegulationExporter:
    """Unified exporter for all supported regulations."""

    def __init__(self):
        self.exporters = {
            'form_sho': FormSHOExporter(),
            'mifid_rts25': MiFIDRTS25Exporter(),
            'eu_ai_act': EUAIActExporter(),
        }

    def export_all(self, events: List[VCPEvent]) -> Dict[str, Any]:
        """Export events to all regulation formats."""
        return {
            name: exporter.export(events)
            for name, exporter in self.exporters.items()
        }


# Usage
exporter = MultiRegulationExporter()
all_outputs = exporter.export_all(chain.events)

print("Form SHO XML:", all_outputs['form_sho'][:200])
print("MiFID RTS 25 records:", len(all_outputs['mifid_rts25']))
print("EU AI Act logs:", len(all_outputs['eu_ai_act']))
Enter fullscreen mode Exit fullscreen mode

Regulation Coverage Matrix

VCP Field Form SHO 10c-1a SLATE MiFID RTS 25 EU AI Act Art. 12
event_id
timestamp ✓ (μs precision)
actor_id
instrument_id
quantity
price
side
model_id
decision_factors
confidence_score
prev_hash
signature

11. Putting It All Together

Here's a complete working example:

import asyncio
from datetime import datetime

async def main():
    # Initialize components
    signer = VCPSigner()
    chain = HashChain()

    print(f"VCP Node initialized")
    print(f"Public key: {signer.get_public_key_b64()[:32]}...")

    # Simulate trading events
    events_data = [
        # AI signal
        {
            "event_type": EventType.SIG,
            "actor_id": "quant-model-v3",
            "instrument_id": "US0378331005",
            "side": "BUY",
            "model_id": "alpha-momentum-v3.2.1",
            "model_version": "3.2.1",
            "decision_factors": {
                "momentum_score": 0.85,
                "mean_reversion": -0.12,
                "volume_signal": 0.67,
            },
            "confidence_score": 0.78,
        },
        # Order submission
        {
            "event_type": EventType.ORD,
            "actor_id": "oms-prod-1",
            "instrument_id": "US0378331005",
            "quantity": 1000,
            "price": 178.50,
            "side": "BUY",
        },
        # Order acknowledgment
        {
            "event_type": EventType.ACK,
            "actor_id": "nasdaq-matching",
            "instrument_id": "US0378331005",
            "quantity": 1000,
            "price": 178.50,
            "side": "BUY",
            "order_id": None,  # Will be linked
        },
        # Execution
        {
            "event_type": EventType.EXE,
            "actor_id": "nasdaq-matching",
            "instrument_id": "US0378331005",
            "quantity": 1000,
            "price": 178.48,
            "side": "BUY",
        },
    ]

    # Process events
    for i, data in enumerate(events_data):
        event = VCPEvent(
            event_id=generate_uuid7(),
            event_type=data["event_type"],
            timestamp=datetime.utcnow(),
            actor_id=data["actor_id"],
            instrument_id=data.get("instrument_id"),
            quantity=data.get("quantity"),
            price=data.get("price"),
            side=data.get("side"),
            model_id=data.get("model_id"),
            model_version=data.get("model_version"),
            decision_factors=data.get("decision_factors"),
            confidence_score=data.get("confidence_score"),
        )

        # Link order_id to previous event if applicable
        if data["event_type"] in [EventType.ACK, EventType.EXE] and chain.events:
            event.order_id = chain.events[-1].event_id

        # Add to chain
        chain.append(event)

        # Sign
        event.signature = signer.sign_event(event)

        print(f"\n{'='*60}")
        print(f"Event {i+1}: {event.event_type.value}")
        print(f"  ID: {event.event_id}")
        print(f"  Hash: {event.event_hash[:32]}...")
        print(f"  Prev: {event.prev_hash[:32]}...")
        print(f"  Sig:  {event.signature[:32]}...")

    # Verify chain integrity
    print(f"\n{'='*60}")
    print(f"Chain verification: {'✓ VALID' if chain.verify_chain() else '✗ INVALID'}")

    # Build Merkle tree
    tree = MerkleTree(chain.events)
    print(f"Merkle root: {tree.root[:32]}...")

    # Generate inclusion proof for first event
    proof = tree.get_proof(0)
    print(f"Proof for event 0: {len(proof)} nodes")

    # Verify proof
    valid = MerkleTree.verify_proof(chain.events[0].event_hash, proof, tree.root)
    print(f"Proof verification: {'✓ VALID' if valid else '✗ INVALID'}")

    # Export to regulatory formats
    exporter = MultiRegulationExporter()
    outputs = exporter.export_all(chain.events)

    print(f"\n{'='*60}")
    print("Regulatory Exports:")
    print(f"  EU AI Act records: {len(outputs['eu_ai_act'])}")
    print(f"  MiFID RTS 25 records: {len(outputs['mifid_rts25'])}")

    # Simulate tampering detection
    print(f"\n{'='*60}")
    print("Simulating tampering...")
    original_qty = chain.events[1].quantity
    chain.events[1].quantity = 10000  # Attacker changes quantity

    tampered_idx = chain.detect_tampering()
    print(f"Tampering detected at event index: {tampered_idx}")

    # Restore
    chain.events[1].quantity = original_qty

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Output

VCP Node initialized
Public key: MCowBQYDK2VwAyEA7Hp8j2mQ...

============================================================
Event 1: SIG
  ID: 018d5f3c-7a2b-7def-8123-456789abcdef
  Hash: a3f2e8c1d4b5a6978012345678...
  Prev: 00000000000000000000000000...
  Sig:  MEUCIQDh8Kj2mNpQ3rS5tU6vW...

============================================================
Event 2: ORD
  ID: 018d5f3c-7a2c-7abc-9234-567890abcdef
  Hash: b4e3f9d2c5a6b7890123456789...
  Prev: a3f2e8c1d4b5a6978012345678...
  Sig:  MEQCIDk9Lm3nOpR4sT6uV7wX...

...

============================================================
Chain verification: ✓ VALID
Merkle root: f8e7d6c5b4a3928170615243...
Proof for event 0: 2 nodes
Proof verification: ✓ VALID

============================================================
Regulatory Exports:
  EU AI Act records: 1
  MiFID RTS 25 records: 1

============================================================
Simulating tampering...
Tampering detected at event index: 1
Enter fullscreen mode Exit fullscreen mode

Conclusion

We've built a complete cryptographic audit trail system that:

  1. Provides tamper evidence through hash chains
  2. Enables efficient verification via Merkle trees
  3. Authenticates events with Ed25519 signatures
  4. Integrates non-invasively via sidecar architecture
  5. Supports multiple protocols (FIX, MT4/MT5)
  6. Satisfies multiple regulations from a single event stream

The SEC's Rule 13f-2 delay gives us time to build this right. The Fifth Circuit's ruling on cumulative economic impact creates regulatory demand for solutions that reduce aggregate compliance burden.

This is that solution.


Resources


Questions? Reach out at developers@veritaschain.org or open an issue on GitHub.

#fintech #cryptography #compliance #python #trading

Top comments (0)