DEV Community

Cover image for 550 TikTok Accounts Sold Fake AI Protests for Cash. Here's the Audit Trail That Could Have Caught Them.

550 TikTok Accounts Sold Fake AI Protests for Cash. Here's the Audit Trail That Could Have Caught Them.

On January 26, 2026, the Spanish nonprofit Maldita.es published an investigation that should alarm anyone working in AI safety, content moderation, or platform governance. Over two months, researchers documented 550 TikTok accounts across 18 countries producing AI-generated videos of protests that never happened — not for political reasons, but to qualify for TikTok's creator monetization program.

The scheme was straightforward. Use a VPN to access Sora from Canada. Generate realistic protest footage. Post it to TikTok targeting countries where the monetization program operates (UK, US, France, Germany, etc.). Hit the 10,000-follower threshold. Start collecting revenue — or sell the account.

Some creators didn't even bother removing the Sora or Gemini watermarks from their AI-generated videos.

The investigation, covered in depth by TechPolicy.Press on February 22, revealed a structural problem that goes beyond content moderation. TikTok's own community guidelines prohibit inauthentic AI-generated content on matters of public interest. But as Maldita's Carlos Hernández-Echevarría put it:

"The policies are not the problem in this case... But then you need to make sure that you are able to actually spot that content."

The researchers couldn't verify which accounts were actually monetized, how much algorithmic amplification the fake videos received, or whether TikTok's stated enforcement policies were being applied. They were, as Hernández-Echevarría described it, "flying blind."

This article examines that blindness as a technical problem — and builds a working implementation that addresses it.


The Audit Gap: What We Can't Verify Today

The Maldita investigation exposed three interconnected verification failures:

1. Generation-side opacity. When an AI video generator like Sora processes a prompt for "mass protest in Madrid," no externally verifiable record exists of whether the prompt was evaluated against safety policies, what the evaluation concluded, or whether the output was flagged as synthetic civic discourse content. The watermarks that Maldita spotted are trivially removable and provide no chain of custody.

2. Platform-side opacity. TikTok claims to enforce policies against inauthentic AI-generated civic content, but external researchers cannot verify enforcement rates, algorithmic amplification metrics, or monetization status for flagged accounts. Under the EU Digital Services Act, this data should eventually become accessible to vetted researchers — but that mechanism is not yet fully operational.

3. No bridge between the two. Even if Sora logged its generation decisions and TikTok logged its distribution decisions, there is currently no standard way to cryptographically link generation provenance to distribution provenance. A video verified as AI-generated by Sora has no machine-readable connection to TikTok's content moderation decisions about that same video.

C2PA — the Coalition for Content Provenance and Authenticity — solves part of this. It proves "this content was generated by this tool at this time." But C2PA has no concept of what was refused. When the Maldita researchers ask "how many protest-generation prompts did Sora's safety filter actually block?", C2PA cannot answer, because there is no content to attach a manifest to.

This is where refusal provenance enters the picture.


What CAP-SRP Is (and Isn't)

CAP-SRP (Content/Creative AI Profile — Safe Refusal Provenance) is an open specification for cryptographic audit trails in AI content generation systems. Published in January 2026 by the VeritasChain Standards Organization, it addresses the "negative evidence problem": proving that an AI system refused to generate content, not just that it did generate content.

The core idea is a mathematical invariant:

GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
Enter fullscreen mode Exit fullscreen mode

Every generation attempt must produce exactly one cryptographically recorded outcome — content generated, request denied, or system error. The critical design choice is that GEN_ATTEMPT is logged before the safety evaluation runs, creating an unforgeable commitment that a request existed regardless of what follows.

An honest disclosure: CAP-SRP is an early-stage specification. No major AI company has adopted it. The IETF Internet-Draft (draft-kamimura-scitt-refusal-events) is in its initial stages. What follows is a reference implementation that demonstrates the concept — not a production-ready system. The question is not whether this particular specification will prevail, but whether the AI industry will build some form of verifiable refusal provenance before regulators (starting with EU AI Act enforcement in August 2026) impose their own requirements.

With that context established, let's build it.


Architecture Overview

We'll implement a system that could sit inside an AI video generation service (like Sora, Gemini, or Runway) and produce a cryptographic audit trail for every generation request. The implementation has four layers:

┌─────────────────────────────────────────────────────┐
│  Layer 4: Evidence Pack (regulatory export)          │
│  ┌───────────────────────────────────────────────┐  │
│  │ Layer 3: Merkle Tree (batch verification)     │  │
│  │  ┌─────────────────────────────────────────┐  │  │
│  │  │ Layer 2: Hash Chain (tamper detection)   │  │  │
│  │  │  ┌───────────────────────────────────┐  │  │  │
│  │  │  │ Layer 1: Events (atomic records)  │  │  │  │
│  │  │  └───────────────────────────────────┘  │  │  │
│  │  └─────────────────────────────────────────┘  │  │
│  └───────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Layer 1 — Events. Each generation request produces exactly two events: a GEN_ATTEMPT (logged before safety evaluation) and one of GEN, GEN_DENY, or GEN_ERROR (logged after).

Layer 2 — Hash Chain. Every event includes the SHA-256 hash of the previous event, creating a tamper-evident sequence. Modifying any historical event breaks every subsequent hash.

Layer 3 — Merkle Tree. Periodic batches of events are hashed into a Merkle tree. The root hash can be anchored externally (RFC 3161 timestamp, blockchain, SCITT transparency service) to prove the log existed at a specific time.

Layer 4 — Evidence Pack. A self-contained, cryptographically signed export suitable for regulatory submission or third-party audit.


Full Implementation

The complete source is ~500 lines of Python. We'll walk through each class, then run the full test suite.

Dependencies

pip install cryptography
Enter fullscreen mode Exit fullscreen mode

That's the only external dependency. The implementation uses Python's standard library for everything else — hashlib, json, uuid, datetime, unittest.

Module 1: Cryptographic Primitives

# cap_srp/crypto.py
"""
Cryptographic primitives for CAP-SRP.
Ed25519 signatures + SHA-256 hashing with RFC 8785 JSON canonicalization.
"""

import hashlib
import json
import base64
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
    Ed25519PrivateKey, Ed25519PublicKey
)
from cryptography.exceptions import InvalidSignature


def canonical_json(obj: dict) -> bytes:
    """
    Produce canonical JSON per RFC 8785 (JSON Canonicalization Scheme).

    Keys are sorted lexicographically, no whitespace, 
    deterministic float representation. This ensures identical 
    input always produces identical output bytes — essential for 
    hash consistency across implementations.

    Policy context: Without canonicalization, two implementations 
    could serialize the same event differently, producing different 
    hashes, making cross-platform audit impossible.
    """
    return json.dumps(
        obj, sort_keys=True, separators=(",", ":"),
        ensure_ascii=False
    ).encode("utf-8")


def sha256_hash(data: bytes) -> str:
    """Compute SHA-256, return as 'sha256:{hex}' string."""
    return f"sha256:{hashlib.sha256(data).hexdigest()}"


def compute_event_hash(event: dict) -> str:
    """
    Hash an event (excluding its own Signature and EventHash fields).

    This is the value that gets signed and that links 
    into the next event's PrevHash field.
    """
    filtered = {k: v for k, v in event.items() 
                if k not in ("Signature", "EventHash")}
    return sha256_hash(canonical_json(filtered))


def sign_hash(hash_str: str, private_key: Ed25519PrivateKey) -> str:
    """Sign a 'sha256:{hex}' hash string, return 'ed25519:{base64}' signature."""
    hash_bytes = bytes.fromhex(hash_str.removeprefix("sha256:"))
    sig = private_key.sign(hash_bytes)
    return f"ed25519:{base64.b64encode(sig).decode()}"


