DEV Community

Cover image for The Accountability Gap: Why AI Systems Need Cryptographic Proof of Refusals

The Accountability Gap: Why AI Systems Need Cryptographic Proof of Refusals

TL;DR

On January 7, 2026, the Grok incident demonstrated that AI providers claiming "our safeguards work" isn't enough. We need cryptographic proof. CAP (Content / Creative AI Profile) v1.0 provides exactly that—a specification for creating tamper-evident audit trails that can prove AI systems refused to generate harmful content.

Key features:

  • Safe Refusal Provenance (SRP): Cryptographic proof of non-generation
  • Completeness Invariant: Mathematical guarantee that all requests have outcomes
  • Privacy-preserving verification: Prove refusals without exposing harmful content
  • Regulatory alignment: EU AI Act, DSA, Colorado AI Act ready

GitHub: https://github.com/veritaschain/cap-spec
DOI: 10.5281/zenodo.18213616


The Problem: Trust-Based AI Governance Has Failed

The Grok Incident

In early January 2026, researchers discovered that xAI's Grok image generation model could be manipulated to generate non-consensual intimate imagery (NCII). Within hours:

  • Thousands of harmful images were generated
  • Multiple regulatory jurisdictions launched investigations
  • xAI claimed "our safeguards were working"

The problem? No one could verify this claim.

xAI, like every AI provider, could only say "trust us—we blocked the bad requests." But there was no cryptographic proof of what was blocked, no independent verification possible, no way to audit their claims.

┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│   Current State of AI Content Moderation                        │
│                                                                 │
│   Provider: "We blocked millions of harmful requests!"          │
│                                                                 │
│   Regulator: "Prove it."                                        │
│                                                                 │
│   Provider: "...trust us?"                                      │
│                                                                 │
│   Regulator: "That's not how compliance works."                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

The Negative Proof Problem

Here's the fundamental challenge: AI systems can prove what they generated, but cannot prove what they refused to generate.

Every AI platform logs successful generations. It's easy—you have the output, the timestamp, the user. But when a request is blocked, what do you have? A log entry that says "blocked"?

That log entry:

  • Could be fabricated after the fact
  • Could be selectively created for audits
  • Cannot be independently verified
  • Provides no proof the request actually existed

This asymmetry creates an accountability vacuum. Providers can claim any refusal rate they want. No one can verify it.

The Regulatory Gap

Regulators are waking up to this problem:

Regulation Effective The Problem
EU AI Act Article 12 Aug 2026 Requires "automatic logging" but no standard format
DSA Article 37 In force VLOPs need independent audits, but audit what?
Colorado AI Act Feb 2026 Impact assessments required, no verification mechanism
TAKE IT DOWN Act May 2026 Platforms must remove NCII, must prove they do

Every regulation demands transparency. None specifies how to achieve it. CAP fills this gap.


Introducing CAP: A Specification, Not a Product

Let me be clear about what CAP is and isn't:

CAP IS:

  • An open specification (CC BY 4.0)
  • A data format for AI audit trails
  • A mechanism for proving system decisions
  • Compatible with existing infrastructure (C2PA, SCITT)

CAP IS NOT:

  • A content filter or moderation system
  • A real-time blocking mechanism
  • A proprietary product
  • A replacement for C2PA (it complements it)

Think of it this way:

C2PA answers: "Is this content authentic?"
CAP answers: "What did the AI system decide to do?"

C2PA is a content passport. CAP is a system flight recorder.


The Core Innovation: Safe Refusal Provenance (SRP)

Making Non-Generation a First-Class Event

SRP's key insight is treating refusals as first-class, cryptographically provable events. Not just log entries—events with the same integrity guarantees as successful generations.

Traditional Logging:
  User Request → [Safety Check] → Success: Log "generated"
                               → Failure: Log "blocked" (maybe)

SRP Approach:
  User Request → Log GEN_ATTEMPT (MUST)
              → [Safety Check] 
              → Success: Log GEN (linked to attempt)
              → Failure: Log GEN_DENY (linked to attempt)
              → Error: Log GEN_ERROR (linked to attempt)
Enter fullscreen mode Exit fullscreen mode

The critical difference: GEN_ATTEMPT is logged BEFORE the safety check runs. This creates an unforgeable record that a request existed, regardless of outcome.

The Completeness Invariant

This is the mathematical heart of SRP:

∑ GEN_ATTEMPT = ∑ GEN + ∑ GEN_DENY + ∑ GEN_ERROR
Enter fullscreen mode Exit fullscreen mode

For any time window, the count of attempts MUST equal the count of all outcomes. If it doesn't, something is wrong:

Violation Meaning Severity
Attempts > Outcomes Missing outcome events (hiding results) Critical
Outcomes > Attempts Fabricated outcomes (fake refusals) Critical
Duplicate Outcomes Data integrity failure Critical

This invariant makes selective logging mathematically impossible. You can't claim you blocked a request without first having recorded that request. You can't hide a successful generation without breaking the invariant.


Implementation Deep Dive

Let's get into the technical details.

Event Structure

Every CAP event follows this structure:

