TL;DR
On January 7, 2026, the Grok incident demonstrated that AI providers claiming "our safeguards work" isn't enough. We need cryptographic proof. CAP (Content / Creative AI Profile) v1.0 provides exactly that—a specification for creating tamper-evident audit trails that can prove AI systems refused to generate harmful content.
Key features:
- Safe Refusal Provenance (SRP): Cryptographic proof of non-generation
- Completeness Invariant: Mathematical guarantee that all requests have outcomes
- Privacy-preserving verification: Prove refusals without exposing harmful content
- Regulatory alignment: EU AI Act, DSA, Colorado AI Act ready
GitHub: https://github.com/veritaschain/cap-spec
DOI: 10.5281/zenodo.18213616
The Problem: Trust-Based AI Governance Has Failed
The Grok Incident
In early January 2026, researchers discovered that xAI's Grok image generation model could be manipulated to generate non-consensual intimate imagery (NCII). Within hours:
- Thousands of harmful images were generated
- Multiple regulatory jurisdictions launched investigations
- xAI claimed "our safeguards were working"
The problem? No one could verify this claim.
xAI, like every AI provider, could only say "trust us—we blocked the bad requests." But there was no cryptographic proof of what was blocked, no independent verification possible, no way to audit their claims.
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Current State of AI Content Moderation │
│ │
│ Provider: "We blocked millions of harmful requests!" │
│ │
│ Regulator: "Prove it." │
│ │
│ Provider: "...trust us?" │
│ │
│ Regulator: "That's not how compliance works." │
│ │
└─────────────────────────────────────────────────────────────────┘
The Negative Proof Problem
Here's the fundamental challenge: AI systems can prove what they generated, but cannot prove what they refused to generate.
Every AI platform logs successful generations. It's easy—you have the output, the timestamp, the user. But when a request is blocked, what do you have? A log entry that says "blocked"?
That log entry:
- Could be fabricated after the fact
- Could be selectively created for audits
- Cannot be independently verified
- Provides no proof the request actually existed
This asymmetry creates an accountability vacuum. Providers can claim any refusal rate they want. No one can verify it.
The Regulatory Gap
Regulators are waking up to this problem:
| Regulation | Effective | The Problem |
|---|---|---|
| EU AI Act Article 12 | Aug 2026 | Requires "automatic logging" but no standard format |
| DSA Article 37 | In force | VLOPs need independent audits, but audit what? |
| Colorado AI Act | Feb 2026 | Impact assessments required, no verification mechanism |
| TAKE IT DOWN Act | May 2026 | Platforms must remove NCII, must prove they do |
Every regulation demands transparency. None specifies how to achieve it. CAP fills this gap.
Introducing CAP: A Specification, Not a Product
Let me be clear about what CAP is and isn't:
CAP IS:
- An open specification (CC BY 4.0)
- A data format for AI audit trails
- A mechanism for proving system decisions
- Compatible with existing infrastructure (C2PA, SCITT)
CAP IS NOT:
- A content filter or moderation system
- A real-time blocking mechanism
- A proprietary product
- A replacement for C2PA (it complements it)
Think of it this way:
C2PA answers: "Is this content authentic?"
CAP answers: "What did the AI system decide to do?"
C2PA is a content passport. CAP is a system flight recorder.
The Core Innovation: Safe Refusal Provenance (SRP)
Making Non-Generation a First-Class Event
SRP's key insight is treating refusals as first-class, cryptographically provable events. Not just log entries—events with the same integrity guarantees as successful generations.
Traditional Logging:
User Request → [Safety Check] → Success: Log "generated"
→ Failure: Log "blocked" (maybe)
SRP Approach:
User Request → Log GEN_ATTEMPT (MUST)
→ [Safety Check]
→ Success: Log GEN (linked to attempt)
→ Failure: Log GEN_DENY (linked to attempt)
→ Error: Log GEN_ERROR (linked to attempt)
The critical difference: GEN_ATTEMPT is logged BEFORE the safety check runs. This creates an unforgeable record that a request existed, regardless of outcome.
The Completeness Invariant
This is the mathematical heart of SRP:
∑ GEN_ATTEMPT = ∑ GEN + ∑ GEN_DENY + ∑ GEN_ERROR
For any time window, the count of attempts MUST equal the count of all outcomes. If it doesn't, something is wrong:
| Violation | Meaning | Severity |
|---|---|---|
| Attempts > Outcomes | Missing outcome events (hiding results) | Critical |
| Outcomes > Attempts | Fabricated outcomes (fake refusals) | Critical |
| Duplicate Outcomes | Data integrity failure | Critical |
This invariant makes selective logging mathematically impossible. You can't claim you blocked a request without first having recorded that request. You can't hide a successful generation without breaking the invariant.
Implementation Deep Dive
Let's get into the technical details.
Event Structure
Every CAP event follows this structure:
{
"EventID": "019467a1-0001-7000-0000-000000000001",
"ChainID": "019467a0-0000-7000-0000-000000000000",
"PrevHash": "sha256:a1b2c3d4e5f6789...",
"Timestamp": "2026-01-13T14:23:45.100Z",
"EventType": "GEN_ATTEMPT",
"HashAlgo": "SHA256",
"SignAlgo": "ED25519",
"PromptHash": "sha256:7f83b1657ff1fc53b92dc18148a1d65d...",
"InputType": "text+image",
"PolicyID": "cap.safety.v1.0",
"ModelVersion": "img-gen-v4.2.1",
"ActorHash": "sha256:e3b0c44298fc1c149afbf4c8996fb924...",
"EventHash": "sha256:computed_hash_of_this_event...",
"Signature": "ed25519:MEUCIQDhE3H4..."
}
Key design decisions:
- UUIDv7: Event IDs are time-ordered UUIDs, providing natural chronological ordering
-
Hash Chain: Each event includes
PrevHash, creating an append-only chain -
Content Hashing:
PromptHashandActorHashenable verification without exposing content - Ed25519 Signatures: Every event is cryptographically signed
Python Implementation
Here's a complete implementation of the core logging system:
import hashlib
import json
import uuid
from datetime import datetime, timezone
from typing import Optional, List, Dict
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
import base64
class CAPEvent:
"""Base class for CAP events."""
def __init__(
self,
chain_id: str,
event_type: str,
prev_hash: Optional[str] = None
):
self.event_id = str(uuid.uuid7())
self.chain_id = chain_id
self.prev_hash = prev_hash
self.timestamp = datetime.now(timezone.utc).isoformat()
self.event_type = event_type
self.hash_algo = "SHA256"
self.sign_algo = "ED25519"
self.event_hash: Optional[str] = None
self.signature: Optional[str] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for serialization."""
return {
"EventID": self.event_id,
"ChainID": self.chain_id,
"PrevHash": self.prev_hash,
"Timestamp": self.timestamp,
"EventType": self.event_type,
"HashAlgo": self.hash_algo,
"SignAlgo": self.sign_algo,
"EventHash": self.event_hash,
"Signature": self.signature
}
def compute_hash(data: str) -> str:
"""Compute SHA-256 hash with prefix."""
hash_bytes = hashlib.sha256(data.encode('utf-8')).digest()
return f"sha256:{hash_bytes.hex()}"
def canonicalize(obj: Dict) -> str:
"""
RFC 8785 JSON Canonicalization Scheme (simplified).
In production, use a proper JCS library.
"""
return json.dumps(obj, sort_keys=True, separators=(',', ':'))
def compute_event_hash(event: Dict) -> str:
"""Compute hash of event (excluding signature)."""
event_copy = {k: v for k, v in event.items() if k != "Signature"}
canonical = canonicalize(event_copy)
return compute_hash(canonical)
def sign_event(event: Dict, private_key: Ed25519PrivateKey) -> str:
"""Sign event with Ed25519."""
hash_hex = event["EventHash"][7:] # Remove "sha256:" prefix
hash_bytes = bytes.fromhex(hash_hex)
signature = private_key.sign(hash_bytes)
return f"ed25519:{base64.b64encode(signature).decode()}"
class GenAttemptEvent(CAPEvent):
"""GEN_ATTEMPT event - logged when request is received."""
def __init__(
self,
chain_id: str,
prev_hash: Optional[str],
prompt: str,
input_type: str,
policy_id: str,
model_version: str,
actor_id: str,
reference_image: Optional[bytes] = None
):
super().__init__(chain_id, "GEN_ATTEMPT", prev_hash)
self.prompt_hash = compute_hash(prompt)
self.reference_image_hash = (
compute_hash(reference_image.hex()) if reference_image else None
)
self.input_type = input_type
self.policy_id = policy_id
self.model_version = model_version
self.actor_hash = compute_hash(actor_id)
self.session_id = str(uuid.uuid4())
def to_dict(self) -> Dict:
base = super().to_dict()
base.update({
"PromptHash": self.prompt_hash,
"ReferenceImageHash": self.reference_image_hash,
"InputType": self.input_type,
"PolicyID": self.policy_id,
"ModelVersion": self.model_version,
"ActorHash": self.actor_hash,
"SessionID": self.session_id
})
return base
class GenDenyEvent(CAPEvent):
"""GEN_DENY event - logged when request is refused."""
def __init__(
self,
chain_id: str,
prev_hash: str,
attempt_id: str,
risk_category: str,
risk_score: float,
refusal_reason: str,
policy_id: str,
policy_version: str,
risk_sub_categories: Optional[List[str]] = None,
human_override: bool = False
):
super().__init__(chain_id, "GEN_DENY", prev_hash)
self.attempt_id = attempt_id
self.risk_category = risk_category
self.risk_sub_categories = risk_sub_categories or []
self.risk_score = risk_score
self.refusal_reason = refusal_reason
self.policy_id = policy_id
self.policy_version = policy_version
self.model_decision = "DENY"
self.human_override = human_override
def to_dict(self) -> Dict:
base = super().to_dict()
base.update({
"AttemptID": self.attempt_id,
"RiskCategory": self.risk_category,
"RiskSubCategories": self.risk_sub_categories,
"RiskScore": self.risk_score,
"RefusalReason": self.refusal_reason,
"PolicyID": self.policy_id,
"PolicyVersion": self.policy_version,
"ModelDecision": self.model_decision,
"HumanOverride": self.human_override
})
return base
class CAPChain:
"""CAP event chain with integrity guarantees."""
def __init__(self, private_key: Ed25519PrivateKey):
self.chain_id = str(uuid.uuid7())
self.events: List[Dict] = []
self.private_key = private_key
def _get_prev_hash(self) -> Optional[str]:
"""Get hash of last event, or None for genesis."""
if not self.events:
return None
return self.events[-1]["EventHash"]
def _finalize_event(self, event: CAPEvent) -> Dict:
"""Compute hash and signature for event."""
event_dict = event.to_dict()
# Compute hash
event_dict["EventHash"] = compute_event_hash(event_dict)
# Sign
event_dict["Signature"] = sign_event(event_dict, self.private_key)
return event_dict
def log_attempt(
self,
prompt: str,
input_type: str,
policy_id: str,
model_version: str,
actor_id: str,
reference_image: Optional[bytes] = None
) -> Dict:
"""Log a generation attempt. MUST be called before safety check."""
event = GenAttemptEvent(
chain_id=self.chain_id,
prev_hash=self._get_prev_hash(),
prompt=prompt,
input_type=input_type,
policy_id=policy_id,
model_version=model_version,
actor_id=actor_id,
reference_image=reference_image
)
event_dict = self._finalize_event(event)
self.events.append(event_dict)
return event_dict
def log_deny(
self,
attempt_id: str,
risk_category: str,
risk_score: float,
refusal_reason: str,
policy_id: str,
policy_version: str,
risk_sub_categories: Optional[List[str]] = None
) -> Dict:
"""Log a refusal."""
event = GenDenyEvent(
chain_id=self.chain_id,
prev_hash=self._get_prev_hash(),
attempt_id=attempt_id,
risk_category=risk_category,
risk_score=risk_score,
refusal_reason=refusal_reason,
policy_id=policy_id,
policy_version=policy_version,
risk_sub_categories=risk_sub_categories
)
event_dict = self._finalize_event(event)
self.events.append(event_dict)
return event_dict
def verify_chain(self) -> bool:
"""Verify chain integrity."""
for i, event in enumerate(self.events):
# Verify hash
computed_hash = compute_event_hash(event)
if event["EventHash"] != computed_hash:
print(f"Hash mismatch at event {i}")
return False
# Verify chain linkage
if i > 0:
if event["PrevHash"] != self.events[i-1]["EventHash"]:
print(f"Chain break at event {i}")
return False
else:
if event["PrevHash"] is not None:
print("Genesis event has non-null PrevHash")
return False
return True
def verify_completeness(self) -> bool:
"""Verify completeness invariant."""
attempts = {}
outcomes = {}
for event in self.events:
if event["EventType"] == "GEN_ATTEMPT":
attempts[event["EventID"]] = event
elif event["EventType"] in ["GEN", "GEN_DENY", "GEN_ERROR"]:
attempt_id = event.get("AttemptID")
if attempt_id in outcomes:
print(f"Duplicate outcome for attempt {attempt_id}")
return False
outcomes[attempt_id] = event
# Check completeness
if set(attempts.keys()) != set(outcomes.keys()):
missing = set(attempts.keys()) - set(outcomes.keys())
orphans = set(outcomes.keys()) - set(attempts.keys())
if missing:
print(f"Missing outcomes for: {missing}")
if orphans:
print(f"Orphan outcomes: {orphans}")
return False
return True
Usage Example
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
# Initialize chain with signing key
private_key = Ed25519PrivateKey.generate()
chain = CAPChain(private_key)
# Scenario: User requests NCII generation
harmful_prompt = "Generate nude image of celebrity X"
user_id = "user_12345"
# Step 1: Log attempt BEFORE safety check
attempt = chain.log_attempt(
prompt=harmful_prompt,
input_type="text",
policy_id="cap.safety.v1.0",
model_version="img-gen-v4.2.1",
actor_id=user_id
)
print(f"Attempt logged: {attempt['EventID']}")
# Step 2: Run safety check (your existing system)
# safety_result = your_safety_system.check(harmful_prompt)
# Step 3: Log denial
deny = chain.log_deny(
attempt_id=attempt["EventID"],
risk_category="NCII_RISK",
risk_score=0.94,
refusal_reason="Non-consensual intimate imagery request detected",
policy_id="cap.safety.v1.0",
policy_version="2026-01-01",
risk_sub_categories=["REAL_PERSON", "CLOTHING_REMOVAL_REQUEST"]
)
print(f"Denial logged: {deny['EventID']}")
# Verify chain integrity
assert chain.verify_chain(), "Chain integrity check failed!"
assert chain.verify_completeness(), "Completeness invariant violated!"
print("✓ All verifications passed")
TypeScript Implementation
For Node.js environments:
import { createHash, sign, verify, generateKeyPairSync } from 'crypto';
import { v7 as uuidv7 } from 'uuid';
interface CAPEvent {
EventID: string;
ChainID: string;
PrevHash: string | null;
Timestamp: string;
EventType: string;
HashAlgo: string;
SignAlgo: string;
EventHash?: string;
Signature?: string;
[key: string]: unknown;
}
interface GenAttemptEvent extends CAPEvent {
EventType: 'GEN_ATTEMPT';
PromptHash: string;
ReferenceImageHash?: string;
InputType: string;
PolicyID: string;
ModelVersion: string;
ActorHash: string;
SessionID: string;
}
interface GenDenyEvent extends CAPEvent {
EventType: 'GEN_DENY';
AttemptID: string;
RiskCategory: string;
RiskSubCategories: string[];
RiskScore: number;
RefusalReason: string;
PolicyID: string;
PolicyVersion: string;
ModelDecision: string;
HumanOverride: boolean;
}
type RiskCategory =
| 'CSAM_RISK'
| 'NCII_RISK'
| 'MINOR_SEXUALIZATION'
| 'REAL_PERSON_DEEPFAKE'
| 'VIOLENCE_EXTREME'
| 'HATE_CONTENT'
| 'TERRORIST_CONTENT'
| 'SELF_HARM_PROMOTION'
| 'COPYRIGHT_VIOLATION'
| 'OTHER';
function computeHash(data: string): string {
const hash = createHash('sha256').update(data, 'utf8').digest('hex');
return `sha256:${hash}`;
}
function canonicalize(obj: Record<string, unknown>): string {
// Simplified JCS - use proper library in production
return JSON.stringify(obj, Object.keys(obj).sort());
}
function computeEventHash(event: CAPEvent): string {
const { Signature, ...eventWithoutSig } = event;
const canonical = canonicalize(eventWithoutSig);
return computeHash(canonical);
}
class CAPChain {
private chainId: string;
private events: CAPEvent[] = [];
private privateKey: Buffer;
private publicKey: Buffer;
constructor() {
this.chainId = uuidv7();
const keypair = generateKeyPairSync('ed25519');
this.privateKey = keypair.privateKey.export({ type: 'pkcs8', format: 'der' });
this.publicKey = keypair.publicKey.export({ type: 'spki', format: 'der' });
}
private getPrevHash(): string | null {
if (this.events.length === 0) return null;
return this.events[this.events.length - 1].EventHash!;
}
private signEvent(eventHash: string): string {
const hashHex = eventHash.slice(7); // Remove "sha256:"
const hashBuffer = Buffer.from(hashHex, 'hex');
const signature = sign(null, hashBuffer, {
key: this.privateKey,
format: 'der',
type: 'pkcs8'
});
return `ed25519:${signature.toString('base64')}`;
}
private finalizeEvent<T extends CAPEvent>(event: T): T {
event.EventHash = computeEventHash(event);
event.Signature = this.signEvent(event.EventHash);
return event;
}
logAttempt(params: {
prompt: string;
inputType: string;
policyId: string;
modelVersion: string;
actorId: string;
referenceImage?: Buffer;
}): GenAttemptEvent {
const event: GenAttemptEvent = {
EventID: uuidv7(),
ChainID: this.chainId,
PrevHash: this.getPrevHash(),
Timestamp: new Date().toISOString(),
EventType: 'GEN_ATTEMPT',
HashAlgo: 'SHA256',
SignAlgo: 'ED25519',
PromptHash: computeHash(params.prompt),
ReferenceImageHash: params.referenceImage
? computeHash(params.referenceImage.toString('hex'))
: undefined,
InputType: params.inputType,
PolicyID: params.policyId,
ModelVersion: params.modelVersion,
ActorHash: computeHash(params.actorId),
SessionID: uuidv7()
};
const finalizedEvent = this.finalizeEvent(event);
this.events.push(finalizedEvent);
return finalizedEvent;
}
logDeny(params: {
attemptId: string;
riskCategory: RiskCategory;
riskScore: number;
refusalReason: string;
policyId: string;
policyVersion: string;
riskSubCategories?: string[];
}): GenDenyEvent {
const event: GenDenyEvent = {
EventID: uuidv7(),
ChainID: this.chainId,
PrevHash: this.getPrevHash(),
Timestamp: new Date().toISOString(),
EventType: 'GEN_DENY',
HashAlgo: 'SHA256',
SignAlgo: 'ED25519',
AttemptID: params.attemptId,
RiskCategory: params.riskCategory,
RiskSubCategories: params.riskSubCategories || [],
RiskScore: params.riskScore,
RefusalReason: params.refusalReason,
PolicyID: params.policyId,
PolicyVersion: params.policyVersion,
ModelDecision: 'DENY',
HumanOverride: false
};
const finalizedEvent = this.finalizeEvent(event);
this.events.push(finalizedEvent);
return finalizedEvent;
}
verifyChain(): { valid: boolean; error?: string } {
for (let i = 0; i < this.events.length; i++) {
const event = this.events[i];
// Verify hash
const computedHash = computeEventHash(event);
if (event.EventHash !== computedHash) {
return { valid: false, error: `Hash mismatch at event ${i}` };
}
// Verify chain linkage
if (i > 0 && event.PrevHash !== this.events[i - 1].EventHash) {
return { valid: false, error: `Chain break at event ${i}` };
}
}
return { valid: true };
}
verifyCompleteness(): { valid: boolean; error?: string } {
const attempts = new Map<string, CAPEvent>();
const outcomes = new Map<string, CAPEvent>();
for (const event of this.events) {
if (event.EventType === 'GEN_ATTEMPT') {
attempts.set(event.EventID, event);
} else if (['GEN', 'GEN_DENY', 'GEN_ERROR'].includes(event.EventType)) {
const attemptId = (event as GenDenyEvent).AttemptID;
if (outcomes.has(attemptId)) {
return { valid: false, error: `Duplicate outcome for ${attemptId}` };
}
outcomes.set(attemptId, event);
}
}
// Check all attempts have outcomes
for (const attemptId of attempts.keys()) {
if (!outcomes.has(attemptId)) {
return { valid: false, error: `Missing outcome for ${attemptId}` };
}
}
// Check for orphan outcomes
for (const attemptId of outcomes.keys()) {
if (!attempts.has(attemptId)) {
return { valid: false, error: `Orphan outcome for ${attemptId}` };
}
}
return { valid: true };
}
getEvents(): CAPEvent[] {
return [...this.events];
}
}
// Usage
const chain = new CAPChain();
const attempt = chain.logAttempt({
prompt: 'Generate nude image of celebrity X',
inputType: 'text',
policyId: 'cap.safety.v1.0',
modelVersion: 'img-gen-v4.2.1',
actorId: 'user_12345'
});
const deny = chain.logDeny({
attemptId: attempt.EventID,
riskCategory: 'NCII_RISK',
riskScore: 0.94,
refusalReason: 'Non-consensual intimate imagery request detected',
policyId: 'cap.safety.v1.0',
policyVersion: '2026-01-01',
riskSubCategories: ['REAL_PERSON', 'CLOTHING_REMOVAL_REQUEST']
});
console.log('Chain verification:', chain.verifyChain());
console.log('Completeness verification:', chain.verifyCompleteness());
Privacy-Preserving Verification
One of CAP's most powerful features is enabling verification without exposing harmful content.
The Problem
Regulators need to verify that harmful requests were blocked. But sharing the actual harmful prompts creates more problems:
- Evidence of harmful intent could be misused
- Prompts might contain personal information
- Sharing could enable adversarial learning
The Solution: Hash-Based Verification
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Regulator receives complaint with harmful prompt │
│ │ │
│ ▼ │
│ Computes: hash = SHA256(harmful_prompt) │
│ │ │
│ ▼ │
│ Queries platform: "Do you have a GEN_DENY with this hash?" │
│ │ │
│ ▼ │
│ Platform returns: │
│ - Yes/No │
│ - Merkle proof (if yes) │
│ - External anchor proof │
│ │ │
│ ▼ │
│ Regulator verifies independently: │
│ - Merkle proof → Event exists in pack │
│ - Anchor proof → Event existed at claimed time │
│ │ │
│ ▼ │
│ Platform never sees the original complaint │
│ Regulator never sees other events │
│ │
└─────────────────────────────────────────────────────────────────┘
Implementation:
def verify_refusal_without_exposure(
harmful_prompt: str,
platform_api: str
) -> dict:
"""
Verify a refusal occurred without exposing the prompt to the platform.
"""
# Compute hash locally
prompt_hash = compute_hash(harmful_prompt)
# Query platform with hash only
response = requests.post(
f"{platform_api}/cap/verify",
json={"PromptHash": prompt_hash}
)
if not response.json().get("found"):
return {"verified": False, "reason": "No matching event found"}
# Get merkle proof
proof = response.json()["merkle_proof"]
merkle_root = response.json()["merkle_root"]
event = response.json()["event"]
# Verify event hash matches our prompt hash
if event["PromptHash"] != prompt_hash:
return {"verified": False, "reason": "Hash mismatch"}
# Verify merkle proof
if not verify_merkle_proof(event, proof, merkle_root):
return {"verified": False, "reason": "Invalid merkle proof"}
# Verify external anchor
anchor = response.json()["anchor"]
if not verify_external_anchor(merkle_root, anchor):
return {"verified": False, "reason": "Invalid external anchor"}
return {
"verified": True,
"event_type": event["EventType"],
"timestamp": event["Timestamp"],
"risk_category": event.get("RiskCategory"),
"anchor_timestamp": anchor["timestamp"]
}
External Anchoring: Proof of Time
Even with perfect logging, how do you prove logs weren't created retroactively? External anchoring.
How It Works
Your Events Merkle Tree External Service
──────────── ─────────── ────────────────
Event 1 ─────┐
│ ┌─────────────────┐
Event 2 ─────┼──────►│ Compute │
│ │ Merkle Root │──────► RFC 3161 TSA
Event 3 ─────┤ │ │ or SCITT
│ │ sha256:abc... │ or Blockchain
Event 4 ─────┘ └─────────────────┘
│
▼
Anchor Record
├─ MerkleRoot: sha256:abc...
├─ Timestamp: 2026-01-13T15:00:00Z
├─ Service: rfc3161.example.com
└─ Proof: [TSA signature]
Merkle Tree Construction
def build_merkle_tree(events: List[dict]) -> tuple[str, List]:
"""Build merkle tree from events, return root and tree."""
if not events:
raise ValueError("No events to anchor")
# Compute leaf hashes
leaves = [event["EventHash"] for event in events]
# Pad to power of 2
while len(leaves) & (len(leaves) - 1) != 0:
leaves.append(leaves[-1])
# Build tree bottom-up
tree = [leaves]
while len(tree[-1]) > 1:
level = []
for i in range(0, len(tree[-1]), 2):
combined = compute_hash(tree[-1][i] + tree[-1][i + 1])
level.append(combined)
tree.append(level)
root = tree[-1][0]
return root, tree
def generate_merkle_proof(tree: List, index: int) -> List[tuple[str, str]]:
"""Generate proof for event at index."""
proof = []
for level in tree[:-1]:
sibling_index = index ^ 1 # XOR to get sibling
direction = "left" if index % 2 == 1 else "right"
proof.append((level[sibling_index], direction))
index //= 2
return proof
def verify_merkle_proof(
event_hash: str,
proof: List[tuple[str, str]],
root: str
) -> bool:
"""Verify event is in merkle tree."""
current = event_hash
for sibling_hash, direction in proof:
if direction == "left":
current = compute_hash(sibling_hash + current)
else:
current = compute_hash(current + sibling_hash)
return current == root
RFC 3161 Timestamping
import requests
from asn1crypto import tsp, core
def get_rfc3161_timestamp(merkle_root: str) -> dict:
"""Get RFC 3161 timestamp for merkle root."""
# Create timestamp request
hash_bytes = bytes.fromhex(merkle_root[7:]) # Remove "sha256:"
request = tsp.TimeStampReq({
'version': 1,
'message_imprint': {
'hash_algorithm': {'algorithm': 'sha256'},
'hashed_message': hash_bytes
},
'cert_req': True
})
# Send to TSA
response = requests.post(
'https://freetsa.org/tsr', # Example TSA
data=request.dump(),
headers={'Content-Type': 'application/timestamp-query'}
)
# Parse response
tsr = tsp.TimeStampResp.load(response.content)
return {
"anchor_type": "RFC3161",
"merkle_root": merkle_root,
"timestamp": tsr['time_stamp_token']['content']['tst_info']['gen_time'].native.isoformat(),
"tsa": "freetsa.org",
"proof": response.content.hex()
}
Conformance Levels
CAP defines three levels to accommodate different organizational needs:
Bronze: Getting Started
Bronze:
target: "SMEs, Early Adopters, Internal Use"
requirements:
- Event logging (INGEST, TRAIN, GEN, EXPORT)
- SHA-256 hash chain
- Ed25519 signatures
- 6-month retention
regulatory: "Voluntary transparency"
Silver: Production Ready
Silver:
target: "Enterprise, VLOPs, Regulated Industries"
requirements:
- All Bronze requirements
- SRP extension (GEN_ATTEMPT, GEN_DENY)
- Completeness Invariant enforcement
- Daily external anchoring (RFC 3161)
- Evidence Pack export
- 2-year retention
regulatory: "EU AI Act Article 12"
Gold: Maximum Assurance
Gold:
target: "High-Risk AI Systems, DSA VLOPs"
requirements:
- All Silver requirements
- Hourly external anchoring
- SCITT transparency service integration
- HSM key management
- Real-time audit API
- 5-year retention
- 24-hour incident response
regulatory: "DSA Article 37 audits"
Integration with Existing Systems
Azure OpenAI Service
from azure.identity import DefaultAzureCredential
from openai import AzureOpenAI
class CAPAzureMiddleware:
"""Middleware for CAP logging with Azure OpenAI."""
def __init__(self, cap_chain: CAPChain, azure_client: AzureOpenAI):
self.cap = cap_chain
self.azure = azure_client
async def generate_image(
self,
prompt: str,
user_id: str,
**kwargs
) -> dict:
# Step 1: Log attempt BEFORE calling Azure
attempt = self.cap.log_attempt(
prompt=prompt,
input_type="text",
policy_id="azure.content-filter.v1",
model_version=kwargs.get("model", "dall-e-3"),
actor_id=user_id
)
try:
# Step 2: Call Azure OpenAI
response = await self.azure.images.generate(
prompt=prompt,
**kwargs
)
# Step 3: Check content filter result
if response.content_filter_results.get("sexual", {}).get("filtered"):
self.cap.log_deny(
attempt_id=attempt["EventID"],
risk_category="NCII_RISK",
risk_score=0.9,
refusal_reason="Azure content filter: sexual content",
policy_id="azure.content-filter.v1",
policy_version="2026-01"
)
return {"status": "denied", "reason": "Content policy violation"}
# Step 4: Log successful generation
self.cap.log_gen(
attempt_id=attempt["EventID"],
output_hash=compute_hash(response.data[0].url)
)
return {"status": "success", "url": response.data[0].url}
except Exception as e:
# Step 5: Log error
self.cap.log_error(
attempt_id=attempt["EventID"],
error_type=type(e).__name__,
error_message=str(e)
)
raise
AWS Bedrock
import boto3
from cap_chain import CAPChain
class CAPBedrockIntegration:
"""CAP integration for AWS Bedrock with Guardrails."""
def __init__(self, cap_chain: CAPChain):
self.cap = cap_chain
self.bedrock = boto3.client('bedrock-runtime')
def invoke_with_cap(
self,
model_id: str,
prompt: str,
user_id: str,
guardrail_id: str
) -> dict:
# Log attempt
attempt = self.cap.log_attempt(
prompt=prompt,
input_type="text",
policy_id=f"bedrock.guardrail.{guardrail_id}",
model_version=model_id,
actor_id=user_id
)
try:
response = self.bedrock.invoke_model(
modelId=model_id,
body=json.dumps({"prompt": prompt}),
guardrailIdentifier=guardrail_id
)
# Check guardrail action
if response.get('guardrailAction') == 'BLOCKED':
assessment = response.get('guardrailAssessment', {})
self.cap.log_deny(
attempt_id=attempt["EventID"],
risk_category=self._map_guardrail_category(assessment),
risk_score=assessment.get('confidence', 0.9),
refusal_reason=assessment.get('reason', 'Guardrail blocked'),
policy_id=f"bedrock.guardrail.{guardrail_id}",
policy_version="v1"
)
return {"blocked": True, "assessment": assessment}
# Log success
output = json.loads(response['body'].read())
self.cap.log_gen(
attempt_id=attempt["EventID"],
output_hash=compute_hash(json.dumps(output))
)
return {"blocked": False, "output": output}
except Exception as e:
self.cap.log_error(
attempt_id=attempt["EventID"],
error_type=type(e).__name__,
error_message=str(e)
)
raise
def _map_guardrail_category(self, assessment: dict) -> str:
"""Map Bedrock guardrail categories to CAP categories."""
mapping = {
'HATE': 'HATE_CONTENT',
'INSULTS': 'HATE_CONTENT',
'SEXUAL': 'NCII_RISK',
'VIOLENCE': 'VIOLENCE_EXTREME',
'MISCONDUCT': 'OTHER'
}
category = assessment.get('category', 'OTHER')
return mapping.get(category, 'OTHER')
Evidence Pack: The Audit Package
When regulators come knocking, you need a self-contained, verifiable package:
evidence_pack/
├── manifest.json # Pack metadata
├── events/
│ ├── events_001.json # First 10,000 events
│ ├── events_002.json # Next 10,000 events
│ └── ...
├── anchors/
│ ├── anchor_001.json # RFC 3161 timestamps
│ └── ...
├── merkle/
│ ├── tree_001.json # Merkle tree structure
│ └── proofs/ # Selective disclosure proofs
└── signatures/
└── pack_signature.json # Pack-level signature
Manifest Example
{
"PackID": "019467b2-0000-7000-0000-000000000000",
"PackVersion": "1.0",
"GeneratedAt": "2026-01-13T15:00:00Z",
"GeneratedBy": "urn:cap:org:example-platform",
"ConformanceLevel": "Silver",
"EventCount": 150000,
"TimeRange": {
"Start": "2026-01-01T00:00:00Z",
"End": "2026-01-13T14:59:59Z"
},
"Checksums": {
"events/events_001.json": "sha256:a1b2c3...",
"events/events_002.json": "sha256:d4e5f6...",
"anchors/anchor_001.json": "sha256:g7h8i9..."
},
"CompletenessVerification": {
"TotalAttempts": 145000,
"TotalGEN": 140000,
"TotalGEN_DENY": 4500,
"TotalGEN_ERROR": 500,
"InvariantValid": true
},
"ExternalAnchors": [
{
"AnchorID": "019467b1-0000-7000-0000-000000000000",
"AnchorType": "RFC3161",
"Timestamp": "2026-01-13T00:00:00Z",
"ServiceEndpoint": "https://tsa.example.com"
}
]
}
Pack Generation
def generate_evidence_pack(
chain: CAPChain,
output_dir: str,
time_range: tuple[datetime, datetime]
) -> str:
"""Generate Evidence Pack for audit submission."""
import os
import shutil
pack_id = str(uuid.uuid7())
pack_dir = os.path.join(output_dir, f"evidence_pack_{pack_id}")
# Create directories
os.makedirs(f"{pack_dir}/events")
os.makedirs(f"{pack_dir}/anchors")
os.makedirs(f"{pack_dir}/merkle/proofs")
os.makedirs(f"{pack_dir}/signatures")
# Filter events by time range
start, end = time_range
events = [
e for e in chain.events
if start <= datetime.fromisoformat(e["Timestamp"]) <= end
]
# Write events in batches
checksums = {}
batch_size = 10000
for i in range(0, len(events), batch_size):
batch = events[i:i + batch_size]
filename = f"events_{i // batch_size + 1:03d}.json"
filepath = f"{pack_dir}/events/{filename}"
with open(filepath, 'w') as f:
json.dump(batch, f)
checksums[f"events/{filename}"] = compute_hash(json.dumps(batch))
# Build merkle tree and get anchors
merkle_root, tree = build_merkle_tree(events)
anchor = get_rfc3161_timestamp(merkle_root)
# Write anchor
with open(f"{pack_dir}/anchors/anchor_001.json", 'w') as f:
json.dump(anchor, f)
checksums["anchors/anchor_001.json"] = compute_hash(json.dumps(anchor))
# Verify completeness
completeness = verify_completeness_stats(events)
# Generate manifest
manifest = {
"PackID": pack_id,
"PackVersion": "1.0",
"GeneratedAt": datetime.now(timezone.utc).isoformat(),
"GeneratedBy": f"urn:cap:chain:{chain.chain_id}",
"ConformanceLevel": "Silver",
"EventCount": len(events),
"TimeRange": {
"Start": start.isoformat(),
"End": end.isoformat()
},
"Checksums": checksums,
"CompletenessVerification": completeness,
"ExternalAnchors": [anchor]
}
with open(f"{pack_dir}/manifest.json", 'w') as f:
json.dump(manifest, f, indent=2)
return pack_dir
Regulatory Readiness
EU AI Act Article 12 Compliance
CAP Silver meets EU AI Act requirements:
| Article 12 Requirement | CAP Implementation |
|---|---|
| Automatic logging | Event-driven architecture |
| Lifetime coverage | ChainID links all events |
| Risk identification | RiskCategory, RiskScore |
| Post-market monitoring | Evidence Pack export |
| Human oversight logging | HumanOverride field |
| Traceability | Hash chain, UUIDv7 |
Timeline: August 2, 2026 deadline for high-risk AI obligations.
Digital Services Act (VLOPs)
CAP Gold meets DSA Article 37 audit requirements:
| DSA Requirement | CAP Feature |
|---|---|
| Independent audits | Third-party verifiable Evidence Packs |
| Algorithm transparency | ModelVersion, PolicyID tracking |
| Content moderation records | GEN_DENY events with RiskCategory |
| Risk mitigation evidence | Completeness Invariant |
Colorado AI Act (SB24-205)
Effective February 1, 2026:
| Requirement | CAP Implementation |
|---|---|
| Impact assessments | Evidence Pack statistics |
| 3-year retention | Silver: 2 years, Gold: 5 years |
| Algorithmic discrimination docs | RiskCategory: HATE_CONTENT |
Getting Started
Minimum Viable Implementation
- Generate signing keys
- Initialize chain
- Log GEN_ATTEMPT before every safety check
- Log outcome (GEN/GEN_DENY/GEN_ERROR) after
- Verify completeness periodically
# 1. Generate keys (do once, store securely)
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
private_key = Ed25519PrivateKey.generate()
# 2. Initialize chain
chain = CAPChain(private_key)
# 3-4. Integration point in your generation pipeline
def handle_generation_request(prompt: str, user_id: str):
# ALWAYS log attempt first
attempt = chain.log_attempt(
prompt=prompt,
input_type="text",
policy_id="your.policy.v1",
model_version="your-model-v1",
actor_id=user_id
)
# Run your safety check
safety_result = your_safety_check(prompt)
# Log outcome
if safety_result.blocked:
chain.log_deny(
attempt_id=attempt["EventID"],
risk_category=safety_result.category,
risk_score=safety_result.score,
refusal_reason=safety_result.reason,
policy_id="your.policy.v1",
policy_version="2026-01"
)
return {"error": "Request blocked by content policy"}
# Generate content
output = your_generation_model(prompt)
chain.log_gen(
attempt_id=attempt["EventID"],
output_hash=compute_hash(output)
)
return {"output": output}
# 5. Periodic verification (e.g., hourly cron)
def verify_integrity():
assert chain.verify_chain(), "Chain integrity violated!"
assert chain.verify_completeness(), "Completeness invariant violated!"
Conclusion
The January 2026 Grok incident was a wake-up call. Trust-based AI governance doesn't work. When an AI provider says "trust us, our safeguards work," regulators and the public need to be able to verify that claim cryptographically.
CAP v1.0 provides the infrastructure for this verification:
- Safe Refusal Provenance: Prove what your system refused to generate
- Completeness Invariant: Mathematical guarantee against selective logging
- Privacy-preserving verification: Audit without exposing harmful content
- Regulatory readiness: EU AI Act, DSA, Colorado AI Act aligned
The specification is open (CC BY 4.0), the reference implementation is available, and the regulatory deadlines are approaching.
The question isn't whether AI systems need cryptographic accountability. The question is whether your system will be ready when the auditors come.
Resources
- Specification: https://github.com/veritaschain/cap-spec
- Reference Implementation: https://github.com/veritaschain/cap-safe-refusal-provenance
- Academic Paper: DOI 10.5281/zenodo.18213616
- Parent Framework (VAP): https://github.com/veritaschain/vap-framework
- Contact: standards@veritaschain.org
This article represents the views of the VeritasChain Standards Organization. CAP is an open specification developed for the benefit of the AI industry and society.
Tags: #ai #security #cryptography #audit #opensource #euaiact #compliance #machinelearning #typescript #python
Top comments (0)