On March 17, 2026, Florida charged a man with 100 counts including 46 for AI-generated CSAM. On February 3, the UK ICO opened a formal investigation into Grok's generation of non-consensual imagery. In both cases, the question no one could answer was the same: what did the AI system actually refuse, and can anyone prove it? This article walks you through building the answer — with full working Python code for CAP-SRP v1.1's three new event types.
TL;DR
A Florida prosecution, a UK regulatory investigation, and an EU compliance deadline all converged on the same structural gap: AI providers can claim their safety systems work, but no one can independently verify it. The previous version of CAP-SRP (v1.0) addressed generation-level refusal logging. Version 1.1, released March 5, 2026, extends the framework upstream — to account enforcement, law enforcement referral decisions, and tamper-evident policy versioning.
This article:
- Fact-checks the five events that motivated v1.1's design
- Explains the three new event types (
ACCOUNT_ACTION,LAW_ENFORCEMENT_REFERRAL,POLICY_VERSION) with full schemas - Provides complete, runnable Python code for the v1.1 implementation
- Implements the four Completeness Invariants (expanded from one in v1.0)
- Maps the implementation to regulatory deadlines hitting in the next 90 days
GitHub: veritaschain/cap-spec · veritaschain/cap-srp · License: CC BY 4.0 / Apache 2.0
Table of Contents
- The Five Events That Broke the v1.0 Model
- What v1.1 Adds: The Account Enforcement Layer
- Architecture: From Generation to Enforcement
- Implementation: Core Data Model
- Implementation: The v1.1 Event Types
- Implementation: The Four Completeness Invariants
- Implementation: The Full Chain
- Putting It All Together: The Tumbler Ridge Scenario
- Regulatory Deadline Map
- What Developers Should Build Now
- Transparency Notes
The Five Events That Broke the v1.0 Model
CAP-SRP v1.0, released January 28, 2026, solved a specific problem: cryptographic proof that an AI system refused to generate content. The Completeness Invariant — GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR — ensured that every generation request had exactly one recorded outcome.
But within weeks of v1.0's release, five events exposed gaps that generation-level logging alone couldn't cover.
1. Florida AI-CSAM: 100 Charges, Zero AI Logs
On March 17, 2026, Florida Attorney General James Uthmeier announced the arrest of Blake McKinniss, 37, of Sanford. The charges: 53 counts of CSAM possession, 46 counts of AI-generated CSAM possession, and 1 count of possessing a childlike sex doll. The investigation began on January 27 after FDLE received over 70 NCMEC CyberTipline reports. A search warrant was executed on March 9.
The entire case was built from the human side: NCMEC tips, physical evidence, device forensics. No evidence in the case record originated from the AI systems used to generate the material. No refusal logs. No account action records. No law enforcement referral audit trail.
Even if the AI provider had blocked 99% of McKinniss's attempts, there is no standardized mechanism for prosecutors to request, receive, or verify that information. v1.0 could log refusals — but it couldn't log what happened to the account after a pattern of harmful requests, or whether law enforcement was notified.
2. UK ICO vs. Grok: Can't Verify What You Can't Audit
The UK Information Commissioner's Office announced formal investigations into X Internet Unlimited Company and X.AI LLC on February 3, 2026. The investigation covers Grok's generation of non-consensual sexualized imagery. The ICO first contacted the companies on January 7, seeking urgent information.
The ICO has enforcement tools — it can compel disclosure, impose fines, issue notices. But when xAI provides safety logs, the ICO has no technical standard against which to verify their completeness. No hash chain proves the logs are tamper-evident. No external anchoring proves they weren't modified after the fact. The ICO can demand records; it cannot verify the records' integrity.
3. EU Code of Practice: Logging Made Optional
The European Commission published the second draft of its Code of Practice on marking and labelling AI-generated content in early March 2026. Comments close March 30. The draft adopts a two-layered approach — secured metadata plus watermarking — but logging was downgraded from near-mandatory in the first draft to optional in the second. The code covers the lifecycle of content that exists. Refusal provenance is outside its scope.
4. TAKE IT DOWN Act: 53 Days to Compliance
The TAKE IT DOWN Act (signed May 19, 2025) establishes a notice-and-removal framework with a 48-hour removal obligation. Platforms must demonstrate they removed content. The May 19, 2026 compliance deadline is 53 days away. No provision requires demonstrating that content was prevented from being generated.
5. The Tumbler Ridge Pattern
The event that directly motivated v1.1 was the Tumbler Ridge shooting case (February 2026), where an AI system was used not to generate harmful imagery but to assist in planning violence. The existing v1.0 risk categories didn't cover content that facilitated planning without depicting violence. More critically, the case exposed what we call the "account enforcement opacity problem": when a platform bans an account, there is no standard for recording why, what policy was applied, whether law enforcement was notified, and whether the notification threshold assessment was documented.
v1.0 logged refusals. v1.1 logs what happens after refusals.
What v1.1 Adds: The Account Enforcement Layer
CAP-SRP v1.1 introduces three new event types and three additional Completeness Invariants:
| Event Type | Purpose | Motivated By |
|---|---|---|
ACCOUNT_ACTION |
Cryptographic record of account bans, suspensions, rate-limiting, reinstatements | Tumbler Ridge — no audit trail for enforcement decisions |
LAW_ENFORCEMENT_REFERRAL |
Verifiable record of LE notification threshold assessment | Florida CSAM — no record of whether/when LE was notified |
POLICY_VERSION |
Tamper-evident versioning of safety policies | ICO investigation — can't verify which policy version applied |
And three new intermediate-state events formalized from v1.0 drafts:
| Event Type | Purpose | Completeness Role |
|---|---|---|
GEN_WARN |
Generation allowed with warning | Outcome (treated as GEN) |
GEN_ESCALATE |
Sent for human review | Pending → resolved by GEN or GEN_DENY |
GEN_QUARANTINE |
Generated but held before delivery | Pending → resolved by EXPORT or GEN_DENY |
Plus the VIOLENCE_PLANNING risk category — content that facilitates planning of violent acts without depicting violence directly.
Architecture: From Generation to Enforcement
Here's how v1.0 and v1.1 fit together:
CAP-SRP v1.1 Architecture
══════════════════════════════════════════════════════════════
POLICY LAYER (v1.1 — new)
┌────────────────────────────────────────────────────────┐
│ │
│ POLICY_VERSION ──── ExternalAnchorRef ──► RFC 3161 │
│ │ Timestamp │
│ │ (PolicyHash + EffectiveFrom) Service │
│ │ │
│ ▼ │
│ AppliedPolicyVersionRef on every GEN_DENY │
│ │
└────────────────────────────────────────────────────────┘
│
▼
GENERATION LAYER (v1.0 + v1.1 extensions)
┌────────────────────────────────────────────────────────┐
│ │
│ GEN_ATTEMPT ──► Safety Check ──┬──► GEN │
│ │ ├──► GEN_DENY │
│ │ ├──► GEN_WARN (v1.1) │
│ │ ├──► GEN_ERROR │
│ │ └──► GEN_ESCALATE(v1.1)│
│ │ │ │
│ │ resolved by │
│ │ GEN or GEN_DENY │
│ ▼ │
│ Completeness Invariant: │
│ ∑ ATTEMPT = ∑ GEN + ∑ DENY + ∑ ERROR │
│ │
└────────────────────────────────────────────────────────┘
│
(pattern of GEN_DENY events detected)
│
▼
ACCOUNT ENFORCEMENT LAYER (v1.1 — new)
┌────────────────────────────────────────────────────────┐
│ │
│ ACCOUNT_ACTION ─────► LawEnforcementAssessment │
│ (BAN/SUSPEND/ (ThresholdMet? AssessorType?) │
│ RATE_LIMIT/ │ │
│ REINSTATE/ ▼ │
│ FLAG_FOR_REVIEW) LAW_ENFORCEMENT_REFERRAL │
│ (REFERRED / NOT_REFERRED / │
│ PENDING_LEGAL_REVIEW) │
│ │
└────────────────────────────────────────────────────────┘
The key insight: every layer references the layer above it. A GEN_DENY references the POLICY_VERSION that was in effect when the denial was made. An ACCOUNT_ACTION references the GEN_ATTEMPT events that triggered it. A LAW_ENFORCEMENT_REFERRAL references the ACCOUNT_ACTION that prompted the assessment.
This creates an unbroken chain from policy → generation decision → enforcement action → LE notification. If any link is missing, the audit trail is provably incomplete.
Implementation: Core Data Model
Let's build this. All code targets Python 3.10+ with minimal dependencies.
Prerequisites
pip install cryptography
Foundation: Hashing, Signing, and Event Base
"""
CAP-SRP v1.1 Reference Implementation
======================================
Core data model with Ed25519 signatures, SHA-256 hash chains,
and the four Completeness Invariants.
License: Apache 2.0
GitHub: https://github.com/veritaschain/cap-srp
"""
import hashlib
import json
import time
import uuid
import base64
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Optional, List, Dict, Tuple
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey, Ed25519PublicKey
)
# ─── Utilities ───────────────────────────────────────────
def sha256(data: str) -> str:
"""SHA-256 hash with algorithm prefix."""
return f"sha256:{hashlib.sha256(data.encode()).hexdigest()}"
def sha256_bytes(data: bytes) -> str:
"""SHA-256 hash of raw bytes."""
return f"sha256:{hashlib.sha256(data).hexdigest()}"
def canonicalize(obj: dict) -> str:
"""RFC 8785 JSON Canonicalization (simplified)."""
return json.dumps(obj, sort_keys=True, separators=(",", ":"))
def uuid7() -> str:
"""UUIDv7 (time-ordered) for event IDs."""
timestamp_ms = int(time.time() * 1000)
rand_bits = uuid.uuid4().int & ((1 << 62) - 1)
u = (timestamp_ms << 80) | (0b0111 << 76) | (rand_bits & ((1 << 76) - 1))
return str(uuid.UUID(int=u))
# ─── Enumerations ────────────────────────────────────────
class EventType(Enum):
# Generation events (v1.0)
GEN_ATTEMPT = "GEN_ATTEMPT"
GEN = "GEN"
GEN_DENY = "GEN_DENY"
GEN_ERROR = "GEN_ERROR"
# Generation events (v1.1 — formalized)
GEN_WARN = "GEN_WARN"
GEN_ESCALATE = "GEN_ESCALATE"
GEN_QUARANTINE = "GEN_QUARANTINE"
# Lifecycle events
EXPORT = "EXPORT"
# Account enforcement events (v1.1 — new)
ACCOUNT_ACTION = "ACCOUNT_ACTION"
LAW_ENFORCEMENT_REFERRAL = "LAW_ENFORCEMENT_REFERRAL"
POLICY_VERSION = "POLICY_VERSION"
class RiskCategory(Enum):
CSAM_RISK = "CSAM_RISK"
NCII_RISK = "NCII_RISK"
MINOR_SEXUALIZATION = "MINOR_SEXUALIZATION"
REAL_PERSON_DEEPFAKE = "REAL_PERSON_DEEPFAKE"
VIOLENCE_EXTREME = "VIOLENCE_EXTREME"
VIOLENCE_PLANNING = "VIOLENCE_PLANNING" # v1.1 — new
HATE_CONTENT = "HATE_CONTENT"
TERRORIST_CONTENT = "TERRORIST_CONTENT"
SELF_HARM_PROMOTION = "SELF_HARM_PROMOTION"
COPYRIGHT_VIOLATION = "COPYRIGHT_VIOLATION"
COPYRIGHT_STYLE_MIMICRY = "COPYRIGHT_STYLE_MIMICRY"
OTHER = "OTHER"
class ActionType(Enum):
SUSPEND = "SUSPEND"
BAN = "BAN"
RATE_LIMIT = "RATE_LIMIT"
REINSTATE = "REINSTATE"
FLAG_FOR_REVIEW = "FLAG_FOR_REVIEW"
class RiskBand(Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
class DecisionMechanism(Enum):
AUTOMATED = "AUTOMATED"
HUMAN_INITIATED = "HUMAN_INITIATED"
HUMAN_CONFIRMED_AUTOMATED = "HUMAN_CONFIRMED_AUTOMATED"
class ReferralStatus(Enum):
REFERRED = "REFERRED"
NOT_REFERRED = "NOT_REFERRED"
PENDING_LEGAL_REVIEW = "PENDING_LEGAL_REVIEW"
class PolicyType(Enum):
CONTENT_MODERATION = "CONTENT_MODERATION"
LE_NOTIFICATION = "LE_NOTIFICATION"
ACCOUNT_ACTION = "ACCOUNT_ACTION"
RETENTION = "RETENTION"
The Base Event Class
Every event in the chain — generation, enforcement, policy — inherits from this:
@dataclass
class CAPEvent:
"""Base event with Ed25519 signature and SHA-256 hash chain."""
event_id: str
event_type: EventType
chain_id: str
timestamp: str
prev_hash: Optional[str]
event_hash: Optional[str] = None
signature: Optional[str] = None
def _serializable_dict(self) -> dict:
"""Convert to dict for hashing, handling enums."""
d = {}
for k, v in asdict(self).items():
if k in ("event_hash", "signature"):
continue
if isinstance(v, Enum):
d[k] = v.value
elif isinstance(v, list):
d[k] = [
item.value if isinstance(item, Enum) else item
for item in v
]
else:
d[k] = v
return d
def compute_hash(self) -> str:
"""SHA-256 of canonicalized event (excluding hash and sig)."""
return sha256(canonicalize(self._serializable_dict()))
def sign(self, private_key: Ed25519PrivateKey):
"""Compute hash, then sign with Ed25519."""
self.event_hash = self.compute_hash()
hash_bytes = bytes.fromhex(self.event_hash[7:])
sig = private_key.sign(hash_bytes)
self.signature = f"ed25519:{base64.b64encode(sig).decode()}"
def verify(self, public_key: Ed25519PublicKey) -> bool:
"""Verify Ed25519 signature against event hash."""
if not self.event_hash or not self.signature:
return False
try:
hash_bytes = bytes.fromhex(self.event_hash[7:])
sig_bytes = base64.b64decode(self.signature[8:])
public_key.verify(sig_bytes, hash_bytes)
return True
except Exception:
return False
Implementation: The v1.1 Event Types
Generation Events (v1.0 core + v1.1 extensions)
@dataclass
class GenAttemptEvent(CAPEvent):
"""
Logged BEFORE safety evaluation.
Creates an unforgeable commitment that the request existed.
CRITICAL: Must be logged before any safety check runs.
Without pre-evaluation logging, a malicious provider could
silently drop harmful requests from the audit trail.
"""
prompt_hash: str = ""
input_type: str = "text"
model_version: str = ""
policy_id: str = ""
actor_hash: str = ""
@classmethod
def create(cls, chain_id: str, prev_hash: Optional[str],
prompt: str, actor_id: str, model_version: str,
policy_id: str, input_type: str = "text"):
return cls(
event_id=uuid7(),
event_type=EventType.GEN_ATTEMPT,
chain_id=chain_id,
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
prev_hash=prev_hash,
prompt_hash=sha256(prompt),
input_type=input_type,
model_version=model_version,
policy_id=policy_id,
actor_hash=sha256(actor_id),
)
@dataclass
class GenDenyEvent(CAPEvent):
"""
Logged when safety evaluation DENIES generation.
v1.1 additions:
- applied_policy_version_ref: links to POLICY_VERSION event
- jurisdiction_context: ISO 3166-1 alpha-2 code
- takedown_relevance: flags for TAKE IT DOWN Act compliance
"""
attempt_id: str = ""
risk_category: str = ""
risk_score: float = 0.0
refusal_reason: str = ""
policy_version: str = ""
# v1.1 fields
applied_policy_version_ref: str = ""
jurisdiction_context: str = ""
takedown_relevance: Optional[dict] = None
@classmethod
def create(cls, chain_id: str, prev_hash: Optional[str],
attempt_id: str, risk_category: RiskCategory,
risk_score: float, reason: str,
policy_version: str,
policy_version_ref: str = "",
jurisdiction: str = "",
takedown_flags: Optional[dict] = None):
return cls(
event_id=uuid7(),
event_type=EventType.GEN_DENY,
chain_id=chain_id,
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
prev_hash=prev_hash,
attempt_id=attempt_id,
risk_category=risk_category.value,
risk_score=risk_score,
refusal_reason=reason,
policy_version=policy_version,
applied_policy_version_ref=policy_version_ref,
jurisdiction_context=jurisdiction,
takedown_relevance=takedown_flags,
)
@dataclass
class GenEscalateEvent(CAPEvent):
"""
v1.1: Request sent for human review.
MUST be resolved by a subsequent GEN or GEN_DENY event.
Unresolved escalations older than 72 hours are a compliance
violation.
"""
attempt_id: str = ""
escalation_reason: str = ""
resolution_ref: Optional[str] = None # Populated when resolved
@classmethod
def create(cls, chain_id: str, prev_hash: Optional[str],
attempt_id: str, reason: str):
return cls(
event_id=uuid7(),
event_type=EventType.GEN_ESCALATE,
chain_id=chain_id,
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
prev_hash=prev_hash,
attempt_id=attempt_id,
escalation_reason=reason,
)
@dataclass
class GenQuarantineEvent(CAPEvent):
"""
v1.1: Content generated but held before delivery.
MUST be resolved by EXPORT (released) or GEN_DENY (blocked).
"""
attempt_id: str = ""
content_hash: str = ""
expiry_policy: str = "REQUIRES_HUMAN_APPROVAL"
release_ref: Optional[str] = None # Populated when resolved
@classmethod
def create(cls, chain_id: str, prev_hash: Optional[str],
attempt_id: str, content: str,
expiry: str = "REQUIRES_HUMAN_APPROVAL"):
return cls(
event_id=uuid7(),
event_type=EventType.GEN_QUARANTINE,
chain_id=chain_id,
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
prev_hash=prev_hash,
attempt_id=attempt_id,
content_hash=sha256(content),
expiry_policy=expiry,
)
Account Enforcement Events (v1.1 — new)
This is the code the Florida case and the ICO investigation need:
@dataclass
class AccountActionEvent(CAPEvent):
"""
v1.1: Cryptographic record of account-level enforcement.
Motivated by the Tumbler Ridge incident: when a platform
bans an account, there must be a tamper-evident record of
WHY, WHAT POLICY applied, and WHETHER law enforcement
was assessed for notification.
Privacy: account_hash uses keyed HMAC, never plaintext.
Supports crypto-shredding for GDPR Article 17.
"""
account_hash: str = ""
action_type: str = ""
triggering_event_refs: List[str] = field(default_factory=list)
policy_version_ref: str = ""
risk_score_band: str = ""
decision_mechanism: str = ""
le_assessment: Optional[dict] = None # Law enforcement assessment
@classmethod
def create(cls, chain_id: str, prev_hash: Optional[str],
account_id: str, action: ActionType,
triggering_events: List[str],
policy_ref: str, risk_band: RiskBand,
mechanism: DecisionMechanism,
le_threshold_met: bool = False,
le_threshold_ref: str = "",
le_assessor: str = "AUTOMATED"):
return cls(
event_id=uuid7(),
event_type=EventType.ACCOUNT_ACTION,
chain_id=chain_id,
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
prev_hash=prev_hash,
account_hash=sha256(account_id),
action_type=action.value,
triggering_event_refs=triggering_events,
policy_version_ref=policy_ref,
risk_score_band=risk_band.value,
decision_mechanism=mechanism.value,
le_assessment={
"ThresholdMet": le_threshold_met,
"ThresholdDefinitionRef": le_threshold_ref,
"AssessmentTimestamp": time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
"AssessorType": le_assessor,
},
)
@dataclass
class LawEnforcementReferralEvent(CAPEvent):
"""
v1.1: Verifiable record of law enforcement notification
threshold assessment.
This event answers: "Did the platform assess whether law
enforcement should be notified? What was the outcome?
Was legal counsel involved?"
DecisionRationaleRef stores ONLY a hash of the internal
decision document — never plaintext — preserving
attorney-client privilege.
"""
triggering_account_action_ref: str = ""
referral_status: str = ""
jurisdiction_code: str = ""
legal_framework: str = ""
threshold_doc_ref: str = ""
threshold_met: bool = False
decision_rationale_ref: str = ""
decision_timestamp: str = ""
legal_review_completed: bool = False
@classmethod
def create(cls, chain_id: str, prev_hash: Optional[str],
account_action_ref: str,
status: ReferralStatus,
jurisdiction: str,
framework: str,
threshold_ref: str,
threshold_met: bool,
rationale_doc: str,
legal_reviewed: bool):
return cls(
event_id=uuid7(),
event_type=EventType.LAW_ENFORCEMENT_REFERRAL,
chain_id=chain_id,
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
prev_hash=prev_hash,
triggering_account_action_ref=account_action_ref,
referral_status=status.value,
jurisdiction_code=jurisdiction,
legal_framework=framework,
threshold_doc_ref=threshold_ref,
threshold_met=threshold_met,
decision_rationale_ref=sha256(rationale_doc),
decision_timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
legal_review_completed=legal_reviewed,
)
@dataclass
class PolicyVersionEvent(CAPEvent):
"""
v1.1: Tamper-evident policy version management.
CRITICAL CONSTRAINT: ExternalAnchorRef timestamp MUST
precede or equal EffectiveFrom. If the anchor postdates
the effective date, implementations MUST reject the event.
This prevents retroactive policy changes — a provider
can't claim "we had a stricter policy" if that policy
wasn't anchored before it took effect.
"""
policy_id: str = ""
policy_hash: str = ""
effective_from: str = ""
supersedes_ref: Optional[str] = None
policy_type: str = ""
jurisdiction_scope: List[str] = field(default_factory=list)
external_anchor_ref: str = ""
@classmethod
def create(cls, chain_id: str, prev_hash: Optional[str],
policy_id: str, policy_document: str,
effective_from: str, supersedes: Optional[str],
policy_type: PolicyType,
jurisdictions: List[str],
anchor_ref: str):
return cls(
event_id=uuid7(),
event_type=EventType.POLICY_VERSION,
chain_id=chain_id,
timestamp=time.strftime(
"%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()
),
prev_hash=prev_hash,
policy_id=policy_id,
policy_hash=sha256(policy_document),
effective_from=effective_from,
supersedes_ref=supersedes,
policy_type=policy_type.value,
jurisdiction_scope=jurisdictions,
external_anchor_ref=anchor_ref,
)
Implementation: The Four Completeness Invariants
v1.0 had one invariant. v1.1 has four. Here's why — and here's the code.
The Four Completeness Invariants
════════════════════════════════════════════════════════════
1. PRIMARY (v1.0):
∑ GEN_ATTEMPT = ∑ GEN + ∑ GEN_DENY + ∑ GEN_ERROR
→ Every attempt has exactly one outcome.
2. ESCALATION RESOLUTION (v1.1):
∑ GEN_ESCALATE = ∑ ESCALATION_RESOLVED
→ Every escalation has a resolution (GEN or GEN_DENY).
→ Unresolved escalations > 72 hours = compliance violation.
3. QUARANTINE RESOLUTION (v1.1):
∑ GEN_QUARANTINE = ∑ QUARANTINE_RELEASED + ∑ QUARANTINE_DENIED
→ Every quarantined item is either released or blocked.
4. POLICY ANCHORING (v1.1):
∀ POLICY_VERSION: ExternalAnchorTimestamp ≤ EffectiveFrom
→ No policy can be backdated past its external anchor.
class CompletenessVerifier:
"""
Verifies all four CAP-SRP v1.1 Completeness Invariants.
The Completeness Invariants are the mathematical core of
CAP-SRP. If any invariant fails, the audit trail is
provably compromised — it's not a matter of opinion,
it's just math.
"""
def __init__(self, events: List[CAPEvent]):
self.events = events
self._index_events()
def _index_events(self):
"""Build lookup indexes for efficient verification."""
self.by_type: Dict[EventType, List[CAPEvent]] = {}
self.by_id: Dict[str, CAPEvent] = {}
for e in self.events:
self.by_type.setdefault(e.event_type, []).append(e)
self.by_id[e.event_id] = e
def verify_primary_invariant(self) -> dict:
"""
Invariant 1: ∑ ATTEMPT = ∑ GEN + ∑ DENY + ∑ ERROR
GEN_WARN counts as GEN (generation occurred, with warning).
"""
attempts = len(
self.by_type.get(EventType.GEN_ATTEMPT, [])
)
gens = len(
self.by_type.get(EventType.GEN, [])
) + len(
self.by_type.get(EventType.GEN_WARN, [])
)
denials = len(
self.by_type.get(EventType.GEN_DENY, [])
)
errors = len(
self.by_type.get(EventType.GEN_ERROR, [])
)
outcomes = gens + denials + errors
return {
"invariant": "PRIMARY",
"valid": attempts == outcomes,
"attempts": attempts,
"outcomes": {
"GEN": gens, "GEN_DENY": denials,
"GEN_ERROR": errors, "total": outcomes
},
"equation": f"{attempts} == {gens} + {denials} + {errors}",
}
def verify_escalation_invariant(self) -> dict:
"""
Invariant 2: ∑ ESCALATE = ∑ ESCALATION_RESOLVED
Every GEN_ESCALATE must have a matching resolution.
Unresolved escalations older than 72 hours are flagged.
"""
escalations = self.by_type.get(EventType.GEN_ESCALATE, [])
resolved = sum(
1 for e in escalations
if getattr(e, "resolution_ref", None) is not None
)
unresolved = [
e.event_id for e in escalations
if getattr(e, "resolution_ref", None) is None
]
return {
"invariant": "ESCALATION_RESOLUTION",
"valid": len(escalations) == resolved,
"total_escalations": len(escalations),
"resolved": resolved,
"unresolved_ids": unresolved,
}
def verify_quarantine_invariant(self) -> dict:
"""
Invariant 3: ∑ QUARANTINE = ∑ RELEASED + ∑ DENIED
Every quarantined item must be resolved.
"""
quarantines = self.by_type.get(
EventType.GEN_QUARANTINE, []
)
resolved = sum(
1 for e in quarantines
if getattr(e, "release_ref", None) is not None
)
unresolved = [
e.event_id for e in quarantines
if getattr(e, "release_ref", None) is None
]
return {
"invariant": "QUARANTINE_RESOLUTION",
"valid": len(quarantines) == resolved,
"total_quarantines": len(quarantines),
"resolved": resolved,
"unresolved_ids": unresolved,
}
def verify_policy_anchoring_invariant(self) -> dict:
"""
Invariant 4: ∀ POLICY_VERSION:
ExternalAnchorTimestamp ≤ EffectiveFrom
No policy can claim to have been in effect before
it was externally timestamped. This prevents a
provider from retroactively claiming "we had
stricter policies all along."
"""
policies = self.by_type.get(EventType.POLICY_VERSION, [])
violations = []
for p in policies:
anchor = getattr(p, "external_anchor_ref", "")
effective = getattr(p, "effective_from", "")
# In production: verify RFC 3161 receipt timestamp
# Here: check that anchor_ref exists
if not anchor:
violations.append({
"policy_id": getattr(p, "policy_id", ""),
"reason": "Missing external anchor",
})
return {
"invariant": "POLICY_ANCHORING",
"valid": len(violations) == 0,
"total_policies": len(policies),
"violations": violations,
}
def verify_all(self) -> dict:
"""Run all four invariants. Return composite result."""
results = {
"primary": self.verify_primary_invariant(),
"escalation": self.verify_escalation_invariant(),
"quarantine": self.verify_quarantine_invariant(),
"policy_anchoring": self.verify_policy_anchoring_invariant(),
}
results["all_valid"] = all(
r["valid"] for r in results.values()
)
return results
Implementation: The Full Chain
Now let's wire it all together into a chain that manages the complete v1.1 lifecycle:
class CAPChainV11:
"""
CAP-SRP v1.1 hash chain with full lifecycle management.
Manages generation events, account enforcement,
law enforcement referrals, and policy versioning —
all in a single tamper-evident chain.
"""
def __init__(self, private_key: Ed25519PrivateKey,
chain_id: str = None):
self.private_key = private_key
self.public_key = private_key.public_key()
self.chain_id = chain_id or str(uuid.uuid4())
self.events: List[CAPEvent] = []
self.attempts: Dict[str, CAPEvent] = {}
self.outcomes: Dict[str, CAPEvent] = {}
self.policies: Dict[str, PolicyVersionEvent] = {}
@property
def last_hash(self) -> Optional[str]:
return self.events[-1].event_hash if self.events else None
def _append(self, event: CAPEvent) -> CAPEvent:
"""Sign, hash-chain, and store."""
event.sign(self.private_key)
self.events.append(event)
return event
# ─── Policy Layer ────────────────────────────────────
def publish_policy(self, policy_id: str,
policy_document: str,
effective_from: str,
policy_type: PolicyType,
jurisdictions: List[str],
supersedes: Optional[str] = None,
anchor_ref: str = "pending"
) -> PolicyVersionEvent:
"""
Publish a new policy version.
MUST be called BEFORE the policy takes effect.
MUST be externally anchored before EffectiveFrom.
"""
event = PolicyVersionEvent.create(
chain_id=self.chain_id,
prev_hash=self.last_hash,
policy_id=policy_id,
policy_document=policy_document,
effective_from=effective_from,
supersedes=supersedes,
policy_type=policy_type,
jurisdictions=jurisdictions,
anchor_ref=anchor_ref,
)
self._append(event)
self.policies[event.event_id] = event
return event
# ─── Generation Layer ────────────────────────────────
def log_attempt(self, prompt: str, actor_id: str,
model_version: str,
policy_id: str) -> GenAttemptEvent:
"""Log GEN_ATTEMPT — MUST be called BEFORE safety eval."""
event = GenAttemptEvent.create(
chain_id=self.chain_id,
prev_hash=self.last_hash,
prompt=prompt,
actor_id=actor_id,
model_version=model_version,
policy_id=policy_id,
)
self._append(event)
self.attempts[event.event_id] = event
return event
def log_deny(self, attempt_id: str,
risk_category: RiskCategory,
risk_score: float, reason: str,
policy_version: str,
policy_version_ref: str = "",
jurisdiction: str = "",
takedown_flags: Optional[dict] = None
) -> GenDenyEvent:
"""Log GEN_DENY with v1.1 policy linkage."""
if attempt_id not in self.attempts:
raise ValueError(f"Unknown attempt: {attempt_id}")
if attempt_id in self.outcomes:
raise ValueError(
f"Attempt {attempt_id} already has outcome"
)
event = GenDenyEvent.create(
chain_id=self.chain_id,
prev_hash=self.last_hash,
attempt_id=attempt_id,
risk_category=risk_category,
risk_score=risk_score,
reason=reason,
policy_version=policy_version,
policy_version_ref=policy_version_ref,
jurisdiction=jurisdiction,
takedown_flags=takedown_flags,
)
self._append(event)
self.outcomes[attempt_id] = event
return event
def log_escalate(self, attempt_id: str,
reason: str) -> GenEscalateEvent:
"""Log GEN_ESCALATE — pending human review."""
event = GenEscalateEvent.create(
chain_id=self.chain_id,
prev_hash=self.last_hash,
attempt_id=attempt_id,
reason=reason,
)
self._append(event)
return event
# ─── Account Enforcement Layer ───────────────────────
def log_account_action(
self, account_id: str, action: ActionType,
triggering_events: List[str],
policy_ref: str, risk_band: RiskBand,
mechanism: DecisionMechanism,
le_threshold_met: bool = False,
le_threshold_ref: str = "",
le_assessor: str = "AUTOMATED"
) -> AccountActionEvent:
"""
Log ACCOUNT_ACTION — account-level enforcement.
This is what the Florida case was missing. When a
pattern of CSAM-risk denials triggers an account ban,
this event records:
- Which generation attempts triggered it
- What policy was in effect
- Whether LE notification was assessed
"""
event = AccountActionEvent.create(
chain_id=self.chain_id,
prev_hash=self.last_hash,
account_id=account_id,
action=action,
triggering_events=triggering_events,
policy_ref=policy_ref,
risk_band=risk_band,
mechanism=mechanism,
le_threshold_met=le_threshold_met,
le_threshold_ref=le_threshold_ref,
le_assessor=le_assessor,
)
self._append(event)
return event
def log_le_referral(
self, account_action_ref: str,
status: ReferralStatus,
jurisdiction: str, framework: str,
threshold_ref: str, threshold_met: bool,
rationale_doc: str,
legal_reviewed: bool
) -> LawEnforcementReferralEvent:
"""
Log LAW_ENFORCEMENT_REFERRAL — LE notification decision.
This is what prosecutors need. When an account is
banned for CSAM-related activity, was law enforcement
notified? Was the threshold assessment documented?
Was legal counsel involved?
The rationale_doc is stored ONLY as a SHA-256 hash,
preserving attorney-client privilege.
"""
event = LawEnforcementReferralEvent.create(
chain_id=self.chain_id,
prev_hash=self.last_hash,
account_action_ref=account_action_ref,
status=status,
jurisdiction=jurisdiction,
framework=framework,
threshold_ref=threshold_ref,
threshold_met=threshold_met,
rationale_doc=rationale_doc,
legal_reviewed=legal_reviewed,
)
self._append(event)
return event
# ─── Verification ────────────────────────────────────
def verify_chain_integrity(self) -> dict:
"""Verify hash chain is unbroken and signatures valid."""
broken_links = []
invalid_sigs = []
for i, event in enumerate(self.events):
# Check hash recomputation
expected_hash = event.compute_hash()
if event.event_hash != expected_hash:
broken_links.append(i)
# Check prev_hash linkage
if i > 0:
if event.prev_hash != self.events[i - 1].event_hash:
broken_links.append(i)
# Check signature
if not event.verify(self.public_key):
invalid_sigs.append(i)
return {
"valid": len(broken_links) == 0
and len(invalid_sigs) == 0,
"total_events": len(self.events),
"broken_links": broken_links,
"invalid_signatures": invalid_sigs,
}
def verify_completeness(self) -> dict:
"""Run all four Completeness Invariants."""
verifier = CompletenessVerifier(self.events)
return verifier.verify_all()
Putting It All Together: The Tumbler Ridge Scenario
Here's a complete scenario demonstrating the full v1.1 lifecycle — from policy publication through account enforcement and LE referral:
def demo_v11_lifecycle():
"""
Full CAP-SRP v1.1 lifecycle demonstration.
Scenario: An AI image generation service detects a pattern
of CSAM-risk requests from a single account, bans the
account, assesses whether law enforcement should be
notified, and creates a complete audit trail.
"""
# Generate signing key
private_key = Ed25519PrivateKey.generate()
chain = CAPChainV11(private_key)
print("=" * 60)
print("CAP-SRP v1.1 Lifecycle Demo")
print("=" * 60)
# ─── Step 1: Publish safety policy ───────────────────
print("\n[1] Publishing safety policy...")
policy = chain.publish_policy(
policy_id="cap.safety.csam-prevention.v2026-03",
policy_document=(
"CSAM Prevention Policy v2026-03: "
"All requests classified as CSAM_RISK with "
"confidence > 0.7 MUST be denied. Accounts "
"with 3+ CSAM_RISK denials within 24 hours "
"MUST be suspended and assessed for LE referral."
),
effective_from="2026-03-01T00:00:00.000Z",
policy_type=PolicyType.CONTENT_MODERATION,
jurisdictions=["US", "EU", "GLOBAL"],
anchor_ref="sha256:rfc3161-receipt-abc123...",
)
print(f" Policy: {policy.policy_id}")
print(f" Event: {policy.event_id}")
# ─── Step 2: Log harmful requests and denials ────────
print("\n[2] Logging generation attempts and denials...")
actor_id = "user-12345"
deny_event_ids = []
attempt_event_ids = []
prompts = [
"Generate an image of [REDACTED - CSAM content]",
"Create a photo of [REDACTED - CSAM content]",
"Make a picture showing [REDACTED - CSAM content]",
]
for i, prompt in enumerate(prompts, 1):
# Log attempt BEFORE safety check
attempt = chain.log_attempt(
prompt=prompt,
actor_id=actor_id,
model_version="model-v3.2.1",
policy_id=policy.policy_id,
)
attempt_event_ids.append(attempt.event_id)
# Safety check triggers denial
deny = chain.log_deny(
attempt_id=attempt.event_id,
risk_category=RiskCategory.CSAM_RISK,
risk_score=0.97,
reason="CSAM content detected in prompt",
policy_version="v2026-03",
policy_version_ref=policy.event_id,
jurisdiction="US",
takedown_flags={
"TAKE_IT_DOWN_Act": False,
"NCII_Category": False,
"ChildSafetyCategory": True,
},
)
deny_event_ids.append(deny.event_id)
print(f" Attempt {i}: {attempt.event_id[:12]}... → DENY")
# ─── Step 3: Account enforcement ─────────────────────
print("\n[3] Account enforcement triggered (3 CSAM denials)...")
account_action = chain.log_account_action(
account_id=actor_id,
action=ActionType.BAN,
triggering_events=attempt_event_ids,
policy_ref=policy.event_id,
risk_band=RiskBand.CRITICAL,
mechanism=DecisionMechanism.HUMAN_CONFIRMED_AUTOMATED,
le_threshold_met=True,
le_threshold_ref=policy.event_id,
le_assessor="HUMAN_TRUST_AND_SAFETY",
)
print(f" Action: BAN")
print(f" Event: {account_action.event_id[:12]}...")
print(f" LE threshold met: True")
# ─── Step 4: Law enforcement referral ────────────────
print("\n[4] Law enforcement referral decision...")
le_referral = chain.log_le_referral(
account_action_ref=account_action.event_id,
status=ReferralStatus.REFERRED,
jurisdiction="US",
framework="NCMEC_CyberTipline",
threshold_ref=policy.event_id,
threshold_met=True,
rationale_doc=(
"Account triggered CSAM prevention threshold: "
"3 CSAM_RISK denials within 24 hours. "
"Pattern consistent with NCMEC reporting criteria. "
"Legal counsel confirmed referral obligation under "
"18 USC 2258A."
),
legal_reviewed=True,
)
print(f" Status: REFERRED")
print(f" Framework: NCMEC CyberTipline")
print(f" Legal review: Complete")
# ─── Step 5: Verify everything ───────────────────────
print("\n[5] Verification...")
chain_result = chain.verify_chain_integrity()
print(f" Chain integrity: "
f"{'VALID' if chain_result['valid'] else 'BROKEN'}")
print(f" Total events: {chain_result['total_events']}")
completeness = chain.verify_completeness()
print(f" Completeness (all 4 invariants): "
f"{'VALID' if completeness['all_valid'] else 'FAILED'}")
primary = completeness["primary"]
print(f" Primary: {primary['equation']} → "
f"{'✓' if primary['valid'] else '✗'}")
policy_inv = completeness["policy_anchoring"]
print(f" Policy anchoring: "
f"{policy_inv['total_policies']} policies, "
f"{len(policy_inv['violations'])} violations → "
f"{'✓' if policy_inv['valid'] else '✗'}")
# ─── Audit Trail Summary ─────────────────────────────
print("\n" + "=" * 60)
print("AUDIT TRAIL SUMMARY")
print("=" * 60)
print(f" Policy published: {policy.event_id[:12]}...")
print(f" Generation attempts: {len(attempt_event_ids)}")
print(f" Denials (CSAM_RISK): {len(deny_event_ids)}")
print(f" Account action (BAN): "
f"{account_action.event_id[:12]}...")
print(f" LE referral (REFERRED): "
f"{le_referral.event_id[:12]}...")
print(f" Chain integrity: VALID")
print(f" All invariants: VALID")
print(f" Total chain events: {len(chain.events)}")
print("=" * 60)
print("\nThis is the audit trail that the Florida case")
print("didn't have. Every event is signed, hash-chained,")
print("and independently verifiable.")
if __name__ == "__main__":
demo_v11_lifecycle()
Run it:
pip install cryptography
python cap_srp_v11.py
Expected output:
============================================================
CAP-SRP v1.1 Lifecycle Demo
============================================================
[1] Publishing safety policy...
Policy: cap.safety.csam-prevention.v2026-03
Event: 019...
[2] Logging generation attempts and denials...
Attempt 1: 019... → DENY
Attempt 2: 019... → DENY
Attempt 3: 019... → DENY
[3] Account enforcement triggered (3 CSAM denials)...
Action: BAN
Event: 019...
LE threshold met: True
[4] Law enforcement referral decision...
Status: REFERRED
Framework: NCMEC CyberTipline
Legal review: Complete
[5] Verification...
Chain integrity: VALID
Total events: 8
Completeness (all 4 invariants): VALID
Primary: 3 == 0 + 3 + 0 → ✓
Policy anchoring: 1 policies, 0 violations → ✓
============================================================
AUDIT TRAIL SUMMARY
============================================================
Policy published: 019...
Generation attempts: 3
Denials (CSAM_RISK): 3
Account action (BAN): 019...
LE referral (REFERRED): 019...
Chain integrity: VALID
All invariants: VALID
Total chain events: 8
============================================================
Regulatory Deadline Map
Here's what's hitting in the next 90 days and how v1.1 maps to each:
REGULATORY TIMELINE — March to August 2026
═════════════════════════════════════════════════════════════
Mar 30 ─── EU Code of Practice comment deadline
│ └─ Relevant: POLICY_VERSION for marking compliance
│
May 19 ─── TAKE IT DOWN Act compliance deadline
│ └─ Relevant: GEN_DENY.takedown_relevance flags
│ + 48-hour SLA monitoring (Gold)
│ + ContentHash for duplicate detection
│
Jun 30 ─── Colorado SB24-205 effective date
│ └─ Relevant: Evidence Pack with risk analysis
│ + RiskCategory taxonomy
│
Aug 2 ─── EU AI Act Article 50 enforcement
└─ Relevant: Full Silver/Gold conformance
+ POLICY_VERSION for transparency
+ External anchoring for timestamp integrity
+ Completeness Invariant for audit readiness
| Deadline | Regulation | v1.1 Event Types | Conformance Level |
|---|---|---|---|
| Mar 30 | EU Code of Practice (comment) | POLICY_VERSION | Silver |
| May 19 | TAKE IT DOWN Act | GEN_DENY (takedown flags), ACCOUNT_ACTION | Silver |
| Jun 30 | Colorado SB24-205 | All generation events, Evidence Pack | Silver |
| Aug 2 | EU AI Act Art. 50 | All events, external anchoring | Silver/Gold |
What Developers Should Build Now
If you're building an AI content generation system and want to be ready for the next 90 days of regulatory deadlines, here's the priority order:
Priority 1: Pre-evaluation logging (ship this week)
# The single most important change: log BEFORE safety check
attempt = chain.log_attempt(prompt, actor_id, model, policy)
# THEN run safety evaluation
result = safety_check(prompt)
# THEN log outcome
if result.blocked:
chain.log_deny(attempt.event_id, ...)
This is the foundation. Without pre-evaluation logging, everything else is unverifiable. If you ship nothing else, ship this.
Priority 2: Policy versioning (ship this month)
Every time your safety policy changes — updated categories, new risk thresholds, model version upgrades — log a POLICY_VERSION event and get it externally timestamped. When a regulator asks "what policy was in effect when this decision was made?", you need a tamper-evident answer.
Priority 3: Account enforcement (ship before May 19)
The TAKE IT DOWN Act compliance deadline is May 19. If your system generates intimate imagery, you need a provable record of account enforcement actions. ACCOUNT_ACTION events give you that.
Priority 4: LE referral provenance (ship before August 2)
The EU AI Act Article 50 enforcement date is August 2. Gold-level conformance requires documented law enforcement assessment on account actions. This is the most complex piece, but you have four months.
C2PA Integration: Two Halves of One Architecture
For developers already implementing C2PA, here's how the two specifications fit together:
The Complete Provenance Architecture
═════════════════════════════════════════════════════════════
┌─────────────────────┐ ┌─────────────────────────┐
│ C2PA │ │ CAP-SRP │
│ "Content Passport" │ │ "System Flight Recorder│
│ │ │ │
│ Proves: │ │ Proves: │
│ • This content was │ │ • This request was │
│ generated by X │ │ blocked by policy Y │
│ • At time T │ │ • Account Z was banned │
│ • Hasn't been │ │ • LE was notified │
│ tampered with │ │ • Policy V was active │
│ │ │ at decision time │
│ Cannot prove: │ │ Cannot prove: │
│ • What was blocked │ │ • Content authenticity │
│ • Why it was │ │ • Tampering after │
│ blocked │ │ export │
└──────────┬──────────┘ └──────────┬──────────────┘
│ │
└──────────┬───────────────┘
│
▼
┌───────────────────┐
│ EXPORT Event │
│ │
│ Links CAP-SRP │
│ chain to C2PA │
│ manifest via │
│ ContentHash │
└───────────────────┘
The critical insight: C2PA's own specification acknowledges it provides a positive signal but not a negative signal. A missing Content Credential cannot distinguish between human-made content and AI-refused-to-generate content. CAP-SRP's GEN_DENY record is the only audit artifact available when content was not generated — because there is no content to attach C2PA credentials to.
Transparency Notes
What CAP-SRP is: An open technical specification (v1.1, released March 5, 2026) for cryptographic audit trails in AI content generation systems. Published under CC BY 4.0 by the VeritasChain Standards Organization (VSO). The reference implementation is Apache 2.0 licensed.
What CAP-SRP is not: An endorsed industry standard. CAP-SRP has not been adopted by C2PA, W3C, IEEE, or the IETF SCITT working group. An individual Internet-Draft (draft-kamimura-scitt-refusal-events-02) has been submitted to the IETF but carries no formal standing in the IETF standards process. The specification is maintained by a small team in Tokyo. No major AI company has publicly adopted it.
What the gap is: Real. The absence of independently verifiable audit trails for AI safety decisions is documented by the Content Authenticity Initiative, the Center for Democracy and Technology, the World Privacy Forum, and researchers at Princeton. Whether the industry's response takes the form of CAP-SRP, a C2PA extension, an IETF SCITT profile, or something else is an open question. The gap itself is not.
Fact-check methodology: All events cited in this article were verified against primary sources. The Florida arrest was confirmed via the Florida Attorney General's press release and FOX 35 Orlando (March 17, 2026). The ICO investigation was confirmed via ico.org.uk (February 3, 2026). The EU Code of Practice deadline was confirmed via the European Commission's digital strategy portal. The TAKE IT DOWN Act timeline was confirmed via Congress.gov and CRS analysis.
GitHub: veritaschain/cap-spec · veritaschain/cap-srp
IETF Draft: draft-kamimura-scitt-refusal-events-02
Specification: CAP-SRP v1.1
"Verify, Don't Trust" — AI needs a Flight Recorder
Top comments (0)