{
  "EventID": "019467a1-0001-7000-0000-000000000001",
  "ChainID": "019467a0-0000-7000-0000-000000000000",
  "PrevHash": "sha256:a1b2c3d4e5f6789...",
  "Timestamp": "2026-01-13T14:23:45.100Z",
  "EventType": "GEN_ATTEMPT",
  "HashAlgo": "SHA256",
  "SignAlgo": "ED25519",

  "PromptHash": "sha256:7f83b1657ff1fc53b92dc18148a1d65d...",
  "InputType": "text+image",
  "PolicyID": "cap.safety.v1.0",
  "ModelVersion": "img-gen-v4.2.1",
  "ActorHash": "sha256:e3b0c44298fc1c149afbf4c8996fb924...",

  "EventHash": "sha256:computed_hash_of_this_event...",
  "Signature": "ed25519:MEUCIQDhE3H4..."
}
Enter fullscreen mode Exit fullscreen mode

Key design decisions:

  1. UUIDv7: Event IDs are time-ordered UUIDs, providing natural chronological ordering
  2. Hash Chain: Each event includes PrevHash, creating an append-only chain
  3. Content Hashing: PromptHash and ActorHash enable verification without exposing content
  4. Ed25519 Signatures: Every event is cryptographically signed

Python Implementation

Here's a complete implementation of the core logging system:

import hashlib
import json
import uuid
from datetime import datetime, timezone
from typing import Optional, List, Dict
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
import base64

class CAPEvent:
    """Base class for CAP events."""

    def __init__(
        self,
        chain_id: str,
        event_type: str,
        prev_hash: Optional[str] = None
    ):
        self.event_id = str(uuid.uuid7())
        self.chain_id = chain_id
        self.prev_hash = prev_hash
        self.timestamp = datetime.now(timezone.utc).isoformat()
        self.event_type = event_type
        self.hash_algo = "SHA256"
        self.sign_algo = "ED25519"
        self.event_hash: Optional[str] = None
        self.signature: Optional[str] = None

    def to_dict(self) -> Dict:
        """Convert to dictionary for serialization."""
        return {
            "EventID": self.event_id,
            "ChainID": self.chain_id,
            "PrevHash": self.prev_hash,
            "Timestamp": self.timestamp,
            "EventType": self.event_type,
            "HashAlgo": self.hash_algo,
            "SignAlgo": self.sign_algo,
            "EventHash": self.event_hash,
            "Signature": self.signature
        }


def compute_hash(data: str) -> str:
    """Compute SHA-256 hash with prefix."""
    hash_bytes = hashlib.sha256(data.encode('utf-8')).digest()
    return f"sha256:{hash_bytes.hex()}"


def canonicalize(obj: Dict) -> str:
    """
    RFC 8785 JSON Canonicalization Scheme (simplified).
    In production, use a proper JCS library.
    """
    return json.dumps(obj, sort_keys=True, separators=(',', ':'))


def compute_event_hash(event: Dict) -> str:
    """Compute hash of event (excluding signature)."""
    event_copy = {k: v for k, v in event.items() if k != "Signature"}
    canonical = canonicalize(event_copy)
    return compute_hash(canonical)


def sign_event(event: Dict, private_key: Ed25519PrivateKey) -> str:
    """Sign event with Ed25519."""
    hash_hex = event["EventHash"][7:]  # Remove "sha256:" prefix
    hash_bytes = bytes.fromhex(hash_hex)
    signature = private_key.sign(hash_bytes)
    return f"ed25519:{base64.b64encode(signature).decode()}"


class GenAttemptEvent(CAPEvent):
    """GEN_ATTEMPT event - logged when request is received."""

    def __init__(
        self,
        chain_id: str,
        prev_hash: Optional[str],
        prompt: str,
        input_type: str,
        policy_id: str,
        model_version: str,
        actor_id: str,
        reference_image: Optional[bytes] = None
    ):
        super().__init__(chain_id, "GEN_ATTEMPT", prev_hash)
        self.prompt_hash = compute_hash(prompt)
        self.reference_image_hash = (
            compute_hash(reference_image.hex()) if reference_image else None
        )
        self.input_type = input_type
        self.policy_id = policy_id
        self.model_version = model_version
        self.actor_hash = compute_hash(actor_id)
        self.session_id = str(uuid.uuid4())

    def to_dict(self) -> Dict:
        base = super().to_dict()
        base.update({
            "PromptHash": self.prompt_hash,
            "ReferenceImageHash": self.reference_image_hash,
            "InputType": self.input_type,
            "PolicyID": self.policy_id,
            "ModelVersion": self.model_version,
            "ActorHash": self.actor_hash,
            "SessionID": self.session_id
        })
        return base


class GenDenyEvent(CAPEvent):
    """GEN_DENY event - logged when request is refused."""

    def __init__(
        self,
        chain_id: str,
        prev_hash: str,
        attempt_id: str,
        risk_category: str,
        risk_score: float,
        refusal_reason: str,
        policy_id: str,
        policy_version: str,
        risk_sub_categories: Optional[List[str]] = None,
        human_override: bool = False
    ):
        super().__init__(chain_id, "GEN_DENY", prev_hash)
        self.attempt_id = attempt_id
        self.risk_category = risk_category
        self.risk_sub_categories = risk_sub_categories or []
        self.risk_score = risk_score
        self.refusal_reason = refusal_reason
        self.policy_id = policy_id
        self.policy_version = policy_version
        self.model_decision = "DENY"
        self.human_override = human_override

    def to_dict(self) -> Dict:
        base = super().to_dict()
        base.update({
            "AttemptID": self.attempt_id,
            "RiskCategory": self.risk_category,
            "RiskSubCategories": self.risk_sub_categories,
            "RiskScore": self.risk_score,
            "RefusalReason": self.refusal_reason,
            "PolicyID": self.policy_id,
            "PolicyVersion": self.policy_version,
            "ModelDecision": self.model_decision,
            "HumanOverride": self.human_override
        })
        return base


