DEV Community

Ed25519 + Merkle Tree + UUIDv7 = Building Tamper-Proof Decision Logs

TL;DR: This article shows how to combine three cryptographic primitives—Ed25519 signatures, Merkle trees, and UUIDv7 identifiers—to create an immutable audit trail for AI and algorithmic systems. Full Python implementation included.


The Problem: AI Decisions at the Speed of Light

Your trading algorithm just made 10,000 decisions in the last second. Your ML model approved 500 loan applications. Your autonomous system executed a critical maneuver.

Now someone asks: "What exactly happened at 14:32:07.847?"

Can you prove:

  1. What decision was made?
  2. When it was made (with cryptographic certainty)?
  3. That nothing was altered after the fact?

Traditional logging fails here. Database records can be modified. Timestamps can be forged. Log files can be edited. When AI systems operate faster than human comprehension, we need something stronger than "trust us."

We need cryptographic proof.


The Three Pillars

Let's build a tamper-proof audit trail using three battle-tested primitives:

Primitive Purpose Standard
UUIDv7 Time-ordered unique identifiers RFC 9562
Hash Chain Tamper-evident linking SHA-256
Ed25519 Digital signatures RFC 8032
Merkle Tree Efficient batch verification RFC 6962

Each solves a specific problem. Together, they create an unbreakable chain of evidence.


Pillar 1: UUIDv7 — Time-Ordered Identity

UUIDv7 (defined in RFC 9562) embeds a 48-bit Unix timestamp in milliseconds directly into the identifier. This gives us:

  • Lexicographic sorting = chronological sorting
  • Embedded temporal proof independent of any timestamp field
  • Uniqueness without central coordination
import uuid
import time

def generate_uuidv7() -> str:
    """Generate a UUIDv7 identifier with embedded timestamp."""
    # Current time in milliseconds
    timestamp_ms = int(time.time() * 1000)

    # UUIDv7 structure:
    # - 48 bits: timestamp (ms)
    # - 4 bits: version (7)
    # - 12 bits: random
    # - 2 bits: variant
    # - 62 bits: random

    # Using Python 3.12+ built-in (or uuid7 package for earlier versions)
    return str(uuid.uuid7())

# Example output: "019234ab-cdef-7000-8123-456789abcdef"
#                  ^^^^^^^^ timestamp embedded here
Enter fullscreen mode Exit fullscreen mode

Why this matters: If someone claims an event happened at time T, but the UUIDv7 embedded timestamp says T+5 minutes, we have cryptographic evidence of timestamp manipulation.

def extract_timestamp_from_uuidv7(uuid_str: str) -> int:
    """Extract the embedded millisecond timestamp from a UUIDv7."""
    uuid_hex = uuid_str.replace("-", "")
    # First 48 bits (12 hex chars) contain the timestamp
    timestamp_ms = int(uuid_hex[:12], 16)
    return timestamp_ms

def detect_timestamp_anomaly(event_id: str, claimed_timestamp_ms: int, 
                             threshold_ms: int = 5000) -> bool:
    """Detect if claimed timestamp diverges from UUIDv7 embedded timestamp."""
    embedded_ts = extract_timestamp_from_uuidv7(event_id)
    drift = abs(embedded_ts - claimed_timestamp_ms)
    return drift > threshold_ms
Enter fullscreen mode Exit fullscreen mode

Pillar 2: Hash Chains — Tamper-Evident Linking

Every event includes the hash of the previous event, creating an unbreakable chain:

Event₁ → H(Event₁) → Event₂ → H(Event₂) → Event₃ → ...
              ↓                     ↓
         prev_hash              prev_hash
Enter fullscreen mode Exit fullscreen mode

If anyone modifies Event₁ after the fact, the hash changes, which breaks the link to Event₂, which breaks the link to Event₃... The tampering is instantly detectable.

import hashlib
import json

# Genesis hash: 64 zeros (256 bits)
GENESIS_HASH = "0" * 64

def canonicalize_json(obj: dict) -> str:
    """
    RFC 8785 JSON Canonicalization.
    - Lexicographic key ordering
    - No whitespace
    - Specific number formatting
    """
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)

