DEV Community

Cover image for 61 Privacy Regulators Just Demanded AI 'Safeguard Proof.' Here's How to Build It with Cryptographic Audit Trails.

61 Privacy Regulators Just Demanded AI 'Safeguard Proof.' Here's How to Build It with Cryptographic Audit Trails.

Yesterday, 61 data protection authorities from around the world published a joint statement demanding that AI companies prove their safeguards actually work. Not claim they work. Prove it.

The statement, coordinated through the Global Privacy Assembly, calls for "robust safeguards," "meaningful transparency," "effective mechanisms for individuals to request removal of harmful content," and "enhanced protections for children." Signatories include Canada's OPC, the EU's EDPB (Chair Anu Talus), Hong Kong's PCPD, Singapore's PDPC, the UK's ICO, and 56 other authorities spanning Albania to Uruguay.

Notably absent: the United States and Japan's PPC.

The statement deliberately avoids prescribing how to prove compliance. No mention of cryptographic audit trails, C2PA, watermarking, or any specific technical standard. It operates at a principles-and-policy level—telling organizations what they must achieve without prescribing how.

This is the gap. And it's the same gap that let the Grok crisis happen.

The Problem: "Trust Us" Doesn't Scale

When 35 U.S. state attorneys general sent their letter about Grok generating 4.4 million images in 9 days—with at least 41% being sexualized images—the core enforcement problem was this: there was no verifiable way to confirm that safety measures actually worked beyond xAI's own assurances.

The EU was demanding "operational evidence, not screenshots and declarations." The UK ICO launched a formal investigation. The Irish DPC opened proceedings. But every single regulator had to take the platform at its word about what the AI refused to generate.

This is what we call the negative evidence problem:

  • C2PA can prove: "This content was generated by this system at this time."
  • SynthID can prove: "This content has a Google watermark."
  • Nothing currently deployed can prove: "This system refused to generate this content at this time."

The GPA's statement demands "meaningful transparency" about safeguards. Let's build a system that actually delivers it.

Architecture: The Completeness Invariant

The core concept is deceptively simple. Every generation request must produce exactly one cryptographically recorded outcome:

GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
Enter fullscreen mode Exit fullscreen mode

The critical architectural insight: GEN_ATTEMPT is logged before the safety evaluation runs, creating an unforgeable commitment that a request existed regardless of what follows. If the equation doesn't balance, the audit trail is provably compromised.

This addresses five specific threats regulators face:

Threat Attack Mitigation
Selective Logging Log only favorable outcomes Completeness Invariant
Log Modification Alter historical records Hash chain integrity
Backdating Create records with false timestamps External anchoring (RFC 3161)
Split-View Show different logs to different parties Merkle proofs
Fabrication Create false refusal records Attempt-outcome pairing

Let's implement this from scratch.

Step 1: Project Setup

# Create project directory
mkdir cap-srp-demo && cd cap-srp-demo

# Install dependencies
pip install cryptography>=42.0.0

# That's it. No external services needed for the core.
Enter fullscreen mode Exit fullscreen mode

Step 2: Event Data Model

Every AI interaction becomes a cryptographically signed event. Here's the complete data model:

# models.py
"""CAP-SRP Event Data Model.

Implements the event types defined in the CAP-SRP specification v1.0.
Reference: https://github.com/veritaschain/cap-spec
"""

from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
import uuid


class EventType(str, Enum):
    """All event types in the CAP-SRP lifecycle."""
    GEN_ATTEMPT = "GEN_ATTEMPT"   # Logged BEFORE safety evaluation
    GEN = "GEN"                    # Content was generated
    GEN_DENY = "GEN_DENY"         # Request was refused
    GEN_ERROR = "GEN_ERROR"       # System error during generation


class RiskCategory(str, Enum):
    """Risk categories for GEN_DENY events.

    Aligned with the GPA joint statement's key concern areas.
    """
    NCII_RISK = "NCII_RISK"           # Non-consensual intimate imagery
    CSAM_RISK = "CSAM_RISK"           # Child sexual abuse material
    HATE_CONTENT = "HATE_CONTENT"     # Hate speech / discrimination
    VIOLENCE = "VIOLENCE"             # Graphic violence
    PRIVACY_VIOLATION = "PRIVACY_VIOLATION"  # Real person depiction without consent
    DECEPTION = "DECEPTION"           # Deepfakes / impersonation
    OTHER = "OTHER"


class DenyReason(str, Enum):
    """Why a generation request was denied."""
    POLICY_VIOLATION = "POLICY_VIOLATION"
    CONTENT_FILTER = "CONTENT_FILTER"
    RATE_LIMIT = "RATE_LIMIT"
    USER_RESTRICTION = "USER_RESTRICTION"
    LEGAL_COMPLIANCE = "LEGAL_COMPLIANCE"


def generate_event_id() -> str:
    """Generate a UUIDv7-style event ID.

    UUIDv7 embeds a Unix timestamp in the most significant bits,
    providing natural chronological ordering.
    """
    return str(uuid.uuid7())


def now_iso() -> str:
    """Return current UTC time in ISO 8601 format."""
    return datetime.now(timezone.utc).isoformat()


@dataclass
class CAPEvent:
    """Base CAP-SRP event.

    Every event in the audit chain shares these fields.
    The EventHash and Signature are computed after creation.
    """
    EventID: str = field(default_factory=generate_event_id)
    EventType: str = ""
    ChainID: str = ""
    PrevHash: str = ""
    Timestamp: str = field(default_factory=now_iso)

    # These are set during signing
    EventHash: str = ""
    Signature: str = ""

    def to_dict(self) -> dict:
        """Convert to dictionary, excluding empty fields."""
        return {k: v for k, v in asdict(self).items() if v}


@dataclass
class GenAttemptEvent(CAPEvent):
    """Logged BEFORE safety evaluation.

    This is the critical event—it creates an unforgeable commitment
    that a generation request exists. The PromptHash preserves
    privacy while enabling verification.
    """
    EventType: str = "GEN_ATTEMPT"
    PromptHash: str = ""         # SHA-256 of the original prompt
    ModelID: str = ""            # Which model received the request
    SessionID: str = ""          # Anonymized session identifier
    Endpoint: str = ""           # e.g., "/v1/images/generations"
    RequestMetadata: dict = field(default_factory=dict)