class CAPChain:
    """CAP event chain with integrity guarantees."""

    def __init__(self, private_key: Ed25519PrivateKey):
        self.chain_id = str(uuid.uuid7())
        self.events: List[Dict] = []
        self.private_key = private_key

    def _get_prev_hash(self) -> Optional[str]:
        """Get hash of last event, or None for genesis."""
        if not self.events:
            return None
        return self.events[-1]["EventHash"]

    def _finalize_event(self, event: CAPEvent) -> Dict:
        """Compute hash and signature for event."""
        event_dict = event.to_dict()

        # Compute hash
        event_dict["EventHash"] = compute_event_hash(event_dict)

        # Sign
        event_dict["Signature"] = sign_event(event_dict, self.private_key)

        return event_dict

    def log_attempt(
        self,
        prompt: str,
        input_type: str,
        policy_id: str,
        model_version: str,
        actor_id: str,
        reference_image: Optional[bytes] = None
    ) -> Dict:
        """Log a generation attempt. MUST be called before safety check."""
        event = GenAttemptEvent(
            chain_id=self.chain_id,
            prev_hash=self._get_prev_hash(),
            prompt=prompt,
            input_type=input_type,
            policy_id=policy_id,
            model_version=model_version,
            actor_id=actor_id,
            reference_image=reference_image
        )
        event_dict = self._finalize_event(event)
        self.events.append(event_dict)
        return event_dict

    def log_deny(
        self,
        attempt_id: str,
        risk_category: str,
        risk_score: float,
        refusal_reason: str,
        policy_id: str,
        policy_version: str,
        risk_sub_categories: Optional[List[str]] = None
    ) -> Dict:
        """Log a refusal."""
        event = GenDenyEvent(
            chain_id=self.chain_id,
            prev_hash=self._get_prev_hash(),
            attempt_id=attempt_id,
            risk_category=risk_category,
            risk_score=risk_score,
            refusal_reason=refusal_reason,
            policy_id=policy_id,
            policy_version=policy_version,
            risk_sub_categories=risk_sub_categories
        )
        event_dict = self._finalize_event(event)
        self.events.append(event_dict)
        return event_dict

    def verify_chain(self) -> bool:
        """Verify chain integrity."""
        for i, event in enumerate(self.events):
            # Verify hash
            computed_hash = compute_event_hash(event)
            if event["EventHash"] != computed_hash:
                print(f"Hash mismatch at event {i}")
                return False

            # Verify chain linkage
            if i > 0:
                if event["PrevHash"] != self.events[i-1]["EventHash"]:
                    print(f"Chain break at event {i}")
                    return False
            else:
                if event["PrevHash"] is not None:
                    print("Genesis event has non-null PrevHash")
                    return False

        return True

    def verify_completeness(self) -> bool:
        """Verify completeness invariant."""
        attempts = {}
        outcomes = {}

        for event in self.events:
            if event["EventType"] == "GEN_ATTEMPT":
                attempts[event["EventID"]] = event
            elif event["EventType"] in ["GEN", "GEN_DENY", "GEN_ERROR"]:
                attempt_id = event.get("AttemptID")
                if attempt_id in outcomes:
                    print(f"Duplicate outcome for attempt {attempt_id}")
                    return False
                outcomes[attempt_id] = event

        # Check completeness
        if set(attempts.keys()) != set(outcomes.keys()):
            missing = set(attempts.keys()) - set(outcomes.keys())
            orphans = set(outcomes.keys()) - set(attempts.keys())
            if missing:
                print(f"Missing outcomes for: {missing}")
            if orphans:
                print(f"Orphan outcomes: {orphans}")
            return False

        return True
Enter fullscreen mode Exit fullscreen mode

Usage Example

from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey

# Initialize chain with signing key
private_key = Ed25519PrivateKey.generate()
chain = CAPChain(private_key)

# Scenario: User requests NCII generation
harmful_prompt = "Generate nude image of celebrity X"
user_id = "user_12345"

# Step 1: Log attempt BEFORE safety check
attempt = chain.log_attempt(
    prompt=harmful_prompt,
    input_type="text",
    policy_id="cap.safety.v1.0",
    model_version="img-gen-v4.2.1",
    actor_id=user_id
)
print(f"Attempt logged: {attempt['EventID']}")

# Step 2: Run safety check (your existing system)
# safety_result = your_safety_system.check(harmful_prompt)

# Step 3: Log denial
deny = chain.log_deny(
    attempt_id=attempt["EventID"],
    risk_category="NCII_RISK",
    risk_score=0.94,
    refusal_reason="Non-consensual intimate imagery request detected",
    policy_id="cap.safety.v1.0",
    policy_version="2026-01-01",
    risk_sub_categories=["REAL_PERSON", "CLOTHING_REMOVAL_REQUEST"]
)
print(f"Denial logged: {deny['EventID']}")

# Verify chain integrity
assert chain.verify_chain(), "Chain integrity check failed!"
assert chain.verify_completeness(), "Completeness invariant violated!"

print("✓ All verifications passed")
Enter fullscreen mode Exit fullscreen mode

TypeScript Implementation

For Node.js environments:

import { createHash, sign, verify, generateKeyPairSync } from 'crypto';
import { v7 as uuidv7 } from 'uuid';