def compute_event_hash(header: dict, payload: dict, prev_hash: str, 
                       algo: str = "sha256") -> str:
    """
    Compute event hash following VCP specification.

    hash = H(canonical(header) || canonical(payload) || prev_hash)
    """
    canonical_header = canonicalize_json(header)
    canonical_payload = canonicalize_json(payload)

    # Concatenate components
    data = f"{canonical_header}{canonical_payload}{prev_hash}".encode("utf-8")

    if algo == "sha256":
        return hashlib.sha256(data).hexdigest()
    elif algo == "sha3_256":
        return hashlib.sha3_256(data).hexdigest()
    else:
        raise ValueError(f"Unsupported algorithm: {algo}")
Enter fullscreen mode Exit fullscreen mode

Chain validation is straightforward:

def validate_chain(events: list[dict]) -> tuple[bool, str]:
    """
    Validate the integrity of an event chain.
    Returns (is_valid, error_message).
    """
    if not events:
        return True, "Empty chain"

    # First event must reference genesis hash
    first_event = events[0]
    if first_event["security"]["prev_hash"] != GENESIS_HASH:
        return False, "Genesis event has incorrect prev_hash"

    # Verify each subsequent event
    for i in range(1, len(events)):
        current = events[i]
        previous = events[i - 1]

        # Recompute previous event's hash
        expected_prev_hash = compute_event_hash(
            previous["header"],
            previous["payload"],
            previous["security"]["prev_hash"]
        )

        # Verify linkage
        if current["security"]["prev_hash"] != expected_prev_hash:
            return False, f"Chain broken at event {i}: prev_hash mismatch"

        # Verify current event's own hash
        computed_hash = compute_event_hash(
            current["header"],
            current["payload"],
            current["security"]["prev_hash"]
        )
        if current["security"]["event_hash"] != computed_hash:
            return False, f"Event {i} hash mismatch: content was modified"

    return True, "Chain valid"
Enter fullscreen mode Exit fullscreen mode

Pillar 3: Ed25519 Signatures — Authenticity and Non-Repudiation

Hash chains prove integrity (nothing was modified), but they don't prove who created the record. That's where digital signatures come in.

Ed25519 (RFC 8032) provides:

  • 256-bit security with 64-byte signatures
  • Fast signing and verification (~25,000 ops/sec on commodity hardware)
  • Deterministic signatures (same input always produces same output)
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
    Ed25519PrivateKey, Ed25519PublicKey
)
from cryptography.hazmat.primitives import serialization

class EventSigner:
    """Ed25519 signer for VCP events."""

    def __init__(self, private_key: Ed25519PrivateKey = None):
        self.private_key = private_key or Ed25519PrivateKey.generate()
        self.public_key = self.private_key.public_key()

    def get_public_key_hex(self) -> str:
        """Export public key as hex string."""
        public_bytes = self.public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )
        return public_bytes.hex()

    def sign_event(self, event_hash: str) -> str:
        """Sign an event hash and return hex-encoded signature."""
        signature = self.private_key.sign(bytes.fromhex(event_hash))
        return signature.hex()

    @staticmethod
    def verify_signature(public_key_hex: str, event_hash: str, 
                        signature_hex: str) -> bool:
        """Verify an event signature."""
        try:
            public_key = Ed25519PublicKey.from_public_bytes(
                bytes.fromhex(public_key_hex)
            )
            public_key.verify(
                bytes.fromhex(signature_hex),
                bytes.fromhex(event_hash)
            )
            return True
        except Exception:
            return False
Enter fullscreen mode Exit fullscreen mode

Pillar 4: Merkle Trees — Efficient Batch Verification

For high-throughput systems generating millions of events, we need efficient proofs. Merkle trees (RFC 6962, from Certificate Transparency) let us:

  1. Batch many events under a single root hash
  2. Prove inclusion of any single event without revealing others
  3. Anchor to external systems (blockchain, timestamp authority)
                    Root Hash
                   /         \
              Hash01          Hash23
             /     \         /     \
         Hash0   Hash1   Hash2   Hash3
           |       |       |       |
        Event0  Event1  Event2  Event3
Enter fullscreen mode Exit fullscreen mode
import hashlib
from typing import Optional

# RFC 6962 leaf/node prefixes
LEAF_PREFIX = b'\x00'
NODE_PREFIX = b'\x01'