def verify_signature(
    hash_str: str, sig_str: str, public_key: Ed25519PublicKey
) -> bool:
    """Verify an Ed25519 signature against a hash."""
    try:
        hash_bytes = bytes.fromhex(hash_str.removeprefix("sha256:"))
        sig_bytes = base64.b64decode(sig_str.removeprefix("ed25519:"))
        public_key.verify(sig_bytes, hash_bytes)
        return True
    except (InvalidSignature, Exception):
        return False


def hash_prompt(prompt: str, salt: bytes) -> str:
    """
    Privacy-preserving prompt hash.

    Salted SHA-256 means auditors can verify "this exact prompt 
    was evaluated" without the audit trail exposing prompt text.
    The salt is stored separately and disclosed only under 
    authorized audit / legal process.
    """
    data = salt + prompt.encode("utf-8")
    return sha256_hash(data)
Enter fullscreen mode Exit fullscreen mode

Why this matters for policymakers: Canonicalization (RFC 8785) ensures that a regulator in Brussels and an auditor in Washington, running completely different software, will compute identical hashes from identical event data. Without this, cross-border regulatory cooperation on AI audit trails breaks down at the most basic level.

Module 2: Event Model

# cap_srp/events.py
"""
CAP-SRP Event Model.

Every AI generation request produces exactly two events:
  1. GEN_ATTEMPT  — logged BEFORE safety evaluation
  2. GEN / GEN_DENY / GEN_ERROR — logged AFTER

This ordering is the architectural foundation. By committing 
to the attempt's existence before the safety filter runs, 
the system cannot retroactively hide requests it processed.
"""

import uuid
import os
from datetime import datetime, timezone
from enum import Enum
from dataclasses import dataclass, field, asdict
from typing import Optional

from .crypto import hash_prompt


class EventType(str, Enum):
    GEN_ATTEMPT = "GEN_ATTEMPT"
    GEN = "GEN"
    GEN_DENY = "GEN_DENY"
    GEN_ERROR = "GEN_ERROR"


class RiskCategory(str, Enum):
    """
    Risk categories from CAP-SRP §7.3.

    The Maldita investigation primarily involves CIVIC_DISINFO —
    AI-generated protests that never happened, used to manipulate 
    public discourse for monetization. Other categories shown here 
    cover the full spectrum of AI safety concerns.
    """
    CSAM_RISK = "CSAM_RISK"
    NCII_RISK = "NCII_RISK"
    MINOR_SEXUALIZATION = "MINOR_SEXUALIZATION"
    REAL_PERSON_DEEPFAKE = "REAL_PERSON_DEEPFAKE"
    VIOLENCE_EXTREME = "VIOLENCE_EXTREME"
    HATE_CONTENT = "HATE_CONTENT"
    TERRORIST_CONTENT = "TERRORIST_CONTENT"
    SELF_HARM_PROMOTION = "SELF_HARM_PROMOTION"
    COPYRIGHT_VIOLATION = "COPYRIGHT_VIOLATION"
    CIVIC_DISINFO = "OTHER"  # Mapped to OTHER in current spec
    OTHER = "OTHER"


class ModelDecision(str, Enum):
    DENY = "DENY"
    WARN = "WARN"
    ESCALATE = "ESCALATE"
    QUARANTINE = "QUARANTINE"


class InputType(str, Enum):
    TEXT = "text"
    IMAGE = "image"
    TEXT_IMAGE = "text+image"
    VIDEO = "video"


def _uuid7() -> str:
    """Generate UUIDv7 (time-ordered) as string."""
    return str(uuid.uuid7())


def _now_iso() -> str:
    """Current UTC timestamp in ISO 8601."""
    return datetime.now(timezone.utc).isoformat()


@dataclass
class GenAttemptEvent:
    """
    Records that a generation request was received.

    This event is created BEFORE the safety evaluation pipeline runs.
    Once this event exists in the hash chain, the system has committed 
    to producing exactly one outcome event (GEN, GEN_DENY, or GEN_ERROR).

    If the Completeness Invariant is violated — if an attempt exists 
    without a matching outcome — it proves the system hid a result.
    """
    prompt_hash: str
    input_type: InputType
    policy_id: str
    model_version: str
    session_id: str
    actor_hash: str
    reference_image_hash: Optional[str] = None
    jurisdiction: Optional[str] = None

    # Auto-generated fields
    event_id: str = field(default_factory=_uuid7)
    event_type: str = field(default=EventType.GEN_ATTEMPT.value, init=False)
    timestamp: str = field(default_factory=_now_iso)

    def to_dict(self) -> dict:
        d = {
            "EventID": self.event_id,
            "EventType": self.event_type,
            "Timestamp": self.timestamp,
            "PromptHash": self.prompt_hash,
            "InputType": self.input_type.value
                if isinstance(self.input_type, InputType)
                else self.input_type,
            "PolicyID": self.policy_id,
            "ModelVersion": self.model_version,
            "SessionID": self.session_id,
            "ActorHash": self.actor_hash,
            "HashAlgo": "SHA256",
            "SignAlgo": "ED25519",
        }
        if self.reference_image_hash:
            d["ReferenceImageHash"] = self.reference_image_hash
        if self.jurisdiction:
            d["Jurisdiction"] = self.jurisdiction
        return d


@dataclass
class GenDenyEvent:
    """
    Records that a generation request was REFUSED.

    This is the core SRP (Safe Refusal Provenance) event.

    For the TikTok/Maldita scenario: if Sora's safety system 
    determined that a prompt requesting "violent protest in Madrid 
    with police confrontation" should be blocked, this event would 
    record that decision with the risk category, confidence score, 
    and policy version — all cryptographically signed.
    """
    attempt_id: str
    risk_category: RiskCategory
    risk_score: float
    policy_id: str
    model_decision: ModelDecision
    refusal_reason: str = ""
    policy_version: str = "1.0"
    human_override: bool = False
    risk_sub_categories: list = field(default_factory=list)

    event_id: str = field(default_factory=_uuid7)
    event_type: str = field(default=EventType.GEN_DENY.value, init=False)
    timestamp: str = field(default_factory=_now_iso)

    def to_dict(self) -> dict:
        return {
            "EventID": self.event_id,
            "EventType": self.event_type,
            "Timestamp": self.timestamp,
            "AttemptID": self.attempt_id,
            "RiskCategory": self.risk_category.value
                if isinstance(self.risk_category, RiskCategory)
                else self.risk_category,
            "RiskScore": self.risk_score,
            "PolicyID": self.policy_id,
            "PolicyVersion": self.policy_version,
            "ModelDecision": self.model_decision.value
                if isinstance(self.model_decision, ModelDecision)
                else self.model_decision,
            "RefusalReason": self.refusal_reason,
            "HumanOverride": self.human_override,
            "RiskSubCategories": self.risk_sub_categories,
            "HashAlgo": "SHA256",
            "SignAlgo": "ED25519",
        }


@dataclass
class GenEvent:
    """Records that content was successfully generated."""
    attempt_id: str
    output_hash: str
    policy_id: str
    model_version: str
    output_type: str = "video"

    event_id: str = field(default_factory=_uuid7)
    event_type: str = field(default=EventType.GEN.value, init=False)
    timestamp: str = field(default_factory=_now_iso)

    def to_dict(self) -> dict:
        return {
            "EventID": self.event_id,
            "EventType": self.event_type,
            "Timestamp": self.timestamp,
            "AttemptID": self.attempt_id,
            "OutputHash": self.output_hash,
            "PolicyID": self.policy_id,
            "ModelVersion": self.model_version,
            "OutputType": self.output_type,
            "HashAlgo": "SHA256",
            "SignAlgo": "ED25519",
        }