interface CAPEvent {
  EventID: string;
  ChainID: string;
  PrevHash: string | null;
  Timestamp: string;
  EventType: string;
  HashAlgo: string;
  SignAlgo: string;
  EventHash?: string;
  Signature?: string;
  [key: string]: unknown;
}

interface GenAttemptEvent extends CAPEvent {
  EventType: 'GEN_ATTEMPT';
  PromptHash: string;
  ReferenceImageHash?: string;
  InputType: string;
  PolicyID: string;
  ModelVersion: string;
  ActorHash: string;
  SessionID: string;
}

interface GenDenyEvent extends CAPEvent {
  EventType: 'GEN_DENY';
  AttemptID: string;
  RiskCategory: string;
  RiskSubCategories: string[];
  RiskScore: number;
  RefusalReason: string;
  PolicyID: string;
  PolicyVersion: string;
  ModelDecision: string;
  HumanOverride: boolean;
}

type RiskCategory = 
  | 'CSAM_RISK'
  | 'NCII_RISK'
  | 'MINOR_SEXUALIZATION'
  | 'REAL_PERSON_DEEPFAKE'
  | 'VIOLENCE_EXTREME'
  | 'HATE_CONTENT'
  | 'TERRORIST_CONTENT'
  | 'SELF_HARM_PROMOTION'
  | 'COPYRIGHT_VIOLATION'
  | 'OTHER';

function computeHash(data: string): string {
  const hash = createHash('sha256').update(data, 'utf8').digest('hex');
  return `sha256:${hash}`;
}

function canonicalize(obj: Record<string, unknown>): string {
  // Simplified JCS - use proper library in production
  return JSON.stringify(obj, Object.keys(obj).sort());
}

function computeEventHash(event: CAPEvent): string {
  const { Signature, ...eventWithoutSig } = event;
  const canonical = canonicalize(eventWithoutSig);
  return computeHash(canonical);
}

class CAPChain {
  private chainId: string;
  private events: CAPEvent[] = [];
  private privateKey: Buffer;
  private publicKey: Buffer;

  constructor() {
    this.chainId = uuidv7();
    const keypair = generateKeyPairSync('ed25519');
    this.privateKey = keypair.privateKey.export({ type: 'pkcs8', format: 'der' });
    this.publicKey = keypair.publicKey.export({ type: 'spki', format: 'der' });
  }

  private getPrevHash(): string | null {
    if (this.events.length === 0) return null;
    return this.events[this.events.length - 1].EventHash!;
  }

  private signEvent(eventHash: string): string {
    const hashHex = eventHash.slice(7); // Remove "sha256:"
    const hashBuffer = Buffer.from(hashHex, 'hex');
    const signature = sign(null, hashBuffer, {
      key: this.privateKey,
      format: 'der',
      type: 'pkcs8'
    });
    return `ed25519:${signature.toString('base64')}`;
  }

  private finalizeEvent<T extends CAPEvent>(event: T): T {
    event.EventHash = computeEventHash(event);
    event.Signature = this.signEvent(event.EventHash);
    return event;
  }

  logAttempt(params: {
    prompt: string;
    inputType: string;
    policyId: string;
    modelVersion: string;
    actorId: string;
    referenceImage?: Buffer;
  }): GenAttemptEvent {
    const event: GenAttemptEvent = {
      EventID: uuidv7(),
      ChainID: this.chainId,
      PrevHash: this.getPrevHash(),
      Timestamp: new Date().toISOString(),
      EventType: 'GEN_ATTEMPT',
      HashAlgo: 'SHA256',
      SignAlgo: 'ED25519',
      PromptHash: computeHash(params.prompt),
      ReferenceImageHash: params.referenceImage 
        ? computeHash(params.referenceImage.toString('hex'))
        : undefined,
      InputType: params.inputType,
      PolicyID: params.policyId,
      ModelVersion: params.modelVersion,
      ActorHash: computeHash(params.actorId),
      SessionID: uuidv7()
    };

    const finalizedEvent = this.finalizeEvent(event);
    this.events.push(finalizedEvent);
    return finalizedEvent;
  }

  logDeny(params: {
    attemptId: string;
    riskCategory: RiskCategory;
    riskScore: number;
    refusalReason: string;
    policyId: string;
    policyVersion: string;
    riskSubCategories?: string[];
  }): GenDenyEvent {
    const event: GenDenyEvent = {
      EventID: uuidv7(),
      ChainID: this.chainId,
      PrevHash: this.getPrevHash(),
      Timestamp: new Date().toISOString(),
      EventType: 'GEN_DENY',
      HashAlgo: 'SHA256',
      SignAlgo: 'ED25519',
      AttemptID: params.attemptId,
      RiskCategory: params.riskCategory,
      RiskSubCategories: params.riskSubCategories || [],
      RiskScore: params.riskScore,
      RefusalReason: params.refusalReason,
      PolicyID: params.policyId,
      PolicyVersion: params.policyVersion,
      ModelDecision: 'DENY',
      HumanOverride: false
    };

    const finalizedEvent = this.finalizeEvent(event);
    this.events.push(finalizedEvent);
    return finalizedEvent;
  }

  verifyChain(): { valid: boolean; error?: string } {
    for (let i = 0; i < this.events.length; i++) {
      const event = this.events[i];

      // Verify hash
      const computedHash = computeEventHash(event);
      if (event.EventHash !== computedHash) {
        return { valid: false, error: `Hash mismatch at event ${i}` };
      }

      // Verify chain linkage
      if (i > 0 && event.PrevHash !== this.events[i - 1].EventHash) {
        return { valid: false, error: `Chain break at event ${i}` };
      }
    }

    return { valid: true };
  }

