DEV Community

ahmet gedik
ahmet gedik

Posted on

Building a Video Metadata Moderation Pipeline With Python and the Claude API

Last month our ingest worker pulled a "trending" clip whose title carried a phone number and a slur written in Cyrillic, while its description hid a link to a fake crypto giveaway. It went live on a regional category page before a human ever looked at it. We don't host a single frame of video at ViralVidVault — we track viral trends across European regions and surface metadata: titles, descriptions, channel names, tags. That incident taught me a lesson that I think most aggregators learn the hard way: for a discovery site, the metadata is the attack surface, and a moderation gap there is a brand-safety problem and a GDPR problem at the same time.

This post walks through the pipeline we built to close that gap. It's a Python worker that runs YouTube-sourced metadata through the Claude API, fronted by a cheap Go pre-filter and backed by a SQLite WAL queue that our PHP 8.4 application already owned. It processes tens of thousands of records a day on a single LiteSpeed box, and the API bill is smaller than our coffee budget.

Why a keyword blocklist was never going to work

The first instinct of every backend engineer is a regex blocklist, and we tried it. It fails for video metadata in three predictable ways.

  • Adversarial spelling. Spammers write f r e e c r y p t o, swap Latin o for Cyrillic о, and pad slurs with zero-width joiners. A static list never keeps up.
  • Context collapse. The word "shot" is fine in a basketball compilation and alarming in a different context. A blocklist has no notion of context; it just matches bytes.
  • Multilingual reality. We surface trends from a dozen European regions. A list tuned for English is blind to German, Polish, or Turkish abuse, and maintaining one list per language is a second full-time job.

What a blocklist is good at is being cheap and deterministic. So we kept it — but demoted it to a pre-filter whose only job is to catch the obvious garbage and decide which records are worth an LLM call. The nuanced judgement we handed to Claude.

The shape of the pipeline

The whole thing is four moving parts, and each part is replaceable in isolation:

  1. Ingest writes new video metadata into a moderation_queue table (PHP, the same code path that already fetches trending videos).
  2. Pre-filter (Go) makes a free local decision: hard-block obvious spam, or mark the record as "needs a real judgement."
  3. Classifier (Python) sends the survivors to Claude with a strict tool schema and gets back a structured verdict.
  4. Write-back updates the row to allow or block, and only allow rows are ever eligible to render on a page.

The queue is the contract between stages. Because it lives in SQLite with WAL enabled, the PHP web tier and the Python worker can read and write concurrently without stepping on each other, and a crash anywhere just leaves rows in pending or processing for the next run to pick up. No Redis, no Kafka, no extra box to babysit.

A durable queue in SQLite WAL

Here's the enqueue side, written the way the rest of our PHP 8.4 codebase looks — typed, final, constructor property promotion, and PRAGMAs set the moment the connection opens.

<?php
declare(strict_types=1);

final class ModerationQueue
{
    public function __construct(private readonly PDO $db) {}