@dataclass
class GenErrorEvent:
    """Records a system failure during generation."""
    attempt_id: str
    error_code: str
    error_message: str

    event_id: str = field(default_factory=_uuid7)
    event_type: str = field(default=EventType.GEN_ERROR.value, init=False)
    timestamp: str = field(default_factory=_now_iso)

    def to_dict(self) -> dict:
        return {
            "EventID": self.event_id,
            "EventType": self.event_type,
            "Timestamp": self.timestamp,
            "AttemptID": self.attempt_id,
            "ErrorCode": self.error_code,
            "ErrorMessage": self.error_message,
            "HashAlgo": "SHA256",
            "SignAlgo": "ED25519",
        }
Enter fullscreen mode Exit fullscreen mode

Module 3: Hash Chain

# cap_srp/chain.py
"""
Tamper-evident hash chain.

Each event includes PrevHash — the SHA-256 hash of the preceding event.
Modifying any historical event changes its hash, which breaks PrevHash 
in the next event, which breaks the next, cascading to the chain tip.

An auditor verifies chain integrity in O(n) by recomputing every hash 
and checking every PrevHash link. If any link fails, the precise 
point of tampering is identified.
"""

import uuid
from datetime import datetime, timezone
from typing import List, Optional
from dataclasses import dataclass, field

from .crypto import compute_event_hash, sign_hash, verify_signature
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
    Ed25519PrivateKey, Ed25519PublicKey
)


@dataclass
class ChainedEvent:
    """An event that has been inserted into the hash chain."""
    data: dict
    event_hash: str
    signature: str
    position: int


class AuditChain:
    """
    Append-only hash chain for CAP-SRP events.

    Invariant: once an event is appended, it cannot be modified 
    or removed without invalidating the entire subsequent chain.

    This is the same principle that makes blockchain immutable, 
    but without the consensus overhead — the chain is maintained 
    by a single operator and verified by external auditors.
    """

    def __init__(
        self, chain_id: str, private_key: Ed25519PrivateKey
    ):
        self.chain_id = chain_id
        self._private_key = private_key
        self.public_key = private_key.public_key()
        self._events: List[ChainedEvent] = []
        self._prev_hash: Optional[str] = None

    def append(self, event_data: dict) -> ChainedEvent:
        """
        Append an event to the chain.

        Steps:
        1. Inject chain metadata (ChainID, PrevHash)
        2. Compute SHA-256 hash of the event
        3. Sign the hash with Ed25519
        4. Store and return the chained event
        """
        # Inject chain linkage
        event_data["ChainID"] = self.chain_id
        event_data["PrevHash"] = self._prev_hash  # None for genesis

        # Compute hash and sign
        event_hash = compute_event_hash(event_data)
        event_data["EventHash"] = event_hash
        signature = sign_hash(event_hash, self._private_key)
        event_data["Signature"] = signature

        chained = ChainedEvent(
            data=event_data,
            event_hash=event_hash,
            signature=signature,
            position=len(self._events),
        )
        self._events.append(chained)
        self._prev_hash = event_hash
        return chained

    def verify_integrity(self) -> dict:
        """
        Verify the entire chain's cryptographic integrity.

        Returns a report dict with:
        - valid: bool
        - events_checked: int
        - first_failure: int or None
        - details: str
        """
        if not self._events:
            return {
                "valid": True, "events_checked": 0,
                "first_failure": None, "details": "Empty chain"
            }

        for i, ce in enumerate(self._events):
            # Re-derive hash
            recomputed = compute_event_hash(ce.data)
            if recomputed != ce.event_hash:
                return {
                    "valid": False, "events_checked": i + 1,
                    "first_failure": i,
                    "details": f"Hash mismatch at position {i}"
                }

            # Verify chain linkage
            if i == 0:
                if ce.data.get("PrevHash") is not None:
                    return {
                        "valid": False, "events_checked": 1,
                        "first_failure": 0,
                        "details": "Genesis event has non-null PrevHash"
                    }
            else:
                expected_prev = self._events[i - 1].event_hash
                if ce.data.get("PrevHash") != expected_prev:
                    return {
                        "valid": False, "events_checked": i + 1,
                        "first_failure": i,
                        "details": f"PrevHash mismatch at position {i}"
                    }

            # Verify signature
            if not verify_signature(
                ce.event_hash, ce.signature, self.public_key
            ):
                return {
                    "valid": False, "events_checked": i + 1,
                    "first_failure": i,
                    "details": f"Signature invalid at position {i}"
                }

        return {
            "valid": True,
            "events_checked": len(self._events),
            "first_failure": None,
            "details": "All events verified",
        }

    @property
    def events(self) -> List[ChainedEvent]:
        return list(self._events)

    @property
    def length(self) -> int:
        return len(self._events)

    @property
    def tip_hash(self) -> Optional[str]:
        return self._events[-1].event_hash if self._events else None
Enter fullscreen mode Exit fullscreen mode

Module 4: Completeness Invariant Verifier

This is the mathematical heart of CAP-SRP. For the TikTok scenario, this is what would answer: "Of all prompts Sora received for protest-related videos, how many were generated vs. denied vs. errored — and do the numbers add up?"

# cap_srp/completeness.py
"""
Completeness Invariant Verification.

The invariant: GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR

For every time window, the count of attempts MUST equal the count 
of all outcomes. Violations reveal:
  - Attempts > Outcomes → system hid results (selective logging)
  - Outcomes > Attempts → system fabricated records
  - Duplicate outcomes → data integrity failure

Verification is O(n) time and O(n) space — fast enough for 
real-time compliance monitoring on high-volume systems.
"""

from dataclasses import dataclass, field
from typing import List, Optional, Dict, Set
from datetime import datetime


@dataclass
class CompletenessReport:
    """Result of a completeness invariant check."""
    valid: bool
    total_attempts: int = 0
    total_gen: int = 0
    total_deny: int = 0
    total_error: int = 0
    unmatched_attempts: List[str] = field(default_factory=list)
    orphan_outcomes: List[str] = field(default_factory=list)
    duplicate_outcomes: List[str] = field(default_factory=list)
    violation_type: Optional[str] = None

    @property
    def total_outcomes(self) -> int:
        return self.total_gen + self.total_deny + self.total_error

    @property
    def refusal_rate(self) -> float:
        if self.total_attempts == 0:
            return 0.0
        return self.total_deny / self.total_attempts

    @property
    def summary(self) -> dict:
        return {
            "invariant_holds": self.valid,
            "equation": (
                f"{self.total_attempts} = "
                f"{self.total_gen} + {self.total_deny} + {self.total_error}"
            ),
            "refusal_rate": f"{self.refusal_rate:.1%}",
            "unmatched_attempts": len(self.unmatched_attempts),
            "orphan_outcomes": len(self.orphan_outcomes),
            "duplicate_outcomes": len(self.duplicate_outcomes),
            "violation_type": self.violation_type,
        }