  verifyCompleteness(): { valid: boolean; error?: string } {
    const attempts = new Map<string, CAPEvent>();
    const outcomes = new Map<string, CAPEvent>();

    for (const event of this.events) {
      if (event.EventType === 'GEN_ATTEMPT') {
        attempts.set(event.EventID, event);
      } else if (['GEN', 'GEN_DENY', 'GEN_ERROR'].includes(event.EventType)) {
        const attemptId = (event as GenDenyEvent).AttemptID;
        if (outcomes.has(attemptId)) {
          return { valid: false, error: `Duplicate outcome for ${attemptId}` };
        }
        outcomes.set(attemptId, event);
      }
    }

    // Check all attempts have outcomes
    for (const attemptId of attempts.keys()) {
      if (!outcomes.has(attemptId)) {
        return { valid: false, error: `Missing outcome for ${attemptId}` };
      }
    }

    // Check for orphan outcomes
    for (const attemptId of outcomes.keys()) {
      if (!attempts.has(attemptId)) {
        return { valid: false, error: `Orphan outcome for ${attemptId}` };
      }
    }

    return { valid: true };
  }

  getEvents(): CAPEvent[] {
    return [...this.events];
  }
}

// Usage
const chain = new CAPChain();

const attempt = chain.logAttempt({
  prompt: 'Generate nude image of celebrity X',
  inputType: 'text',
  policyId: 'cap.safety.v1.0',
  modelVersion: 'img-gen-v4.2.1',
  actorId: 'user_12345'
});

const deny = chain.logDeny({
  attemptId: attempt.EventID,
  riskCategory: 'NCII_RISK',
  riskScore: 0.94,
  refusalReason: 'Non-consensual intimate imagery request detected',
  policyId: 'cap.safety.v1.0',
  policyVersion: '2026-01-01',
  riskSubCategories: ['REAL_PERSON', 'CLOTHING_REMOVAL_REQUEST']
});

console.log('Chain verification:', chain.verifyChain());
console.log('Completeness verification:', chain.verifyCompleteness());
Enter fullscreen mode Exit fullscreen mode

Privacy-Preserving Verification

One of CAP's most powerful features is enabling verification without exposing harmful content.

The Problem

Regulators need to verify that harmful requests were blocked. But sharing the actual harmful prompts creates more problems:

  • Evidence of harmful intent could be misused
  • Prompts might contain personal information
  • Sharing could enable adversarial learning

The Solution: Hash-Based Verification

┌─────────────────────────────────────────────────────────────────┐
│                                                                 │
│  Regulator receives complaint with harmful prompt               │
│         │                                                       │
│         ▼                                                       │
│  Computes: hash = SHA256(harmful_prompt)                        │
│         │                                                       │
│         ▼                                                       │
│  Queries platform: "Do you have a GEN_DENY with this hash?"     │
│         │                                                       │
│         ▼                                                       │
│  Platform returns:                                              │
│    - Yes/No                                                     │
│    - Merkle proof (if yes)                                      │
│    - External anchor proof                                      │
│         │                                                       │
│         ▼                                                       │
│  Regulator verifies independently:                              │
│    - Merkle proof → Event exists in pack                        │
│    - Anchor proof → Event existed at claimed time               │
│         │                                                       │
│         ▼                                                       │
│  Platform never sees the original complaint                     │
│  Regulator never sees other events                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Implementation:

def verify_refusal_without_exposure(
    harmful_prompt: str,
    platform_api: str
) -> dict:
    """
    Verify a refusal occurred without exposing the prompt to the platform.
    """
    # Compute hash locally
    prompt_hash = compute_hash(harmful_prompt)

    # Query platform with hash only
    response = requests.post(
        f"{platform_api}/cap/verify",
        json={"PromptHash": prompt_hash}
    )

    if not response.json().get("found"):
        return {"verified": False, "reason": "No matching event found"}

    # Get merkle proof
    proof = response.json()["merkle_proof"]
    merkle_root = response.json()["merkle_root"]
    event = response.json()["event"]

    # Verify event hash matches our prompt hash
    if event["PromptHash"] != prompt_hash:
        return {"verified": False, "reason": "Hash mismatch"}

    # Verify merkle proof
    if not verify_merkle_proof(event, proof, merkle_root):
        return {"verified": False, "reason": "Invalid merkle proof"}

    # Verify external anchor
    anchor = response.json()["anchor"]
    if not verify_external_anchor(merkle_root, anchor):
        return {"verified": False, "reason": "Invalid external anchor"}

    return {
        "verified": True,
        "event_type": event["EventType"],
        "timestamp": event["Timestamp"],
        "risk_category": event.get("RiskCategory"),
        "anchor_timestamp": anchor["timestamp"]
    }
Enter fullscreen mode Exit fullscreen mode

External Anchoring: Proof of Time

Even with perfect logging, how do you prove logs weren't created retroactively? External anchoring.

How It Works

Your Events          Merkle Tree          External Service
────────────         ───────────          ────────────────

Event 1 ─────┐
             │       ┌─────────────────┐
Event 2 ─────┼──────►│  Compute        │
             │       │  Merkle Root    │──────► RFC 3161 TSA
Event 3 ─────┤       │                 │        or SCITT
             │       │  sha256:abc...  │        or Blockchain
