DEV Community

Cover image for Building Tamper-Evident Audit Trails: A Developer's Guide to Cryptographic Logging for AI Systems

Building Tamper-Evident Audit Trails: A Developer's Guide to Cryptographic Logging for AI Systems

How to implement hash chains, Merkle trees, and external anchoring—with code examples in Python and TypeScript


The EU AI Act's Article 12 mandates "automatic recording of events (logs)" for high-risk AI systems. What it doesn't specify is how those logs should resist tampering. Traditional database audit tables fail this requirement by design—anyone with admin privileges can modify historical records without detection.

This guide walks through implementing cryptographic audit trails that make tampering mathematically detectable. We'll cover hash chain construction, Merkle tree aggregation, external anchoring, and the sidecar integration pattern that keeps audit logic isolated from production systems.

What you'll learn:

  • How hash chains create tamper-evident sequential logs
  • Why Merkle trees enable efficient batch verification
  • How external anchoring provides third-party verifiability
  • Practical integration patterns for existing trading systems
  • Performance considerations for high-throughput environments

The Problem with Traditional Audit Logs

Consider a typical SQL audit table:

CREATE TABLE audit_log (
    id SERIAL PRIMARY KEY,
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    event_type VARCHAR(50),
    payload JSONB,
    user_id VARCHAR(100)
);
Enter fullscreen mode Exit fullscreen mode

This approach has a fundamental flaw: mutability. A database administrator can execute:

UPDATE audit_log SET payload = '{"modified": true}' WHERE id = 12345;
DELETE FROM audit_log WHERE timestamp BETWEEN '2025-01-01' AND '2025-01-02';
Enter fullscreen mode Exit fullscreen mode

No trace remains. The audit log's integrity depends entirely on trusting everyone with database access.

For regulatory compliance (EU AI Act, MiFID II, SEC Rule 17a-4), "trust me" isn't sufficient. You need mathematical proof that records haven't been altered.


Layer 1: Hash Chain Construction

A hash chain links each record to its predecessor through cryptographic hashing. Any modification to a historical record breaks the chain—and the break is detectable.

The Core Concept

┌─────────────────────────────────────────────────────────────────────┐
│                         Hash Chain Structure                         │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   ┌─────────┐      ┌─────────┐      ┌─────────┐      ┌─────────┐   │
│   │ Event 1 │─────▶│ Event 2 │─────▶│ Event 3 │─────▶│ Event 4 │   │
│   │ prev: 0 │      │ prev: h₁│      │ prev: h₂│      │ prev: h₃│   │
│   │ hash: h₁│      │ hash: h₂│      │ hash: h₃│      │ hash: h₄│   │
│   └─────────┘      └─────────┘      └─────────┘      └─────────┘   │
│                                                                      │
│   If Event 2 is modified → h₂ changes → h₃ references wrong hash    │
│   Chain verification fails at Event 3                                │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Python Implementation

import hashlib
import json
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class AuditEvent:
    event_id: str
    trace_id: str
    timestamp_iso: str
    timestamp_int: str  # Nanoseconds as string for precision
    event_type: str
    prev_hash: str
    payload: dict

    def to_canonical_json(self) -> str:
        """RFC 8785 JSON Canonicalization Scheme (JCS) simplified."""
        # For production, use a proper JCS library
        return json.dumps(asdict(self), sort_keys=True, separators=(',', ':'))

    def compute_hash(self) -> str:
        """SHA-256 hash of canonical JSON representation."""
        canonical = self.to_canonical_json()
        return hashlib.sha256(canonical.encode('utf-8')).hexdigest()