def verify_completeness(
    events: List[dict],
    time_start: Optional[datetime] = None,
    time_end: Optional[datetime] = None,
) -> CompletenessReport:
    """
    Verify the Completeness Invariant across a set of events.

    This function is the auditor's primary tool. Given a set of 
    events (from an Evidence Pack or direct chain export), it 
    determines whether every attempt has exactly one outcome.
    """
    # Optional time filtering
    filtered = events
    if time_start or time_end:
        filtered = []
        for e in events:
            ts = datetime.fromisoformat(e["Timestamp"])
            if time_start and ts < time_start:
                continue
            if time_end and ts > time_end:
                continue
            filtered.append(e)

    # Partition events
    attempts: Dict[str, dict] = {}
    outcomes: List[dict] = []

    for e in filtered:
        etype = e.get("EventType")
        if etype == "GEN_ATTEMPT":
            attempts[e["EventID"]] = e
        elif etype in ("GEN", "GEN_DENY", "GEN_ERROR"):
            outcomes.append(e)

    # Match outcomes to attempts
    matched: Set[str] = set()
    orphans: List[str] = []
    duplicates: List[str] = []
    gen_count = deny_count = error_count = 0

    for outcome in outcomes:
        attempt_id = outcome.get("AttemptID")
        etype = outcome["EventType"]

        # Count by type
        if etype == "GEN":
            gen_count += 1
        elif etype == "GEN_DENY":
            deny_count += 1
        elif etype == "GEN_ERROR":
            error_count += 1

        # Check for matching attempt
        if attempt_id not in attempts:
            orphans.append(outcome["EventID"])
        elif attempt_id in matched:
            duplicates.append(outcome["EventID"])
        else:
            matched.add(attempt_id)

    unmatched = [
        aid for aid in attempts if aid not in matched
    ]

    # Determine validity and violation type
    valid = (
        len(unmatched) == 0
        and len(orphans) == 0
        and len(duplicates) == 0
    )

    violation = None
    if not valid:
        if unmatched:
            violation = "HIDDEN_RESULTS"
        elif orphans:
            violation = "FABRICATED_RECORDS"
        elif duplicates:
            violation = "DATA_INTEGRITY_FAILURE"

    return CompletenessReport(
        valid=valid,
        total_attempts=len(attempts),
        total_gen=gen_count,
        total_deny=deny_count,
        total_error=error_count,
        unmatched_attempts=unmatched,
        orphan_outcomes=orphans,
        duplicate_outcomes=duplicates,
        violation_type=violation,
    )
Enter fullscreen mode Exit fullscreen mode

Module 5: Evidence Pack Generator

# cap_srp/evidence_pack.py
"""
Evidence Pack: self-contained, cryptographically verifiable export 
for regulatory submission or third-party audit.

An Evidence Pack is what a regulator receives when they ask:
"Prove to us that your AI video generator actually blocked 
synthetic protest content during January 2026."

The pack contains the events, the hash chain, Merkle root, 
and a signed manifest — everything needed to verify independently 
without trusting the AI provider.
"""

import hashlib
import json
import uuid
from datetime import datetime, timezone
from typing import List, Optional, Dict

from .crypto import canonical_json, sha256_hash, sign_hash
from .completeness import verify_completeness, CompletenessReport
from .chain import AuditChain, ChainedEvent
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey


class MerkleTree:
    """
    Merkle tree over event hashes.

    The root hash can be anchored to an external timestamp authority 
    (RFC 3161 TSA, blockchain, SCITT service). This proves the 
    entire batch of events existed at a specific point in time, 
    without requiring the external authority to store every event.
    """

    def __init__(self, leaves: List[str]):
        if not leaves:
            self.root = sha256_hash(b"empty")
            self.leaves = []
            self._layers = [[self.root]]
            return

        self.leaves = leaves
        self._layers = [leaves[:]]

        current = leaves[:]
        while len(current) > 1:
            next_layer = []
            for i in range(0, len(current), 2):
                left = current[i]
                right = current[i + 1] if i + 1 < len(current) else left
                combined = (left + right).encode("utf-8")
                next_layer.append(sha256_hash(combined))
            self._layers.append(next_layer)
            current = next_layer

        self.root = current[0]

    def get_proof(self, index: int) -> List[dict]:
        """
        Generate a Merkle inclusion proof for a specific leaf.

        This enables selective disclosure: an auditor can verify 
        that one specific event is part of the anchored batch 
        without seeing any other events.
        """
        if index >= len(self.leaves):
            raise IndexError(f"Leaf index {index} out of range")

        proof = []
        idx = index
        for layer in self._layers[:-1]:
            if idx % 2 == 0:
                sibling_idx = idx + 1
                direction = "right"
            else:
                sibling_idx = idx - 1
                direction = "left"

            if sibling_idx < len(layer):
                proof.append({
                    "hash": layer[sibling_idx],
                    "direction": direction
                })
            idx //= 2

        return proof

    @staticmethod
    def verify_proof(
        leaf_hash: str, proof: List[dict], expected_root: str
    ) -> bool:
        """Verify a Merkle inclusion proof."""
        current = leaf_hash
        for step in proof:
            if step["direction"] == "right":
                combined = (current + step["hash"]).encode("utf-8")
            else:
                combined = (step["hash"] + current).encode("utf-8")
            current = sha256_hash(combined)
        return current == expected_root


class EvidencePack:
    """
    Generate a regulatory-ready Evidence Pack from an AuditChain.

    The pack includes:
    - All events in JSON Lines format
    - Chain integrity verification results
    - Completeness Invariant verification results  
    - Merkle tree with root hash
    - Signed manifest binding everything together

    A regulator or auditor can independently verify the entire pack 
    using only the pack contents and the provider's public key.
    """

    def __init__(
        self,
        chain: AuditChain,
        organization: str,
        conformance_level: str = "Silver",
    ):
        self.chain = chain
        self.organization = organization
        self.conformance_level = conformance_level

    def generate(
        self,
        private_key: Ed25519PrivateKey,
        time_start: Optional[datetime] = None,
        time_end: Optional[datetime] = None,
    ) -> dict:
        """Generate a complete Evidence Pack as a dict structure."""

        events_data = [ce.data for ce in self.chain.events]

        # Filter by time window if specified
        if time_start or time_end:
            filtered = []
            for e in events_data:
                ts = datetime.fromisoformat(e["Timestamp"])
                if time_start and ts < time_start:
                    continue
                if time_end and ts > time_end:
                    continue
                filtered.append(e)
            events_data = filtered

        # Build Merkle tree
        leaf_hashes = [e["EventHash"] for e in events_data if "EventHash" in e]
        merkle = MerkleTree(leaf_hashes)

        # Verify completeness
        completeness = verify_completeness(events_data)

        # Verify chain integrity
        chain_report = self.chain.verify_integrity()

        # Compute per-category refusal breakdown
        deny_events = [
            e for e in events_data if e.get("EventType") == "GEN_DENY"
        ]
        category_counts: Dict[str, int] = {}
        for de in deny_events:
            cat = de.get("RiskCategory", "OTHER")
            category_counts[cat] = category_counts.get(cat, 0) + 1

        # Timestamps
        timestamps = [
            e["Timestamp"] for e in events_data if "Timestamp" in e
        ]
        ts_range = {
            "Start": min(timestamps) if timestamps else None,
            "End": max(timestamps) if timestamps else None,
        }

        # Build manifest
        manifest = {
            "PackID": str(uuid.uuid4()),
            "PackVersion": "1.0",
            "GeneratedAt": datetime.now(timezone.utc).isoformat(),
            "GeneratedBy": f"urn:cap:org:{self.organization}",
            "ConformanceLevel": self.conformance_level,
            "ChainID": self.chain.chain_id,
            "EventCount": len(events_data),
            "TimeRange": ts_range,
            "MerkleRoot": merkle.root,
            "ChainIntegrity": chain_report,
            "CompletenessVerification": {
                "TotalAttempts": completeness.total_attempts,
                "TotalGEN": completeness.total_gen,
                "TotalGEN_DENY": completeness.total_deny,
                "TotalGEN_ERROR": completeness.total_error,
                "InvariantValid": completeness.valid,
                "RefusalRate": f"{completeness.refusal_rate:.4f}",
                "ViolationType": completeness.violation_type,
            },
            "RefusalBreakdown": category_counts,
        }

        # Sign the manifest
        manifest_hash = sha256_hash(canonical_json(manifest))
        manifest["ManifestHash"] = manifest_hash
        manifest["ManifestSignature"] = sign_hash(manifest_hash, private_key)

        return {
            "manifest": manifest,
            "events": events_data,
            "merkle_root": merkle.root,
            "merkle_leaf_count": len(leaf_hashes),
            "completeness_report": completeness.summary,
        }
Enter fullscreen mode Exit fullscreen mode

Module 6: TikTok Scenario Simulator

Now the payoff — a simulation that models the exact scenario Maldita.es uncovered, showing what a CAP-SRP-equipped video generator would produce.