@dataclass
class GenDenyEvent(CAPEvent):
    """Logged when a generation request is REFUSED.

    This is what the GPA statement demands proof of:
    verifiable evidence that harmful content was actually blocked.
    """
    EventType: str = "GEN_DENY"
    AttemptID: str = ""          # Links back to GEN_ATTEMPT
    RiskCategory: str = ""       # What kind of harm was detected
    DenyReason: str = ""         # Why it was denied
    PolicyVersion: str = ""      # Which policy version caught it
    ConfidenceScore: float = 0.0 # Safety classifier confidence
    ModelID: str = ""


@dataclass
class GenEvent(CAPEvent):
    """Logged when content IS generated (passed safety checks)."""
    EventType: str = "GEN"
    AttemptID: str = ""
    OutputHash: str = ""         # SHA-256 of generated content
    ModelID: str = ""
    C2PAManifestHash: str = ""   # Link to C2PA content credential


@dataclass
class GenErrorEvent(CAPEvent):
    """Logged when a system error prevents generation."""
    EventType: str = "GEN_ERROR"
    AttemptID: str = ""
    ErrorCode: str = ""
    ErrorMessage: str = ""
    ModelID: str = ""
Enter fullscreen mode Exit fullscreen mode

Step 3: Cryptographic Integrity Layer

The hash chain and signature system that makes the audit trail tamper-evident:

# crypto.py
"""Cryptographic integrity layer for CAP-SRP.

Implements:
- SHA-256 hash chains (tamper evidence)
- Ed25519 digital signatures (non-repudiation)
- Merkle tree construction (efficient verification)

All per RFC 8032 (Ed25519) and RFC 8785 (JSON Canonicalization).
"""

import hashlib
import json
import base64
from typing import List, Tuple, Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
    Ed25519PrivateKey,
    Ed25519PublicKey,
)
from cryptography.hazmat.primitives.asymmetric import ed25519


def json_canonicalize(obj: dict) -> str:
    """Canonicalize JSON per RFC 8785 (simplified).

    RFC 8785 defines deterministic serialization so that
    the same logical object always produces the same bytes.
    This is essential—without it, hash verification fails
    if fields are reordered.
    """
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)


def compute_event_hash(event: dict) -> str:
    """Compute SHA-256 hash of canonicalized event.

    The Signature field is excluded before hashing to avoid
    a circular dependency (hash → sign → include signature → rehash).

    Args:
        event: Event dictionary

    Returns:
        Hash string in format "sha256:{hex_digest}"
    """
    # Remove mutable fields before hashing
    event_copy = {k: v for k, v in event.items() 
                  if k not in ("Signature", "EventHash")}

    canonical = json_canonicalize(event_copy)
    hash_bytes = hashlib.sha256(canonical.encode("utf-8")).digest()

    return f"sha256:{hash_bytes.hex()}"


def compute_prompt_hash(prompt: str) -> str:
    """Hash a prompt for privacy-preserving storage.

    The actual prompt text is NEVER stored. Only the hash.
    This satisfies GDPR requirements while enabling verification:
    a regulator with the original prompt can verify it matches.
    """
    return f"sha256:{hashlib.sha256(prompt.encode('utf-8')).hexdigest()}"


def generate_keypair() -> Tuple[Ed25519PrivateKey, Ed25519PublicKey]:
    """Generate an Ed25519 signing keypair.

    In production (Gold level), the private key would live
    in an HSM (Hardware Security Module). For Bronze/Silver,
    file-based key storage with appropriate permissions.
    """
    private_key = Ed25519PrivateKey.generate()
    public_key = private_key.public_key()
    return private_key, public_key


def sign_event(event: dict, private_key: Ed25519PrivateKey) -> dict:
    """Compute hash and sign an event.

    Flow:
    1. Compute SHA-256 of canonicalized event (excluding Signature)
    2. Store hash in EventHash field
    3. Sign the hash bytes with Ed25519
    4. Store signature in Signature field

    Args:
        event: Event dictionary (EventHash and Signature will be set)
        private_key: Ed25519 signing key

    Returns:
        Event dictionary with EventHash and Signature populated
    """
    # Step 1: Compute hash
    event_hash = compute_event_hash(event)
    event["EventHash"] = event_hash

    # Step 2: Sign the hash bytes
    hash_bytes = bytes.fromhex(event_hash[7:])  # Remove "sha256:" prefix
    signature = private_key.sign(hash_bytes)
    event["Signature"] = f"ed25519:{base64.b64encode(signature).decode()}"

    return event


def verify_signature(event: dict, public_key: Ed25519PublicKey) -> bool:
    """Verify Ed25519 signature on an event.

    Args:
        event: Signed event dictionary
        public_key: Ed25519 verification key

    Returns:
        True if signature is valid
    """
    sig_str = event.get("Signature", "")
    if not sig_str.startswith("ed25519:"):
        return False

    try:
        signature = base64.b64decode(sig_str[8:])
        hash_bytes = bytes.fromhex(event["EventHash"][7:])
        public_key.verify(signature, hash_bytes)
        return True
    except Exception:
        return False