Event 4 ─────┘       └─────────────────┘
                            │
                            ▼
                     Anchor Record
                     ├─ MerkleRoot: sha256:abc...
                     ├─ Timestamp: 2026-01-13T15:00:00Z
                     ├─ Service: rfc3161.example.com
                     └─ Proof: [TSA signature]
Enter fullscreen mode Exit fullscreen mode

Merkle Tree Construction

def build_merkle_tree(events: List[dict]) -> tuple[str, List]:
    """Build merkle tree from events, return root and tree."""
    if not events:
        raise ValueError("No events to anchor")

    # Compute leaf hashes
    leaves = [event["EventHash"] for event in events]

    # Pad to power of 2
    while len(leaves) & (len(leaves) - 1) != 0:
        leaves.append(leaves[-1])

    # Build tree bottom-up
    tree = [leaves]
    while len(tree[-1]) > 1:
        level = []
        for i in range(0, len(tree[-1]), 2):
            combined = compute_hash(tree[-1][i] + tree[-1][i + 1])
            level.append(combined)
        tree.append(level)

    root = tree[-1][0]
    return root, tree


def generate_merkle_proof(tree: List, index: int) -> List[tuple[str, str]]:
    """Generate proof for event at index."""
    proof = []
    for level in tree[:-1]:
        sibling_index = index ^ 1  # XOR to get sibling
        direction = "left" if index % 2 == 1 else "right"
        proof.append((level[sibling_index], direction))
        index //= 2
    return proof


def verify_merkle_proof(
    event_hash: str,
    proof: List[tuple[str, str]],
    root: str
) -> bool:
    """Verify event is in merkle tree."""
    current = event_hash
    for sibling_hash, direction in proof:
        if direction == "left":
            current = compute_hash(sibling_hash + current)
        else:
            current = compute_hash(current + sibling_hash)
    return current == root
Enter fullscreen mode Exit fullscreen mode

RFC 3161 Timestamping

import requests
from asn1crypto import tsp, core

def get_rfc3161_timestamp(merkle_root: str) -> dict:
    """Get RFC 3161 timestamp for merkle root."""
    # Create timestamp request
    hash_bytes = bytes.fromhex(merkle_root[7:])  # Remove "sha256:"

    request = tsp.TimeStampReq({
        'version': 1,
        'message_imprint': {
            'hash_algorithm': {'algorithm': 'sha256'},
            'hashed_message': hash_bytes
        },
        'cert_req': True
    })

    # Send to TSA
    response = requests.post(
        'https://freetsa.org/tsr',  # Example TSA
        data=request.dump(),
        headers={'Content-Type': 'application/timestamp-query'}
    )

    # Parse response
    tsr = tsp.TimeStampResp.load(response.content)

    return {
        "anchor_type": "RFC3161",
        "merkle_root": merkle_root,
        "timestamp": tsr['time_stamp_token']['content']['tst_info']['gen_time'].native.isoformat(),
        "tsa": "freetsa.org",
        "proof": response.content.hex()
    }
Enter fullscreen mode Exit fullscreen mode

Conformance Levels

CAP defines three levels to accommodate different organizational needs:

Bronze: Getting Started

Bronze:
  target: "SMEs, Early Adopters, Internal Use"
  requirements:
    - Event logging (INGEST, TRAIN, GEN, EXPORT)
    - SHA-256 hash chain
    - Ed25519 signatures
    - 6-month retention
  regulatory: "Voluntary transparency"
Enter fullscreen mode Exit fullscreen mode

Silver: Production Ready

Silver:
  target: "Enterprise, VLOPs, Regulated Industries"
  requirements:
    - All Bronze requirements
    - SRP extension (GEN_ATTEMPT, GEN_DENY)
    - Completeness Invariant enforcement
    - Daily external anchoring (RFC 3161)
    - Evidence Pack export
    - 2-year retention
  regulatory: "EU AI Act Article 12"
Enter fullscreen mode Exit fullscreen mode

Gold: Maximum Assurance

Gold:
  target: "High-Risk AI Systems, DSA VLOPs"
  requirements:
    - All Silver requirements
    - Hourly external anchoring
    - SCITT transparency service integration
    - HSM key management
    - Real-time audit API
    - 5-year retention
    - 24-hour incident response
  regulatory: "DSA Article 37 audits"
Enter fullscreen mode Exit fullscreen mode

Integration with Existing Systems

Azure OpenAI Service

from azure.identity import DefaultAzureCredential
from openai import AzureOpenAI

class CAPAzureMiddleware:
    """Middleware for CAP logging with Azure OpenAI."""

    def __init__(self, cap_chain: CAPChain, azure_client: AzureOpenAI):
        self.cap = cap_chain
        self.azure = azure_client

    async def generate_image(
        self,
        prompt: str,
        user_id: str,
        **kwargs
    ) -> dict:
        # Step 1: Log attempt BEFORE calling Azure
        attempt = self.cap.log_attempt(
            prompt=prompt,
            input_type="text",
            policy_id="azure.content-filter.v1",
            model_version=kwargs.get("model", "dall-e-3"),
            actor_id=user_id
        )

        try:
            # Step 2: Call Azure OpenAI
            response = await self.azure.images.generate(
                prompt=prompt,
                **kwargs
            )

            # Step 3: Check content filter result
            if response.content_filter_results.get("sexual", {}).get("filtered"):
                self.cap.log_deny(
                    attempt_id=attempt["EventID"],
                    risk_category="NCII_RISK",
                    risk_score=0.9,
                    refusal_reason="Azure content filter: sexual content",
                    policy_id="azure.content-filter.v1",
                    policy_version="2026-01"
                )
                return {"status": "denied", "reason": "Content policy violation"}

            # Step 4: Log successful generation
            self.cap.log_gen(
                attempt_id=attempt["EventID"],
                output_hash=compute_hash(response.data[0].url)
            )

            return {"status": "success", "url": response.data[0].url}

        except Exception as e:
            # Step 5: Log error
            self.cap.log_error(
                attempt_id=attempt["EventID"],
                error_type=type(e).__name__,
                error_message=str(e)
            )
            raise