    public static function open(string $path): self
    {
        $db = new PDO('sqlite:' . $path);
        $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $db->exec('PRAGMA journal_mode = WAL');
        $db->exec('PRAGMA busy_timeout = 5000');
        $db->exec(<<<SQL
            CREATE TABLE IF NOT EXISTS moderation_queue (
                video_id    TEXT PRIMARY KEY,
                title       TEXT NOT NULL,
                description TEXT NOT NULL,
                channel     TEXT NOT NULL,
                region      TEXT NOT NULL,
                status      TEXT NOT NULL DEFAULT 'pending',
                verdict     TEXT,
                reason      TEXT,
                enqueued_at INTEGER NOT NULL,
                decided_at  INTEGER
            )
        SQL);
        $db->exec('CREATE INDEX IF NOT EXISTS idx_queue_status
                   ON moderation_queue(status, enqueued_at)');
        return new self($db);
    }

    public function enqueue(array $video): void
    {
        $stmt = $this->db->prepare(<<<SQL
            INSERT INTO moderation_queue
                (video_id, title, description, channel, region, enqueued_at)
            VALUES (:id, :title, :descr, :channel, :region, :ts)
            ON CONFLICT(video_id) DO NOTHING
        SQL);
        $stmt->execute([
            ':id'      => $video['id'],
            ':title'   => mb_substr($video['title'], 0, 300),
            ':descr'   => mb_substr($video['description'] ?? '', 0, 2000),
            ':channel' => $video['channel'],
            ':region'  => $video['region'],
            ':ts'      => time(),
        ]);
    }
}
Enter fullscreen mode Exit fullscreen mode

Two decisions matter here. First, busy_timeout = 5000 means a writer that hits a momentary lock retries for five seconds instead of throwing SQLITE_BUSY — essential when a web request and the worker touch the table at the same time. Second, we truncate the description to 2000 characters on the way in. Descriptions can be enormous, and the abuse signal is almost always near the top; truncating bounds both our storage and, more importantly, our token cost downstream.

A Go pre-filter to keep the LLM bill honest

Roughly 70% of incoming metadata is trivially clean and another 5% is trivially toxic. Spending an API call on those is waste. We run a tiny Go package as a sidecar that the worker calls over a local socket; it returns a verdict only when it's confident, and otherwise defers to Claude.

package prefilter

import (
    "regexp"
    "strings"
)

var (
    phoneRe = regexp.MustCompile(`(?:\+?\d[\s\-]?){9,}`)
    urlRe   = regexp.MustCompile(`https?://[^\s]+`)
    // Small, auditable hard-block list. Nuance is left to the LLM.
    hardBlock = []string{"t.me/", "wa.me/", "free-crypto", "giveaway-bot"}
)

type Verdict struct {
    Decided bool   // false => escalate to Claude
    Allowed bool
    Reason  string
}

// Screen makes a free local decision. Decided==false means the caller
// should send the record to the Claude classifier for a real judgement.
func Screen(title, description string) Verdict {
    blob := strings.ToLower(title + " " + description)

    for _, needle := range hardBlock {
        if strings.Contains(blob, needle) {
            return Verdict{Decided: true, Allowed: false, Reason: "hard_block:" + needle}
        }
    }

    links := len(urlRe.FindAllString(description, -1))
    if phoneRe.MatchString(blob) && links > 0 {
        return Verdict{Decided: true, Allowed: false, Reason: "phone_plus_link_spam"}
    }

    // Looks clean, but a clean-looking title can still mislead or dox.
    // Defer to Claude rather than auto-allowing.
    return Verdict{Decided: false}
}
Enter fullscreen mode Exit fullscreen mode

Notice what the pre-filter does not do: it never auto-approves. A phone number plus an external link is a confident block, but the absence of those signals is not a confident allow — doxxing, hate speech, and misleading titles all pass a regex cleanly. "Clean-looking" means "escalate," not "publish." That asymmetry is the whole point. We use the cheap layer only to reject and to triage, never to trust.

Asking Claude for a structured verdict

For the records that survive the pre-filter, we want a machine-readable answer, not prose. The reliable way to get that from the Claude API is a tool definition with a strict input_schema plus tool_choice forcing that tool. The model is then constrained to return arguments that match your schema, which you can deserialize without parsing free text.

For a high-volume classifier the model choice is a cost lever, not a vanity metric. We run the bulk of traffic on claude-haiku-4-5-20251001 because it's fast and inexpensive, and only escalate genuinely ambiguous records to claude-sonnet-4-6. Haiku 4.5 handles flat "is this title spam" judgements at a quality that comfortably beats our old heuristics.

import os
from anthropic import Anthropic

client = Anthropic(api_key=os.environ['ANTHROPIC_API_KEY'])

MODEL = 'claude-haiku-4-5-20251001'

MODERATION_TOOL = {
    'name': 'record_verdict',
    'description': 'Record the moderation verdict for one video metadata record.',
    'input_schema': {
        'type': 'object',
        'properties': {
            'allowed': {
                'type': 'boolean',
                'description': 'True only if the metadata is safe to publish.',
            },
            'categories': {
                'type': 'array',
                'items': {
                    'type': 'string',
                    'enum': [
                        'clean', 'spam', 'scam_link', 'adult',
                        'hate', 'violence', 'pii', 'misleading',
                    ],
                },
            },
            'severity': {'type': 'string', 'enum': ['none', 'low', 'medium', 'high']},
            'reason': {'type': 'string', 'description': 'One sentence, max 160 chars.'},
        },
        'required': ['allowed', 'categories', 'severity', 'reason'],
    },
}

SYSTEM = (
    'You moderate short-video METADATA (title, description, channel) for a '
    'European, GDPR-compliant discovery site. You never see the video itself; '
    'judge only the text. Flag personal data (phone numbers, emails, home '
    'addresses), scam or affiliate-link spam, adult or hateful language, and '
    'titles that materially misrepresent likely content. Be conservative about '
    'expression: edgy or clickbait wording is allowed; doxxing, fraud, and '
    'illegal content are not. Call record_verdict exactly once.'
)

def classify(video: dict) -> dict:
    fields = [
        'Title: ' + video['title'],
        'Channel: ' + video['channel'],
        'Region: ' + video['region'],
        'Description:\n' + video['description'][:2000],
    ]
    resp = client.messages.create(
        model=MODEL,
        max_tokens=400,
        system=[{
            'type': 'text',
            'text': SYSTEM,
            'cache_control': {'type': 'ephemeral'},
        }],
        tools=[MODERATION_TOOL],
        tool_choice={'type': 'tool', 'name': 'record_verdict'},
        messages=[{'role': 'user', 'content': '\n'.join(fields)}],
    )
    for block in resp.content:
        if block.type == 'tool_use' and block.name == 'record_verdict':
            return block.input
    raise RuntimeError('no verdict for ' + video['id'])
Enter fullscreen mode Exit fullscreen mode

The cache_control marker on the system block is doing quiet, important work. The system prompt is identical for every single call, so we let the API cache it; after the first request the cached prefix is billed at a steep discount and the model skips re-reading it. When your system prompt is large relative to a 200-character title, prompt caching is the difference between a viable classifier and an expensive one.

A note on the system prompt itself: it tells the model what it cannot see. Stating "you never see the video; judge only the text" stops Claude from hedging with "I can't assess the video," which is exactly the kind of non-answer that breaks an automated pipeline.

The worker loop: batching, concurrency, and write-back

The worker claims a batch atomically, fans the rows out across a thread pool (the Anthropic SDK call is I/O-bound, so threads are fine), and writes each verdict back as it lands. Claiming with BEGIN IMMEDIATE and flipping rows to processing means two worker instances can run side by side without double-processing.

import sqlite3
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

DB_PATH = '/var/data/moderation.db'

def fetch_batch(conn, limit=50):
    conn.execute('BEGIN IMMEDIATE')
    rows = conn.execute(
        '''SELECT video_id, title, description, channel, region
           FROM moderation_queue
           WHERE status = 'pending'
           ORDER BY enqueued_at
           LIMIT ?''',
        (limit,),
    ).fetchall()
    ids = [r[0] for r in rows]
    if ids:
        placeholders = ','.join('?' * len(ids))
        conn.execute(
            'UPDATE moderation_queue SET status = \'processing\' '
            'WHERE video_id IN (' + placeholders + ')',
            ids,
        )
    conn.commit()
    return rows

def write_verdict(conn, video_id, verdict):
    conn.execute(
        '''UPDATE moderation_queue
           SET status = 'done', verdict = ?, reason = ?, decided_at = ?
           WHERE video_id = ?''',
        (
            'allow' if verdict['allowed'] else 'block',
            verdict['severity'] + ':' + verdict['reason'],
            int(time.time()),
            video_id,
        ),
    )
    conn.commit()

def run_once():
    conn = sqlite3.connect(DB_PATH, timeout=10)
    conn.execute('PRAGMA journal_mode = WAL')
    rows = fetch_batch(conn)
    if not rows:
        return 0