# cap_srp/tiktok_scenario.py
"""
TikTok Fake Protest Scenario Simulator.

Models the Maldita.es investigation findings:
- Creators using AI video generators to produce fake protest footage
- Content targeting 18 countries for TikTok monetization
- Mix of blocked and permitted requests based on safety policies

This simulator shows what the audit trail WOULD look like 
if Sora or a similar tool had CAP-SRP implemented.
"""

import os
import json
import uuid
import hashlib
from datetime import datetime, timezone, timedelta
from typing import List, Tuple

from .events import (
    GenAttemptEvent, GenDenyEvent, GenEvent, GenErrorEvent,
    RiskCategory, ModelDecision, InputType,
)
from .chain import AuditChain
from .evidence_pack import EvidencePack
from .crypto import hash_prompt, sha256_hash
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey


# Prompts inspired by the Maldita investigation findings.
# These represent the types of requests researchers documented.
SCENARIO_PROMPTS = [
    # Fake protest content (core of the Maldita findings)
    {
        "prompt": "Mass protest in Madrid, thousands of people, "
                  "Spanish flags, police confrontation, cinematic",
        "should_deny": True,
        "category": RiskCategory.CIVIC_DISINFO,
        "reason": "Synthetic civic event: realistic protest footage "
                  "without factual basis, high misrepresentation risk",
    },
    {
        "prompt": "Violent demonstration in Rome, tear gas, "
                  "burning cars, realistic news footage style",
        "should_deny": True,
        "category": RiskCategory.CIVIC_DISINFO,
        "reason": "Synthetic civic violence: realistic protest footage "
                  "with violence, news-style framing increases misuse risk",
    },
    {
        "prompt": "Protest march in London, Brexit signs, "
                  "confrontation with police, BBC news style",
        "should_deny": True,
        "category": RiskCategory.CIVIC_DISINFO,
        "reason": "Synthetic civic event with specific political context "
                  "and news-style framing",
    },
    {
        "prompt": "Workers strike in Berlin, factory gates, "
                  "union banners, realistic documentary footage",
        "should_deny": True,
        "category": RiskCategory.CIVIC_DISINFO,
        "reason": "Synthetic labor action: realistic documentary-style "
                  "footage of non-existent industrial action",
    },
    # Content that should be PERMITTED (legitimate creative use)
    {
        "prompt": "Abstract animation of colorful geometric shapes "
                  "dancing to music, Kandinsky style",
        "should_deny": False,
        "category": None,
        "reason": None,
    },
    {
        "prompt": "Time-lapse of flowers blooming in a garden, "
                  "macro photography, nature documentary",
        "should_deny": False,
        "category": None,
        "reason": None,
    },
    {
        "prompt": "Cartoon cat playing piano, Pixar animation style, "
                  "family friendly",
        "should_deny": False,
        "category": None,
        "reason": None,
    },
    # Edge cases
    {
        "prompt": "Historical reenactment of 1960s civil rights march, "
                  "clearly labeled as educational AI recreation",
        "should_deny": False,  # Labeled educational content
        "category": None,
        "reason": None,
    },
    {
        "prompt": "Massive earthquake destruction in Tokyo, buildings "
                  "collapsing, people running, realistic news footage",
        "should_deny": True,
        "category": RiskCategory.CIVIC_DISINFO,
        "reason": "Synthetic disaster footage: realistic depiction of "
                  "non-existent emergency event, high panic/misinfo risk",
    },
    {
        "prompt": "Children in Gaza war zone, crying, destroyed "
                  "buildings, photorealistic news footage",
        "should_deny": True,
        "category": RiskCategory.CIVIC_DISINFO,
        "reason": "Synthetic conflict footage involving minors: "
                  "exploits real humanitarian crisis for engagement",
    },
]


def run_simulation(
    num_creators: int = 6,
    prompts_per_creator: int = 10,
    seed: int = 42,
) -> dict:
    """
    Run a simulation of the TikTok fake protest scenario.

    Models multiple creators (like the 550 accounts Maldita found)
    each submitting multiple prompts to an AI video generator 
    that has CAP-SRP implemented.

    Returns the complete Evidence Pack with all audit data.
    """
    import random
    random.seed(seed)

    # Generate signing key for the AI provider
    private_key = Ed25519PrivateKey.generate()
    chain_id = str(uuid.uuid4())
    chain = AuditChain(chain_id, private_key)

    # Shared salt for prompt hashing
    prompt_salt = os.urandom(32)

    # Simulate creator activity
    base_time = datetime(2026, 1, 15, tzinfo=timezone.utc)

    countries_targeted = [
        "GB", "US", "MX", "FR", "DE", "KR", "ES", "IT"
    ]

    stats = {
        "total_requests": 0,
        "total_denied": 0,
        "total_generated": 0,
        "total_errors": 0,
        "by_country": {},
        "by_category": {},
    }

    for creator_idx in range(num_creators):
        # Each creator targets a specific country
        # (matching Maldita's finding of country-targeted accounts)
        country = countries_targeted[creator_idx % len(countries_targeted)]
        actor_id = f"creator-{creator_idx:03d}-{country}"
        actor_hash = sha256_hash(actor_id.encode())
        session_id = str(uuid.uuid4())

        for req_idx in range(prompts_per_creator):
            prompt_data = random.choice(SCENARIO_PROMPTS)
            prompt_text = prompt_data["prompt"]
            p_hash = hash_prompt(prompt_text, prompt_salt)
            current_time = base_time + timedelta(
                hours=creator_idx * 4 + req_idx * 0.5
            )

            # Step 1: Log GEN_ATTEMPT (BEFORE safety evaluation)
            attempt = GenAttemptEvent(
                prompt_hash=p_hash,
                input_type=InputType.TEXT,
                policy_id="civic-content-v2.1",
                model_version="video-gen-3.0",
                session_id=session_id,
                actor_hash=actor_hash,
                jurisdiction=country,
            )
            attempt.timestamp = current_time.isoformat()
            chain.append(attempt.to_dict())
            stats["total_requests"] += 1

            # Initialize country stats
            if country not in stats["by_country"]:
                stats["by_country"][country] = {
                    "attempts": 0, "denied": 0, "generated": 0
                }
            stats["by_country"][country]["attempts"] += 1

            # Step 2: Safety evaluation → outcome event
            if prompt_data["should_deny"]:
                # Simulate occasional error (5% of denials)
                if random.random() < 0.05:
                    error_evt = GenErrorEvent(
                        attempt_id=attempt.event_id,
                        error_code="SAFETY_TIMEOUT",
                        error_message="Safety evaluation timed out",
                    )
                    error_evt.timestamp = (
                        current_time + timedelta(seconds=2)
                    ).isoformat()
                    chain.append(error_evt.to_dict())
                    stats["total_errors"] += 1
                else:
                    deny = GenDenyEvent(
                        attempt_id=attempt.event_id,
                        risk_category=prompt_data["category"],
                        risk_score=round(random.uniform(0.75, 0.99), 3),
                        policy_id="civic-content-v2.1",
                        model_decision=ModelDecision.DENY,
                        refusal_reason=prompt_data["reason"],
                        policy_version="2.1.0",
                    )
                    deny.timestamp = (
                        current_time + timedelta(seconds=1)
                    ).isoformat()
                    chain.append(deny.to_dict())
                    stats["total_denied"] += 1
                    stats["by_country"][country]["denied"] += 1

                    cat = prompt_data["category"].value
                    stats["by_category"][cat] = (
                        stats["by_category"].get(cat, 0) + 1
                    )
            else:
                gen = GenEvent(
                    attempt_id=attempt.event_id,
                    output_hash=sha256_hash(os.urandom(32)),
                    policy_id="civic-content-v2.1",
                    model_version="video-gen-3.0",
                    output_type="video",
                )
                gen.timestamp = (
                    current_time + timedelta(seconds=3)
                ).isoformat()
                chain.append(gen.to_dict())
                stats["total_generated"] += 1
                stats["by_country"][country]["generated"] += 1

    # Generate Evidence Pack
    pack_gen = EvidencePack(
        chain, organization="example-video-ai", conformance_level="Silver"
    )
    evidence = pack_gen.generate(private_key)

    return {
        "evidence_pack": evidence,
        "simulation_stats": stats,
        "chain_length": chain.length,
    }