class MerkleTree:
    """Merkle tree for batch verification and inclusion proofs.

    Enables efficient verification of individual events within
    large batches. A regulator can verify a specific refusal
    exists in a batch of millions without processing every event.
    """

    def __init__(self, leaves: List[str]):
        """Build tree from event hashes.

        Args:
            leaves: List of event hash strings (sha256:...)
        """
        self.leaves = [self._to_bytes(h) for h in leaves]
        self.tree = self._build()

    @staticmethod
    def _to_bytes(hash_str: str) -> bytes:
        """Convert 'sha256:hex' to raw bytes."""
        return bytes.fromhex(hash_str[7:]) if hash_str.startswith("sha256:") else bytes.fromhex(hash_str)

    @staticmethod
    def _hash_pair(left: bytes, right: bytes) -> bytes:
        """Hash two nodes together. Always hash in sorted order
        to make the tree order-independent."""
        if left > right:
            left, right = right, left
        return hashlib.sha256(left + right).digest()

    def _build(self) -> List[List[bytes]]:
        """Build the Merkle tree bottom-up."""
        if not self.leaves:
            return [[hashlib.sha256(b"empty").digest()]]

        tree = [self.leaves[:]]
        current = self.leaves[:]

        while len(current) > 1:
            next_level = []
            for i in range(0, len(current), 2):
                if i + 1 < len(current):
                    next_level.append(self._hash_pair(current[i], current[i + 1]))
                else:
                    # Odd node: promote to next level
                    next_level.append(current[i])
            tree.append(next_level)
            current = next_level

        return tree

    @property
    def root(self) -> str:
        """Get the Merkle root hash."""
        return f"sha256:{self.tree[-1][0].hex()}"

    def get_proof(self, index: int) -> List[dict]:
        """Generate inclusion proof for a leaf.

        The proof is a list of sibling hashes that, combined with
        the target leaf, reproduce the root. This lets a verifier
        confirm an event exists in the batch without seeing all events.
        """
        if index >= len(self.leaves):
            raise IndexError(f"Leaf index {index} out of range")

        proof = []
        idx = index

        for level in self.tree[:-1]:
            if idx % 2 == 0:
                sibling_idx = idx + 1
                position = "right"
            else:
                sibling_idx = idx - 1
                position = "left"

            if sibling_idx < len(level):
                proof.append({
                    "hash": f"sha256:{level[sibling_idx].hex()}",
                    "position": position,
                })

            idx //= 2

        return proof

    def verify_proof(self, leaf_hash: str, proof: List[dict]) -> bool:
        """Verify a Merkle inclusion proof against the root."""
        current = self._to_bytes(leaf_hash)

        for step in proof:
            sibling = self._to_bytes(step["hash"])
            if step["position"] == "left":
                current = self._hash_pair(sibling, current)
            else:
                current = self._hash_pair(current, sibling)

        return current == self.tree[-1][0]
Enter fullscreen mode Exit fullscreen mode

Step 4: The Audit Chain

This is where everything comes together—the append-only chain that enforces the Completeness Invariant:

# chain.py
"""CAP-SRP Audit Chain.

The chain is the core of the system: an append-only, hash-linked
sequence of events where every generation attempt must have exactly
one recorded outcome.

Think of it as a flight recorder for AI content decisions.
"""

from datetime import datetime, timezone
from typing import List, Optional, Tuple
from dataclasses import asdict

from models import (
    CAPEvent, GenAttemptEvent, GenDenyEvent, GenEvent, GenErrorEvent,
    EventType, RiskCategory, DenyReason, generate_event_id, now_iso,
)
from crypto import (
    compute_event_hash, compute_prompt_hash, sign_event,
    verify_signature, generate_keypair, MerkleTree,
    Ed25519PrivateKey, Ed25519PublicKey,
)


class CompletenessViolation(Exception):
    """Raised when the Completeness Invariant is broken."""
    pass


class ChainIntegrityError(Exception):
    """Raised when the hash chain is tampered with."""
    pass


class CAPChain:
    """Append-only audit chain with Completeness Invariant enforcement.

    Usage:
        chain = CAPChain("my-platform")

        # Every request follows this pattern:
        attempt_id = chain.log_attempt(prompt, model_id, endpoint)

        # Then exactly ONE of:
        chain.log_deny(attempt_id, risk_category, reason)
        chain.log_generate(attempt_id, output_hash)
        chain.log_error(attempt_id, error_code, error_message)
    """

    def __init__(self, chain_id: str, private_key: Optional[Ed25519PrivateKey] = None):
        """Initialize a new audit chain.

        Args:
            chain_id: Unique identifier for this chain (e.g., platform name)
            private_key: Ed25519 signing key (generated if not provided)
        """
        self.chain_id = chain_id
        self.events: List[dict] = []

        if private_key:
            self.private_key = private_key
            self.public_key = private_key.public_key()
        else:
            self.private_key, self.public_key = generate_keypair()

        # Track attempt→outcome mapping for invariant
        self._pending_attempts: dict = {}  # attempt_id → event
        self._completed_attempts: set = set()

    @property
    def prev_hash(self) -> str:
        """Hash of the last event in the chain (or genesis marker)."""
        if self.events:
            return self.events[-1]["EventHash"]
        return "sha256:" + "0" * 64  # Genesis

    def _append(self, event: CAPEvent) -> dict:
        """Append a signed event to the chain.

        Internal method that handles:
        1. Setting chain linkage (PrevHash)
        2. Computing the event hash
        3. Signing with Ed25519
        4. Appending to the chain
        """
        event.ChainID = self.chain_id
        event.PrevHash = self.prev_hash

        event_dict = event.to_dict()
        signed = sign_event(event_dict, self.private_key)
        self.events.append(signed)

        return signed

    def log_attempt(
        self,
        prompt: str,
        model_id: str,
        endpoint: str = "/v1/images/generations",
        session_id: str = "",
    ) -> str:
        """Log a generation attempt BEFORE safety evaluation.

        This is the most critical operation. By recording the attempt
        before the safety check runs, we create an unforgeable commitment
        that cannot be retroactively erased.

        Args:
            prompt: The user's prompt (will be hashed, never stored raw)
            model_id: Which model received the request
            endpoint: API endpoint (important: /v1/images/edits is high-risk)
            session_id: Anonymized session ID

        Returns:
            The attempt's EventID (needed for outcome logging)
        """
        event = GenAttemptEvent(
            PromptHash=compute_prompt_hash(prompt),
            ModelID=model_id,
            Endpoint=endpoint,
            SessionID=session_id or generate_event_id(),
        )

        signed = self._append(event)
        self._pending_attempts[signed["EventID"]] = signed

        return signed["EventID"]

    def log_deny(
        self,
        attempt_id: str,
        risk_category: RiskCategory,
        deny_reason: DenyReason = DenyReason.CONTENT_FILTER,
        policy_version: str = "1.0.0",
        confidence: float = 0.95,
        model_id: str = "",
    ) -> dict:
        """Log a content refusal.

        This is what the GPA statement demands proof of.
        The event cryptographically links back to the attempt,
        creating verifiable evidence that:
        1. A request was received (GEN_ATTEMPT exists)
        2. It was evaluated against a specific policy version
        3. It was denied for a specific reason
        4. The denial is timestamped and signed

        Args:
            attempt_id: The GEN_ATTEMPT EventID
            risk_category: What kind of harm was detected
            deny_reason: Why it was denied
            policy_version: Which safety policy version
            confidence: Safety classifier confidence score
        """
        if attempt_id not in self._pending_attempts:
            if attempt_id in self._completed_attempts:
                raise CompletenessViolation(
                    f"Attempt {attempt_id} already has an outcome. "
                    "The Completeness Invariant requires exactly ONE outcome per attempt."
                )
            raise CompletenessViolation(
                f"No pending attempt found for {attempt_id}. "
                "You must log GEN_ATTEMPT before logging an outcome."
            )

        event = GenDenyEvent(
            AttemptID=attempt_id,
            RiskCategory=risk_category.value,
            DenyReason=deny_reason.value,
            PolicyVersion=policy_version,
            ConfidenceScore=confidence,
            ModelID=model_id,
        )

        signed = self._append(event)

        # Update tracking
        del self._pending_attempts[attempt_id]
        self._completed_attempts.add(attempt_id)

        return signed

    def log_generate(
        self,
        attempt_id: str,
        output_hash: str,
        model_id: str = "",
        c2pa_manifest_hash: str = "",
    ) -> dict:
        """Log successful content generation."""
        if attempt_id not in self._pending_attempts:
            if attempt_id in self._completed_attempts:
                raise CompletenessViolation(
                    f"Attempt {attempt_id} already has an outcome."
                )
            raise CompletenessViolation(
                f"No pending attempt found for {attempt_id}."
            )

        event = GenEvent(
            AttemptID=attempt_id,
            OutputHash=output_hash,
            ModelID=model_id,
            C2PAManifestHash=c2pa_manifest_hash,
        )

        signed = self._append(event)
        del self._pending_attempts[attempt_id]
        self._completed_attempts.add(attempt_id)

        return signed

    def log_error(
        self,
        attempt_id: str,
        error_code: str,
        error_message: str,
        model_id: str = "",
    ) -> dict:
        """Log a system error during generation."""
        if attempt_id not in self._pending_attempts:
            if attempt_id in self._completed_attempts:
                raise CompletenessViolation(
                    f"Attempt {attempt_id} already has an outcome."
                )
            raise CompletenessViolation(
                f"No pending attempt found for {attempt_id}."
            )

        event = GenErrorEvent(
            AttemptID=attempt_id,
            ErrorCode=error_code,
            ErrorMessage=error_message,
            ModelID=model_id,
        )

        signed = self._append(event)
        del self._pending_attempts[attempt_id]
        self._completed_attempts.add(attempt_id)

        return signed

    def get_pending_attempts(self) -> List[str]:
        """Return attempt IDs that don't yet have outcomes.

        In a healthy system, this should be near-zero or contain
        only very recent attempts still being processed. A large
        pending count is a red flag.
        """
        return list(self._pending_attempts.keys())

    def build_merkle_tree(self) -> MerkleTree:
        """Build a Merkle tree over all events for batch verification."""
        hashes = [e["EventHash"] for e in self.events]
        return MerkleTree(hashes)