Enter fullscreen mode Exit fullscreen mode

AWS Bedrock

import boto3
from cap_chain import CAPChain

class CAPBedrockIntegration:
    """CAP integration for AWS Bedrock with Guardrails."""

    def __init__(self, cap_chain: CAPChain):
        self.cap = cap_chain
        self.bedrock = boto3.client('bedrock-runtime')

    def invoke_with_cap(
        self,
        model_id: str,
        prompt: str,
        user_id: str,
        guardrail_id: str
    ) -> dict:
        # Log attempt
        attempt = self.cap.log_attempt(
            prompt=prompt,
            input_type="text",
            policy_id=f"bedrock.guardrail.{guardrail_id}",
            model_version=model_id,
            actor_id=user_id
        )

        try:
            response = self.bedrock.invoke_model(
                modelId=model_id,
                body=json.dumps({"prompt": prompt}),
                guardrailIdentifier=guardrail_id
            )

            # Check guardrail action
            if response.get('guardrailAction') == 'BLOCKED':
                assessment = response.get('guardrailAssessment', {})
                self.cap.log_deny(
                    attempt_id=attempt["EventID"],
                    risk_category=self._map_guardrail_category(assessment),
                    risk_score=assessment.get('confidence', 0.9),
                    refusal_reason=assessment.get('reason', 'Guardrail blocked'),
                    policy_id=f"bedrock.guardrail.{guardrail_id}",
                    policy_version="v1"
                )
                return {"blocked": True, "assessment": assessment}

            # Log success
            output = json.loads(response['body'].read())
            self.cap.log_gen(
                attempt_id=attempt["EventID"],
                output_hash=compute_hash(json.dumps(output))
            )
            return {"blocked": False, "output": output}

        except Exception as e:
            self.cap.log_error(
                attempt_id=attempt["EventID"],
                error_type=type(e).__name__,
                error_message=str(e)
            )
            raise

    def _map_guardrail_category(self, assessment: dict) -> str:
        """Map Bedrock guardrail categories to CAP categories."""
        mapping = {
            'HATE': 'HATE_CONTENT',
            'INSULTS': 'HATE_CONTENT',
            'SEXUAL': 'NCII_RISK',
            'VIOLENCE': 'VIOLENCE_EXTREME',
            'MISCONDUCT': 'OTHER'
        }
        category = assessment.get('category', 'OTHER')
        return mapping.get(category, 'OTHER')
Enter fullscreen mode Exit fullscreen mode

Evidence Pack: The Audit Package

When regulators come knocking, you need a self-contained, verifiable package:

evidence_pack/
├── manifest.json           # Pack metadata
├── events/
│   ├── events_001.json     # First 10,000 events
│   ├── events_002.json     # Next 10,000 events
│   └── ...
├── anchors/
│   ├── anchor_001.json     # RFC 3161 timestamps
│   └── ...
├── merkle/
│   ├── tree_001.json       # Merkle tree structure
│   └── proofs/             # Selective disclosure proofs
└── signatures/
    └── pack_signature.json # Pack-level signature
Enter fullscreen mode Exit fullscreen mode

Manifest Example