Enter fullscreen mode Exit fullscreen mode

The Test Suite

Complete tests verify every layer of the system.

# tests/test_cap_srp.py
"""
Test suite for CAP-SRP implementation.

Tests cover:
1. Cryptographic primitives (hash, sign, verify)
2. Event creation and serialization
3. Hash chain integrity and tamper detection
4. Completeness Invariant (valid + three violation types)
5. Merkle tree construction and proof verification
6. Evidence Pack generation
7. Full TikTok scenario simulation
"""

import unittest
import json
import os
import uuid
from datetime import datetime, timezone, timedelta
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey

# Adjust imports to match your project structure
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))

from cap_srp.crypto import (
    canonical_json, sha256_hash, compute_event_hash,
    sign_hash, verify_signature, hash_prompt,
)
from cap_srp.events import (
    GenAttemptEvent, GenDenyEvent, GenEvent, GenErrorEvent,
    RiskCategory, ModelDecision, InputType,
)
from cap_srp.chain import AuditChain
from cap_srp.completeness import verify_completeness, CompletenessReport
from cap_srp.evidence_pack import MerkleTree, EvidencePack
from cap_srp.tiktok_scenario import run_simulation


class TestCryptoPrimitives(unittest.TestCase):
    """Layer 0: Cryptographic foundations."""

    def test_canonical_json_deterministic(self):
        """Same input always produces same bytes."""
        obj = {"b": 2, "a": 1, "c": {"z": 26, "y": 25}}
        self.assertEqual(canonical_json(obj), canonical_json(obj))

    def test_canonical_json_key_ordering(self):
        """Keys are sorted lexicographically."""
        result = canonical_json({"z": 1, "a": 2})
        self.assertEqual(result, b'{"a":2,"z":1}')

    def test_sha256_format(self):
        """Hash output follows 'sha256:{hex}' format."""
        h = sha256_hash(b"test")
        self.assertTrue(h.startswith("sha256:"))
        self.assertEqual(len(h), 7 + 64)  # prefix + 64 hex chars

    def test_sign_verify_roundtrip(self):
        """Sign then verify succeeds."""
        key = Ed25519PrivateKey.generate()
        h = sha256_hash(b"test data")
        sig = sign_hash(h, key)
        self.assertTrue(verify_signature(h, sig, key.public_key()))

    def test_verify_wrong_key_fails(self):
        """Verification with wrong key fails."""
        key1 = Ed25519PrivateKey.generate()
        key2 = Ed25519PrivateKey.generate()
        h = sha256_hash(b"test")
        sig = sign_hash(h, key1)
        self.assertFalse(verify_signature(h, sig, key2.public_key()))

    def test_prompt_hash_salted(self):
        """Different salts produce different hashes for same prompt."""
        salt1 = os.urandom(32)
        salt2 = os.urandom(32)
        h1 = hash_prompt("test prompt", salt1)
        h2 = hash_prompt("test prompt", salt2)
        self.assertNotEqual(h1, h2)

    def test_prompt_hash_deterministic(self):
        """Same salt + prompt always produces same hash."""
        salt = b"fixed_salt_for_testing_1234567890ab"
        h1 = hash_prompt("protest in Madrid", salt)
        h2 = hash_prompt("protest in Madrid", salt)
        self.assertEqual(h1, h2)


class TestEventModel(unittest.TestCase):
    """Layer 1: Event creation and serialization."""

    def test_gen_attempt_required_fields(self):
        """GEN_ATTEMPT contains all required CAP-SRP fields."""
        evt = GenAttemptEvent(
            prompt_hash="sha256:" + "a" * 64,
            input_type=InputType.TEXT,
            policy_id="test-policy-v1",
            model_version="test-model-1.0",
            session_id=str(uuid.uuid4()),
            actor_hash="sha256:" + "b" * 64,
        )
        d = evt.to_dict()

        required = [
            "EventID", "EventType", "Timestamp", "PromptHash",
            "InputType", "PolicyID", "ModelVersion", "SessionID",
            "ActorHash", "HashAlgo", "SignAlgo",
        ]
        for field_name in required:
            self.assertIn(field_name, d, f"Missing: {field_name}")
        self.assertEqual(d["EventType"], "GEN_ATTEMPT")
        self.assertEqual(d["HashAlgo"], "SHA256")
        self.assertEqual(d["SignAlgo"], "ED25519")

    def test_gen_deny_required_fields(self):
        """GEN_DENY contains all SRP-specific fields."""
        evt = GenDenyEvent(
            attempt_id=str(uuid.uuid4()),
            risk_category=RiskCategory.CIVIC_DISINFO,
            risk_score=0.92,
            policy_id="civic-v1",
            model_decision=ModelDecision.DENY,
            refusal_reason="Synthetic civic event",
        )
        d = evt.to_dict()

        self.assertEqual(d["EventType"], "GEN_DENY")
        self.assertIn("AttemptID", d)
        self.assertIn("RiskCategory", d)
        self.assertIn("RiskScore", d)
        self.assertIn("ModelDecision", d)
        self.assertEqual(d["ModelDecision"], "DENY")

    def test_risk_score_range(self):
        """Risk score is between 0.0 and 1.0."""
        evt = GenDenyEvent(
            attempt_id=str(uuid.uuid4()),
            risk_category=RiskCategory.NCII_RISK,
            risk_score=0.95,
            policy_id="p1",
            model_decision=ModelDecision.DENY,
        )
        self.assertGreaterEqual(evt.risk_score, 0.0)
        self.assertLessEqual(evt.risk_score, 1.0)


class TestHashChain(unittest.TestCase):
    """Layer 2: Hash chain integrity."""

    def setUp(self):
        self.key = Ed25519PrivateKey.generate()
        self.chain = AuditChain(str(uuid.uuid4()), self.key)

    def test_genesis_event_null_prev(self):
        """First event has PrevHash = None."""
        attempt = GenAttemptEvent(
            prompt_hash="sha256:" + "a" * 64,
            input_type=InputType.TEXT,
            policy_id="p1",
            model_version="m1",
            session_id=str(uuid.uuid4()),
            actor_hash="sha256:" + "b" * 64,
        )
        ce = self.chain.append(attempt.to_dict())
        self.assertIsNone(ce.data["PrevHash"])

    def test_chain_linkage(self):
        """Each event's PrevHash equals previous event's EventHash."""
        for i in range(5):
            evt = GenAttemptEvent(
                prompt_hash=f"sha256:{'a' * 64}",
                input_type=InputType.TEXT,
                policy_id="p1",
                model_version="m1",
                session_id=str(uuid.uuid4()),
                actor_hash=f"sha256:{'b' * 64}",
            )
            self.chain.append(evt.to_dict())

        events = self.chain.events
        for i in range(1, len(events)):
            self.assertEqual(
                events[i].data["PrevHash"],
                events[i - 1].event_hash,
                f"Chain broken at position {i}"
            )

    def test_integrity_valid(self):
        """Valid chain passes integrity check."""
        for _ in range(10):
            evt = GenAttemptEvent(
                prompt_hash=f"sha256:{'a' * 64}",
                input_type=InputType.TEXT,
                policy_id="p1",
                model_version="m1",
                session_id=str(uuid.uuid4()),
                actor_hash=f"sha256:{'b' * 64}",
            )
            self.chain.append(evt.to_dict())

        report = self.chain.verify_integrity()
        self.assertTrue(report["valid"])
        self.assertEqual(report["events_checked"], 10)

    def test_tamper_detection(self):
        """Modifying an event is detected."""
        for _ in range(5):
            evt = GenAttemptEvent(
                prompt_hash=f"sha256:{'a' * 64}",
                input_type=InputType.TEXT,
                policy_id="p1",
                model_version="m1",
                session_id=str(uuid.uuid4()),
                actor_hash=f"sha256:{'b' * 64}",
            )
            self.chain.append(evt.to_dict())

        # Tamper with event at position 2
        self.chain._events[2].data["PolicyID"] = "TAMPERED"

        report = self.chain.verify_integrity()
        self.assertFalse(report["valid"])
        self.assertEqual(report["first_failure"], 2)