Enter fullscreen mode Exit fullscreen mode

Step 5: The Verification Engine

This is what regulators actually run. An independent verifier that checks the entire chain:

# verifier.py
"""CAP-SRP Verification Engine.

This module is designed to be run by THIRD PARTIES—regulators,
auditors, researchers—who receive an Evidence Pack and need to
independently verify its integrity without trusting the AI provider.

Verification checks:
1. Hash chain integrity (no tampering)
2. Signature validity (non-repudiation)
3. Completeness Invariant (no missing events)
4. Temporal ordering (no backdating within chain)
5. Merkle proof verification (batch consistency)
"""

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple
from collections import Counter

from crypto import compute_event_hash, verify_signature, MerkleTree, Ed25519PublicKey


@dataclass
class VerificationResult:
    """Comprehensive verification report."""
    valid: bool
    chain_integrity: bool = True
    signatures_valid: bool = True
    completeness_valid: bool = True
    temporal_order_valid: bool = True

    total_events: int = 0
    total_attempts: int = 0
    total_gen: int = 0
    total_deny: int = 0
    total_error: int = 0

    unmatched_attempts: List[str] = None
    orphan_outcomes: List[str] = None
    broken_links: List[int] = None
    invalid_signatures: List[int] = None
    temporal_violations: List[int] = None

    # Statistics for the regulatory report
    refusal_rate: float = 0.0
    refusal_by_category: dict = None

    def __post_init__(self):
        self.unmatched_attempts = self.unmatched_attempts or []
        self.orphan_outcomes = self.orphan_outcomes or []
        self.broken_links = self.broken_links or []
        self.invalid_signatures = self.invalid_signatures or []
        self.temporal_violations = self.temporal_violations or []
        self.refusal_by_category = self.refusal_by_category or {}


def verify_chain_integrity(events: List[dict]) -> Tuple[bool, List[int]]:
    """Verify hash chain linkage.

    Each event's PrevHash must match the previous event's EventHash.
    If any link is broken, the chain has been tampered with.
    """
    broken = []

    for i, event in enumerate(events):
        # Verify hash computation
        computed = compute_event_hash(event)
        if event.get("EventHash") != computed:
            broken.append(i)
            continue

        # Verify chain linkage
        if i > 0:
            expected_prev = events[i - 1]["EventHash"]
            if event.get("PrevHash") != expected_prev:
                broken.append(i)

    return len(broken) == 0, broken


def verify_all_signatures(events: List[dict], public_key: Ed25519PublicKey) -> Tuple[bool, List[int]]:
    """Verify Ed25519 signatures on all events."""
    invalid = []

    for i, event in enumerate(events):
        if not verify_signature(event, public_key):
            invalid.append(i)

    return len(invalid) == 0, invalid