{
  "PackID": "019467b2-0000-7000-0000-000000000000",
  "PackVersion": "1.0",
  "GeneratedAt": "2026-01-13T15:00:00Z",
  "GeneratedBy": "urn:cap:org:example-platform",
  "ConformanceLevel": "Silver",
  "EventCount": 150000,
  "TimeRange": {
    "Start": "2026-01-01T00:00:00Z",
    "End": "2026-01-13T14:59:59Z"
  },
  "Checksums": {
    "events/events_001.json": "sha256:a1b2c3...",
    "events/events_002.json": "sha256:d4e5f6...",
    "anchors/anchor_001.json": "sha256:g7h8i9..."
  },
  "CompletenessVerification": {
    "TotalAttempts": 145000,
    "TotalGEN": 140000,
    "TotalGEN_DENY": 4500,
    "TotalGEN_ERROR": 500,
    "InvariantValid": true
  },
  "ExternalAnchors": [
    {
      "AnchorID": "019467b1-0000-7000-0000-000000000000",
      "AnchorType": "RFC3161",
      "Timestamp": "2026-01-13T00:00:00Z",
      "ServiceEndpoint": "https://tsa.example.com"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Pack Generation

def generate_evidence_pack(
    chain: CAPChain,
    output_dir: str,
    time_range: tuple[datetime, datetime]
) -> str:
    """Generate Evidence Pack for audit submission."""
    import os
    import shutil

    pack_id = str(uuid.uuid7())
    pack_dir = os.path.join(output_dir, f"evidence_pack_{pack_id}")

    # Create directories
    os.makedirs(f"{pack_dir}/events")
    os.makedirs(f"{pack_dir}/anchors")
    os.makedirs(f"{pack_dir}/merkle/proofs")
    os.makedirs(f"{pack_dir}/signatures")

    # Filter events by time range
    start, end = time_range
    events = [
        e for e in chain.events
        if start <= datetime.fromisoformat(e["Timestamp"]) <= end
    ]

    # Write events in batches
    checksums = {}
    batch_size = 10000
    for i in range(0, len(events), batch_size):
        batch = events[i:i + batch_size]
        filename = f"events_{i // batch_size + 1:03d}.json"
        filepath = f"{pack_dir}/events/{filename}"

        with open(filepath, 'w') as f:
            json.dump(batch, f)

        checksums[f"events/{filename}"] = compute_hash(json.dumps(batch))

    # Build merkle tree and get anchors
    merkle_root, tree = build_merkle_tree(events)
    anchor = get_rfc3161_timestamp(merkle_root)

    # Write anchor
    with open(f"{pack_dir}/anchors/anchor_001.json", 'w') as f:
        json.dump(anchor, f)
    checksums["anchors/anchor_001.json"] = compute_hash(json.dumps(anchor))

    # Verify completeness
    completeness = verify_completeness_stats(events)

    # Generate manifest
    manifest = {
        "PackID": pack_id,
        "PackVersion": "1.0",
        "GeneratedAt": datetime.now(timezone.utc).isoformat(),
        "GeneratedBy": f"urn:cap:chain:{chain.chain_id}",
        "ConformanceLevel": "Silver",
        "EventCount": len(events),
        "TimeRange": {
            "Start": start.isoformat(),
            "End": end.isoformat()
        },
        "Checksums": checksums,
        "CompletenessVerification": completeness,
        "ExternalAnchors": [anchor]
    }

    with open(f"{pack_dir}/manifest.json", 'w') as f:
        json.dump(manifest, f, indent=2)

    return pack_dir
Enter fullscreen mode Exit fullscreen mode

Regulatory Readiness

EU AI Act Article 12 Compliance

CAP Silver meets EU AI Act requirements:

Article 12 Requirement CAP Implementation
Automatic logging Event-driven architecture
Lifetime coverage ChainID links all events
Risk identification RiskCategory, RiskScore
Post-market monitoring Evidence Pack export
Human oversight logging HumanOverride field
Traceability Hash chain, UUIDv7

Timeline: August 2, 2026 deadline for high-risk AI obligations.

Digital Services Act (VLOPs)

CAP Gold meets DSA Article 37 audit requirements:

DSA Requirement CAP Feature
Independent audits Third-party verifiable Evidence Packs
Algorithm transparency ModelVersion, PolicyID tracking
Content moderation records GEN_DENY events with RiskCategory
Risk mitigation evidence Completeness Invariant

Colorado AI Act (SB24-205)

Effective February 1, 2026:

Requirement CAP Implementation
Impact assessments Evidence Pack statistics
3-year retention Silver: 2 years, Gold: 5 years
Algorithmic discrimination docs RiskCategory: HATE_CONTENT

Getting Started

Minimum Viable Implementation

  1. Generate signing keys
  2. Initialize chain
  3. Log GEN_ATTEMPT before every safety check
  4. Log outcome (GEN/GEN_DENY/GEN_ERROR) after
  5. Verify completeness periodically
# 1. Generate keys (do once, store securely)
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
private_key = Ed25519PrivateKey.generate()

# 2. Initialize chain
chain = CAPChain(private_key)

# 3-4. Integration point in your generation pipeline
def handle_generation_request(prompt: str, user_id: str):
    # ALWAYS log attempt first
    attempt = chain.log_attempt(
        prompt=prompt,
        input_type="text",
        policy_id="your.policy.v1",
        model_version="your-model-v1",
        actor_id=user_id
    )

    # Run your safety check
    safety_result = your_safety_check(prompt)

    # Log outcome
    if safety_result.blocked:
        chain.log_deny(
            attempt_id=attempt["EventID"],
            risk_category=safety_result.category,
            risk_score=safety_result.score,
            refusal_reason=safety_result.reason,
            policy_id="your.policy.v1",
            policy_version="2026-01"
        )
        return {"error": "Request blocked by content policy"}

    # Generate content
    output = your_generation_model(prompt)

    chain.log_gen(
        attempt_id=attempt["EventID"],
        output_hash=compute_hash(output)
    )

    return {"output": output}

# 5. Periodic verification (e.g., hourly cron)
def verify_integrity():
    assert chain.verify_chain(), "Chain integrity violated!"
    assert chain.verify_completeness(), "Completeness invariant violated!"
Enter fullscreen mode Exit fullscreen mode

Conclusion

The January 2026 Grok incident was a wake-up call. Trust-based AI governance doesn't work. When an AI provider says "trust us, our safeguards work," regulators and the public need to be able to verify that claim cryptographically.

CAP v1.0 provides the infrastructure for this verification:

  • Safe Refusal Provenance: Prove what your system refused to generate
  • Completeness Invariant: Mathematical guarantee against selective logging
  • Privacy-preserving verification: Audit without exposing harmful content
  • Regulatory readiness: EU AI Act, DSA, Colorado AI Act aligned

The specification is open (CC BY 4.0), the reference implementation is available, and the regulatory deadlines are approaching.

The question isn't whether AI systems need cryptographic accountability. The question is whether your system will be ready when the auditors come.


Resources


This article represents the views of the VeritasChain Standards Organization. CAP is an open specification developed for the benefit of the AI industry and society.


Tags: #ai #security #cryptography #audit #opensource #euaiact #compliance #machinelearning #typescript #python

Top comments (0)