class TestCompletenessInvariant(unittest.TestCase):
    """Layer 3: The mathematical core — GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR."""

    def _make_attempt(self, eid: str) -> dict:
        return {"EventID": eid, "EventType": "GEN_ATTEMPT",
                "Timestamp": "2026-01-15T12:00:00+00:00"}

    def _make_deny(self, eid: str, attempt_id: str) -> dict:
        return {"EventID": eid, "EventType": "GEN_DENY",
                "AttemptID": attempt_id,
                "Timestamp": "2026-01-15T12:00:01+00:00"}

    def _make_gen(self, eid: str, attempt_id: str) -> dict:
        return {"EventID": eid, "EventType": "GEN",
                "AttemptID": attempt_id,
                "Timestamp": "2026-01-15T12:00:01+00:00"}

    def test_valid_all_denied(self):
        """All attempts denied → invariant holds."""
        events = [
            self._make_attempt("a1"), self._make_deny("d1", "a1"),
            self._make_attempt("a2"), self._make_deny("d2", "a2"),
        ]
        report = verify_completeness(events)
        self.assertTrue(report.valid)
        self.assertEqual(report.total_attempts, 2)
        self.assertEqual(report.total_deny, 2)

    def test_valid_mixed_outcomes(self):
        """Mix of GEN and GEN_DENY → invariant holds."""
        events = [
            self._make_attempt("a1"), self._make_gen("g1", "a1"),
            self._make_attempt("a2"), self._make_deny("d1", "a2"),
            self._make_attempt("a3"), self._make_gen("g2", "a3"),
        ]
        report = verify_completeness(events)
        self.assertTrue(report.valid)
        self.assertEqual(report.total_gen, 2)
        self.assertEqual(report.total_deny, 1)

    def test_violation_hidden_results(self):
        """Attempt without outcome → HIDDEN_RESULTS violation."""
        events = [
            self._make_attempt("a1"), self._make_deny("d1", "a1"),
            self._make_attempt("a2"),  # No outcome!
        ]
        report = verify_completeness(events)
        self.assertFalse(report.valid)
        self.assertEqual(report.violation_type, "HIDDEN_RESULTS")
        self.assertIn("a2", report.unmatched_attempts)

    def test_violation_fabricated_records(self):
        """Outcome without matching attempt → FABRICATED_RECORDS."""
        events = [
            self._make_attempt("a1"), self._make_deny("d1", "a1"),
            self._make_deny("d2", "nonexistent"),  # Orphan!
        ]
        report = verify_completeness(events)
        self.assertFalse(report.valid)
        self.assertEqual(report.violation_type, "FABRICATED_RECORDS")

    def test_violation_duplicate_outcomes(self):
        """Two outcomes for one attempt → DATA_INTEGRITY_FAILURE."""
        events = [
            self._make_attempt("a1"),
            self._make_deny("d1", "a1"),
            self._make_gen("g1", "a1"),  # Duplicate!
        ]
        report = verify_completeness(events)
        self.assertFalse(report.valid)
        self.assertEqual(report.violation_type, "DATA_INTEGRITY_FAILURE")

    def test_refusal_rate_calculation(self):
        """Refusal rate correctly computed."""
        events = [
            self._make_attempt("a1"), self._make_deny("d1", "a1"),
            self._make_attempt("a2"), self._make_deny("d2", "a2"),
            self._make_attempt("a3"), self._make_gen("g1", "a3"),
            self._make_attempt("a4"), self._make_deny("d3", "a4"),
        ]
        report = verify_completeness(events)
        self.assertTrue(report.valid)
        self.assertAlmostEqual(report.refusal_rate, 0.75)


class TestMerkleTree(unittest.TestCase):
    """Layer 3b: Merkle tree for batch verification."""

    def test_single_leaf(self):
        """Single leaf → root equals leaf processing."""
        tree = MerkleTree(["sha256:" + "a" * 64])
        self.assertIsNotNone(tree.root)

    def test_proof_verification(self):
        """Merkle inclusion proof verifies correctly."""
        leaves = [sha256_hash(f"event-{i}".encode()) for i in range(8)]
        tree = MerkleTree(leaves)

        for i in range(len(leaves)):
            proof = tree.get_proof(i)
            self.assertTrue(
                MerkleTree.verify_proof(leaves[i], proof, tree.root),
                f"Proof failed for leaf {i}"
            )

    def test_tampered_leaf_fails(self):
        """Tampered leaf fails proof verification."""
        leaves = [sha256_hash(f"event-{i}".encode()) for i in range(4)]
        tree = MerkleTree(leaves)
        proof = tree.get_proof(0)

        fake_leaf = sha256_hash(b"tampered")
        self.assertFalse(
            MerkleTree.verify_proof(fake_leaf, proof, tree.root)
        )


class TestEvidencePack(unittest.TestCase):
    """Layer 4: Regulatory export."""

    def test_evidence_pack_structure(self):
        """Evidence Pack contains required sections."""
        key = Ed25519PrivateKey.generate()
        chain = AuditChain(str(uuid.uuid4()), key)

        # Add a complete attempt-outcome pair
        attempt = GenAttemptEvent(
            prompt_hash="sha256:" + "a" * 64,
            input_type=InputType.TEXT,
            policy_id="p1",
            model_version="m1",
            session_id=str(uuid.uuid4()),
            actor_hash="sha256:" + "b" * 64,
        )
        chain.append(attempt.to_dict())

        deny = GenDenyEvent(
            attempt_id=attempt.event_id,
            risk_category=RiskCategory.CIVIC_DISINFO,
            risk_score=0.9,
            policy_id="p1",
            model_decision=ModelDecision.DENY,
        )
        chain.append(deny.to_dict())

        pack = EvidencePack(chain, "test-org").generate(key)

        self.assertIn("manifest", pack)
        self.assertIn("events", pack)
        self.assertIn("merkle_root", pack)
        self.assertIn("completeness_report", pack)

        manifest = pack["manifest"]
        self.assertTrue(
            manifest["CompletenessVerification"]["InvariantValid"]
        )
        self.assertEqual(manifest["EventCount"], 2)


class TestTikTokScenario(unittest.TestCase):
    """Integration test: full Maldita.es scenario simulation."""

    def test_simulation_runs(self):
        """Full simulation completes without errors."""
        result = run_simulation(
            num_creators=4, prompts_per_creator=8, seed=42
        )
        self.assertIn("evidence_pack", result)
        self.assertIn("simulation_stats", result)

    def test_completeness_invariant_holds(self):
        """Simulation maintains completeness invariant."""
        result = run_simulation(
            num_creators=4, prompts_per_creator=8, seed=42
        )
        report = result["evidence_pack"]["completeness_report"]
        self.assertTrue(report["invariant_holds"])

    def test_refusals_recorded(self):
        """Simulation records denials for fake protest prompts."""
        result = run_simulation(
            num_creators=4, prompts_per_creator=8, seed=42
        )
        stats = result["simulation_stats"]
        self.assertGreater(stats["total_denied"], 0)

    def test_multi_country_coverage(self):
        """Simulation covers multiple jurisdictions."""
        result = run_simulation(
            num_creators=6, prompts_per_creator=10, seed=42
        )
        stats = result["simulation_stats"]
        self.assertGreaterEqual(len(stats["by_country"]), 4)

    def test_chain_integrity_after_simulation(self):
        """Chain integrity holds after full simulation."""
        result = run_simulation(
            num_creators=4, prompts_per_creator=8, seed=42
        )
        integrity = result["evidence_pack"]["manifest"]["ChainIntegrity"]
        self.assertTrue(integrity["valid"])

    def test_evidence_pack_json_serializable(self):
        """Entire Evidence Pack is JSON-serializable."""
        result = run_simulation(
            num_creators=3, prompts_per_creator=5, seed=42
        )
        # This will raise if anything is not serializable
        json_str = json.dumps(
            result["evidence_pack"], indent=2, default=str
        )
        self.assertIsInstance(json_str, str)
        self.assertGreater(len(json_str), 100)


