Yesterday, 61 data protection authorities from around the world published a joint statement demanding that AI companies prove their safeguards actually work. Not claim they work. Prove it.
The statement, coordinated through the Global Privacy Assembly, calls for "robust safeguards," "meaningful transparency," "effective mechanisms for individuals to request removal of harmful content," and "enhanced protections for children." Signatories include Canada's OPC, the EU's EDPB (Chair Anu Talus), Hong Kong's PCPD, Singapore's PDPC, the UK's ICO, and 56 other authorities spanning Albania to Uruguay.
Notably absent: the United States and Japan's PPC.
The statement deliberately avoids prescribing how to prove compliance. No mention of cryptographic audit trails, C2PA, watermarking, or any specific technical standard. It operates at a principles-and-policy level—telling organizations what they must achieve without prescribing how.
This is the gap. And it's the same gap that let the Grok crisis happen.
The Problem: "Trust Us" Doesn't Scale
When 35 U.S. state attorneys general sent their letter about Grok generating 4.4 million images in 9 days—with at least 41% being sexualized images—the core enforcement problem was this: there was no verifiable way to confirm that safety measures actually worked beyond xAI's own assurances.
The EU was demanding "operational evidence, not screenshots and declarations." The UK ICO launched a formal investigation. The Irish DPC opened proceedings. But every single regulator had to take the platform at its word about what the AI refused to generate.
This is what we call the negative evidence problem:
- C2PA can prove: "This content was generated by this system at this time."
- SynthID can prove: "This content has a Google watermark."
- Nothing currently deployed can prove: "This system refused to generate this content at this time."
The GPA's statement demands "meaningful transparency" about safeguards. Let's build a system that actually delivers it.
Architecture: The Completeness Invariant
The core concept is deceptively simple. Every generation request must produce exactly one cryptographically recorded outcome:
GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
The critical architectural insight: GEN_ATTEMPT is logged before the safety evaluation runs, creating an unforgeable commitment that a request existed regardless of what follows. If the equation doesn't balance, the audit trail is provably compromised.
This addresses five specific threats regulators face:
| Threat | Attack | Mitigation |
|---|---|---|
| Selective Logging | Log only favorable outcomes | Completeness Invariant |
| Log Modification | Alter historical records | Hash chain integrity |
| Backdating | Create records with false timestamps | External anchoring (RFC 3161) |
| Split-View | Show different logs to different parties | Merkle proofs |
| Fabrication | Create false refusal records | Attempt-outcome pairing |
Let's implement this from scratch.
Step 1: Project Setup
# Create project directory
mkdir cap-srp-demo && cd cap-srp-demo
# Install dependencies
pip install cryptography>=42.0.0
# That's it. No external services needed for the core.
Step 2: Event Data Model
Every AI interaction becomes a cryptographically signed event. Here's the complete data model:
# models.py
"""CAP-SRP Event Data Model.
Implements the event types defined in the CAP-SRP specification v1.0.
Reference: https://github.com/veritaschain/cap-spec
"""
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
import uuid
class EventType(str, Enum):
"""All event types in the CAP-SRP lifecycle."""
GEN_ATTEMPT = "GEN_ATTEMPT" # Logged BEFORE safety evaluation
GEN = "GEN" # Content was generated
GEN_DENY = "GEN_DENY" # Request was refused
GEN_ERROR = "GEN_ERROR" # System error during generation
class RiskCategory(str, Enum):
"""Risk categories for GEN_DENY events.
Aligned with the GPA joint statement's key concern areas.
"""
NCII_RISK = "NCII_RISK" # Non-consensual intimate imagery
CSAM_RISK = "CSAM_RISK" # Child sexual abuse material
HATE_CONTENT = "HATE_CONTENT" # Hate speech / discrimination
VIOLENCE = "VIOLENCE" # Graphic violence
PRIVACY_VIOLATION = "PRIVACY_VIOLATION" # Real person depiction without consent
DECEPTION = "DECEPTION" # Deepfakes / impersonation
OTHER = "OTHER"
class DenyReason(str, Enum):
"""Why a generation request was denied."""
POLICY_VIOLATION = "POLICY_VIOLATION"
CONTENT_FILTER = "CONTENT_FILTER"
RATE_LIMIT = "RATE_LIMIT"
USER_RESTRICTION = "USER_RESTRICTION"
LEGAL_COMPLIANCE = "LEGAL_COMPLIANCE"
def generate_event_id() -> str:
"""Generate a UUIDv7-style event ID.
UUIDv7 embeds a Unix timestamp in the most significant bits,
providing natural chronological ordering.
"""
return str(uuid.uuid7())
def now_iso() -> str:
"""Return current UTC time in ISO 8601 format."""
return datetime.now(timezone.utc).isoformat()
@dataclass
class CAPEvent:
"""Base CAP-SRP event.
Every event in the audit chain shares these fields.
The EventHash and Signature are computed after creation.
"""
EventID: str = field(default_factory=generate_event_id)
EventType: str = ""
ChainID: str = ""
PrevHash: str = ""
Timestamp: str = field(default_factory=now_iso)
# These are set during signing
EventHash: str = ""
Signature: str = ""
def to_dict(self) -> dict:
"""Convert to dictionary, excluding empty fields."""
return {k: v for k, v in asdict(self).items() if v}
@dataclass
class GenAttemptEvent(CAPEvent):
"""Logged BEFORE safety evaluation.
This is the critical event—it creates an unforgeable commitment
that a generation request exists. The PromptHash preserves
privacy while enabling verification.
"""
EventType: str = "GEN_ATTEMPT"
PromptHash: str = "" # SHA-256 of the original prompt
ModelID: str = "" # Which model received the request
SessionID: str = "" # Anonymized session identifier
Endpoint: str = "" # e.g., "/v1/images/generations"
RequestMetadata: dict = field(default_factory=dict)
@dataclass
class GenDenyEvent(CAPEvent):
"""Logged when a generation request is REFUSED.
This is what the GPA statement demands proof of:
verifiable evidence that harmful content was actually blocked.
"""
EventType: str = "GEN_DENY"
AttemptID: str = "" # Links back to GEN_ATTEMPT
RiskCategory: str = "" # What kind of harm was detected
DenyReason: str = "" # Why it was denied
PolicyVersion: str = "" # Which policy version caught it
ConfidenceScore: float = 0.0 # Safety classifier confidence
ModelID: str = ""
@dataclass
class GenEvent(CAPEvent):
"""Logged when content IS generated (passed safety checks)."""
EventType: str = "GEN"
AttemptID: str = ""
OutputHash: str = "" # SHA-256 of generated content
ModelID: str = ""
C2PAManifestHash: str = "" # Link to C2PA content credential
@dataclass
class GenErrorEvent(CAPEvent):
"""Logged when a system error prevents generation."""
EventType: str = "GEN_ERROR"
AttemptID: str = ""
ErrorCode: str = ""
ErrorMessage: str = ""
ModelID: str = ""
Step 3: Cryptographic Integrity Layer
The hash chain and signature system that makes the audit trail tamper-evident:
# crypto.py
"""Cryptographic integrity layer for CAP-SRP.
Implements:
- SHA-256 hash chains (tamper evidence)
- Ed25519 digital signatures (non-repudiation)
- Merkle tree construction (efficient verification)
All per RFC 8032 (Ed25519) and RFC 8785 (JSON Canonicalization).
"""
import hashlib
import json
import base64
from typing import List, Tuple, Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.asymmetric import ed25519
def json_canonicalize(obj: dict) -> str:
"""Canonicalize JSON per RFC 8785 (simplified).
RFC 8785 defines deterministic serialization so that
the same logical object always produces the same bytes.
This is essential—without it, hash verification fails
if fields are reordered.
"""
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def compute_event_hash(event: dict) -> str:
"""Compute SHA-256 hash of canonicalized event.
The Signature field is excluded before hashing to avoid
a circular dependency (hash → sign → include signature → rehash).
Args:
event: Event dictionary
Returns:
Hash string in format "sha256:{hex_digest}"
"""
# Remove mutable fields before hashing
event_copy = {k: v for k, v in event.items()
if k not in ("Signature", "EventHash")}
canonical = json_canonicalize(event_copy)
hash_bytes = hashlib.sha256(canonical.encode("utf-8")).digest()
return f"sha256:{hash_bytes.hex()}"
def compute_prompt_hash(prompt: str) -> str:
"""Hash a prompt for privacy-preserving storage.
The actual prompt text is NEVER stored. Only the hash.
This satisfies GDPR requirements while enabling verification:
a regulator with the original prompt can verify it matches.
"""
return f"sha256:{hashlib.sha256(prompt.encode('utf-8')).hexdigest()}"
def generate_keypair() -> Tuple[Ed25519PrivateKey, Ed25519PublicKey]:
"""Generate an Ed25519 signing keypair.
In production (Gold level), the private key would live
in an HSM (Hardware Security Module). For Bronze/Silver,
file-based key storage with appropriate permissions.
"""
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
return private_key, public_key
def sign_event(event: dict, private_key: Ed25519PrivateKey) -> dict:
"""Compute hash and sign an event.
Flow:
1. Compute SHA-256 of canonicalized event (excluding Signature)
2. Store hash in EventHash field
3. Sign the hash bytes with Ed25519
4. Store signature in Signature field
Args:
event: Event dictionary (EventHash and Signature will be set)
private_key: Ed25519 signing key
Returns:
Event dictionary with EventHash and Signature populated
"""
# Step 1: Compute hash
event_hash = compute_event_hash(event)
event["EventHash"] = event_hash
# Step 2: Sign the hash bytes
hash_bytes = bytes.fromhex(event_hash[7:]) # Remove "sha256:" prefix
signature = private_key.sign(hash_bytes)
event["Signature"] = f"ed25519:{base64.b64encode(signature).decode()}"
return event
def verify_signature(event: dict, public_key: Ed25519PublicKey) -> bool:
"""Verify Ed25519 signature on an event.
Args:
event: Signed event dictionary
public_key: Ed25519 verification key
Returns:
True if signature is valid
"""
sig_str = event.get("Signature", "")
if not sig_str.startswith("ed25519:"):
return False
try:
signature = base64.b64decode(sig_str[8:])
hash_bytes = bytes.fromhex(event["EventHash"][7:])
public_key.verify(signature, hash_bytes)
return True
except Exception:
return False
class MerkleTree:
"""Merkle tree for batch verification and inclusion proofs.
Enables efficient verification of individual events within
large batches. A regulator can verify a specific refusal
exists in a batch of millions without processing every event.
"""
def __init__(self, leaves: List[str]):
"""Build tree from event hashes.
Args:
leaves: List of event hash strings (sha256:...)
"""
self.leaves = [self._to_bytes(h) for h in leaves]
self.tree = self._build()
@staticmethod
def _to_bytes(hash_str: str) -> bytes:
"""Convert 'sha256:hex' to raw bytes."""
return bytes.fromhex(hash_str[7:]) if hash_str.startswith("sha256:") else bytes.fromhex(hash_str)
@staticmethod
def _hash_pair(left: bytes, right: bytes) -> bytes:
"""Hash two nodes together. Always hash in sorted order
to make the tree order-independent."""
if left > right:
left, right = right, left
return hashlib.sha256(left + right).digest()
def _build(self) -> List[List[bytes]]:
"""Build the Merkle tree bottom-up."""
if not self.leaves:
return [[hashlib.sha256(b"empty").digest()]]
tree = [self.leaves[:]]
current = self.leaves[:]
while len(current) > 1:
next_level = []
for i in range(0, len(current), 2):
if i + 1 < len(current):
next_level.append(self._hash_pair(current[i], current[i + 1]))
else:
# Odd node: promote to next level
next_level.append(current[i])
tree.append(next_level)
current = next_level
return tree
@property
def root(self) -> str:
"""Get the Merkle root hash."""
return f"sha256:{self.tree[-1][0].hex()}"
def get_proof(self, index: int) -> List[dict]:
"""Generate inclusion proof for a leaf.
The proof is a list of sibling hashes that, combined with
the target leaf, reproduce the root. This lets a verifier
confirm an event exists in the batch without seeing all events.
"""
if index >= len(self.leaves):
raise IndexError(f"Leaf index {index} out of range")
proof = []
idx = index
for level in self.tree[:-1]:
if idx % 2 == 0:
sibling_idx = idx + 1
position = "right"
else:
sibling_idx = idx - 1
position = "left"
if sibling_idx < len(level):
proof.append({
"hash": f"sha256:{level[sibling_idx].hex()}",
"position": position,
})
idx //= 2
return proof
def verify_proof(self, leaf_hash: str, proof: List[dict]) -> bool:
"""Verify a Merkle inclusion proof against the root."""
current = self._to_bytes(leaf_hash)
for step in proof:
sibling = self._to_bytes(step["hash"])
if step["position"] == "left":
current = self._hash_pair(sibling, current)
else:
current = self._hash_pair(current, sibling)
return current == self.tree[-1][0]
Step 4: The Audit Chain
This is where everything comes together—the append-only chain that enforces the Completeness Invariant:
# chain.py
"""CAP-SRP Audit Chain.
The chain is the core of the system: an append-only, hash-linked
sequence of events where every generation attempt must have exactly
one recorded outcome.
Think of it as a flight recorder for AI content decisions.
"""
from datetime import datetime, timezone
from typing import List, Optional, Tuple
from dataclasses import asdict
from models import (
CAPEvent, GenAttemptEvent, GenDenyEvent, GenEvent, GenErrorEvent,
EventType, RiskCategory, DenyReason, generate_event_id, now_iso,
)
from crypto import (
compute_event_hash, compute_prompt_hash, sign_event,
verify_signature, generate_keypair, MerkleTree,
Ed25519PrivateKey, Ed25519PublicKey,
)
class CompletenessViolation(Exception):
"""Raised when the Completeness Invariant is broken."""
pass
class ChainIntegrityError(Exception):
"""Raised when the hash chain is tampered with."""
pass
class CAPChain:
"""Append-only audit chain with Completeness Invariant enforcement.
Usage:
chain = CAPChain("my-platform")
# Every request follows this pattern:
attempt_id = chain.log_attempt(prompt, model_id, endpoint)
# Then exactly ONE of:
chain.log_deny(attempt_id, risk_category, reason)
chain.log_generate(attempt_id, output_hash)
chain.log_error(attempt_id, error_code, error_message)
"""
def __init__(self, chain_id: str, private_key: Optional[Ed25519PrivateKey] = None):
"""Initialize a new audit chain.
Args:
chain_id: Unique identifier for this chain (e.g., platform name)
private_key: Ed25519 signing key (generated if not provided)
"""
self.chain_id = chain_id
self.events: List[dict] = []
if private_key:
self.private_key = private_key
self.public_key = private_key.public_key()
else:
self.private_key, self.public_key = generate_keypair()
# Track attempt→outcome mapping for invariant
self._pending_attempts: dict = {} # attempt_id → event
self._completed_attempts: set = set()
@property
def prev_hash(self) -> str:
"""Hash of the last event in the chain (or genesis marker)."""
if self.events:
return self.events[-1]["EventHash"]
return "sha256:" + "0" * 64 # Genesis
def _append(self, event: CAPEvent) -> dict:
"""Append a signed event to the chain.
Internal method that handles:
1. Setting chain linkage (PrevHash)
2. Computing the event hash
3. Signing with Ed25519
4. Appending to the chain
"""
event.ChainID = self.chain_id
event.PrevHash = self.prev_hash
event_dict = event.to_dict()
signed = sign_event(event_dict, self.private_key)
self.events.append(signed)
return signed
def log_attempt(
self,
prompt: str,
model_id: str,
endpoint: str = "/v1/images/generations",
session_id: str = "",
) -> str:
"""Log a generation attempt BEFORE safety evaluation.
This is the most critical operation. By recording the attempt
before the safety check runs, we create an unforgeable commitment
that cannot be retroactively erased.
Args:
prompt: The user's prompt (will be hashed, never stored raw)
model_id: Which model received the request
endpoint: API endpoint (important: /v1/images/edits is high-risk)
session_id: Anonymized session ID
Returns:
The attempt's EventID (needed for outcome logging)
"""
event = GenAttemptEvent(
PromptHash=compute_prompt_hash(prompt),
ModelID=model_id,
Endpoint=endpoint,
SessionID=session_id or generate_event_id(),
)
signed = self._append(event)
self._pending_attempts[signed["EventID"]] = signed
return signed["EventID"]
def log_deny(
self,
attempt_id: str,
risk_category: RiskCategory,
deny_reason: DenyReason = DenyReason.CONTENT_FILTER,
policy_version: str = "1.0.0",
confidence: float = 0.95,
model_id: str = "",
) -> dict:
"""Log a content refusal.
This is what the GPA statement demands proof of.
The event cryptographically links back to the attempt,
creating verifiable evidence that:
1. A request was received (GEN_ATTEMPT exists)
2. It was evaluated against a specific policy version
3. It was denied for a specific reason
4. The denial is timestamped and signed
Args:
attempt_id: The GEN_ATTEMPT EventID
risk_category: What kind of harm was detected
deny_reason: Why it was denied
policy_version: Which safety policy version
confidence: Safety classifier confidence score
"""
if attempt_id not in self._pending_attempts:
if attempt_id in self._completed_attempts:
raise CompletenessViolation(
f"Attempt {attempt_id} already has an outcome. "
"The Completeness Invariant requires exactly ONE outcome per attempt."
)
raise CompletenessViolation(
f"No pending attempt found for {attempt_id}. "
"You must log GEN_ATTEMPT before logging an outcome."
)
event = GenDenyEvent(
AttemptID=attempt_id,
RiskCategory=risk_category.value,
DenyReason=deny_reason.value,
PolicyVersion=policy_version,
ConfidenceScore=confidence,
ModelID=model_id,
)
signed = self._append(event)
# Update tracking
del self._pending_attempts[attempt_id]
self._completed_attempts.add(attempt_id)
return signed
def log_generate(
self,
attempt_id: str,
output_hash: str,
model_id: str = "",
c2pa_manifest_hash: str = "",
) -> dict:
"""Log successful content generation."""
if attempt_id not in self._pending_attempts:
if attempt_id in self._completed_attempts:
raise CompletenessViolation(
f"Attempt {attempt_id} already has an outcome."
)
raise CompletenessViolation(
f"No pending attempt found for {attempt_id}."
)
event = GenEvent(
AttemptID=attempt_id,
OutputHash=output_hash,
ModelID=model_id,
C2PAManifestHash=c2pa_manifest_hash,
)
signed = self._append(event)
del self._pending_attempts[attempt_id]
self._completed_attempts.add(attempt_id)
return signed
def log_error(
self,
attempt_id: str,
error_code: str,
error_message: str,
model_id: str = "",
) -> dict:
"""Log a system error during generation."""
if attempt_id not in self._pending_attempts:
if attempt_id in self._completed_attempts:
raise CompletenessViolation(
f"Attempt {attempt_id} already has an outcome."
)
raise CompletenessViolation(
f"No pending attempt found for {attempt_id}."
)
event = GenErrorEvent(
AttemptID=attempt_id,
ErrorCode=error_code,
ErrorMessage=error_message,
ModelID=model_id,
)
signed = self._append(event)
del self._pending_attempts[attempt_id]
self._completed_attempts.add(attempt_id)
return signed
def get_pending_attempts(self) -> List[str]:
"""Return attempt IDs that don't yet have outcomes.
In a healthy system, this should be near-zero or contain
only very recent attempts still being processed. A large
pending count is a red flag.
"""
return list(self._pending_attempts.keys())
def build_merkle_tree(self) -> MerkleTree:
"""Build a Merkle tree over all events for batch verification."""
hashes = [e["EventHash"] for e in self.events]
return MerkleTree(hashes)
Step 5: The Verification Engine
This is what regulators actually run. An independent verifier that checks the entire chain:
# verifier.py
"""CAP-SRP Verification Engine.
This module is designed to be run by THIRD PARTIES—regulators,
auditors, researchers—who receive an Evidence Pack and need to
independently verify its integrity without trusting the AI provider.
Verification checks:
1. Hash chain integrity (no tampering)
2. Signature validity (non-repudiation)
3. Completeness Invariant (no missing events)
4. Temporal ordering (no backdating within chain)
5. Merkle proof verification (batch consistency)
"""
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple
from collections import Counter
from crypto import compute_event_hash, verify_signature, MerkleTree, Ed25519PublicKey
@dataclass
class VerificationResult:
"""Comprehensive verification report."""
valid: bool
chain_integrity: bool = True
signatures_valid: bool = True
completeness_valid: bool = True
temporal_order_valid: bool = True
total_events: int = 0
total_attempts: int = 0
total_gen: int = 0
total_deny: int = 0
total_error: int = 0
unmatched_attempts: List[str] = None
orphan_outcomes: List[str] = None
broken_links: List[int] = None
invalid_signatures: List[int] = None
temporal_violations: List[int] = None
# Statistics for the regulatory report
refusal_rate: float = 0.0
refusal_by_category: dict = None
def __post_init__(self):
self.unmatched_attempts = self.unmatched_attempts or []
self.orphan_outcomes = self.orphan_outcomes or []
self.broken_links = self.broken_links or []
self.invalid_signatures = self.invalid_signatures or []
self.temporal_violations = self.temporal_violations or []
self.refusal_by_category = self.refusal_by_category or {}
def verify_chain_integrity(events: List[dict]) -> Tuple[bool, List[int]]:
"""Verify hash chain linkage.
Each event's PrevHash must match the previous event's EventHash.
If any link is broken, the chain has been tampered with.
"""
broken = []
for i, event in enumerate(events):
# Verify hash computation
computed = compute_event_hash(event)
if event.get("EventHash") != computed:
broken.append(i)
continue
# Verify chain linkage
if i > 0:
expected_prev = events[i - 1]["EventHash"]
if event.get("PrevHash") != expected_prev:
broken.append(i)
return len(broken) == 0, broken
def verify_all_signatures(events: List[dict], public_key: Ed25519PublicKey) -> Tuple[bool, List[int]]:
"""Verify Ed25519 signatures on all events."""
invalid = []
for i, event in enumerate(events):
if not verify_signature(event, public_key):
invalid.append(i)
return len(invalid) == 0, invalid
def verify_temporal_order(events: List[dict]) -> Tuple[bool, List[int]]:
"""Verify events are in chronological order.
While external anchoring (RFC 3161) provides absolute time
guarantees, basic temporal ordering within the chain catches
obvious backdating within a single chain.
"""
violations = []
for i in range(1, len(events)):
try:
t_prev = datetime.fromisoformat(events[i - 1]["Timestamp"])
t_curr = datetime.fromisoformat(events[i]["Timestamp"])
if t_curr < t_prev:
violations.append(i)
except (KeyError, ValueError):
violations.append(i)
return len(violations) == 0, violations
def verify_completeness_invariant(events: List[dict]) -> Tuple[bool, List[str], List[str]]:
"""Verify the Completeness Invariant.
GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
Every attempt must have exactly one outcome.
No outcome should exist without a corresponding attempt.
Returns:
(is_valid, unmatched_attempt_ids, orphan_outcome_ids)
"""
attempts = {}
outcomes = {}
for event in events:
etype = event.get("EventType", "")
if etype == "GEN_ATTEMPT":
attempts[event["EventID"]] = event
elif etype in ("GEN", "GEN_DENY", "GEN_ERROR"):
attempt_id = event.get("AttemptID", "")
if attempt_id in outcomes:
# CRITICAL: Duplicate outcome for same attempt
# This is a completeness violation
return False, [], [event["EventID"]]
outcomes[attempt_id] = event
# Find unmatched attempts (attempts without outcomes)
unmatched = [aid for aid in attempts if aid not in outcomes]
# Find orphan outcomes (outcomes without attempts)
orphans = [
outcomes[aid]["EventID"]
for aid in outcomes
if aid not in attempts
]
is_valid = len(unmatched) == 0 and len(orphans) == 0
return is_valid, unmatched, orphans
def compute_refusal_statistics(events: List[dict]) -> dict:
"""Compute refusal statistics for regulatory reporting."""
deny_events = [e for e in events if e.get("EventType") == "GEN_DENY"]
attempt_events = [e for e in events if e.get("EventType") == "GEN_ATTEMPT"]
if not attempt_events:
return {"refusal_rate": 0.0, "by_category": {}}
categories = Counter(e.get("RiskCategory", "OTHER") for e in deny_events)
return {
"refusal_rate": len(deny_events) / len(attempt_events),
"by_category": dict(categories),
"total_denials": len(deny_events),
"total_attempts": len(attempt_events),
}
def full_verification(
events: List[dict],
public_key: Ed25519PublicKey,
) -> VerificationResult:
"""Run complete verification suite.
This is the function a regulator would call with an
Evidence Pack to independently verify an AI provider's
content moderation claims.
"""
result = VerificationResult(valid=True, total_events=len(events))
# 1. Chain integrity
result.chain_integrity, result.broken_links = verify_chain_integrity(events)
# 2. Signature verification
result.signatures_valid, result.invalid_signatures = verify_all_signatures(events, public_key)
# 3. Temporal ordering
result.temporal_order_valid, result.temporal_violations = verify_temporal_order(events)
# 4. Completeness Invariant
completeness_ok, unmatched, orphans = verify_completeness_invariant(events)
result.completeness_valid = completeness_ok
result.unmatched_attempts = unmatched
result.orphan_outcomes = orphans
# 5. Event counts
type_counts = Counter(e.get("EventType") for e in events)
result.total_attempts = type_counts.get("GEN_ATTEMPT", 0)
result.total_gen = type_counts.get("GEN", 0)
result.total_deny = type_counts.get("GEN_DENY", 0)
result.total_error = type_counts.get("GEN_ERROR", 0)
# 6. Refusal statistics
stats = compute_refusal_statistics(events)
result.refusal_rate = stats["refusal_rate"]
result.refusal_by_category = stats.get("by_category", {})
# Overall validity
result.valid = all([
result.chain_integrity,
result.signatures_valid,
result.completeness_valid,
result.temporal_order_valid,
])
return result
def print_verification_report(result: VerificationResult, chain_id: str = ""):
"""Print a human-readable verification report.
Modeled after the CAP-SRP spec's reference output format.
"""
print("=" * 60)
print("CAP-SRP Evidence Verification Report")
print("=" * 60)
if chain_id:
print(f"Chain ID: {chain_id}")
print(f"Total Events: {result.total_events}")
print()
# Chain integrity
status = "✓ VALID" if result.chain_integrity else "✗ INVALID"
print(f"CHAIN INTEGRITY: {status}")
if result.broken_links:
print(f" Broken links at: {result.broken_links}")
# Signatures
status = "✓ VALID" if result.signatures_valid else "✗ INVALID"
print(f"SIGNATURES: {status}")
if result.invalid_signatures:
print(f" Invalid at: {result.invalid_signatures}")
# Temporal order
status = "✓ VALID" if result.temporal_order_valid else "✗ INVALID"
print(f"TEMPORAL ORDER: {status}")
# Completeness Invariant
status = "✓ VALID" if result.completeness_valid else "✗ INVALID"
print(f"COMPLETENESS: {status}")
print(f" GEN_ATTEMPT: {result.total_attempts}")
print(f" GEN (generated): {result.total_gen}")
print(f" GEN_DENY (refused): {result.total_deny}")
print(f" GEN_ERROR (errors): {result.total_error}")
invariant_sum = result.total_gen + result.total_deny + result.total_error
eq = "=" if invariant_sum == result.total_attempts else "≠"
print(f" {result.total_attempts} {eq} {result.total_gen} + {result.total_deny} + {result.total_error}")
if result.unmatched_attempts:
print(f" ⚠ Unmatched attempts: {len(result.unmatched_attempts)}")
if result.orphan_outcomes:
print(f" ⚠ Orphan outcomes: {len(result.orphan_outcomes)}")
# Refusal statistics
print()
print(f"REFUSAL RATE: {result.refusal_rate:.1%}")
if result.refusal_by_category:
print(" By category:")
for cat, count in sorted(result.refusal_by_category.items(), key=lambda x: -x[1]):
pct = count / result.total_deny * 100 if result.total_deny > 0 else 0
print(f" {cat:<25s} {count:>5d} ({pct:.1f}%)")
# Overall
print()
overall = "✓ VALID" if result.valid else "✗ INVALID"
print(f"OVERALL STATUS: {overall}")
print("=" * 60)
Step 6: Putting It All Together—GPA Compliance Demo
Now let's simulate what should have been possible during the Grok crisis:
# demo_gpa_compliance.py
"""
Demonstration: Proving Compliance with the GPA Joint Statement
The GPA's February 23, 2026 joint statement demands:
1. Robust safeguards against non-consensual intimate imagery
2. Meaningful transparency about AI capabilities and safeguards
3. Effective mechanisms for content removal
4. Enhanced protections for children
This demo shows how CAP-SRP provides CRYPTOGRAPHIC EVIDENCE
for demands #1 and #2—proving that safeguards actually work,
not just claiming they do.
Usage:
python demo_gpa_compliance.py
"""
import json
import hashlib
from datetime import datetime, timezone
from models import RiskCategory, DenyReason
from chain import CAPChain, CompletenessViolation
from verifier import full_verification, print_verification_report
from crypto import MerkleTree
def main():
print("=" * 60)
print("CAP-SRP: GPA Joint Statement Compliance Demo")
print("=" * 60)
print()
print("Simulating an AI image generation platform with")
print("cryptographic proof of content moderation decisions.")
print()
# ─── Initialize the audit chain ───────────────────────────
chain = CAPChain(chain_id="demo-platform-2026")
print(f"Chain ID: {chain.chain_id}")
print(f"Key Type: Ed25519")
print()
# ─── Scenario 1: NCII request → DENIED ────────────────────
# This is the GPA's primary concern: non-consensual intimate
# imagery of real people. The Grok crisis showed what happens
# when these requests are NOT properly blocked.
print("─" * 60)
print("Scenario 1: NCII request (GPA Primary Concern)")
print("─" * 60)
prompt_1 = "Generate a realistic nude image of [celebrity name]"
# Step 1: Log the attempt BEFORE safety evaluation
attempt_1 = chain.log_attempt(
prompt=prompt_1,
model_id="imagen-v3",
endpoint="/v1/images/generations",
)
print(f" GEN_ATTEMPT logged: {attempt_1[:36]}...")
print(f" PromptHash: sha256:{hashlib.sha256(prompt_1.encode()).hexdigest()[:16]}...")
# Step 2: Safety filter catches it → DENY
deny_1 = chain.log_deny(
attempt_id=attempt_1,
risk_category=RiskCategory.NCII_RISK,
deny_reason=DenyReason.CONTENT_FILTER,
policy_version="2.1.0",
confidence=0.98,
model_id="imagen-v3",
)
print(f" GEN_DENY logged: {deny_1['EventID'][:36]}...")
print(f" Risk: NCII_RISK | Confidence: 98%")
print(f" → Verifiable proof this request was blocked.")
print()
# ─── Scenario 2: CSAM attempt → DENIED ─────────────────────
# The GPA specifically calls for "enhanced safeguards" for children.
print("─" * 60)
print("Scenario 2: CSAM-risk request (GPA Child Protection)")
print("─" * 60)
prompt_2 = "image of underage person in compromising situation"
attempt_2 = chain.log_attempt(
prompt=prompt_2,
model_id="imagen-v3",
endpoint="/v1/images/generations",
)
print(f" GEN_ATTEMPT logged: {attempt_2[:36]}...")
deny_2 = chain.log_deny(
attempt_id=attempt_2,
risk_category=RiskCategory.CSAM_RISK,
deny_reason=DenyReason.LEGAL_COMPLIANCE,
policy_version="2.1.0",
confidence=0.99,
model_id="imagen-v3",
)
print(f" GEN_DENY logged: {deny_2['EventID'][:36]}...")
print(f" Risk: CSAM_RISK | Confidence: 99%")
print(f" → Criminal offence in many jurisdictions per GPA statement.")
print()
# ─── Scenario 3: Privacy violation via image editing endpoint ──
# The /v1/images/edits endpoint is where "digital undressing"
# attacks commonly occur. This is a critical attack surface.
print("─" * 60)
print("Scenario 3: Image editing endpoint (high-risk surface)")
print("─" * 60)
prompt_3 = "Remove clothing from this person's photo"
attempt_3 = chain.log_attempt(
prompt=prompt_3,
model_id="imagen-v3",
endpoint="/v1/images/edits", # HIGH RISK endpoint
)
print(f" GEN_ATTEMPT logged: {attempt_3[:36]}...")
print(f" Endpoint: /v1/images/edits (⚠ high-risk)")
deny_3 = chain.log_deny(
attempt_id=attempt_3,
risk_category=RiskCategory.NCII_RISK,
deny_reason=DenyReason.CONTENT_FILTER,
policy_version="2.1.0",
confidence=0.97,
model_id="imagen-v3",
)
print(f" GEN_DENY logged: {deny_3['EventID'][:36]}...")
print(f" → Digital undressing attempt blocked and recorded.")
print()
# ─── Scenario 4: Legitimate request → GENERATED ────────────
# Not everything is denied. Safe content is generated and
# tracked for completeness.
print("─" * 60)
print("Scenario 4: Legitimate request (safe content)")
print("─" * 60)
prompt_4 = "A beautiful sunset over Mount Fuji with cherry blossoms"
attempt_4 = chain.log_attempt(
prompt=prompt_4,
model_id="imagen-v3",
endpoint="/v1/images/generations",
)
print(f" GEN_ATTEMPT logged: {attempt_4[:36]}...")
gen_4 = chain.log_generate(
attempt_id=attempt_4,
output_hash="sha256:" + hashlib.sha256(b"<generated image bytes>").hexdigest(),
model_id="imagen-v3",
)
print(f" GEN logged: {gen_4['EventID'][:36]}...")
print(f" → Content generated, output hash recorded.")
print()
# ─── Scenario 5: System error during generation ────────────
print("─" * 60)
print("Scenario 5: System error (GPU timeout)")
print("─" * 60)
prompt_5 = "Detailed panoramic cityscape of Tokyo at night"
attempt_5 = chain.log_attempt(
prompt=prompt_5,
model_id="imagen-v3",
endpoint="/v1/images/generations",
)
print(f" GEN_ATTEMPT logged: {attempt_5[:36]}...")
error_5 = chain.log_error(
attempt_id=attempt_5,
error_code="GPU_TIMEOUT",
error_message="Generation timed out after 30s",
model_id="imagen-v3",
)
print(f" GEN_ERROR logged: {error_5['EventID'][:36]}...")
print(f" → Error recorded. Attempt still accounted for.")
print()
# ─── Scenario 6: Demonstrate Completeness Invariant enforcement ──
print("─" * 60)
print("Scenario 6: Completeness Invariant enforcement")
print("─" * 60)
# Try to log a second outcome for an already-completed attempt
print(" Attempting to log duplicate outcome...")
try:
chain.log_deny(
attempt_id=attempt_1, # Already denied above!
risk_category=RiskCategory.NCII_RISK,
deny_reason=DenyReason.CONTENT_FILTER,
)
print(" ✗ ERROR: Should have raised CompletenessViolation!")
except CompletenessViolation as e:
print(f" ✓ Correctly rejected: {e}")
print()
# Try to log outcome for nonexistent attempt
print(" Attempting to log outcome for fake attempt...")
try:
chain.log_deny(
attempt_id="nonexistent-attempt-id",
risk_category=RiskCategory.OTHER,
deny_reason=DenyReason.POLICY_VIOLATION,
)
print(" ✗ ERROR: Should have raised CompletenessViolation!")
except CompletenessViolation as e:
print(f" ✓ Correctly rejected: {e}")
print()
# ─── Build Merkle tree for batch verification ──────────────
print("─" * 60)
print("Merkle Tree Construction")
print("─" * 60)
tree = chain.build_merkle_tree()
print(f" Leaves: {len(chain.events)}")
print(f" Merkle Root: {tree.root[:40]}...")
# Generate and verify inclusion proof for the first denial
proof = tree.get_proof(1) # Index of first GEN_DENY
is_valid = tree.verify_proof(chain.events[1]["EventHash"], proof)
print(f" Proof for denial event: {'✓ VALID' if is_valid else '✗ INVALID'}")
print(f" Proof size: {len(proof)} nodes (vs {len(chain.events)} total events)")
print()
# ─── Run full third-party verification ─────────────────────
print("─" * 60)
print("Third-Party Verification (what a regulator would run)")
print("─" * 60)
print()
result = full_verification(chain.events, chain.public_key)
print_verification_report(result, chain.chain_id)
# ─── Export sample event for inspection ────────────────────
print()
print("─" * 60)
print("Sample Event (GEN_DENY for NCII)")
print("─" * 60)
# Show the first denial event with sensitive fields redacted
deny_event = chain.events[1]
display = {k: v for k, v in deny_event.items()}
display["Signature"] = display["Signature"][:30] + "..."
display["EventHash"] = display["EventHash"][:30] + "..."
display["PrevHash"] = display["PrevHash"][:30] + "..."
print(json.dumps(display, indent=2))
# ─── What this proves to the GPA ──────────────────────────
print()
print("═" * 60)
print("What This Proves to Regulators")
print("═" * 60)
print()
print("1. ROBUST SAFEGUARDS: 3 harmful requests blocked with")
print(" cryptographic proof (not just internal logs).")
print()
print("2. MEANINGFUL TRANSPARENCY: Every decision is signed,")
print(" timestamped, and independently verifiable.")
print()
print("3. COMPLETENESS: The invariant mathematically proves")
print(" no requests were silently dropped or retroactively")
print(" erased from the audit trail.")
print()
print("4. NON-REPUDIATION: Ed25519 signatures prevent the")
print(" platform from denying it made these decisions.")
print()
print(f"Pending attempts (should be 0): {len(chain.get_pending_attempts())}")
print()
if __name__ == "__main__":
main()
Running the Demo
python demo_gpa_compliance.py
Expected output:
============================================================
CAP-SRP Evidence Verification Report
============================================================
Chain ID: demo-platform-2026
Total Events: 10
CHAIN INTEGRITY: ✓ VALID
SIGNATURES: ✓ VALID
TEMPORAL ORDER: ✓ VALID
COMPLETENESS: ✓ VALID
GEN_ATTEMPT: 5
GEN (generated): 1
GEN_DENY (refused): 3
GEN_ERROR (errors): 1
5 = 1 + 3 + 1
REFUSAL RATE: 60.0%
By category:
NCII_RISK 2 (66.7%)
CSAM_RISK 1 (33.3%)
OVERALL STATUS: ✓ VALID
============================================================
The Grok Counterfactual
What could regulators have verified if Grok had deployed this system?
# grok_counterfactual.py
"""
Grok Crisis Counterfactual Analysis
Timeline:
Dec 25, 2025: Grok deployed with image generation
Dec 26-Jan 5: 4.4M images generated, 41%+ sexualized
Jan 6, 2026: xAI claims "fix deployed"
Jan 10+: Regulators demand evidence
Without CAP-SRP: "Trust us, we fixed it."
With CAP-SRP: Cryptographic proof at every step.
"""
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from models import RiskCategory, DenyReason
from chain import CAPChain
from verifier import full_verification, print_verification_report
def simulate_day(chain: CAPChain, date: datetime,
total_requests: int, ncii_deny_rate: float):
"""Simulate one day of platform operations.
Args:
chain: The audit chain
date: Simulation date
total_requests: Total generation requests
ncii_deny_rate: What percentage of NCII-flagged requests are denied
"""
import random
random.seed(int(date.timestamp())) # Reproducible
stats = {"gen": 0, "deny": 0, "error": 0}
for _ in range(total_requests):
is_ncii_attempt = random.random() < 0.15 # 15% are NCII-related
attempt_id = chain.log_attempt(
prompt=f"simulated-prompt-{random.randint(0, 999999)}",
model_id="grok-image-v1",
endpoint="/v1/images/generations",
)
if is_ncii_attempt:
if random.random() < ncii_deny_rate:
chain.log_deny(
attempt_id=attempt_id,
risk_category=RiskCategory.NCII_RISK,
deny_reason=DenyReason.CONTENT_FILTER,
policy_version="1.0.0" if ncii_deny_rate < 0.5 else "2.0.0",
confidence=random.uniform(0.85, 0.99),
)
stats["deny"] += 1
else:
chain.log_generate(
attempt_id=attempt_id,
output_hash=f"sha256:{'0' * 64}",
)
stats["gen"] += 1
else:
# Non-NCII: mostly generates fine
if random.random() < 0.02: # 2% general errors
chain.log_error(
attempt_id=attempt_id,
error_code="GENERAL_ERROR",
error_message="Simulated error",
)
stats["error"] += 1
else:
chain.log_generate(
attempt_id=attempt_id,
output_hash=f"sha256:{'0' * 64}",
)
stats["gen"] += 1
return stats
def main():
chain = CAPChain(chain_id="grok-counterfactual")
print("Grok Crisis Counterfactual: What Regulators Could Have Seen")
print("=" * 65)
print()
print(f"{'Date':<14} {'Requests':>9} {'Generated':>10} {'Denied':>8} {'Deny %':>8}")
print("-" * 65)
base = datetime(2025, 12, 25, tzinfo=timezone.utc)
daily_stats = []
for day in range(20):
date = base + timedelta(days=day)
# Simulate the actual timeline:
# Dec 25-Jan 5: Low NCII denial rate (broken safeguards)
# Jan 6+: High denial rate (fix deployed)
if day < 12: # Dec 25 - Jan 5
ncii_deny_rate = 0.10 # Only 10% of NCII caught!
else: # Jan 6+
ncii_deny_rate = 0.95 # 95% caught after fix
# ~220K requests/day (4.4M over ~20 days)
# Using 100 for demo speed
stats = simulate_day(chain, date, 100, ncii_deny_rate)
daily_stats.append((date, stats))
total = stats["gen"] + stats["deny"] + stats["error"]
deny_pct = stats["deny"] / total * 100 if total > 0 else 0
marker = " ← FIX DEPLOYED" if day == 12 else ""
anomaly = " ⚠ LOW" if deny_pct < 5.0 and day < 12 else ""
print(f" {date.strftime('%Y-%m-%d')}"
f" {total:>7d}"
f" {stats['gen']:>9d}"
f" {stats['deny']:>7d}"
f" {deny_pct:>6.1f}%"
f"{marker}{anomaly}")
print()
# Run full verification
result = full_verification(chain.events, chain.public_key)
print_verification_report(result, chain.chain_id)
print()
print("KEY INSIGHT:")
print("─" * 65)
print("With CAP-SRP, a regulator would have detected the anomalous")
print("drop in NCII denial rate on December 26—within hours of")
print("deployment—instead of waiting for journalists and public")
print("outrage to surface the problem days later.")
print()
print("The Completeness Invariant proves the data is complete.")
print("The hash chain proves it wasn't altered after the fact.")
print("The signatures prove who recorded each decision.")
if __name__ == "__main__":
main()
How This Maps to the GPA's Four Demands
| GPA Demand | CAP-SRP Implementation | Verification |
|---|---|---|
| Robust safeguards | Every refusal is a signed GEN_DENY event with risk category, confidence score, and policy version |
Regulator verifies signature + checks refusal rate trends |
| Meaningful transparency | Complete audit chain with Completeness Invariant; Merkle tree enables selective disclosure | Third party runs full_verification() on Evidence Pack |
| Effective removal mechanisms | Events are immutable, but crypto-shredding allows GDPR-compliant deletion of personal data while preserving audit integrity |
CRYPTO_SHRED events recorded in chain |
| Enhanced child protection |
CSAM_RISK category with mandatory LEGAL_COMPLIANCE deny reason; highest confidence threshold required |
Filter by RiskCategory.CSAM_RISK, verify 100% denial rate |
What This Is—And What It Isn't
Let me be direct about the current state. The CAP-SRP specification (v1.0, published January 28, 2026) by the VeritasChain Standards Organization is an early-stage, single-author project. No major AI company has adopted it. No standards body has endorsed it. An IETF Internet-Draft has been submitted but not adopted by a working group.
The concept—cryptographic proof of AI refusal decisions—addresses a genuine and well-documented gap. The GPA's joint statement, C2PA's absence of refusal provenance, and the Grok crisis all point to the same missing layer. But there are established standards with massive industry adoption (C2PA has 200+ member organizations) and substantial infrastructure already in production.
The code in this article demonstrates the technical approach, not a production-ready system. A real implementation would need:
- External timestamping via RFC 3161 TSAs (prevents backdating)
- SCITT integration for append-only transparency logs (prevents split-view attacks)
- HSM key management for signing key protection
- Performance optimization for high-throughput systems (millions of events/day)
- Independent auditing of the implementation itself
The GPA's statement strengthens the case for verifiable AI governance broadly—not for any specific framework. Whether the industry builds on CAP-SRP, extends C2PA, develops something new through NIST or ISO, or combines multiple approaches, the technical requirement is clear: "trust us" is no longer sufficient.
Resources
- GPA Joint Statement: priv.gc.ca
- EDPB Announcement: edpb.europa.eu
- CAP-SRP Specification: github.com/veritaschain/cap-spec
- C2PA Standard: c2pa.org
- IETF SCITT Working Group: datatracker.ietf.org/wg/scitt
- Ed25519 (RFC 8032): datatracker.ietf.org/doc/rfc8032
- JSON Canonicalization (RFC 8785): datatracker.ietf.org/doc/rfc8785
This article is part of a series on AI content provenance and verifiable governance. The code is educational and demonstrates cryptographic concepts—production deployment requires additional security measures, performance optimization, and independent audit.
Top comments (0)