class HashChainLogger:
    def __init__(self):
        self.events: list[AuditEvent] = []
        self.current_hash = "0" * 64  # Genesis: 64 zeros

    def generate_uuid_v7(self) -> str:
        """UUID v7: Time-ordered unique identifier."""
        # Simplified implementation - use uuid7 library in production
        timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
        random_bits = uuid.uuid4().hex[12:]
        uuid_hex = f"{timestamp_ms:012x}{random_bits}"
        return f"{uuid_hex[:8]}-{uuid_hex[8:12]}-7{uuid_hex[13:16]}-{uuid_hex[16:20]}-{uuid_hex[20:32]}"

    def log_event(self, event_type: str, trace_id: str, payload: dict) -> AuditEvent:
        """Create and append a new event to the chain."""
        now = datetime.now(timezone.utc)

        event = AuditEvent(
            event_id=self.generate_uuid_v7(),
            trace_id=trace_id,
            timestamp_iso=now.isoformat(),
            timestamp_int=str(int(now.timestamp() * 1_000_000_000)),
            event_type=event_type,
            prev_hash=self.current_hash,
            payload=payload
        )

        self.current_hash = event.compute_hash()
        self.events.append(event)
        return event

    def verify_chain(self) -> tuple[bool, Optional[int]]:
        """Verify entire chain integrity. Returns (valid, first_broken_index)."""
        expected_prev = "0" * 64

        for i, event in enumerate(self.events):
            # Check prev_hash linkage
            if event.prev_hash != expected_prev:
                return False, i

            # Update expected for next iteration
            expected_prev = event.compute_hash()

        return True, None


# Usage example
logger = HashChainLogger()
trade_trace = logger.generate_uuid_v7()

# Log a trade lifecycle
signal = logger.log_event("SIG", trade_trace, {
    "symbol": "EURUSD",
    "direction": "BUY",
    "model_version": "v2.3.1"
})

order = logger.log_event("ORD", trade_trace, {
    "order_id": "ORD-123456",
    "quantity": "100000",  # String for precision
    "price": "1.08523"
})

execution = logger.log_event("EXE", trade_trace, {
    "execution_id": "EXE-789012",
    "fill_price": "1.08525",
    "fill_quantity": "100000"
})

# Verify the chain
is_valid, broken_at = logger.verify_chain()
print(f"Chain valid: {is_valid}")  # True
Enter fullscreen mode Exit fullscreen mode

TypeScript Implementation

import { createHash, randomUUID } from 'crypto';

interface AuditEvent {
  event_id: string;
  trace_id: string;
  timestamp_iso: string;
  timestamp_int: string;
  event_type: string;
  prev_hash: string;
  payload: Record<string, unknown>;
}

class HashChainLogger {
  private events: AuditEvent[] = [];
  private currentHash: string = '0'.repeat(64);

  private toCanonicalJson(event: AuditEvent): string {
    // RFC 8785 JCS - simplified version
    return JSON.stringify(event, Object.keys(event).sort());
  }

  private computeHash(event: AuditEvent): string {
    const canonical = this.toCanonicalJson(event);
    return createHash('sha256').update(canonical).digest('hex');
  }