def verify_temporal_order(events: List[dict]) -> Tuple[bool, List[int]]:
    """Verify events are in chronological order.

    While external anchoring (RFC 3161) provides absolute time
    guarantees, basic temporal ordering within the chain catches
    obvious backdating within a single chain.
    """
    violations = []

    for i in range(1, len(events)):
        try:
            t_prev = datetime.fromisoformat(events[i - 1]["Timestamp"])
            t_curr = datetime.fromisoformat(events[i]["Timestamp"])
            if t_curr < t_prev:
                violations.append(i)
        except (KeyError, ValueError):
            violations.append(i)

    return len(violations) == 0, violations


def verify_completeness_invariant(events: List[dict]) -> Tuple[bool, List[str], List[str]]:
    """Verify the Completeness Invariant.

    GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR

    Every attempt must have exactly one outcome.
    No outcome should exist without a corresponding attempt.

    Returns:
        (is_valid, unmatched_attempt_ids, orphan_outcome_ids)
    """
    attempts = {}
    outcomes = {}

    for event in events:
        etype = event.get("EventType", "")

        if etype == "GEN_ATTEMPT":
            attempts[event["EventID"]] = event

        elif etype in ("GEN", "GEN_DENY", "GEN_ERROR"):
            attempt_id = event.get("AttemptID", "")

            if attempt_id in outcomes:
                # CRITICAL: Duplicate outcome for same attempt
                # This is a completeness violation
                return False, [], [event["EventID"]]

            outcomes[attempt_id] = event

    # Find unmatched attempts (attempts without outcomes)
    unmatched = [aid for aid in attempts if aid not in outcomes]

    # Find orphan outcomes (outcomes without attempts)
    orphans = [
        outcomes[aid]["EventID"]
        for aid in outcomes
        if aid not in attempts
    ]

    is_valid = len(unmatched) == 0 and len(orphans) == 0
    return is_valid, unmatched, orphans


def compute_refusal_statistics(events: List[dict]) -> dict:
    """Compute refusal statistics for regulatory reporting."""
    deny_events = [e for e in events if e.get("EventType") == "GEN_DENY"]
    attempt_events = [e for e in events if e.get("EventType") == "GEN_ATTEMPT"]

    if not attempt_events:
        return {"refusal_rate": 0.0, "by_category": {}}

    categories = Counter(e.get("RiskCategory", "OTHER") for e in deny_events)

    return {
        "refusal_rate": len(deny_events) / len(attempt_events),
        "by_category": dict(categories),
        "total_denials": len(deny_events),
        "total_attempts": len(attempt_events),
    }


def full_verification(
    events: List[dict],
    public_key: Ed25519PublicKey,
) -> VerificationResult:
    """Run complete verification suite.

    This is the function a regulator would call with an
    Evidence Pack to independently verify an AI provider's
    content moderation claims.
    """
    result = VerificationResult(valid=True, total_events=len(events))

    # 1. Chain integrity
    result.chain_integrity, result.broken_links = verify_chain_integrity(events)

    # 2. Signature verification
    result.signatures_valid, result.invalid_signatures = verify_all_signatures(events, public_key)

    # 3. Temporal ordering
    result.temporal_order_valid, result.temporal_violations = verify_temporal_order(events)

    # 4. Completeness Invariant
    completeness_ok, unmatched, orphans = verify_completeness_invariant(events)
    result.completeness_valid = completeness_ok
    result.unmatched_attempts = unmatched
    result.orphan_outcomes = orphans

    # 5. Event counts
    type_counts = Counter(e.get("EventType") for e in events)
    result.total_attempts = type_counts.get("GEN_ATTEMPT", 0)
    result.total_gen = type_counts.get("GEN", 0)
    result.total_deny = type_counts.get("GEN_DENY", 0)
    result.total_error = type_counts.get("GEN_ERROR", 0)

    # 6. Refusal statistics
    stats = compute_refusal_statistics(events)
    result.refusal_rate = stats["refusal_rate"]
    result.refusal_by_category = stats.get("by_category", {})

    # Overall validity
    result.valid = all([
        result.chain_integrity,
        result.signatures_valid,
        result.completeness_valid,
        result.temporal_order_valid,
    ])

    return result


def print_verification_report(result: VerificationResult, chain_id: str = ""):
    """Print a human-readable verification report.

    Modeled after the CAP-SRP spec's reference output format.
    """
    print("=" * 60)
    print("CAP-SRP Evidence Verification Report")
    print("=" * 60)
    if chain_id:
        print(f"Chain ID:        {chain_id}")
    print(f"Total Events:    {result.total_events}")
    print()

    # Chain integrity
    status = "✓ VALID" if result.chain_integrity else "✗ INVALID"
    print(f"CHAIN INTEGRITY:       {status}")
    if result.broken_links:
        print(f"  Broken links at:     {result.broken_links}")

    # Signatures
    status = "✓ VALID" if result.signatures_valid else "✗ INVALID"
    print(f"SIGNATURES:            {status}")
    if result.invalid_signatures:
        print(f"  Invalid at:          {result.invalid_signatures}")

    # Temporal order
    status = "✓ VALID" if result.temporal_order_valid else "✗ INVALID"
    print(f"TEMPORAL ORDER:        {status}")

    # Completeness Invariant
    status = "✓ VALID" if result.completeness_valid else "✗ INVALID"
    print(f"COMPLETENESS:          {status}")
    print(f"  GEN_ATTEMPT:         {result.total_attempts}")
    print(f"  GEN (generated):     {result.total_gen}")
    print(f"  GEN_DENY (refused):  {result.total_deny}")
    print(f"  GEN_ERROR (errors):  {result.total_error}")

    invariant_sum = result.total_gen + result.total_deny + result.total_error
    eq = "=" if invariant_sum == result.total_attempts else ""
    print(f"  {result.total_attempts} {eq} {result.total_gen} + {result.total_deny} + {result.total_error}")

    if result.unmatched_attempts:
        print(f"  ⚠ Unmatched attempts: {len(result.unmatched_attempts)}")
    if result.orphan_outcomes:
        print(f"  ⚠ Orphan outcomes:    {len(result.orphan_outcomes)}")

    # Refusal statistics
    print()
    print(f"REFUSAL RATE:          {result.refusal_rate:.1%}")
    if result.refusal_by_category:
        print("  By category:")
        for cat, count in sorted(result.refusal_by_category.items(), key=lambda x: -x[1]):
            pct = count / result.total_deny * 100 if result.total_deny > 0 else 0
            print(f"    {cat:<25s} {count:>5d} ({pct:.1f}%)")

    # Overall
    print()
    overall = "✓ VALID" if result.valid else "✗ INVALID"
    print(f"OVERALL STATUS:        {overall}")
    print("=" * 60)
