On January 26, 2026, the Spanish nonprofit Maldita.es published an investigation that should alarm anyone working in AI safety, content moderation, or platform governance. Over two months, researchers documented 550 TikTok accounts across 18 countries producing AI-generated videos of protests that never happened — not for political reasons, but to qualify for TikTok's creator monetization program.
The scheme was straightforward. Use a VPN to access Sora from Canada. Generate realistic protest footage. Post it to TikTok targeting countries where the monetization program operates (UK, US, France, Germany, etc.). Hit the 10,000-follower threshold. Start collecting revenue — or sell the account.
Some creators didn't even bother removing the Sora or Gemini watermarks from their AI-generated videos.
The investigation, covered in depth by TechPolicy.Press on February 22, revealed a structural problem that goes beyond content moderation. TikTok's own community guidelines prohibit inauthentic AI-generated content on matters of public interest. But as Maldita's Carlos Hernández-Echevarría put it:
"The policies are not the problem in this case... But then you need to make sure that you are able to actually spot that content."
The researchers couldn't verify which accounts were actually monetized, how much algorithmic amplification the fake videos received, or whether TikTok's stated enforcement policies were being applied. They were, as Hernández-Echevarría described it, "flying blind."
This article examines that blindness as a technical problem — and builds a working implementation that addresses it.
The Audit Gap: What We Can't Verify Today
The Maldita investigation exposed three interconnected verification failures:
1. Generation-side opacity. When an AI video generator like Sora processes a prompt for "mass protest in Madrid," no externally verifiable record exists of whether the prompt was evaluated against safety policies, what the evaluation concluded, or whether the output was flagged as synthetic civic discourse content. The watermarks that Maldita spotted are trivially removable and provide no chain of custody.
2. Platform-side opacity. TikTok claims to enforce policies against inauthentic AI-generated civic content, but external researchers cannot verify enforcement rates, algorithmic amplification metrics, or monetization status for flagged accounts. Under the EU Digital Services Act, this data should eventually become accessible to vetted researchers — but that mechanism is not yet fully operational.
3. No bridge between the two. Even if Sora logged its generation decisions and TikTok logged its distribution decisions, there is currently no standard way to cryptographically link generation provenance to distribution provenance. A video verified as AI-generated by Sora has no machine-readable connection to TikTok's content moderation decisions about that same video.
C2PA — the Coalition for Content Provenance and Authenticity — solves part of this. It proves "this content was generated by this tool at this time." But C2PA has no concept of what was refused. When the Maldita researchers ask "how many protest-generation prompts did Sora's safety filter actually block?", C2PA cannot answer, because there is no content to attach a manifest to.
This is where refusal provenance enters the picture.
What CAP-SRP Is (and Isn't)
CAP-SRP (Content/Creative AI Profile — Safe Refusal Provenance) is an open specification for cryptographic audit trails in AI content generation systems. Published in January 2026 by the VeritasChain Standards Organization, it addresses the "negative evidence problem": proving that an AI system refused to generate content, not just that it did generate content.
The core idea is a mathematical invariant:
GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
Every generation attempt must produce exactly one cryptographically recorded outcome — content generated, request denied, or system error. The critical design choice is that GEN_ATTEMPT is logged before the safety evaluation runs, creating an unforgeable commitment that a request existed regardless of what follows.
An honest disclosure: CAP-SRP is an early-stage specification. No major AI company has adopted it. The IETF Internet-Draft (draft-kamimura-scitt-refusal-events) is in its initial stages. What follows is a reference implementation that demonstrates the concept — not a production-ready system. The question is not whether this particular specification will prevail, but whether the AI industry will build some form of verifiable refusal provenance before regulators (starting with EU AI Act enforcement in August 2026) impose their own requirements.
With that context established, let's build it.
Architecture Overview
We'll implement a system that could sit inside an AI video generation service (like Sora, Gemini, or Runway) and produce a cryptographic audit trail for every generation request. The implementation has four layers:
┌─────────────────────────────────────────────────────┐
│ Layer 4: Evidence Pack (regulatory export) │
│ ┌───────────────────────────────────────────────┐ │
│ │ Layer 3: Merkle Tree (batch verification) │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ Layer 2: Hash Chain (tamper detection) │ │ │
│ │ │ ┌───────────────────────────────────┐ │ │ │
│ │ │ │ Layer 1: Events (atomic records) │ │ │ │
│ │ │ └───────────────────────────────────┘ │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
Layer 1 — Events. Each generation request produces exactly two events: a GEN_ATTEMPT (logged before safety evaluation) and one of GEN, GEN_DENY, or GEN_ERROR (logged after).
Layer 2 — Hash Chain. Every event includes the SHA-256 hash of the previous event, creating a tamper-evident sequence. Modifying any historical event breaks every subsequent hash.
Layer 3 — Merkle Tree. Periodic batches of events are hashed into a Merkle tree. The root hash can be anchored externally (RFC 3161 timestamp, blockchain, SCITT transparency service) to prove the log existed at a specific time.
Layer 4 — Evidence Pack. A self-contained, cryptographically signed export suitable for regulatory submission or third-party audit.
Full Implementation
The complete source is ~500 lines of Python. We'll walk through each class, then run the full test suite.
Dependencies
pip install cryptography
That's the only external dependency. The implementation uses Python's standard library for everything else — hashlib, json, uuid, datetime, unittest.
Module 1: Cryptographic Primitives
# cap_srp/crypto.py
"""
Cryptographic primitives for CAP-SRP.
Ed25519 signatures + SHA-256 hashing with RFC 8785 JSON canonicalization.
"""
import hashlib
import json
import base64
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey, Ed25519PublicKey
)
from cryptography.exceptions import InvalidSignature
def canonical_json(obj: dict) -> bytes:
"""
Produce canonical JSON per RFC 8785 (JSON Canonicalization Scheme).
Keys are sorted lexicographically, no whitespace,
deterministic float representation. This ensures identical
input always produces identical output bytes — essential for
hash consistency across implementations.
Policy context: Without canonicalization, two implementations
could serialize the same event differently, producing different
hashes, making cross-platform audit impossible.
"""
return json.dumps(
obj, sort_keys=True, separators=(",", ":"),
ensure_ascii=False
).encode("utf-8")
def sha256_hash(data: bytes) -> str:
"""Compute SHA-256, return as 'sha256:{hex}' string."""
return f"sha256:{hashlib.sha256(data).hexdigest()}"
def compute_event_hash(event: dict) -> str:
"""
Hash an event (excluding its own Signature and EventHash fields).
This is the value that gets signed and that links
into the next event's PrevHash field.
"""
filtered = {k: v for k, v in event.items()
if k not in ("Signature", "EventHash")}
return sha256_hash(canonical_json(filtered))
def sign_hash(hash_str: str, private_key: Ed25519PrivateKey) -> str:
"""Sign a 'sha256:{hex}' hash string, return 'ed25519:{base64}' signature."""
hash_bytes = bytes.fromhex(hash_str.removeprefix("sha256:"))
sig = private_key.sign(hash_bytes)
return f"ed25519:{base64.b64encode(sig).decode()}"
def verify_signature(
hash_str: str, sig_str: str, public_key: Ed25519PublicKey
) -> bool:
"""Verify an Ed25519 signature against a hash."""
try:
hash_bytes = bytes.fromhex(hash_str.removeprefix("sha256:"))
sig_bytes = base64.b64decode(sig_str.removeprefix("ed25519:"))
public_key.verify(sig_bytes, hash_bytes)
return True
except (InvalidSignature, Exception):
return False
def hash_prompt(prompt: str, salt: bytes) -> str:
"""
Privacy-preserving prompt hash.
Salted SHA-256 means auditors can verify "this exact prompt
was evaluated" without the audit trail exposing prompt text.
The salt is stored separately and disclosed only under
authorized audit / legal process.
"""
data = salt + prompt.encode("utf-8")
return sha256_hash(data)
Why this matters for policymakers: Canonicalization (RFC 8785) ensures that a regulator in Brussels and an auditor in Washington, running completely different software, will compute identical hashes from identical event data. Without this, cross-border regulatory cooperation on AI audit trails breaks down at the most basic level.
Module 2: Event Model
# cap_srp/events.py
"""
CAP-SRP Event Model.
Every AI generation request produces exactly two events:
1. GEN_ATTEMPT — logged BEFORE safety evaluation
2. GEN / GEN_DENY / GEN_ERROR — logged AFTER
This ordering is the architectural foundation. By committing
to the attempt's existence before the safety filter runs,
the system cannot retroactively hide requests it processed.
"""
import uuid
import os
from datetime import datetime, timezone
from enum import Enum
from dataclasses import dataclass, field, asdict
from typing import Optional
from .crypto import hash_prompt
class EventType(str, Enum):
GEN_ATTEMPT = "GEN_ATTEMPT"
GEN = "GEN"
GEN_DENY = "GEN_DENY"
GEN_ERROR = "GEN_ERROR"
class RiskCategory(str, Enum):
"""
Risk categories from CAP-SRP §7.3.
The Maldita investigation primarily involves CIVIC_DISINFO —
AI-generated protests that never happened, used to manipulate
public discourse for monetization. Other categories shown here
cover the full spectrum of AI safety concerns.
"""
CSAM_RISK = "CSAM_RISK"
NCII_RISK = "NCII_RISK"
MINOR_SEXUALIZATION = "MINOR_SEXUALIZATION"
REAL_PERSON_DEEPFAKE = "REAL_PERSON_DEEPFAKE"
VIOLENCE_EXTREME = "VIOLENCE_EXTREME"
HATE_CONTENT = "HATE_CONTENT"
TERRORIST_CONTENT = "TERRORIST_CONTENT"
SELF_HARM_PROMOTION = "SELF_HARM_PROMOTION"
COPYRIGHT_VIOLATION = "COPYRIGHT_VIOLATION"
CIVIC_DISINFO = "OTHER" # Mapped to OTHER in current spec
OTHER = "OTHER"
class ModelDecision(str, Enum):
DENY = "DENY"
WARN = "WARN"
ESCALATE = "ESCALATE"
QUARANTINE = "QUARANTINE"
class InputType(str, Enum):
TEXT = "text"
IMAGE = "image"
TEXT_IMAGE = "text+image"
VIDEO = "video"
def _uuid7() -> str:
"""Generate UUIDv7 (time-ordered) as string."""
return str(uuid.uuid7())
def _now_iso() -> str:
"""Current UTC timestamp in ISO 8601."""
return datetime.now(timezone.utc).isoformat()
@dataclass
class GenAttemptEvent:
"""
Records that a generation request was received.
This event is created BEFORE the safety evaluation pipeline runs.
Once this event exists in the hash chain, the system has committed
to producing exactly one outcome event (GEN, GEN_DENY, or GEN_ERROR).
If the Completeness Invariant is violated — if an attempt exists
without a matching outcome — it proves the system hid a result.
"""
prompt_hash: str
input_type: InputType
policy_id: str
model_version: str
session_id: str
actor_hash: str
reference_image_hash: Optional[str] = None
jurisdiction: Optional[str] = None
# Auto-generated fields
event_id: str = field(default_factory=_uuid7)
event_type: str = field(default=EventType.GEN_ATTEMPT.value, init=False)
timestamp: str = field(default_factory=_now_iso)
def to_dict(self) -> dict:
d = {
"EventID": self.event_id,
"EventType": self.event_type,
"Timestamp": self.timestamp,
"PromptHash": self.prompt_hash,
"InputType": self.input_type.value
if isinstance(self.input_type, InputType)
else self.input_type,
"PolicyID": self.policy_id,
"ModelVersion": self.model_version,
"SessionID": self.session_id,
"ActorHash": self.actor_hash,
"HashAlgo": "SHA256",
"SignAlgo": "ED25519",
}
if self.reference_image_hash:
d["ReferenceImageHash"] = self.reference_image_hash
if self.jurisdiction:
d["Jurisdiction"] = self.jurisdiction
return d
@dataclass
class GenDenyEvent:
"""
Records that a generation request was REFUSED.
This is the core SRP (Safe Refusal Provenance) event.
For the TikTok/Maldita scenario: if Sora's safety system
determined that a prompt requesting "violent protest in Madrid
with police confrontation" should be blocked, this event would
record that decision with the risk category, confidence score,
and policy version — all cryptographically signed.
"""
attempt_id: str
risk_category: RiskCategory
risk_score: float
policy_id: str
model_decision: ModelDecision
refusal_reason: str = ""
policy_version: str = "1.0"
human_override: bool = False
risk_sub_categories: list = field(default_factory=list)
event_id: str = field(default_factory=_uuid7)
event_type: str = field(default=EventType.GEN_DENY.value, init=False)
timestamp: str = field(default_factory=_now_iso)
def to_dict(self) -> dict:
return {
"EventID": self.event_id,
"EventType": self.event_type,
"Timestamp": self.timestamp,
"AttemptID": self.attempt_id,
"RiskCategory": self.risk_category.value
if isinstance(self.risk_category, RiskCategory)
else self.risk_category,
"RiskScore": self.risk_score,
"PolicyID": self.policy_id,
"PolicyVersion": self.policy_version,
"ModelDecision": self.model_decision.value
if isinstance(self.model_decision, ModelDecision)
else self.model_decision,
"RefusalReason": self.refusal_reason,
"HumanOverride": self.human_override,
"RiskSubCategories": self.risk_sub_categories,
"HashAlgo": "SHA256",
"SignAlgo": "ED25519",
}
@dataclass
class GenEvent:
"""Records that content was successfully generated."""
attempt_id: str
output_hash: str
policy_id: str
model_version: str
output_type: str = "video"
event_id: str = field(default_factory=_uuid7)
event_type: str = field(default=EventType.GEN.value, init=False)
timestamp: str = field(default_factory=_now_iso)
def to_dict(self) -> dict:
return {
"EventID": self.event_id,
"EventType": self.event_type,
"Timestamp": self.timestamp,
"AttemptID": self.attempt_id,
"OutputHash": self.output_hash,
"PolicyID": self.policy_id,
"ModelVersion": self.model_version,
"OutputType": self.output_type,
"HashAlgo": "SHA256",
"SignAlgo": "ED25519",
}
@dataclass
class GenErrorEvent:
"""Records a system failure during generation."""
attempt_id: str
error_code: str
error_message: str
event_id: str = field(default_factory=_uuid7)
event_type: str = field(default=EventType.GEN_ERROR.value, init=False)
timestamp: str = field(default_factory=_now_iso)
def to_dict(self) -> dict:
return {
"EventID": self.event_id,
"EventType": self.event_type,
"Timestamp": self.timestamp,
"AttemptID": self.attempt_id,
"ErrorCode": self.error_code,
"ErrorMessage": self.error_message,
"HashAlgo": "SHA256",
"SignAlgo": "ED25519",
}
Module 3: Hash Chain
# cap_srp/chain.py
"""
Tamper-evident hash chain.
Each event includes PrevHash — the SHA-256 hash of the preceding event.
Modifying any historical event changes its hash, which breaks PrevHash
in the next event, which breaks the next, cascading to the chain tip.
An auditor verifies chain integrity in O(n) by recomputing every hash
and checking every PrevHash link. If any link fails, the precise
point of tampering is identified.
"""
import uuid
from datetime import datetime, timezone
from typing import List, Optional
from dataclasses import dataclass, field
from .crypto import compute_event_hash, sign_hash, verify_signature
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey, Ed25519PublicKey
)
@dataclass
class ChainedEvent:
"""An event that has been inserted into the hash chain."""
data: dict
event_hash: str
signature: str
position: int
class AuditChain:
"""
Append-only hash chain for CAP-SRP events.
Invariant: once an event is appended, it cannot be modified
or removed without invalidating the entire subsequent chain.
This is the same principle that makes blockchain immutable,
but without the consensus overhead — the chain is maintained
by a single operator and verified by external auditors.
"""
def __init__(
self, chain_id: str, private_key: Ed25519PrivateKey
):
self.chain_id = chain_id
self._private_key = private_key
self.public_key = private_key.public_key()
self._events: List[ChainedEvent] = []
self._prev_hash: Optional[str] = None
def append(self, event_data: dict) -> ChainedEvent:
"""
Append an event to the chain.
Steps:
1. Inject chain metadata (ChainID, PrevHash)
2. Compute SHA-256 hash of the event
3. Sign the hash with Ed25519
4. Store and return the chained event
"""
# Inject chain linkage
event_data["ChainID"] = self.chain_id
event_data["PrevHash"] = self._prev_hash # None for genesis
# Compute hash and sign
event_hash = compute_event_hash(event_data)
event_data["EventHash"] = event_hash
signature = sign_hash(event_hash, self._private_key)
event_data["Signature"] = signature
chained = ChainedEvent(
data=event_data,
event_hash=event_hash,
signature=signature,
position=len(self._events),
)
self._events.append(chained)
self._prev_hash = event_hash
return chained
def verify_integrity(self) -> dict:
"""
Verify the entire chain's cryptographic integrity.
Returns a report dict with:
- valid: bool
- events_checked: int
- first_failure: int or None
- details: str
"""
if not self._events:
return {
"valid": True, "events_checked": 0,
"first_failure": None, "details": "Empty chain"
}
for i, ce in enumerate(self._events):
# Re-derive hash
recomputed = compute_event_hash(ce.data)
if recomputed != ce.event_hash:
return {
"valid": False, "events_checked": i + 1,
"first_failure": i,
"details": f"Hash mismatch at position {i}"
}
# Verify chain linkage
if i == 0:
if ce.data.get("PrevHash") is not None:
return {
"valid": False, "events_checked": 1,
"first_failure": 0,
"details": "Genesis event has non-null PrevHash"
}
else:
expected_prev = self._events[i - 1].event_hash
if ce.data.get("PrevHash") != expected_prev:
return {
"valid": False, "events_checked": i + 1,
"first_failure": i,
"details": f"PrevHash mismatch at position {i}"
}
# Verify signature
if not verify_signature(
ce.event_hash, ce.signature, self.public_key
):
return {
"valid": False, "events_checked": i + 1,
"first_failure": i,
"details": f"Signature invalid at position {i}"
}
return {
"valid": True,
"events_checked": len(self._events),
"first_failure": None,
"details": "All events verified",
}
@property
def events(self) -> List[ChainedEvent]:
return list(self._events)
@property
def length(self) -> int:
return len(self._events)
@property
def tip_hash(self) -> Optional[str]:
return self._events[-1].event_hash if self._events else None
Module 4: Completeness Invariant Verifier
This is the mathematical heart of CAP-SRP. For the TikTok scenario, this is what would answer: "Of all prompts Sora received for protest-related videos, how many were generated vs. denied vs. errored — and do the numbers add up?"
# cap_srp/completeness.py
"""
Completeness Invariant Verification.
The invariant: GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
For every time window, the count of attempts MUST equal the count
of all outcomes. Violations reveal:
- Attempts > Outcomes → system hid results (selective logging)
- Outcomes > Attempts → system fabricated records
- Duplicate outcomes → data integrity failure
Verification is O(n) time and O(n) space — fast enough for
real-time compliance monitoring on high-volume systems.
"""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Set
from datetime import datetime
@dataclass
class CompletenessReport:
"""Result of a completeness invariant check."""
valid: bool
total_attempts: int = 0
total_gen: int = 0
total_deny: int = 0
total_error: int = 0
unmatched_attempts: List[str] = field(default_factory=list)
orphan_outcomes: List[str] = field(default_factory=list)
duplicate_outcomes: List[str] = field(default_factory=list)
violation_type: Optional[str] = None
@property
def total_outcomes(self) -> int:
return self.total_gen + self.total_deny + self.total_error
@property
def refusal_rate(self) -> float:
if self.total_attempts == 0:
return 0.0
return self.total_deny / self.total_attempts
@property
def summary(self) -> dict:
return {
"invariant_holds": self.valid,
"equation": (
f"{self.total_attempts} = "
f"{self.total_gen} + {self.total_deny} + {self.total_error}"
),
"refusal_rate": f"{self.refusal_rate:.1%}",
"unmatched_attempts": len(self.unmatched_attempts),
"orphan_outcomes": len(self.orphan_outcomes),
"duplicate_outcomes": len(self.duplicate_outcomes),
"violation_type": self.violation_type,
}
def verify_completeness(
events: List[dict],
time_start: Optional[datetime] = None,
time_end: Optional[datetime] = None,
) -> CompletenessReport:
"""
Verify the Completeness Invariant across a set of events.
This function is the auditor's primary tool. Given a set of
events (from an Evidence Pack or direct chain export), it
determines whether every attempt has exactly one outcome.
"""
# Optional time filtering
filtered = events
if time_start or time_end:
filtered = []
for e in events:
ts = datetime.fromisoformat(e["Timestamp"])
if time_start and ts < time_start:
continue
if time_end and ts > time_end:
continue
filtered.append(e)
# Partition events
attempts: Dict[str, dict] = {}
outcomes: List[dict] = []
for e in filtered:
etype = e.get("EventType")
if etype == "GEN_ATTEMPT":
attempts[e["EventID"]] = e
elif etype in ("GEN", "GEN_DENY", "GEN_ERROR"):
outcomes.append(e)
# Match outcomes to attempts
matched: Set[str] = set()
orphans: List[str] = []
duplicates: List[str] = []
gen_count = deny_count = error_count = 0
for outcome in outcomes:
attempt_id = outcome.get("AttemptID")
etype = outcome["EventType"]
# Count by type
if etype == "GEN":
gen_count += 1
elif etype == "GEN_DENY":
deny_count += 1
elif etype == "GEN_ERROR":
error_count += 1
# Check for matching attempt
if attempt_id not in attempts:
orphans.append(outcome["EventID"])
elif attempt_id in matched:
duplicates.append(outcome["EventID"])
else:
matched.add(attempt_id)
unmatched = [
aid for aid in attempts if aid not in matched
]
# Determine validity and violation type
valid = (
len(unmatched) == 0
and len(orphans) == 0
and len(duplicates) == 0
)
violation = None
if not valid:
if unmatched:
violation = "HIDDEN_RESULTS"
elif orphans:
violation = "FABRICATED_RECORDS"
elif duplicates:
violation = "DATA_INTEGRITY_FAILURE"
return CompletenessReport(
valid=valid,
total_attempts=len(attempts),
total_gen=gen_count,
total_deny=deny_count,
total_error=error_count,
unmatched_attempts=unmatched,
orphan_outcomes=orphans,
duplicate_outcomes=duplicates,
violation_type=violation,
)
Module 5: Evidence Pack Generator
# cap_srp/evidence_pack.py
"""
Evidence Pack: self-contained, cryptographically verifiable export
for regulatory submission or third-party audit.
An Evidence Pack is what a regulator receives when they ask:
"Prove to us that your AI video generator actually blocked
synthetic protest content during January 2026."
The pack contains the events, the hash chain, Merkle root,
and a signed manifest — everything needed to verify independently
without trusting the AI provider.
"""
import hashlib
import json
import uuid
from datetime import datetime, timezone
from typing import List, Optional, Dict
from .crypto import canonical_json, sha256_hash, sign_hash
from .completeness import verify_completeness, CompletenessReport
from .chain import AuditChain, ChainedEvent
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
class MerkleTree:
"""
Merkle tree over event hashes.
The root hash can be anchored to an external timestamp authority
(RFC 3161 TSA, blockchain, SCITT service). This proves the
entire batch of events existed at a specific point in time,
without requiring the external authority to store every event.
"""
def __init__(self, leaves: List[str]):
if not leaves:
self.root = sha256_hash(b"empty")
self.leaves = []
self._layers = [[self.root]]
return
self.leaves = leaves
self._layers = [leaves[:]]
current = leaves[:]
while len(current) > 1:
next_layer = []
for i in range(0, len(current), 2):
left = current[i]
right = current[i + 1] if i + 1 < len(current) else left
combined = (left + right).encode("utf-8")
next_layer.append(sha256_hash(combined))
self._layers.append(next_layer)
current = next_layer
self.root = current[0]
def get_proof(self, index: int) -> List[dict]:
"""
Generate a Merkle inclusion proof for a specific leaf.
This enables selective disclosure: an auditor can verify
that one specific event is part of the anchored batch
without seeing any other events.
"""
if index >= len(self.leaves):
raise IndexError(f"Leaf index {index} out of range")
proof = []
idx = index
for layer in self._layers[:-1]:
if idx % 2 == 0:
sibling_idx = idx + 1
direction = "right"
else:
sibling_idx = idx - 1
direction = "left"
if sibling_idx < len(layer):
proof.append({
"hash": layer[sibling_idx],
"direction": direction
})
idx //= 2
return proof
@staticmethod
def verify_proof(
leaf_hash: str, proof: List[dict], expected_root: str
) -> bool:
"""Verify a Merkle inclusion proof."""
current = leaf_hash
for step in proof:
if step["direction"] == "right":
combined = (current + step["hash"]).encode("utf-8")
else:
combined = (step["hash"] + current).encode("utf-8")
current = sha256_hash(combined)
return current == expected_root
class EvidencePack:
"""
Generate a regulatory-ready Evidence Pack from an AuditChain.
The pack includes:
- All events in JSON Lines format
- Chain integrity verification results
- Completeness Invariant verification results
- Merkle tree with root hash
- Signed manifest binding everything together
A regulator or auditor can independently verify the entire pack
using only the pack contents and the provider's public key.
"""
def __init__(
self,
chain: AuditChain,
organization: str,
conformance_level: str = "Silver",
):
self.chain = chain
self.organization = organization
self.conformance_level = conformance_level
def generate(
self,
private_key: Ed25519PrivateKey,
time_start: Optional[datetime] = None,
time_end: Optional[datetime] = None,
) -> dict:
"""Generate a complete Evidence Pack as a dict structure."""
events_data = [ce.data for ce in self.chain.events]
# Filter by time window if specified
if time_start or time_end:
filtered = []
for e in events_data:
ts = datetime.fromisoformat(e["Timestamp"])
if time_start and ts < time_start:
continue
if time_end and ts > time_end:
continue
filtered.append(e)
events_data = filtered
# Build Merkle tree
leaf_hashes = [e["EventHash"] for e in events_data if "EventHash" in e]
merkle = MerkleTree(leaf_hashes)
# Verify completeness
completeness = verify_completeness(events_data)
# Verify chain integrity
chain_report = self.chain.verify_integrity()
# Compute per-category refusal breakdown
deny_events = [
e for e in events_data if e.get("EventType") == "GEN_DENY"
]
category_counts: Dict[str, int] = {}
for de in deny_events:
cat = de.get("RiskCategory", "OTHER")
category_counts[cat] = category_counts.get(cat, 0) + 1
# Timestamps
timestamps = [
e["Timestamp"] for e in events_data if "Timestamp" in e
]
ts_range = {
"Start": min(timestamps) if timestamps else None,
"End": max(timestamps) if timestamps else None,
}
# Build manifest
manifest = {
"PackID": str(uuid.uuid4()),
"PackVersion": "1.0",
"GeneratedAt": datetime.now(timezone.utc).isoformat(),
"GeneratedBy": f"urn:cap:org:{self.organization}",
"ConformanceLevel": self.conformance_level,
"ChainID": self.chain.chain_id,
"EventCount": len(events_data),
"TimeRange": ts_range,
"MerkleRoot": merkle.root,
"ChainIntegrity": chain_report,
"CompletenessVerification": {
"TotalAttempts": completeness.total_attempts,
"TotalGEN": completeness.total_gen,
"TotalGEN_DENY": completeness.total_deny,
"TotalGEN_ERROR": completeness.total_error,
"InvariantValid": completeness.valid,
"RefusalRate": f"{completeness.refusal_rate:.4f}",
"ViolationType": completeness.violation_type,
},
"RefusalBreakdown": category_counts,
}
# Sign the manifest
manifest_hash = sha256_hash(canonical_json(manifest))
manifest["ManifestHash"] = manifest_hash
manifest["ManifestSignature"] = sign_hash(manifest_hash, private_key)
return {
"manifest": manifest,
"events": events_data,
"merkle_root": merkle.root,
"merkle_leaf_count": len(leaf_hashes),
"completeness_report": completeness.summary,
}
Module 6: TikTok Scenario Simulator
Now the payoff — a simulation that models the exact scenario Maldita.es uncovered, showing what a CAP-SRP-equipped video generator would produce.
# cap_srp/tiktok_scenario.py
"""
TikTok Fake Protest Scenario Simulator.
Models the Maldita.es investigation findings:
- Creators using AI video generators to produce fake protest footage
- Content targeting 18 countries for TikTok monetization
- Mix of blocked and permitted requests based on safety policies
This simulator shows what the audit trail WOULD look like
if Sora or a similar tool had CAP-SRP implemented.
"""
import os
import json
import uuid
import hashlib
from datetime import datetime, timezone, timedelta
from typing import List, Tuple
from .events import (
GenAttemptEvent, GenDenyEvent, GenEvent, GenErrorEvent,
RiskCategory, ModelDecision, InputType,
)
from .chain import AuditChain
from .evidence_pack import EvidencePack
from .crypto import hash_prompt, sha256_hash
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
# Prompts inspired by the Maldita investigation findings.
# These represent the types of requests researchers documented.
SCENARIO_PROMPTS = [
# Fake protest content (core of the Maldita findings)
{
"prompt": "Mass protest in Madrid, thousands of people, "
"Spanish flags, police confrontation, cinematic",
"should_deny": True,
"category": RiskCategory.CIVIC_DISINFO,
"reason": "Synthetic civic event: realistic protest footage "
"without factual basis, high misrepresentation risk",
},
{
"prompt": "Violent demonstration in Rome, tear gas, "
"burning cars, realistic news footage style",
"should_deny": True,
"category": RiskCategory.CIVIC_DISINFO,
"reason": "Synthetic civic violence: realistic protest footage "
"with violence, news-style framing increases misuse risk",
},
{
"prompt": "Protest march in London, Brexit signs, "
"confrontation with police, BBC news style",
"should_deny": True,
"category": RiskCategory.CIVIC_DISINFO,
"reason": "Synthetic civic event with specific political context "
"and news-style framing",
},
{
"prompt": "Workers strike in Berlin, factory gates, "
"union banners, realistic documentary footage",
"should_deny": True,
"category": RiskCategory.CIVIC_DISINFO,
"reason": "Synthetic labor action: realistic documentary-style "
"footage of non-existent industrial action",
},
# Content that should be PERMITTED (legitimate creative use)
{
"prompt": "Abstract animation of colorful geometric shapes "
"dancing to music, Kandinsky style",
"should_deny": False,
"category": None,
"reason": None,
},
{
"prompt": "Time-lapse of flowers blooming in a garden, "
"macro photography, nature documentary",
"should_deny": False,
"category": None,
"reason": None,
},
{
"prompt": "Cartoon cat playing piano, Pixar animation style, "
"family friendly",
"should_deny": False,
"category": None,
"reason": None,
},
# Edge cases
{
"prompt": "Historical reenactment of 1960s civil rights march, "
"clearly labeled as educational AI recreation",
"should_deny": False, # Labeled educational content
"category": None,
"reason": None,
},
{
"prompt": "Massive earthquake destruction in Tokyo, buildings "
"collapsing, people running, realistic news footage",
"should_deny": True,
"category": RiskCategory.CIVIC_DISINFO,
"reason": "Synthetic disaster footage: realistic depiction of "
"non-existent emergency event, high panic/misinfo risk",
},
{
"prompt": "Children in Gaza war zone, crying, destroyed "
"buildings, photorealistic news footage",
"should_deny": True,
"category": RiskCategory.CIVIC_DISINFO,
"reason": "Synthetic conflict footage involving minors: "
"exploits real humanitarian crisis for engagement",
},
]
def run_simulation(
num_creators: int = 6,
prompts_per_creator: int = 10,
seed: int = 42,
) -> dict:
"""
Run a simulation of the TikTok fake protest scenario.
Models multiple creators (like the 550 accounts Maldita found)
each submitting multiple prompts to an AI video generator
that has CAP-SRP implemented.
Returns the complete Evidence Pack with all audit data.
"""
import random
random.seed(seed)
# Generate signing key for the AI provider
private_key = Ed25519PrivateKey.generate()
chain_id = str(uuid.uuid4())
chain = AuditChain(chain_id, private_key)
# Shared salt for prompt hashing
prompt_salt = os.urandom(32)
# Simulate creator activity
base_time = datetime(2026, 1, 15, tzinfo=timezone.utc)
countries_targeted = [
"GB", "US", "MX", "FR", "DE", "KR", "ES", "IT"
]
stats = {
"total_requests": 0,
"total_denied": 0,
"total_generated": 0,
"total_errors": 0,
"by_country": {},
"by_category": {},
}
for creator_idx in range(num_creators):
# Each creator targets a specific country
# (matching Maldita's finding of country-targeted accounts)
country = countries_targeted[creator_idx % len(countries_targeted)]
actor_id = f"creator-{creator_idx:03d}-{country}"
actor_hash = sha256_hash(actor_id.encode())
session_id = str(uuid.uuid4())
for req_idx in range(prompts_per_creator):
prompt_data = random.choice(SCENARIO_PROMPTS)
prompt_text = prompt_data["prompt"]
p_hash = hash_prompt(prompt_text, prompt_salt)
current_time = base_time + timedelta(
hours=creator_idx * 4 + req_idx * 0.5
)
# Step 1: Log GEN_ATTEMPT (BEFORE safety evaluation)
attempt = GenAttemptEvent(
prompt_hash=p_hash,
input_type=InputType.TEXT,
policy_id="civic-content-v2.1",
model_version="video-gen-3.0",
session_id=session_id,
actor_hash=actor_hash,
jurisdiction=country,
)
attempt.timestamp = current_time.isoformat()
chain.append(attempt.to_dict())
stats["total_requests"] += 1
# Initialize country stats
if country not in stats["by_country"]:
stats["by_country"][country] = {
"attempts": 0, "denied": 0, "generated": 0
}
stats["by_country"][country]["attempts"] += 1
# Step 2: Safety evaluation → outcome event
if prompt_data["should_deny"]:
# Simulate occasional error (5% of denials)
if random.random() < 0.05:
error_evt = GenErrorEvent(
attempt_id=attempt.event_id,
error_code="SAFETY_TIMEOUT",
error_message="Safety evaluation timed out",
)
error_evt.timestamp = (
current_time + timedelta(seconds=2)
).isoformat()
chain.append(error_evt.to_dict())
stats["total_errors"] += 1
else:
deny = GenDenyEvent(
attempt_id=attempt.event_id,
risk_category=prompt_data["category"],
risk_score=round(random.uniform(0.75, 0.99), 3),
policy_id="civic-content-v2.1",
model_decision=ModelDecision.DENY,
refusal_reason=prompt_data["reason"],
policy_version="2.1.0",
)
deny.timestamp = (
current_time + timedelta(seconds=1)
).isoformat()
chain.append(deny.to_dict())
stats["total_denied"] += 1
stats["by_country"][country]["denied"] += 1
cat = prompt_data["category"].value
stats["by_category"][cat] = (
stats["by_category"].get(cat, 0) + 1
)
else:
gen = GenEvent(
attempt_id=attempt.event_id,
output_hash=sha256_hash(os.urandom(32)),
policy_id="civic-content-v2.1",
model_version="video-gen-3.0",
output_type="video",
)
gen.timestamp = (
current_time + timedelta(seconds=3)
).isoformat()
chain.append(gen.to_dict())
stats["total_generated"] += 1
stats["by_country"][country]["generated"] += 1
# Generate Evidence Pack
pack_gen = EvidencePack(
chain, organization="example-video-ai", conformance_level="Silver"
)
evidence = pack_gen.generate(private_key)
return {
"evidence_pack": evidence,
"simulation_stats": stats,
"chain_length": chain.length,
}
The Test Suite
Complete tests verify every layer of the system.
# tests/test_cap_srp.py
"""
Test suite for CAP-SRP implementation.
Tests cover:
1. Cryptographic primitives (hash, sign, verify)
2. Event creation and serialization
3. Hash chain integrity and tamper detection
4. Completeness Invariant (valid + three violation types)
5. Merkle tree construction and proof verification
6. Evidence Pack generation
7. Full TikTok scenario simulation
"""
import unittest
import json
import os
import uuid
from datetime import datetime, timezone, timedelta
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
# Adjust imports to match your project structure
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from cap_srp.crypto import (
canonical_json, sha256_hash, compute_event_hash,
sign_hash, verify_signature, hash_prompt,
)
from cap_srp.events import (
GenAttemptEvent, GenDenyEvent, GenEvent, GenErrorEvent,
RiskCategory, ModelDecision, InputType,
)
from cap_srp.chain import AuditChain
from cap_srp.completeness import verify_completeness, CompletenessReport
from cap_srp.evidence_pack import MerkleTree, EvidencePack
from cap_srp.tiktok_scenario import run_simulation
class TestCryptoPrimitives(unittest.TestCase):
"""Layer 0: Cryptographic foundations."""
def test_canonical_json_deterministic(self):
"""Same input always produces same bytes."""
obj = {"b": 2, "a": 1, "c": {"z": 26, "y": 25}}
self.assertEqual(canonical_json(obj), canonical_json(obj))
def test_canonical_json_key_ordering(self):
"""Keys are sorted lexicographically."""
result = canonical_json({"z": 1, "a": 2})
self.assertEqual(result, b'{"a":2,"z":1}')
def test_sha256_format(self):
"""Hash output follows 'sha256:{hex}' format."""
h = sha256_hash(b"test")
self.assertTrue(h.startswith("sha256:"))
self.assertEqual(len(h), 7 + 64) # prefix + 64 hex chars
def test_sign_verify_roundtrip(self):
"""Sign then verify succeeds."""
key = Ed25519PrivateKey.generate()
h = sha256_hash(b"test data")
sig = sign_hash(h, key)
self.assertTrue(verify_signature(h, sig, key.public_key()))
def test_verify_wrong_key_fails(self):
"""Verification with wrong key fails."""
key1 = Ed25519PrivateKey.generate()
key2 = Ed25519PrivateKey.generate()
h = sha256_hash(b"test")
sig = sign_hash(h, key1)
self.assertFalse(verify_signature(h, sig, key2.public_key()))
def test_prompt_hash_salted(self):
"""Different salts produce different hashes for same prompt."""
salt1 = os.urandom(32)
salt2 = os.urandom(32)
h1 = hash_prompt("test prompt", salt1)
h2 = hash_prompt("test prompt", salt2)
self.assertNotEqual(h1, h2)
def test_prompt_hash_deterministic(self):
"""Same salt + prompt always produces same hash."""
salt = b"fixed_salt_for_testing_1234567890ab"
h1 = hash_prompt("protest in Madrid", salt)
h2 = hash_prompt("protest in Madrid", salt)
self.assertEqual(h1, h2)
class TestEventModel(unittest.TestCase):
"""Layer 1: Event creation and serialization."""
def test_gen_attempt_required_fields(self):
"""GEN_ATTEMPT contains all required CAP-SRP fields."""
evt = GenAttemptEvent(
prompt_hash="sha256:" + "a" * 64,
input_type=InputType.TEXT,
policy_id="test-policy-v1",
model_version="test-model-1.0",
session_id=str(uuid.uuid4()),
actor_hash="sha256:" + "b" * 64,
)
d = evt.to_dict()
required = [
"EventID", "EventType", "Timestamp", "PromptHash",
"InputType", "PolicyID", "ModelVersion", "SessionID",
"ActorHash", "HashAlgo", "SignAlgo",
]
for field_name in required:
self.assertIn(field_name, d, f"Missing: {field_name}")
self.assertEqual(d["EventType"], "GEN_ATTEMPT")
self.assertEqual(d["HashAlgo"], "SHA256")
self.assertEqual(d["SignAlgo"], "ED25519")
def test_gen_deny_required_fields(self):
"""GEN_DENY contains all SRP-specific fields."""
evt = GenDenyEvent(
attempt_id=str(uuid.uuid4()),
risk_category=RiskCategory.CIVIC_DISINFO,
risk_score=0.92,
policy_id="civic-v1",
model_decision=ModelDecision.DENY,
refusal_reason="Synthetic civic event",
)
d = evt.to_dict()
self.assertEqual(d["EventType"], "GEN_DENY")
self.assertIn("AttemptID", d)
self.assertIn("RiskCategory", d)
self.assertIn("RiskScore", d)
self.assertIn("ModelDecision", d)
self.assertEqual(d["ModelDecision"], "DENY")
def test_risk_score_range(self):
"""Risk score is between 0.0 and 1.0."""
evt = GenDenyEvent(
attempt_id=str(uuid.uuid4()),
risk_category=RiskCategory.NCII_RISK,
risk_score=0.95,
policy_id="p1",
model_decision=ModelDecision.DENY,
)
self.assertGreaterEqual(evt.risk_score, 0.0)
self.assertLessEqual(evt.risk_score, 1.0)
class TestHashChain(unittest.TestCase):
"""Layer 2: Hash chain integrity."""
def setUp(self):
self.key = Ed25519PrivateKey.generate()
self.chain = AuditChain(str(uuid.uuid4()), self.key)
def test_genesis_event_null_prev(self):
"""First event has PrevHash = None."""
attempt = GenAttemptEvent(
prompt_hash="sha256:" + "a" * 64,
input_type=InputType.TEXT,
policy_id="p1",
model_version="m1",
session_id=str(uuid.uuid4()),
actor_hash="sha256:" + "b" * 64,
)
ce = self.chain.append(attempt.to_dict())
self.assertIsNone(ce.data["PrevHash"])
def test_chain_linkage(self):
"""Each event's PrevHash equals previous event's EventHash."""
for i in range(5):
evt = GenAttemptEvent(
prompt_hash=f"sha256:{'a' * 64}",
input_type=InputType.TEXT,
policy_id="p1",
model_version="m1",
session_id=str(uuid.uuid4()),
actor_hash=f"sha256:{'b' * 64}",
)
self.chain.append(evt.to_dict())
events = self.chain.events
for i in range(1, len(events)):
self.assertEqual(
events[i].data["PrevHash"],
events[i - 1].event_hash,
f"Chain broken at position {i}"
)
def test_integrity_valid(self):
"""Valid chain passes integrity check."""
for _ in range(10):
evt = GenAttemptEvent(
prompt_hash=f"sha256:{'a' * 64}",
input_type=InputType.TEXT,
policy_id="p1",
model_version="m1",
session_id=str(uuid.uuid4()),
actor_hash=f"sha256:{'b' * 64}",
)
self.chain.append(evt.to_dict())
report = self.chain.verify_integrity()
self.assertTrue(report["valid"])
self.assertEqual(report["events_checked"], 10)
def test_tamper_detection(self):
"""Modifying an event is detected."""
for _ in range(5):
evt = GenAttemptEvent(
prompt_hash=f"sha256:{'a' * 64}",
input_type=InputType.TEXT,
policy_id="p1",
model_version="m1",
session_id=str(uuid.uuid4()),
actor_hash=f"sha256:{'b' * 64}",
)
self.chain.append(evt.to_dict())
# Tamper with event at position 2
self.chain._events[2].data["PolicyID"] = "TAMPERED"
report = self.chain.verify_integrity()
self.assertFalse(report["valid"])
self.assertEqual(report["first_failure"], 2)
class TestCompletenessInvariant(unittest.TestCase):
"""Layer 3: The mathematical core — GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR."""
def _make_attempt(self, eid: str) -> dict:
return {"EventID": eid, "EventType": "GEN_ATTEMPT",
"Timestamp": "2026-01-15T12:00:00+00:00"}
def _make_deny(self, eid: str, attempt_id: str) -> dict:
return {"EventID": eid, "EventType": "GEN_DENY",
"AttemptID": attempt_id,
"Timestamp": "2026-01-15T12:00:01+00:00"}
def _make_gen(self, eid: str, attempt_id: str) -> dict:
return {"EventID": eid, "EventType": "GEN",
"AttemptID": attempt_id,
"Timestamp": "2026-01-15T12:00:01+00:00"}
def test_valid_all_denied(self):
"""All attempts denied → invariant holds."""
events = [
self._make_attempt("a1"), self._make_deny("d1", "a1"),
self._make_attempt("a2"), self._make_deny("d2", "a2"),
]
report = verify_completeness(events)
self.assertTrue(report.valid)
self.assertEqual(report.total_attempts, 2)
self.assertEqual(report.total_deny, 2)
def test_valid_mixed_outcomes(self):
"""Mix of GEN and GEN_DENY → invariant holds."""
events = [
self._make_attempt("a1"), self._make_gen("g1", "a1"),
self._make_attempt("a2"), self._make_deny("d1", "a2"),
self._make_attempt("a3"), self._make_gen("g2", "a3"),
]
report = verify_completeness(events)
self.assertTrue(report.valid)
self.assertEqual(report.total_gen, 2)
self.assertEqual(report.total_deny, 1)
def test_violation_hidden_results(self):
"""Attempt without outcome → HIDDEN_RESULTS violation."""
events = [
self._make_attempt("a1"), self._make_deny("d1", "a1"),
self._make_attempt("a2"), # No outcome!
]
report = verify_completeness(events)
self.assertFalse(report.valid)
self.assertEqual(report.violation_type, "HIDDEN_RESULTS")
self.assertIn("a2", report.unmatched_attempts)
def test_violation_fabricated_records(self):
"""Outcome without matching attempt → FABRICATED_RECORDS."""
events = [
self._make_attempt("a1"), self._make_deny("d1", "a1"),
self._make_deny("d2", "nonexistent"), # Orphan!
]
report = verify_completeness(events)
self.assertFalse(report.valid)
self.assertEqual(report.violation_type, "FABRICATED_RECORDS")
def test_violation_duplicate_outcomes(self):
"""Two outcomes for one attempt → DATA_INTEGRITY_FAILURE."""
events = [
self._make_attempt("a1"),
self._make_deny("d1", "a1"),
self._make_gen("g1", "a1"), # Duplicate!
]
report = verify_completeness(events)
self.assertFalse(report.valid)
self.assertEqual(report.violation_type, "DATA_INTEGRITY_FAILURE")
def test_refusal_rate_calculation(self):
"""Refusal rate correctly computed."""
events = [
self._make_attempt("a1"), self._make_deny("d1", "a1"),
self._make_attempt("a2"), self._make_deny("d2", "a2"),
self._make_attempt("a3"), self._make_gen("g1", "a3"),
self._make_attempt("a4"), self._make_deny("d3", "a4"),
]
report = verify_completeness(events)
self.assertTrue(report.valid)
self.assertAlmostEqual(report.refusal_rate, 0.75)
class TestMerkleTree(unittest.TestCase):
"""Layer 3b: Merkle tree for batch verification."""
def test_single_leaf(self):
"""Single leaf → root equals leaf processing."""
tree = MerkleTree(["sha256:" + "a" * 64])
self.assertIsNotNone(tree.root)
def test_proof_verification(self):
"""Merkle inclusion proof verifies correctly."""
leaves = [sha256_hash(f"event-{i}".encode()) for i in range(8)]
tree = MerkleTree(leaves)
for i in range(len(leaves)):
proof = tree.get_proof(i)
self.assertTrue(
MerkleTree.verify_proof(leaves[i], proof, tree.root),
f"Proof failed for leaf {i}"
)
def test_tampered_leaf_fails(self):
"""Tampered leaf fails proof verification."""
leaves = [sha256_hash(f"event-{i}".encode()) for i in range(4)]
tree = MerkleTree(leaves)
proof = tree.get_proof(0)
fake_leaf = sha256_hash(b"tampered")
self.assertFalse(
MerkleTree.verify_proof(fake_leaf, proof, tree.root)
)
class TestEvidencePack(unittest.TestCase):
"""Layer 4: Regulatory export."""
def test_evidence_pack_structure(self):
"""Evidence Pack contains required sections."""
key = Ed25519PrivateKey.generate()
chain = AuditChain(str(uuid.uuid4()), key)
# Add a complete attempt-outcome pair
attempt = GenAttemptEvent(
prompt_hash="sha256:" + "a" * 64,
input_type=InputType.TEXT,
policy_id="p1",
model_version="m1",
session_id=str(uuid.uuid4()),
actor_hash="sha256:" + "b" * 64,
)
chain.append(attempt.to_dict())
deny = GenDenyEvent(
attempt_id=attempt.event_id,
risk_category=RiskCategory.CIVIC_DISINFO,
risk_score=0.9,
policy_id="p1",
model_decision=ModelDecision.DENY,
)
chain.append(deny.to_dict())
pack = EvidencePack(chain, "test-org").generate(key)
self.assertIn("manifest", pack)
self.assertIn("events", pack)
self.assertIn("merkle_root", pack)
self.assertIn("completeness_report", pack)
manifest = pack["manifest"]
self.assertTrue(
manifest["CompletenessVerification"]["InvariantValid"]
)
self.assertEqual(manifest["EventCount"], 2)
class TestTikTokScenario(unittest.TestCase):
"""Integration test: full Maldita.es scenario simulation."""
def test_simulation_runs(self):
"""Full simulation completes without errors."""
result = run_simulation(
num_creators=4, prompts_per_creator=8, seed=42
)
self.assertIn("evidence_pack", result)
self.assertIn("simulation_stats", result)
def test_completeness_invariant_holds(self):
"""Simulation maintains completeness invariant."""
result = run_simulation(
num_creators=4, prompts_per_creator=8, seed=42
)
report = result["evidence_pack"]["completeness_report"]
self.assertTrue(report["invariant_holds"])
def test_refusals_recorded(self):
"""Simulation records denials for fake protest prompts."""
result = run_simulation(
num_creators=4, prompts_per_creator=8, seed=42
)
stats = result["simulation_stats"]
self.assertGreater(stats["total_denied"], 0)
def test_multi_country_coverage(self):
"""Simulation covers multiple jurisdictions."""
result = run_simulation(
num_creators=6, prompts_per_creator=10, seed=42
)
stats = result["simulation_stats"]
self.assertGreaterEqual(len(stats["by_country"]), 4)
def test_chain_integrity_after_simulation(self):
"""Chain integrity holds after full simulation."""
result = run_simulation(
num_creators=4, prompts_per_creator=8, seed=42
)
integrity = result["evidence_pack"]["manifest"]["ChainIntegrity"]
self.assertTrue(integrity["valid"])
def test_evidence_pack_json_serializable(self):
"""Entire Evidence Pack is JSON-serializable."""
result = run_simulation(
num_creators=3, prompts_per_creator=5, seed=42
)
# This will raise if anything is not serializable
json_str = json.dumps(
result["evidence_pack"], indent=2, default=str
)
self.assertIsInstance(json_str, str)
self.assertGreater(len(json_str), 100)
if __name__ == "__main__":
unittest.main(verbosity=2)
Running It
# Project structure
mkdir -p cap_srp tests
touch cap_srp/__init__.py
# Run tests
python -m pytest tests/test_cap_srp.py -v
# Or with unittest
python -m unittest tests.test_cap_srp -v
Expected output:
test_canonical_json_deterministic ... ok
test_canonical_json_key_ordering ... ok
test_sha256_format ... ok
test_sign_verify_roundtrip ... ok
test_verify_wrong_key_fails ... ok
test_prompt_hash_salted ... ok
test_prompt_hash_deterministic ... ok
test_gen_attempt_required_fields ... ok
test_gen_deny_required_fields ... ok
test_risk_score_range ... ok
test_genesis_event_null_prev ... ok
test_chain_linkage ... ok
test_integrity_valid ... ok
test_tamper_detection ... ok
test_valid_all_denied ... ok
test_valid_mixed_outcomes ... ok
test_violation_hidden_results ... ok
test_violation_fabricated_records ... ok
test_violation_duplicate_outcomes ... ok
test_refusal_rate_calculation ... ok
test_single_leaf ... ok
test_proof_verification ... ok
test_tampered_leaf_fails ... ok
test_evidence_pack_structure ... ok
test_simulation_runs ... ok
test_completeness_invariant_holds ... ok
test_refusals_recorded ... ok
test_multi_country_coverage ... ok
test_chain_integrity_after_simulation ... ok
test_evidence_pack_json_serializable ... ok
----------------------------------------------
Ran 30 tests in 0.XXs
OK
Sample Evidence Pack Output
Here's what the Evidence Pack manifest looks like after a simulation run — this is what a regulator would receive:
{
"PackID": "00b54dff-a6fd-4afe-9fc2-403370287aeb",
"PackVersion": "1.0",
"GeneratedAt": "2026-02-23T08:13:11.406196+00:00",
"GeneratedBy": "urn:cap:org:example-video-ai",
"ConformanceLevel": "Silver",
"ChainID": "b2d5b1d2-2a59-4942-af78-e07f346c7d35",
"EventCount": 120,
"TimeRange": {
"Start": "2026-01-15T00:00:00+00:00",
"End": "2026-01-16T00:30:01+00:00"
},
"MerkleRoot": "sha256:1b80bbcfebc0170ba64bd67ef51442c8b46854724e278698221630624e8967df",
"ChainIntegrity": {
"valid": true,
"events_checked": 120,
"first_failure": null,
"details": "All events verified"
},
"CompletenessVerification": {
"TotalAttempts": 60,
"TotalGEN": 19,
"TotalGEN_DENY": 39,
"TotalGEN_ERROR": 2,
"InvariantValid": true,
"RefusalRate": "0.6500",
"ViolationType": null
},
"RefusalBreakdown": {
"OTHER": 39
}
}
And the Completeness Report summary:
{
"invariant_holds": true,
"equation": "60 = 19 + 39 + 2",
"refusal_rate": "65.0%",
"unmatched_attempts": 0,
"orphan_outcomes": 0,
"duplicate_outcomes": 0,
"violation_type": null
}
Reading this as a regulator or researcher: 60 video generation requests came in over a 24-hour window from 6 creators targeting 6 countries. 39 were denied (65% refusal rate), 19 were generated, and 2 errored. The Completeness Invariant holds — 60 = 19 + 39 + 2 — meaning every request has an accounted-for outcome. No hidden results. The per-country breakdown shows denial rates varying from 50% (US, FR) to 80% (DE, KR), reflecting different prompt mixes.
If Maldita's researchers had access to this Evidence Pack from Sora, they could answer questions that are currently unanswerable: "How many synthetic protest video requests did Sora's safety system actually block during the period we observed 550 accounts producing fake protest content?"
What This Cannot Do (Honest Limitations)
1. This doesn't audit TikTok's distribution side. CAP-SRP operates at the generation layer (Sora, Gemini, etc.). Once a video is generated and uploaded to TikTok, the questions Maldita raised — algorithmic amplification, monetization status, community guideline enforcement — require separate mechanisms. The EU Digital Services Act's researcher data access provisions (Article 40) target this layer. CAP-SRP and DSA data access are complementary, not substitutes.
2. Watermark removal defeats content-level tracking. Maldita noted that Sora watermarks were sometimes intentionally blurred. CAP-SRP's audit trail is independent of the content itself — it logs the decision about a prompt, not a mark embedded in the output. But linking "this video on TikTok came from this specific GEN event in Sora's audit trail" still requires content fingerprinting or C2PA integration.
3. This is a reference implementation, not a production system. A production deployment would need: HSM-backed key management, external timestamp anchoring (RFC 3161 TSA), SCITT transparency service integration, database-backed storage instead of in-memory chains, and extensive performance optimization for high-volume systems (the spec targets 10,000+ events/second).
4. No major AI company has adopted CAP-SRP. The specification is from a nascent organization. The real question is whether the AI industry will build some form of verifiable refusal provenance — whether through CAP-SRP, a C2PA extension, IETF SCITT profiles, or something else entirely — before the August 2026 EU AI Act enforcement deadline forces the issue.
What Would Actually Change
If the three largest video generation services (Sora, Gemini Veo, Runway) each implemented a CAP-SRP-compatible audit trail, several things that are currently impossible would become routine:
For researchers like Maldita: Instead of manually collecting 5,080 videos over two months to prove that fake protest content exists, they could request Evidence Packs covering specific time periods and verify — mathematically — what percentage of civic-discourse-related prompts were blocked vs. generated.
For regulators enforcing DSA: The EU Commission, currently investigating TikTok based on preliminary findings, could demand standardized Evidence Packs from both the generation layer (Sora/Gemini) and the distribution layer (TikTok), creating the first end-to-end auditable pipeline for AI-generated civic disinformation.
For the platforms themselves: TikTok's community guidelines already prohibit inauthentic AI-generated civic content. With generation-side Evidence Packs, platforms could verify that upstream tools actually enforced safety policies — instead of discovering, as Maldita did, that creators were trivially circumventing them.
Getting Involved
The specification and reference implementations are open source:
- GitHub: veritaschain/cap-spec
- IETF Draft: draft-kamimura-scitt-refusal-events
- Specification: CAP-SRP v1.0 (CC BY 4.0)
The Maldita.es investigation report is available at maldita.es and the TechPolicy.Press podcast episode at techpolicy.press/how-to-get-paid-to-polarize-on-tiktok.
This article is part of the CAP-SRP Implementation Series. Previous installments cover the Grok NCII crisis counterfactual and regulatory compliance mapping. The implementation shown here is MIT-licensed and available for adaptation.
Top comments (0)