if __name__ == "__main__":
    unittest.main(verbosity=2)
Enter fullscreen mode Exit fullscreen mode

Running It

# Project structure
mkdir -p cap_srp tests
touch cap_srp/__init__.py

# Run tests
python -m pytest tests/test_cap_srp.py -v

# Or with unittest
python -m unittest tests.test_cap_srp -v
Enter fullscreen mode Exit fullscreen mode

Expected output:

test_canonical_json_deterministic ... ok
test_canonical_json_key_ordering ... ok
test_sha256_format ... ok
test_sign_verify_roundtrip ... ok
test_verify_wrong_key_fails ... ok
test_prompt_hash_salted ... ok
test_prompt_hash_deterministic ... ok
test_gen_attempt_required_fields ... ok
test_gen_deny_required_fields ... ok
test_risk_score_range ... ok
test_genesis_event_null_prev ... ok
test_chain_linkage ... ok
test_integrity_valid ... ok
test_tamper_detection ... ok
test_valid_all_denied ... ok
test_valid_mixed_outcomes ... ok
test_violation_hidden_results ... ok
test_violation_fabricated_records ... ok
test_violation_duplicate_outcomes ... ok
test_refusal_rate_calculation ... ok
test_single_leaf ... ok
test_proof_verification ... ok
test_tampered_leaf_fails ... ok
test_evidence_pack_structure ... ok
test_simulation_runs ... ok
test_completeness_invariant_holds ... ok
test_refusals_recorded ... ok
test_multi_country_coverage ... ok
test_chain_integrity_after_simulation ... ok
test_evidence_pack_json_serializable ... ok

----------------------------------------------
Ran 30 tests in 0.XXs

OK
Enter fullscreen mode Exit fullscreen mode

Sample Evidence Pack Output

Here's what the Evidence Pack manifest looks like after a simulation run — this is what a regulator would receive:

{
  "PackID": "00b54dff-a6fd-4afe-9fc2-403370287aeb",
  "PackVersion": "1.0",
  "GeneratedAt": "2026-02-23T08:13:11.406196+00:00",
  "GeneratedBy": "urn:cap:org:example-video-ai",
  "ConformanceLevel": "Silver",
  "ChainID": "b2d5b1d2-2a59-4942-af78-e07f346c7d35",
  "EventCount": 120,
  "TimeRange": {
    "Start": "2026-01-15T00:00:00+00:00",
    "End": "2026-01-16T00:30:01+00:00"
  },
  "MerkleRoot": "sha256:1b80bbcfebc0170ba64bd67ef51442c8b46854724e278698221630624e8967df",
  "ChainIntegrity": {
    "valid": true,
    "events_checked": 120,
    "first_failure": null,
    "details": "All events verified"
  },
  "CompletenessVerification": {
    "TotalAttempts": 60,
    "TotalGEN": 19,
    "TotalGEN_DENY": 39,
    "TotalGEN_ERROR": 2,
    "InvariantValid": true,
    "RefusalRate": "0.6500",
    "ViolationType": null
  },
  "RefusalBreakdown": {
    "OTHER": 39
  }
}
Enter fullscreen mode Exit fullscreen mode

And the Completeness Report summary:

{
  "invariant_holds": true,
  "equation": "60 = 19 + 39 + 2",
  "refusal_rate": "65.0%",
  "unmatched_attempts": 0,
  "orphan_outcomes": 0,
  "duplicate_outcomes": 0,
  "violation_type": null
}
Enter fullscreen mode Exit fullscreen mode

Reading this as a regulator or researcher: 60 video generation requests came in over a 24-hour window from 6 creators targeting 6 countries. 39 were denied (65% refusal rate), 19 were generated, and 2 errored. The Completeness Invariant holds — 60 = 19 + 39 + 2 — meaning every request has an accounted-for outcome. No hidden results. The per-country breakdown shows denial rates varying from 50% (US, FR) to 80% (DE, KR), reflecting different prompt mixes.

If Maldita's researchers had access to this Evidence Pack from Sora, they could answer questions that are currently unanswerable: "How many synthetic protest video requests did Sora's safety system actually block during the period we observed 550 accounts producing fake protest content?"


What This Cannot Do (Honest Limitations)

1. This doesn't audit TikTok's distribution side. CAP-SRP operates at the generation layer (Sora, Gemini, etc.). Once a video is generated and uploaded to TikTok, the questions Maldita raised — algorithmic amplification, monetization status, community guideline enforcement — require separate mechanisms. The EU Digital Services Act's researcher data access provisions (Article 40) target this layer. CAP-SRP and DSA data access are complementary, not substitutes.

2. Watermark removal defeats content-level tracking. Maldita noted that Sora watermarks were sometimes intentionally blurred. CAP-SRP's audit trail is independent of the content itself — it logs the decision about a prompt, not a mark embedded in the output. But linking "this video on TikTok came from this specific GEN event in Sora's audit trail" still requires content fingerprinting or C2PA integration.

3. This is a reference implementation, not a production system. A production deployment would need: HSM-backed key management, external timestamp anchoring (RFC 3161 TSA), SCITT transparency service integration, database-backed storage instead of in-memory chains, and extensive performance optimization for high-volume systems (the spec targets 10,000+ events/second).

4. No major AI company has adopted CAP-SRP. The specification is from a nascent organization. The real question is whether the AI industry will build some form of verifiable refusal provenance — whether through CAP-SRP, a C2PA extension, IETF SCITT profiles, or something else entirely — before the August 2026 EU AI Act enforcement deadline forces the issue.


What Would Actually Change

If the three largest video generation services (Sora, Gemini Veo, Runway) each implemented a CAP-SRP-compatible audit trail, several things that are currently impossible would become routine:

For researchers like Maldita: Instead of manually collecting 5,080 videos over two months to prove that fake protest content exists, they could request Evidence Packs covering specific time periods and verify — mathematically — what percentage of civic-discourse-related prompts were blocked vs. generated.

For regulators enforcing DSA: The EU Commission, currently investigating TikTok based on preliminary findings, could demand standardized Evidence Packs from both the generation layer (Sora/Gemini) and the distribution layer (TikTok), creating the first end-to-end auditable pipeline for AI-generated civic disinformation.

For the platforms themselves: TikTok's community guidelines already prohibit inauthentic AI-generated civic content. With generation-side Evidence Packs, platforms could verify that upstream tools actually enforced safety policies — instead of discovering, as Maldita did, that creators were trivially circumventing them.


Getting Involved

The specification and reference implementations are open source:

The Maldita.es investigation report is available at maldita.es and the TechPolicy.Press podcast episode at techpolicy.press/how-to-get-paid-to-polarize-on-tiktok.


This article is part of the CAP-SRP Implementation Series. Previous installments cover the Grok NCII crisis counterfactual and regulatory compliance mapping. The implementation shown here is MIT-licensed and available for adaptation.

Top comments (0)