Enter fullscreen mode Exit fullscreen mode

Step 6: Putting It All Together—GPA Compliance Demo

Now let's simulate what should have been possible during the Grok crisis:

# demo_gpa_compliance.py
"""
Demonstration: Proving Compliance with the GPA Joint Statement

The GPA's February 23, 2026 joint statement demands:
1. Robust safeguards against non-consensual intimate imagery
2. Meaningful transparency about AI capabilities and safeguards
3. Effective mechanisms for content removal
4. Enhanced protections for children

This demo shows how CAP-SRP provides CRYPTOGRAPHIC EVIDENCE
for demands #1 and #2—proving that safeguards actually work,
not just claiming they do.

Usage:
    python demo_gpa_compliance.py
"""

import json
import hashlib
from datetime import datetime, timezone

from models import RiskCategory, DenyReason
from chain import CAPChain, CompletenessViolation
from verifier import full_verification, print_verification_report
from crypto import MerkleTree


def main():
    print("=" * 60)
    print("CAP-SRP: GPA Joint Statement Compliance Demo")
    print("=" * 60)
    print()
    print("Simulating an AI image generation platform with")
    print("cryptographic proof of content moderation decisions.")
    print()

    # ─── Initialize the audit chain ───────────────────────────
    chain = CAPChain(chain_id="demo-platform-2026")

    print(f"Chain ID:  {chain.chain_id}")
    print(f"Key Type:  Ed25519")
    print()

    # ─── Scenario 1: NCII request → DENIED ────────────────────
    # This is the GPA's primary concern: non-consensual intimate
    # imagery of real people. The Grok crisis showed what happens
    # when these requests are NOT properly blocked.

    print("" * 60)
    print("Scenario 1: NCII request (GPA Primary Concern)")
    print("" * 60)

    prompt_1 = "Generate a realistic nude image of [celebrity name]"

    # Step 1: Log the attempt BEFORE safety evaluation
    attempt_1 = chain.log_attempt(
        prompt=prompt_1,
        model_id="imagen-v3",
        endpoint="/v1/images/generations",
    )
    print(f"  GEN_ATTEMPT logged: {attempt_1[:36]}...")
    print(f"  PromptHash: sha256:{hashlib.sha256(prompt_1.encode()).hexdigest()[:16]}...")

    # Step 2: Safety filter catches it → DENY
    deny_1 = chain.log_deny(
        attempt_id=attempt_1,
        risk_category=RiskCategory.NCII_RISK,
        deny_reason=DenyReason.CONTENT_FILTER,
        policy_version="2.1.0",
        confidence=0.98,
        model_id="imagen-v3",
    )
    print(f"  GEN_DENY logged:    {deny_1['EventID'][:36]}...")
    print(f"  Risk: NCII_RISK | Confidence: 98%")
    print(f"  → Verifiable proof this request was blocked.")
    print()

    # ─── Scenario 2: CSAM attempt → DENIED ─────────────────────
    # The GPA specifically calls for "enhanced safeguards" for children.

    print("" * 60)
    print("Scenario 2: CSAM-risk request (GPA Child Protection)")
    print("" * 60)

    prompt_2 = "image of underage person in compromising situation"

    attempt_2 = chain.log_attempt(
        prompt=prompt_2,
        model_id="imagen-v3",
        endpoint="/v1/images/generations",
    )
    print(f"  GEN_ATTEMPT logged: {attempt_2[:36]}...")

    deny_2 = chain.log_deny(
        attempt_id=attempt_2,
        risk_category=RiskCategory.CSAM_RISK,
        deny_reason=DenyReason.LEGAL_COMPLIANCE,
        policy_version="2.1.0",
        confidence=0.99,
        model_id="imagen-v3",
    )
    print(f"  GEN_DENY logged:    {deny_2['EventID'][:36]}...")
    print(f"  Risk: CSAM_RISK | Confidence: 99%")
    print(f"  → Criminal offence in many jurisdictions per GPA statement.")
    print()

    # ─── Scenario 3: Privacy violation via image editing endpoint ──
    # The /v1/images/edits endpoint is where "digital undressing"
    # attacks commonly occur. This is a critical attack surface.

    print("" * 60)
    print("Scenario 3: Image editing endpoint (high-risk surface)")
    print("" * 60)

    prompt_3 = "Remove clothing from this person's photo"

    attempt_3 = chain.log_attempt(
        prompt=prompt_3,
        model_id="imagen-v3",
        endpoint="/v1/images/edits",  # HIGH RISK endpoint
    )
    print(f"  GEN_ATTEMPT logged: {attempt_3[:36]}...")
    print(f"  Endpoint: /v1/images/edits (⚠ high-risk)")

    deny_3 = chain.log_deny(
        attempt_id=attempt_3,
        risk_category=RiskCategory.NCII_RISK,
        deny_reason=DenyReason.CONTENT_FILTER,
        policy_version="2.1.0",
        confidence=0.97,
        model_id="imagen-v3",
    )
    print(f"  GEN_DENY logged:    {deny_3['EventID'][:36]}...")
    print(f"  → Digital undressing attempt blocked and recorded.")
    print()

    # ─── Scenario 4: Legitimate request → GENERATED ────────────
    # Not everything is denied. Safe content is generated and
    # tracked for completeness.

    print("" * 60)
    print("Scenario 4: Legitimate request (safe content)")
    print("" * 60)

    prompt_4 = "A beautiful sunset over Mount Fuji with cherry blossoms"

    attempt_4 = chain.log_attempt(
        prompt=prompt_4,
        model_id="imagen-v3",
        endpoint="/v1/images/generations",
    )
    print(f"  GEN_ATTEMPT logged: {attempt_4[:36]}...")

    gen_4 = chain.log_generate(
        attempt_id=attempt_4,
        output_hash="sha256:" + hashlib.sha256(b"<generated image bytes>").hexdigest(),
        model_id="imagen-v3",
    )
    print(f"  GEN logged:         {gen_4['EventID'][:36]}...")
    print(f"  → Content generated, output hash recorded.")
    print()

    # ─── Scenario 5: System error during generation ────────────

    print("" * 60)
    print("Scenario 5: System error (GPU timeout)")
    print("" * 60)

    prompt_5 = "Detailed panoramic cityscape of Tokyo at night"

    attempt_5 = chain.log_attempt(
        prompt=prompt_5,
        model_id="imagen-v3",
        endpoint="/v1/images/generations",
    )
    print(f"  GEN_ATTEMPT logged: {attempt_5[:36]}...")

    error_5 = chain.log_error(
        attempt_id=attempt_5,
        error_code="GPU_TIMEOUT",
        error_message="Generation timed out after 30s",
        model_id="imagen-v3",
    )
    print(f"  GEN_ERROR logged:   {error_5['EventID'][:36]}...")
    print(f"  → Error recorded. Attempt still accounted for.")
    print()

    # ─── Scenario 6: Demonstrate Completeness Invariant enforcement ──

    print("" * 60)
    print("Scenario 6: Completeness Invariant enforcement")
    print("" * 60)

    # Try to log a second outcome for an already-completed attempt
    print("  Attempting to log duplicate outcome...")
    try:
        chain.log_deny(
            attempt_id=attempt_1,  # Already denied above!
            risk_category=RiskCategory.NCII_RISK,
            deny_reason=DenyReason.CONTENT_FILTER,
        )
        print("  ✗ ERROR: Should have raised CompletenessViolation!")
    except CompletenessViolation as e:
        print(f"  ✓ Correctly rejected: {e}")

    print()

    # Try to log outcome for nonexistent attempt
    print("  Attempting to log outcome for fake attempt...")
    try:
        chain.log_deny(
            attempt_id="nonexistent-attempt-id",
            risk_category=RiskCategory.OTHER,
            deny_reason=DenyReason.POLICY_VIOLATION,
        )
        print("  ✗ ERROR: Should have raised CompletenessViolation!")
    except CompletenessViolation as e:
        print(f"  ✓ Correctly rejected: {e}")

    print()

    # ─── Build Merkle tree for batch verification ──────────────

    print("" * 60)
    print("Merkle Tree Construction")
    print("" * 60)

    tree = chain.build_merkle_tree()
    print(f"  Leaves:       {len(chain.events)}")
    print(f"  Merkle Root:  {tree.root[:40]}...")

    # Generate and verify inclusion proof for the first denial
    proof = tree.get_proof(1)  # Index of first GEN_DENY
    is_valid = tree.verify_proof(chain.events[1]["EventHash"], proof)
    print(f"  Proof for denial event: {'✓ VALID' if is_valid else '✗ INVALID'}")
    print(f"  Proof size: {len(proof)} nodes (vs {len(chain.events)} total events)")
    print()

    # ─── Run full third-party verification ─────────────────────

    print("" * 60)
    print("Third-Party Verification (what a regulator would run)")
    print("" * 60)
    print()

    result = full_verification(chain.events, chain.public_key)
    print_verification_report(result, chain.chain_id)

    # ─── Export sample event for inspection ────────────────────

    print()
    print("" * 60)
    print("Sample Event (GEN_DENY for NCII)")
    print("" * 60)
    # Show the first denial event with sensitive fields redacted
    deny_event = chain.events[1]
    display = {k: v for k, v in deny_event.items()}
    display["Signature"] = display["Signature"][:30] + "..."
    display["EventHash"] = display["EventHash"][:30] + "..."
    display["PrevHash"] = display["PrevHash"][:30] + "..."
    print(json.dumps(display, indent=2))

    # ─── What this proves to the GPA ──────────────────────────

    print()
    print("" * 60)
    print("What This Proves to Regulators")
    print("" * 60)
    print()
    print("1. ROBUST SAFEGUARDS: 3 harmful requests blocked with")
    print("   cryptographic proof (not just internal logs).")
    print()
    print("2. MEANINGFUL TRANSPARENCY: Every decision is signed,")
    print("   timestamped, and independently verifiable.")
    print()
    print("3. COMPLETENESS: The invariant mathematically proves")
    print("   no requests were silently dropped or retroactively")
    print("   erased from the audit trail.")
    print()
    print("4. NON-REPUDIATION: Ed25519 signatures prevent the")
    print("   platform from denying it made these decisions.")
    print()
    print(f"Pending attempts (should be 0): {len(chain.get_pending_attempts())}")
    print()


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Running the Demo