  private generateUuidV7(): string {
    // Simplified - use uuid library with v7 support in production
    const timestampMs = Date.now();
    const randomHex = randomUUID().replace(/-/g, '').slice(12);
    const hex = timestampMs.toString(16).padStart(12, '0') + randomHex;
    return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-7${hex.slice(13, 16)}-${hex.slice(16, 20)}-${hex.slice(20, 32)}`;
  }

  logEvent(eventType: string, traceId: string, payload: Record<string, unknown>): AuditEvent {
    const now = new Date();

    const event: AuditEvent = {
      event_id: this.generateUuidV7(),
      trace_id: traceId,
      timestamp_iso: now.toISOString(),
      timestamp_int: (BigInt(now.getTime()) * 1_000_000n).toString(),
      event_type: eventType,
      prev_hash: this.currentHash,
      payload
    };

    this.currentHash = this.computeHash(event);
    this.events.push(event);
    return event;
  }

  verifyChain(): { valid: boolean; brokenAt?: number } {
    let expectedPrev = '0'.repeat(64);

    for (let i = 0; i < this.events.length; i++) {
      const event = this.events[i];

      if (event.prev_hash !== expectedPrev) {
        return { valid: false, brokenAt: i };
      }

      expectedPrev = this.computeHash(event);
    }

    return { valid: true };
  }
}
Enter fullscreen mode Exit fullscreen mode

Critical Implementation Notes

1. Use String Types for Financial Values

IEEE 754 floating-point cannot precisely represent all decimal values:

# BAD: Floating point precision loss
{"price": 1.08523}  # May become 1.0852299999999999

# GOOD: String representation preserves exact value
{"price": "1.08523"}
Enter fullscreen mode Exit fullscreen mode

2. Dual Timestamp Format

Store both human-readable and machine-precise timestamps:

{
    "timestamp_iso": "2025-01-16T14:30:00.123456789Z",  # Human readable
    "timestamp_int": "1737038400123456789"              # Nanoseconds since epoch
}
Enter fullscreen mode Exit fullscreen mode

3. Genesis Event

The first event in any chain must have prev_hash set to 64 zeros:

GENESIS_PREV_HASH = "0" * 64  # 0000000000000000000000000000000000000000000000000000000000000000
Enter fullscreen mode Exit fullscreen mode

Layer 2: Merkle Tree Aggregation

Hash chains prove sequential integrity, but verifying a chain of 1 million events requires 1 million hash computations. Merkle trees enable efficient batch verification and selective disclosure.

The Structure

                              ┌──────────────┐
                              │  Merkle Root │
                              │    (R)       │
                              └──────┬───────┘
                                     │
                    ┌────────────────┴────────────────┐
                    │                                 │
              ┌─────┴─────┐                     ┌─────┴─────┐
              │   H(AB)   │                     │   H(CD)   │
              └─────┬─────┘                     └─────┬─────┘
                    │                                 │
           ┌────────┴────────┐               ┌────────┴────────┐
           │                 │               │                 │
      ┌────┴────┐       ┌────┴────┐     ┌────┴────┐       ┌────┴────┐
      │  H(A)   │       │  H(B)   │     │  H(C)   │       │  H(D)   │
      └────┬────┘       └────┬────┘     └────┬────┘       └────┬────┘
           │                 │               │                 │
      ┌────┴────┐       ┌────┴────┐     ┌────┴────┐       ┌────┴────┐
      │ Event A │       │ Event B │     │ Event C │       │ Event D │
      └─────────┘       └─────────┘     └─────────┘       └─────────┘

To prove Event B exists:
- Provide: H(A), H(CD)
- Verifier computes: H(H(A) || H(B)) = H(AB), then H(H(AB) || H(CD)) = R
- Compare with published Root R
Enter fullscreen mode Exit fullscreen mode

Python Implementation (RFC 6962 Compatible)

import hashlib
from typing import Optional


class MerkleTree:
    """RFC 6962 Certificate Transparency compatible Merkle tree."""

    def __init__(self):
        self.leaves: list[bytes] = []

    def _hash_leaf(self, data: bytes) -> bytes:
        """RFC 6962: Hash leaf with 0x00 prefix."""
        return hashlib.sha256(b'\x00' + data).digest()

    def _hash_node(self, left: bytes, right: bytes) -> bytes:
        """RFC 6962: Hash internal node with 0x01 prefix."""
        return hashlib.sha256(b'\x01' + left + right).digest()

    def add_leaf(self, data: bytes) -> int:
        """Add a leaf and return its index."""
        leaf_hash = self._hash_leaf(data)
        self.leaves.append(leaf_hash)
        return len(self.leaves) - 1

    def compute_root(self) -> Optional[bytes]:
        """Compute Merkle root using RFC 6962 algorithm."""
        if not self.leaves:
            return None

        # Copy leaves to avoid mutation
        current_level = list(self.leaves)

        while len(current_level) > 1:
            next_level = []

            for i in range(0, len(current_level), 2):
                left = current_level[i]
                # If odd number of nodes, duplicate the last one
                right = current_level[i + 1] if i + 1 < len(current_level) else left
                next_level.append(self._hash_node(left, right))

            current_level = next_level

        return current_level[0]

    def get_proof(self, leaf_index: int) -> list[tuple[bytes, str]]:
        """Generate inclusion proof for a leaf."""
        if leaf_index >= len(self.leaves):
            raise ValueError(f"Leaf index {leaf_index} out of range")

        proof = []
        current_level = list(self.leaves)
        index = leaf_index

        while len(current_level) > 1:
            next_level = []

            for i in range(0, len(current_level), 2):
                left = current_level[i]
                right = current_level[i + 1] if i + 1 < len(current_level) else left
                next_level.append(self._hash_node(left, right))

                # If this pair contains our target, record the sibling
                if i == index or i + 1 == index:
                    if i == index:
                        proof.append((right, 'right'))
                    else:
                        proof.append((left, 'left'))

            current_level = next_level
            index = index // 2

        return proof

    @staticmethod
    def verify_proof(
        leaf_data: bytes,
        proof: list[tuple[bytes, str]],
        expected_root: bytes
    ) -> bool:
        """Verify an inclusion proof."""
        # Hash the leaf with prefix
        current = hashlib.sha256(b'\x00' + leaf_data).digest()

        for sibling, direction in proof:
            if direction == 'left':
                current = hashlib.sha256(b'\x01' + sibling + current).digest()
            else:
                current = hashlib.sha256(b'\x01' + current + sibling).digest()

        return current == expected_root


# Integration with hash chain
class AuditChainWithMerkle:
    def __init__(self):
        self.chain_logger = HashChainLogger()
        self.merkle_tree = MerkleTree()
        self.event_to_leaf: dict[str, int] = {}

    def log_event(self, event_type: str, trace_id: str, payload: dict) -> AuditEvent:
        """Log event to both hash chain and Merkle tree."""
        event = self.chain_logger.log_event(event_type, trace_id, payload)

        # Add to Merkle tree
        event_bytes = event.to_canonical_json().encode('utf-8')
        leaf_index = self.merkle_tree.add_leaf(event_bytes)
        self.event_to_leaf[event.event_id] = leaf_index

        return event

    def get_merkle_root(self) -> Optional[str]:
        """Get current Merkle root as hex string."""
        root = self.merkle_tree.compute_root()
        return root.hex() if root else None

    def get_inclusion_proof(self, event_id: str) -> dict:
        """Generate Merkle inclusion proof for an event."""
        if event_id not in self.event_to_leaf:
            raise ValueError(f"Event {event_id} not found")

        leaf_index = self.event_to_leaf[event_id]
        proof = self.merkle_tree.get_proof(leaf_index)

        return {
            "event_id": event_id,
            "leaf_index": leaf_index,
            "merkle_root": self.get_merkle_root(),
            "proof": [
                {"hash": h.hex(), "direction": d}
                for h, d in proof
            ]
        }
Enter fullscreen mode Exit fullscreen mode

Layer 3: External Anchoring

Hash chains and Merkle trees prove integrity within your system. External anchoring proves integrity to third parties by publishing commitments to immutable external sources.

Anchoring Options

Method Immutability Cost Latency Best For
OpenTimestamps (Bitcoin) ~10 years proven Free ~2 hours Most use cases
Ethereum ~8 years proven $1-50/tx ~15 sec Smart contract integration
Public notary services Legal guarantees Varies Minutes Regulatory preference
Multiple chains Defense in depth Sum of above Varies Maximum assurance

OpenTimestamps Integration

import subprocess
import tempfile
from pathlib import Path


class OpenTimestampsAnchor:
    """Anchor Merkle roots to Bitcoin via OpenTimestamps."""

    def __init__(self, calendar_urls: list[str] = None):
        self.calendars = calendar_urls or [
            "https://alice.btc.calendar.opentimestamps.org",
            "https://bob.btc.calendar.opentimestamps.org",
            "https://finney.calendar.opentimestamps.org"
        ]

    def create_timestamp(self, merkle_root: bytes) -> bytes:
        """Create OpenTimestamps proof for a Merkle root."""
        with tempfile.NamedTemporaryFile(delete=False, suffix='.bin') as f:
            f.write(merkle_root)
            input_path = f.name

        output_path = input_path + '.ots'

        try:
            # Create timestamp using ots CLI
            subprocess.run(
                ['ots', 'stamp', input_path],
                check=True,
                capture_output=True
            )

            with open(output_path, 'rb') as f:
                return f.read()
        finally:
            Path(input_path).unlink(missing_ok=True)
            Path(output_path).unlink(missing_ok=True)

    def verify_timestamp(self, merkle_root: bytes, proof: bytes) -> dict:
        """Verify an OpenTimestamps proof."""
        with tempfile.NamedTemporaryFile(delete=False, suffix='.bin') as f:
            f.write(merkle_root)
            input_path = f.name

        proof_path = input_path + '.ots'

        try:
            with open(proof_path, 'wb') as f:
                f.write(proof)

            result = subprocess.run(
                ['ots', 'verify', proof_path],
                capture_output=True,
                text=True
            )

            return {
                "verified": result.returncode == 0,
                "output": result.stdout,
                "error": result.stderr if result.returncode != 0 else None
            }
        finally:
            Path(input_path).unlink(missing_ok=True)
            Path(proof_path).unlink(missing_ok=True)


# Complete anchoring workflow
class AnchoredAuditSystem:
    def __init__(self):
        self.audit_chain = AuditChainWithMerkle()
        self.anchor = OpenTimestampsAnchor()
        self.anchor_records: list[dict] = []

    def perform_daily_anchor(self) -> dict:
        """Anchor current state to Bitcoin."""
        merkle_root = self.audit_chain.merkle_tree.compute_root()
        if not merkle_root:
            return {"error": "No events to anchor"}

        # Create timestamp
        proof = self.anchor.create_timestamp(merkle_root)

        record = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "merkle_root": merkle_root.hex(),
            "event_count": len(self.audit_chain.chain_logger.events),
            "proof": proof.hex()
        }

        self.anchor_records.append(record)
        return record
Enter fullscreen mode Exit fullscreen mode

The Sidecar Integration Pattern

For production systems, audit logging must not impact trading performance. The sidecar pattern runs audit infrastructure alongside your trading system without modifying it.

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                     Production Trading System                        │
│                                                                      │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │   Signal     │───▶│    Order     │───▶│  Execution   │          │
│  │  Generator   │    │   Manager    │    │   Engine     │          │
│  └──────┬───────┘    └──────┬───────┘    └──────┬───────┘          │
│         │                   │                   │                   │
│         │ Drop-copy         │ Drop-copy         │ Drop-copy         │
│         ▼                   ▼                   ▼                   │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    Event Message Queue                        │  │
│  │              (Redis / Kafka / ZeroMQ / File)                  │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                │                                    │
└────────────────────────────────┼────────────────────────────────────┘
                                 │
                     ┌───────────┴───────────┐
                     │     VCP Sidecar       │
                     │  ┌─────────────────┐  │
                     │  │ Event Consumer  │  │
                     │  │ Hash Chain Bld  │  │
                     │  │ Merkle Tree Bld │  │
                     │  │ Signature Gen   │  │
                     │  │ Anchor Manager  │  │
                     │  └─────────────────┘  │
                     └───────────────────────┘
                                 │
                     ┌───────────┴───────────┐
                     │    VCP Event Store    │
                     │  (vcp_events.jsonl)   │
                     └───────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Key Benefits

  1. Zero latency impact: Trading path never touches audit logic
  2. Failure isolation: Sidecar crash doesn't affect trading
  3. Independent scaling: Audit processing scales separately
  4. Retrofittable: Works with existing systems via drop-copy

Python Sidecar Implementation

import asyncio
import aiofiles
import json
from dataclasses import dataclass
from typing import Callable, Optional


@dataclass
class SidecarConfig:
    input_queue: str  # Redis key, Kafka topic, or file path
    output_file: str
    anchor_interval_hours: int = 24
    batch_size: int = 100


class VCPSidecar:
    def __init__(self, config: SidecarConfig):
        self.config = config
        self.audit_chain = AuditChainWithMerkle()
        self.anchor = OpenTimestampsAnchor()
        self.running = False

    async def process_event(self, raw_event: dict) -> None:
        """Process a single event from the trading system."""
        # Map trading system event to VCP event type
        event_type = self._map_event_type(raw_event)
        trace_id = raw_event.get('order_id') or raw_event.get('trace_id')

        # Log to audit chain
        vcp_event = self.audit_chain.log_event(
            event_type=event_type,
            trace_id=trace_id,
            payload=raw_event
        )

        # Persist to JSONL file
        async with aiofiles.open(self.config.output_file, 'a') as f:
            await f.write(json.dumps({
                **vcp_event.__dict__,
                "hash": vcp_event.compute_hash()
            }) + '\n')

    def _map_event_type(self, raw_event: dict) -> str:
        """Map trading system event types to VCP codes."""
        type_map = {
            'signal': 'SIG',
            'order_new': 'ORD',
            'order_ack': 'ACK',
            'fill': 'EXE',
            'partial_fill': 'PRT',
            'reject': 'REJ',
            'cancel': 'CXL',
            'modify': 'MOD'
        }
        return type_map.get(raw_event.get('type', ''), 'UNK')

    async def run_anchor_scheduler(self) -> None:
        """Schedule periodic anchoring."""
        while self.running:
            await asyncio.sleep(self.config.anchor_interval_hours * 3600)

            merkle_root = self.audit_chain.merkle_tree.compute_root()
            if merkle_root:
                proof = self.anchor.create_timestamp(merkle_root)

                # Store anchor record
                anchor_record = {
                    "anchored_at": datetime.now(timezone.utc).isoformat(),
                    "merkle_root": merkle_root.hex(),
                    "event_count": len(self.audit_chain.chain_logger.events),
                    "ots_proof": proof.hex()
                }

                async with aiofiles.open(
                    self.config.output_file.replace('.jsonl', '_anchors.json'),
                    'a'
                ) as f:
                    await f.write(json.dumps(anchor_record) + '\n')

    async def start(self, event_source: Callable) -> None:
        """Start the sidecar with provided event source."""
        self.running = True

        # Start anchor scheduler
        anchor_task = asyncio.create_task(self.run_anchor_scheduler())

        try:
            async for raw_event in event_source():
                await self.process_event(raw_event)
        finally:
            self.running = False
            anchor_task.cancel()


# Example: File-based event source (for MT5/cTrader integration)
async def file_event_source(watch_path: str):
    """Watch a file for new events (simple polling implementation)."""
    last_position = 0

    while True:
        try:
            async with aiofiles.open(watch_path, 'r') as f:
                await f.seek(last_position)
                content = await f.read()
                last_position = await f.tell()

                for line in content.strip().split('\n'):
                    if line:
                        yield json.loads(line)
        except FileNotFoundError:
            pass

        await asyncio.sleep(0.1)  # 100ms polling interval


# Usage
async def main():
    config = SidecarConfig(
        input_queue="/tmp/trading_events.jsonl",
        output_file="/var/log/vcp/audit_chain.jsonl",
        anchor_interval_hours=24
    )

    sidecar = VCPSidecar(config)
    await sidecar.start(
        lambda: file_event_source(config.input_queue)
    )
Enter fullscreen mode Exit fullscreen mode

MQL5 Integration Example

For MetaTrader 5 Expert Advisors, the sidecar pattern uses file-based communication:

//+------------------------------------------------------------------+
//| VCP Event Logger for MT5                                          |
//+------------------------------------------------------------------+
#property copyright "Your Company"
#property strict

string VCP_SIGNAL_FILE = "signals.json";
string VCP_RESPONSE_FILE = "response.txt";

//+------------------------------------------------------------------+
//| Structure for VCP events                                          |
//+------------------------------------------------------------------+
struct VCPSignalEvent {
    string symbol;
    string direction;
    double price;
    double quantity;
    string model_version;
    datetime timestamp;
};

//+------------------------------------------------------------------+
//| Write signal event to file for sidecar pickup                     |
//+------------------------------------------------------------------+
bool LogSignalEvent(VCPSignalEvent &event) {
    int handle = FileOpen(VCP_SIGNAL_FILE, FILE_WRITE|FILE_TXT|FILE_ANSI);

    if(handle == INVALID_HANDLE) {
        Print("Error opening VCP signal file: ", GetLastError());
        return false;
    }

    // Format as JSON
    string json = StringFormat(
        "{\"type\":\"signal\",\"symbol\":\"%s\",\"direction\":\"%s\","
        "\"price\":\"%s\",\"quantity\":\"%s\",\"model_version\":\"%s\","
        "\"timestamp\":\"%s\"}",
        event.symbol,
        event.direction,
        DoubleToString(event.price, 5),  // String for precision
        DoubleToString(event.quantity, 2),
        event.model_version,
        TimeToString(event.timestamp, TIME_DATE|TIME_SECONDS)
    );

    FileWriteString(handle, json + "\n");
    FileClose(handle);

    return true;
}

//+------------------------------------------------------------------+
//| Log order event                                                   |
//+------------------------------------------------------------------+
bool LogOrderEvent(ulong ticket, string action) {
    int handle = FileOpen(VCP_SIGNAL_FILE, FILE_WRITE|FILE_TXT|FILE_ANSI|FILE_SHARE_READ);

    if(handle == INVALID_HANDLE) return false;

    if(PositionSelectByTicket(ticket)) {
        string json = StringFormat(
            "{\"type\":\"%s\",\"order_id\":\"%d\",\"symbol\":\"%s\","
            "\"price\":\"%s\",\"volume\":\"%s\",\"timestamp\":\"%s\"}",
            action,
            ticket,
            PositionGetString(POSITION_SYMBOL),
            DoubleToString(PositionGetDouble(POSITION_PRICE_OPEN), 5),
            DoubleToString(PositionGetDouble(POSITION_VOLUME), 2),
            TimeToString(TimeCurrent(), TIME_DATE|TIME_SECONDS)
        );

        FileWriteString(handle, json + "\n");
    }

    FileClose(handle);
    return true;
}

//+------------------------------------------------------------------+
//| Example: Trading with VCP logging                                 |
//+------------------------------------------------------------------+
void OnTick() {
    // Your trading logic generates a signal
    if(/* signal condition */) {
        VCPSignalEvent signal;
        signal.symbol = Symbol();
        signal.direction = "BUY";
        signal.price = SymbolInfoDouble(Symbol(), SYMBOL_ASK);
        signal.quantity = 1.0;
        signal.model_version = "v1.0.0";
        signal.timestamp = TimeCurrent();

        // Log signal for VCP sidecar
        LogSignalEvent(signal);

        // Execute trade
        MqlTradeRequest request = {};
        MqlTradeResult result = {};

        request.action = TRADE_ACTION_DEAL;
        request.symbol = Symbol();
        request.volume = signal.quantity;
        request.type = ORDER_TYPE_BUY;
        request.price = signal.price;

        if(OrderSend(request, result)) {
            // Log order execution
            LogOrderEvent(result.order, "order_new");
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Performance Considerations

Benchmark: Hash Chain Operations

Operation Time (μs) Notes
JSON canonicalization ~5-10 Depends on payload size
SHA-256 hash ~0.5-1 Per 1KB of data
Ed25519 signature ~50-100 CPU implementation
Ed25519 signature (HSM) ~5-20 Hardware accelerated
Merkle proof generation ~1-5 Log₂(n) operations

Throughput Targets by Tier

Tier Target Events/sec Architecture
Silver 1,000+ Async sidecar, batch writes
Gold 10,000+ Dedicated process, memory-mapped files
Platinum 100,000+ FPGA acceleration, kernel bypass

Optimization Strategies

# 1. Batch writes instead of per-event I/O
class BatchingWriter:
    def __init__(self, path: str, batch_size: int = 100):
        self.path = path
        self.batch_size = batch_size
        self.buffer: list[str] = []

    async def write(self, line: str):
        self.buffer.append(line)
        if len(self.buffer) >= self.batch_size:
            await self.flush()

    async def flush(self):
        if self.buffer:
            async with aiofiles.open(self.path, 'a') as f:
                await f.write('\n'.join(self.buffer) + '\n')
            self.buffer.clear()


# 2. Pre-compute static fields
STATIC_FIELDS = {
    "vcp_version": "1.1",
    "producer_id": "trading-system-01",
    "clock_sync_status": "NTP_SYNCED"
}

def create_event(event_type: str, payload: dict) -> dict:
    return {
        **STATIC_FIELDS,
        "event_type": event_type,
        "timestamp_int": str(time.time_ns()),
        "payload": payload
    }


# 3. Use memory-mapped files for high-throughput
import mmap

class MMapEventStore:
    def __init__(self, path: str, size_mb: int = 100):
        self.fd = open(path, 'r+b')
        self.mm = mmap.mmap(self.fd.fileno(), size_mb * 1024 * 1024)
        self.position = 0

    def append(self, data: bytes):
        self.mm[self.position:self.position + len(data)] = data
        self.position += len(data)
Enter fullscreen mode Exit fullscreen mode

Testing Your Implementation

Conformance Test Categories

Category Test Count What It Validates
Schema Validation 25 JSON structure, field types
UUID v7 10 Format, time-ordering
Timestamp 12 Dual format, precision
Hash Chain 15 Genesis, continuity
Merkle Proof 8 Structure, verification
Integration 15 End-to-end flows

Example Test Suite

import pytest


class TestHashChainConformance:
    def test_genesis_event_has_zero_prev_hash(self):
        logger = HashChainLogger()
        event = logger.log_event("SIG", "trace-1", {"test": True})

        assert event.prev_hash == "0" * 64

    def test_chain_linkage_is_correct(self):
        logger = HashChainLogger()

        e1 = logger.log_event("SIG", "t1", {})
        e2 = logger.log_event("ORD", "t1", {})

        assert e2.prev_hash == e1.compute_hash()

    def test_modification_breaks_chain(self):
        logger = HashChainLogger()

        logger.log_event("SIG", "t1", {})
        logger.log_event("ORD", "t1", {})
        logger.log_event("EXE", "t1", {})

        # Tamper with middle event
        logger.events[1].payload["tampered"] = True

        is_valid, broken_at = logger.verify_chain()

        assert not is_valid
        assert broken_at == 2  # Chain breaks at event after tampered one

    def test_uuid_v7_ordering(self):
        logger = HashChainLogger()

        ids = [logger.generate_uuid_v7() for _ in range(100)]

        # UUID v7 should be lexicographically orderable by time
        assert ids == sorted(ids)


class TestMerkleTreeConformance:
    def test_rfc6962_leaf_prefix(self):
        tree = MerkleTree()

        # RFC 6962 requires 0x00 prefix for leaf hashing
        data = b"test"
        leaf_hash = tree._hash_leaf(data)
        expected = hashlib.sha256(b'\x00' + data).digest()

        assert leaf_hash == expected

    def test_inclusion_proof_verifies(self):
        tree = MerkleTree()

        events = [f"event-{i}".encode() for i in range(8)]
        for e in events:
            tree.add_leaf(e)

        root = tree.compute_root()
        proof = tree.get_proof(3)

        assert MerkleTree.verify_proof(events[3], proof, root)

    def test_tampered_leaf_fails_verification(self):
        tree = MerkleTree()

        events = [f"event-{i}".encode() for i in range(8)]
        for e in events:
            tree.add_leaf(e)

        root = tree.compute_root()
        proof = tree.get_proof(3)

        # Try to verify with different data
        tampered = b"event-TAMPERED"

        assert not MerkleTree.verify_proof(tampered, proof, root)
Enter fullscreen mode Exit fullscreen mode

Conclusion: The Implementation Checklist

Before deploying to production, verify:

Hash Chain Layer:

  • [ ] Genesis event uses 64-zero prev_hash
  • [ ] All financial values stored as strings
  • [ ] Dual timestamp format (ISO + nanoseconds)
  • [ ] UUID v7 for event IDs (time-ordered)
  • [ ] RFC 8785 JSON canonicalization for hashing

Merkle Tree Layer:

  • [ ] RFC 6962 compatible (0x00 leaf prefix, 0x01 node prefix)
  • [ ] Inclusion proofs verify correctly
  • [ ] Handles odd numbers of leaves (duplicate last)

External Anchoring:

  • [ ] Regular anchoring schedule (at least daily)
  • [ ] Anchor proofs stored with audit records
  • [ ] Verification procedure documented

Integration:

  • [ ] Sidecar pattern (no trading path impact)
  • [ ] Failure doesn't affect production
  • [ ] Recovery procedure for gaps documented

Testing:

  • [ ] All conformance tests pass
  • [ ] Tampering detection verified
  • [ ] Performance benchmarks meet requirements

The complete reference implementation is available at github.com/veritaschain. The VCP v1.1 specification provides additional details on event type codes, regulatory mappings, and certification requirements.

Questions or issues? Open a GitHub issue or reach out at technical@veritaschain.org.


Tags: #cryptography #audittrail #python #typescript #blockchain #fintech #security #compliance #euaiact #mifidii


This article is part of the VeritasChain Protocol documentation series. VCP is an open standard for tamper-evident audit trails, developed by the VeritasChain Standards Organization (VSO) and published under Apache 2.0 / CC BY 4.0 licenses.

Top comments (0)