def merkle_leaf_hash(data: bytes) -> bytes:
    """Compute RFC 6962 leaf hash."""
    return hashlib.sha256(LEAF_PREFIX + data).digest()

def merkle_node_hash(left: bytes, right: bytes) -> bytes:
    """Compute RFC 6962 internal node hash."""
    return hashlib.sha256(NODE_PREFIX + left + right).digest()

class MerkleTree:
    """RFC 6962 compliant Merkle tree for event batching."""

    def __init__(self, event_hashes: list[str]):
        self.leaves = [merkle_leaf_hash(bytes.fromhex(h)) for h in event_hashes]
        self.tree = self._build_tree(self.leaves)
        self.root = self.tree[-1][0] if self.tree else None

    def _build_tree(self, leaves: list[bytes]) -> list[list[bytes]]:
        """Build complete Merkle tree from leaves."""
        if not leaves:
            return []

        tree = [leaves]
        current_level = leaves

        while len(current_level) > 1:
            next_level = []
            for i in range(0, len(current_level), 2):
                left = current_level[i]
                # Handle odd number of nodes
                right = current_level[i + 1] if i + 1 < len(current_level) else left
                next_level.append(merkle_node_hash(left, right))
            tree.append(next_level)
            current_level = next_level

        return tree

    def get_root_hex(self) -> str:
        """Get Merkle root as hex string."""
        return self.root.hex() if self.root else ""

    def get_proof(self, index: int) -> list[dict]:
        """
        Get inclusion proof for event at index.
        Returns list of {hash, position} for verification.
        """
        if index >= len(self.leaves):
            raise IndexError("Event index out of range")

        proof = []
        idx = index

        for level in self.tree[:-1]:  # Exclude root level
            if len(level) == 1:
                break

            # Determine sibling
            if idx % 2 == 0:  # Left node
                sibling_idx = idx + 1 if idx + 1 < len(level) else idx
                position = "right"
            else:  # Right node
                sibling_idx = idx - 1
                position = "left"

            proof.append({
                "hash": level[sibling_idx].hex(),
                "position": position
            })

            idx //= 2

        return proof

    @staticmethod
    def verify_proof(event_hash: str, proof: list[dict], 
                     root_hash: str) -> bool:
        """Verify a Merkle inclusion proof."""
        current = merkle_leaf_hash(bytes.fromhex(event_hash))

        for step in proof:
            sibling = bytes.fromhex(step["hash"])
            if step["position"] == "left":
                current = merkle_node_hash(sibling, current)
            else:
                current = merkle_node_hash(current, sibling)

        return current.hex() == root_hash
Enter fullscreen mode Exit fullscreen mode

Putting It All Together: The VCP Event Structure

Here's the complete event model combining all four primitives:

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
import uuid
import time

@dataclass
class VCPEvent:
    """Complete VCP event with cryptographic security."""

    # Header
    event_id: str = field(default_factory=lambda: str(uuid.uuid7()))
    trace_id: str = ""  # Links related events (e.g., order lifecycle)
    timestamp_int: str = ""  # Nanoseconds as string (precision preservation)
    timestamp_iso: str = ""  # RFC 3339 format
    event_type: str = ""  # SIG, ORD, EXE, etc.
    event_type_code: int = 0
    timestamp_precision: str = "MILLISECOND"
    clock_sync_status: str = "NTP_SYNCED"
    hash_algo: str = "SHA256"

    # Domain fields
    venue_id: str = ""
    symbol: str = ""
    account_id: str = ""

    # Payload (domain-specific)
    payload: dict = field(default_factory=dict)

    # Security
    prev_hash: str = GENESIS_HASH
    event_hash: str = ""
    signature: str = ""
    signer_id: str = ""

    def __post_init__(self):
        if not self.timestamp_int:
            now_ns = int(time.time() * 1_000_000_000)
            self.timestamp_int = str(now_ns)
        if not self.timestamp_iso:
            self.timestamp_iso = datetime.now(timezone.utc).isoformat()
        if not self.trace_id:
            self.trace_id = self.event_id

    def to_header_dict(self) -> dict:
        return {
            "event_id": self.event_id,
            "trace_id": self.trace_id,
            "timestamp_int": self.timestamp_int,
            "timestamp_iso": self.timestamp_iso,
            "event_type": self.event_type,
            "event_type_code": self.event_type_code,
            "timestamp_precision": self.timestamp_precision,
            "clock_sync_status": self.clock_sync_status,
            "hash_algo": self.hash_algo,
            "venue_id": self.venue_id,
            "symbol": self.symbol,
            "account_id": self.account_id,
        }

    def compute_hash(self) -> str:
        """Compute and store event hash."""
        self.event_hash = compute_event_hash(
            self.to_header_dict(),
            self.payload,
            self.prev_hash,
            algo=self.hash_algo.lower()
        )
        return self.event_hash

    def sign(self, signer: EventSigner) -> str:
        """Sign the event hash."""
        if not self.event_hash:
            self.compute_hash()
        self.signature = signer.sign_event(self.event_hash)
        self.signer_id = signer.get_public_key_hex()
        return self.signature

    def to_dict(self) -> dict:
        """Export complete event as dictionary."""
        return {
            "header": self.to_header_dict(),
            "payload": self.payload,
            "security": {
                "prev_hash": self.prev_hash,
                "event_hash": self.event_hash,
                "signature": self.signature,
                "signer_id": self.signer_id,
            }
        }