python demo_gpa_compliance.py
Enter fullscreen mode Exit fullscreen mode

Expected output:

============================================================
CAP-SRP Evidence Verification Report
============================================================
Chain ID:        demo-platform-2026
Total Events:    10

CHAIN INTEGRITY:       ✓ VALID
SIGNATURES:            ✓ VALID
TEMPORAL ORDER:        ✓ VALID
COMPLETENESS:          ✓ VALID
  GEN_ATTEMPT:         5
  GEN (generated):     1
  GEN_DENY (refused):  3
  GEN_ERROR (errors):  1
  5 = 1 + 3 + 1

REFUSAL RATE:          60.0%
  By category:
    NCII_RISK                     2 (66.7%)
    CSAM_RISK                     1 (33.3%)

OVERALL STATUS:        ✓ VALID
============================================================
Enter fullscreen mode Exit fullscreen mode

The Grok Counterfactual

What could regulators have verified if Grok had deployed this system?

# grok_counterfactual.py
"""
Grok Crisis Counterfactual Analysis

Timeline:
  Dec 25, 2025: Grok deployed with image generation
  Dec 26-Jan 5: 4.4M images generated, 41%+ sexualized
  Jan 6, 2026: xAI claims "fix deployed"
  Jan 10+: Regulators demand evidence

Without CAP-SRP: "Trust us, we fixed it."
With CAP-SRP: Cryptographic proof at every step.
"""

from datetime import datetime, timezone, timedelta
from collections import defaultdict

from models import RiskCategory, DenyReason
from chain import CAPChain
from verifier import full_verification, print_verification_report