    work = {
        r[0]: {'id': r[0], 'title': r[1], 'description': r[2],
               'channel': r[3], 'region': r[4]}
        for r in rows
    }
    with ThreadPoolExecutor(max_workers=8) as pool:
        futures = {pool.submit(classify, v): vid for vid, v in work.items()}
        for fut in as_completed(futures):
            vid = futures[fut]
            try:
                write_verdict(conn, vid, fut.result())
            except Exception as exc:  # keep the batch moving
                conn.execute(
                    'UPDATE moderation_queue SET status = \'error\', reason = ? '
                    'WHERE video_id = ?',
                    (str(exc)[:200], vid),
                )
                conn.commit()
    return len(rows)

if __name__ == '__main__':
    total = run_once()
    print('processed', total, 'records')
Enter fullscreen mode Exit fullscreen mode

The error branch matters more than it looks. A single malformed record — a timeout, a rate-limit, a record that somehow has a null field — must never poison the whole batch. We catch per-future, mark that one row error, and keep draining the rest. An error row is invisible to the site (only allow renders) and gets re-examined by a slower retry job, so the failure mode is "this clip is temporarily hidden," never "the worker is wedged."

Cost control: prompt caching and the Batch API

Three levers keep this affordable at our volume:

  • Pre-filter first. Roughly three-quarters of records never reach the API. The cheapest token is the one you don't send.
  • Prompt caching on the static system block, as shown above, cuts the repeated-input cost dramatically because the instruction prefix is read from cache instead of re-billed every call.
  • Batch processing. Moderation is not user-facing in real time — a video can sit in pending for a few minutes. For the steady-state backlog we route non-urgent records through the Message Batches API, which trades latency for a substantial per-token discount. Newly trending clips skip the batch and go synchronous so they're live quickly.

The synchronous-versus-batch split falls naturally out of the queue: enqueued_at plus a region's velocity tells you whether a record is "hot" (classify now) or "backlog" (batch overnight).

GDPR is a first-class moderation category

For a European site this is not an afterthought — it's why pii is an explicit enum value in the schema. User-generated titles and descriptions routinely leak personal data: a creator's mobile number, a third party's home address in a "prank" clip, an email harvested for a giveaway. Surfacing that on our pages would make us the data controller republishing it.

Three practices keep the pipeline itself compliant:

  • The model never sees more than the public metadata we already received, and we never log raw description text into long-term analytics — only the verdict, severity, and category survive.
  • A pii-flagged record is blocked, not stored for review. We keep the verdict and the video_id, drop the offending text, and move on. Minimisation by default.
  • Verdicts are explainable. The one-sentence reason field gives us an audit trail for every automated block, which is exactly the kind of record a data-protection inquiry asks for.

Claude is a US-routed service, so the only thing we send across that boundary is metadata a platform already published publicly — never our own users' analytics, which stay on the origin. That boundary discipline is the same one that lets us run GDPR-clean trend analytics in the first place.

Edge enforcement with Cloudflare Workers

The last mile is making sure a blocked record genuinely cannot render, even if a cached page slips through. A small Cloudflare Worker sits in front of the origin and strips any video card whose video_id appears in a KV set of blocked IDs that the write-back step pushes on each block. It's belt-and-suspenders: the application already filters on status = 'allow', but the edge layer guarantees that a stale LiteSpeed page cache can't briefly expose something the pipeline just rejected. Propagation is seconds, and the Worker adds no measurable latency for the 99% of requests whose IDs aren't in the set.

What I'd do differently

If I were starting over, I'd add a thin human-review lane from day one for medium-severity misleading verdicts — that's the band where the model and a human most often disagree, and a feedback table of human overrides would let us tune the system prompt with real examples instead of guesses. I'd also version the system prompt in the database so every stored verdict records which prompt produced it; the first time you change the instructions you'll wish past decisions were reproducible.

Conclusion

The pieces are deliberately boring: a SQLite WAL queue our PHP app already understood, a regex pre-filter in Go that only ever rejects, a forced-tool Claude call that returns a typed verdict, and a worker that fails one row at a time instead of one batch at a time. None of it is exotic, and that's the point — metadata moderation doesn't need a bespoke ML platform, it needs a durable queue, a cheap first pass, and an LLM constrained to answer in a schema you can act on. The result has kept the scam links and the leaked phone numbers off our category pages for months, on infrastructure we were already paying for.

Top comments (0)