Enter fullscreen mode Exit fullscreen mode

Complete Example: Logging a Trade Decision

def demo_trade_logging():
    """Demonstrate complete tamper-proof logging workflow."""

    # Initialize signer
    signer = EventSigner()
    print(f"Signer Public Key: {signer.get_public_key_hex()[:32]}...")

    events = []
    prev_hash = GENESIS_HASH

    # Event 1: Signal Generated (AI decision)
    signal_event = VCPEvent(
        event_type="SIG",
        event_type_code=1,
        venue_id="BINANCE",
        symbol="BTC/USDT",
        account_id="ALGO_001",
        prev_hash=prev_hash,
        payload={
            "signal_type": "LONG",
            "confidence": "0.847",  # String for precision
            "model_version": "v2.3.1",
            "decision_factors": {
                "rsi_14": {"value": "28.5", "weight": "0.3"},
                "macd_signal": {"value": "-0.0023", "weight": "0.25"},
                "sentiment_score": {"value": "0.72", "weight": "0.2"},
            }
        }
    )
    signal_event.compute_hash()
    signal_event.sign(signer)
    events.append(signal_event)
    prev_hash = signal_event.event_hash

    print(f"\n✅ Signal Event")
    print(f"   ID: {signal_event.event_id}")
    print(f"   Hash: {signal_event.event_hash[:32]}...")

    # Event 2: Order Submitted
    order_event = VCPEvent(
        event_type="ORD",
        event_type_code=2,
        trace_id=signal_event.trace_id,  # Link to signal
        venue_id="BINANCE",
        symbol="BTC/USDT",
        account_id="ALGO_001",
        prev_hash=prev_hash,
        payload={
            "trade_data": {
                "order_id": "ORD-20241201-001",
                "side": "BUY",
                "order_type": "LIMIT",
                "price": "43250.50",
                "quantity": "0.1",
            }
        }
    )
    order_event.compute_hash()
    order_event.sign(signer)
    events.append(order_event)
    prev_hash = order_event.event_hash

    print(f"\n✅ Order Event")
    print(f"   ID: {order_event.event_id}")
    print(f"   Links to: {order_event.trace_id}")

    # Event 3: Execution
    exec_event = VCPEvent(
        event_type="EXE",
        event_type_code=5,
        trace_id=signal_event.trace_id,
        venue_id="BINANCE",
        symbol="BTC/USDT",
        account_id="ALGO_001",
        prev_hash=prev_hash,
        payload={
            "trade_data": {
                "order_id": "ORD-20241201-001",
                "execution_id": "EXE-20241201-001",
                "execution_price": "43251.25",
                "executed_qty": "0.1",
                "commission": "0.00043251",
                "slippage": "0.75",
            }
        }
    )
    exec_event.compute_hash()
    exec_event.sign(signer)
    events.append(exec_event)

    print(f"\n✅ Execution Event")
    print(f"   Slippage: {exec_event.payload['trade_data']['slippage']} USDT")

    # Validate chain
    event_dicts = [e.to_dict() for e in events]
    is_valid, message = validate_chain(event_dicts)
    print(f"\n🔗 Chain Validation: {message}")

    # Create Merkle tree for batch anchoring
    event_hashes = [e.event_hash for e in events]
    merkle = MerkleTree(event_hashes)
    print(f"\n🌳 Merkle Root: {merkle.get_root_hex()[:32]}...")

    # Generate proof for the order event
    proof = merkle.get_proof(1)  # Index 1 = order event
    is_included = MerkleTree.verify_proof(
        order_event.event_hash, 
        proof, 
        merkle.get_root_hex()
    )
    print(f"   Order event inclusion verified: {is_included}")

    return events, merkle