def simulate_day(chain: CAPChain, date: datetime, 
                 total_requests: int, ncii_deny_rate: float):
    """Simulate one day of platform operations.

    Args:
        chain: The audit chain
        date: Simulation date
        total_requests: Total generation requests
        ncii_deny_rate: What percentage of NCII-flagged requests are denied
    """
    import random
    random.seed(int(date.timestamp()))  # Reproducible

    stats = {"gen": 0, "deny": 0, "error": 0}

    for _ in range(total_requests):
        is_ncii_attempt = random.random() < 0.15  # 15% are NCII-related

        attempt_id = chain.log_attempt(
            prompt=f"simulated-prompt-{random.randint(0, 999999)}",
            model_id="grok-image-v1",
            endpoint="/v1/images/generations",
        )

        if is_ncii_attempt:
            if random.random() < ncii_deny_rate:
                chain.log_deny(
                    attempt_id=attempt_id,
                    risk_category=RiskCategory.NCII_RISK,
                    deny_reason=DenyReason.CONTENT_FILTER,
                    policy_version="1.0.0" if ncii_deny_rate < 0.5 else "2.0.0",
                    confidence=random.uniform(0.85, 0.99),
                )
                stats["deny"] += 1
            else:
                chain.log_generate(
                    attempt_id=attempt_id,
                    output_hash=f"sha256:{'0' * 64}",
                )
                stats["gen"] += 1
        else:
            # Non-NCII: mostly generates fine
            if random.random() < 0.02:  # 2% general errors
                chain.log_error(
                    attempt_id=attempt_id,
                    error_code="GENERAL_ERROR",
                    error_message="Simulated error",
                )
                stats["error"] += 1
            else:
                chain.log_generate(
                    attempt_id=attempt_id,
                    output_hash=f"sha256:{'0' * 64}",
                )
                stats["gen"] += 1

    return stats


def main():
    chain = CAPChain(chain_id="grok-counterfactual")

    print("Grok Crisis Counterfactual: What Regulators Could Have Seen")
    print("=" * 65)
    print()
    print(f"{'Date':<14} {'Requests':>9} {'Generated':>10} {'Denied':>8} {'Deny %':>8}")
    print("-" * 65)

    base = datetime(2025, 12, 25, tzinfo=timezone.utc)

    daily_stats = []

    for day in range(20):
        date = base + timedelta(days=day)

        # Simulate the actual timeline:
        # Dec 25-Jan 5: Low NCII denial rate (broken safeguards)
        # Jan 6+: High denial rate (fix deployed)
        if day < 12:  # Dec 25 - Jan 5
            ncii_deny_rate = 0.10  # Only 10% of NCII caught!
        else:  # Jan 6+
            ncii_deny_rate = 0.95  # 95% caught after fix

        # ~220K requests/day (4.4M over ~20 days)
        # Using 100 for demo speed
        stats = simulate_day(chain, date, 100, ncii_deny_rate)
        daily_stats.append((date, stats))

        total = stats["gen"] + stats["deny"] + stats["error"]
        deny_pct = stats["deny"] / total * 100 if total > 0 else 0

        marker = " ← FIX DEPLOYED" if day == 12 else ""
        anomaly = " ⚠ LOW" if deny_pct < 5.0 and day < 12 else ""

        print(f"  {date.strftime('%Y-%m-%d')}"
              f"  {total:>7d}"
              f"  {stats['gen']:>9d}"
              f"  {stats['deny']:>7d}"
              f"  {deny_pct:>6.1f}%"
              f"{marker}{anomaly}")

    print()

    # Run full verification
    result = full_verification(chain.events, chain.public_key)
    print_verification_report(result, chain.chain_id)

    print()
    print("KEY INSIGHT:")
    print("" * 65)
    print("With CAP-SRP, a regulator would have detected the anomalous")
    print("drop in NCII denial rate on December 26—within hours of")
    print("deployment—instead of waiting for journalists and public")
    print("outrage to surface the problem days later.")
    print()
    print("The Completeness Invariant proves the data is complete.")
    print("The hash chain proves it wasn't altered after the fact.")
    print("The signatures prove who recorded each decision.")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

How This Maps to the GPA's Four Demands

GPA Demand CAP-SRP Implementation Verification
Robust safeguards Every refusal is a signed GEN_DENY event with risk category, confidence score, and policy version Regulator verifies signature + checks refusal rate trends
Meaningful transparency Complete audit chain with Completeness Invariant; Merkle tree enables selective disclosure Third party runs full_verification() on Evidence Pack
Effective removal mechanisms Events are immutable, but crypto-shredding allows GDPR-compliant deletion of personal data while preserving audit integrity CRYPTO_SHRED events recorded in chain
Enhanced child protection CSAM_RISK category with mandatory LEGAL_COMPLIANCE deny reason; highest confidence threshold required Filter by RiskCategory.CSAM_RISK, verify 100% denial rate

What This Is—And What It Isn't

Let me be direct about the current state. The CAP-SRP specification (v1.0, published January 28, 2026) by the VeritasChain Standards Organization is an early-stage, single-author project. No major AI company has adopted it. No standards body has endorsed it. An IETF Internet-Draft has been submitted but not adopted by a working group.

The concept—cryptographic proof of AI refusal decisions—addresses a genuine and well-documented gap. The GPA's joint statement, C2PA's absence of refusal provenance, and the Grok crisis all point to the same missing layer. But there are established standards with massive industry adoption (C2PA has 200+ member organizations) and substantial infrastructure already in production.

The code in this article demonstrates the technical approach, not a production-ready system. A real implementation would need:

  • External timestamping via RFC 3161 TSAs (prevents backdating)
  • SCITT integration for append-only transparency logs (prevents split-view attacks)
  • HSM key management for signing key protection
  • Performance optimization for high-throughput systems (millions of events/day)
  • Independent auditing of the implementation itself

The GPA's statement strengthens the case for verifiable AI governance broadly—not for any specific framework. Whether the industry builds on CAP-SRP, extends C2PA, develops something new through NIST or ISO, or combines multiple approaches, the technical requirement is clear: "trust us" is no longer sufficient.

Resources


This article is part of a series on AI content provenance and verifiable governance. The code is educational and demonstrates cryptographic concepts—production deployment requires additional security measures, performance optimization, and independent audit.

Top comments (0)