# Run the demo
if __name__ == "__main__":
    demo_trade_logging()
Enter fullscreen mode Exit fullscreen mode

Output:

Signer Public Key: 7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d...

✅ Signal Event
   ID: 019234ab-cdef-7000-8123-456789abcdef
   Hash: a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6...

✅ Order Event
   ID: 019234ab-cdf0-7000-8124-567890abcdef
   Links to: 019234ab-cdef-7000-8123-456789abcdef

✅ Execution Event
   Slippage: 0.75 USDT

🔗 Chain Validation: Chain valid

🌳 Merkle Root: f1e2d3c4b5a6f7e8d9c0b1a2f3e4d5c6...
   Order event inclusion verified: True
Enter fullscreen mode Exit fullscreen mode

Detecting Tampering

Let's see what happens when someone tries to modify history:

def demo_tamper_detection():
    """Show how tampering is detected."""

    events, _ = demo_trade_logging()
    event_dicts = [e.to_dict() for e in events]

    # Attacker tries to change the execution price
    print("\n🚨 Simulating tampering...")
    original_price = event_dicts[2]["payload"]["trade_data"]["execution_price"]
    event_dicts[2]["payload"]["trade_data"]["execution_price"] = "43200.00"  # Fake better price

    # Validate chain
    is_valid, message = validate_chain(event_dicts)
    print(f"   Chain valid: {is_valid}")
    print(f"   Detection: {message}")

    # Restore and try more subtle tampering
    event_dicts[2]["payload"]["trade_data"]["execution_price"] = original_price

    # Attacker tries to change the timestamp
    print("\n🚨 Attempting timestamp manipulation...")
    event_dicts[1]["header"]["timestamp_int"] = "1700000000000000000"  # Earlier timestamp

    # UUIDv7 reveals the lie
    event_id = event_dicts[1]["header"]["event_id"]
    claimed_ts = int(event_dicts[1]["header"]["timestamp_int"]) // 1_000_000  # ns to ms

    anomaly = detect_timestamp_anomaly(event_id, claimed_ts)
    print(f"   Timestamp anomaly detected: {anomaly}")

demo_tamper_detection()
Enter fullscreen mode Exit fullscreen mode

Output:

🚨 Simulating tampering...
   Chain valid: False
   Detection: Event 2 hash mismatch: content was modified

🚨 Attempting timestamp manipulation...
   Timestamp anomaly detected: True
Enter fullscreen mode Exit fullscreen mode

Why This Matters: Real-World Applications

1. AI Audit Compliance

The EU AI Act (Article 12) mandates automatic logging for high-risk AI systems. This architecture provides cryptographic proof of compliance.

2. Algorithmic Trading

MiFID II requires firms to maintain accurate, tamper-proof records of algorithmic decisions. Hash chains satisfy this requirement.

3. Autonomous Systems

When an AI makes a critical decision, stakeholders need verifiable evidence of exactly what happened—not just what someone claims happened.

4. Dispute Resolution

In any dispute, cryptographic proofs are far more compelling than database records that could have been modified.


Going Further

This implementation covers the core concepts. Production systems should also consider:

  • External anchoring: Periodically anchor Merkle roots to public blockchains or RFC 3161 timestamp authorities
  • Key management: HSMs for private key protection
  • Post-quantum migration: Ed25519 will eventually need replacement with CRYSTALS-Dilithium
  • Crypto-shredding: GDPR compliance via encrypted payloads with destroyable keys

Resources


What tamper-proof systems are you building? Drop a comment below 👇

Top comments (0)