DEV Community

Ayi NEDJIMI
Ayi NEDJIMI

Posted on

How to build an AI-powered content moderation pipeline for user comments

Comment sections and user-submitted content are an attack surface. Spam bots, coordinated harassment, phishing links disguised as helpful replies — if you ship a public-facing form or discussion feature, you will encounter all of these within days. Rule-based filters (regex, keyword lists) have ~60-70% precision at best and generate constant maintenance overhead. An LLM-based classifier can handle nuanced toxic content, context-dependent spam, and subtle manipulation that keyword filters miss entirely.

This tutorial builds a complete moderation pipeline in Python: receive a comment, classify it with an LLM, cache repeated inputs, process batches efficiently, and route borderline cases to a human review queue. The same architecture works for form submissions, support tickets, forum posts, and any other user-generated text. For organizations managing content at scale, this pairs well with the broader security controls described in practical security guides.

Architecture overview

User comment
     │
     ▼
Cache lookup (Redis/dict) ──hit──▶ cached decision
     │ miss
     ▼
Batch accumulator (up to 20 items or 500ms)
     │
     ▼
LLM classifier (structured JSON output)
     │
     ├── safe ──────────────▶ publish immediately
     ├── spam/toxic ────────▶ auto-reject + log
     └── borderline (< 0.75)▶ human review queue
Enter fullscreen mode Exit fullscreen mode

Setup

pip install openai redis pydantic python-dotenv
Enter fullscreen mode Exit fullscreen mode

For local development, run Redis:

docker run -d -p 6379:6379 redis:alpine
Enter fullscreen mode Exit fullscreen mode

Data models

# models.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import time

class ContentCategory(str, Enum):
    SAFE = "safe"
    SPAM = "spam"
    TOXIC = "toxic"
    PHISHING = "phishing"
    BORDERLINE = "borderline"

@dataclass
class ModerationResult:
    category: ContentCategory
    confidence: float            # 0.0 – 1.0
    reasoning: str               # one sentence
    flags: list[str]             # e.g. ["contains_url", "promotional_language"]
    needs_human_review: bool
    processing_time_ms: float
    cache_hit: bool = False

@dataclass
class PendingComment:
    comment_id: str
    text: str
    user_id: str
    submitted_at: float = field(default_factory=time.time)
    context: Optional[str] = None  # e.g. article slug or thread title
Enter fullscreen mode Exit fullscreen mode

LLM classifier with structured output

# classifier.py
import json
import hashlib
import time
import logging
from typing import Optional

from openai import OpenAI
from models import ContentCategory, ModerationResult

logger = logging.getLogger(__name__)

llm_client = OpenAI(
    api_key="your_api_key",
    base_url="https://api.your-llm-provider.com/v1",
)

SYSTEM_PROMPT = """You are a content moderation classifier. Analyze submitted text and return a JSON object with exactly these fields:
- "category": one of "safe", "spam", "toxic", "phishing", "borderline"
- "confidence": float 0.0-1.0 (your certainty in the classification)
- "reasoning": one sentence explaining the decision
- "flags": array of strings identifying specific issues (empty array if safe)

Categories:
- safe: legitimate user content, on-topic discussion, genuine questions
- spam: promotional content, repeated phrases, unsolicited advertising, SEO link drops
- toxic: harassment, hate speech, threats, personal attacks, profanity targeting users
- phishing: credential harvesting, fake login prompts, deceptive links, scam patterns
- borderline: ambiguous content that requires human judgment

Return ONLY the JSON object. No prose."""

def classify_single(text: str, context: Optional[str] = None,
                    model: str = "gpt-4o-mini") -> dict:
    """Call LLM for a single text. Returns raw parsed dict."""
    user_content = f"Text to classify:\n{text}"
    if context:
        user_content = f"Context: {context}\n\n{user_content}"

    response = llm_client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_content},
        ],
        temperature=0.0,   # deterministic for classification
        max_tokens=200,
        response_format={"type": "json_object"},  # forces JSON output
    )

    raw = response.choices[0].message.content
    return json.loads(raw)

def classify_batch(texts: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
    """
    Classify multiple texts in a single API call.
    texts: [{"id": str, "text": str, "context": Optional[str]}]
    Returns results in the same order.
    """
    if not texts:
        return []

    # Build a numbered batch prompt
    items_block = "\n\n".join(
        f'Item {i+1} (id={item["id"]}):\n{item["text"][:800]}'
        + (f'\nContext: {item["context"]}' if item.get("context") else "")
        for i, item in enumerate(texts)
    )

    batch_prompt = f"""Classify each of the following {len(texts)} items.
Return a JSON array where each element corresponds to one item, in order.
Each element must have: id, category, confidence, reasoning, flags.

{items_block}"""

    response = llm_client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT.replace(
                "Return ONLY the JSON object.",
                "Return ONLY a JSON array of objects, one per item."
            )},
            {"role": "user", "content": batch_prompt},
        ],
        temperature=0.0,
        max_tokens=100 * len(texts),
        response_format={"type": "json_object"},
    )

    raw = response.choices[0].message.content
    parsed = json.loads(raw)

    # The model may return {"results": [...]} or a bare array
    if isinstance(parsed, dict):
        for key in ("results", "items", "classifications"):
            if key in parsed and isinstance(parsed[key], list):
                return parsed[key]
    if isinstance(parsed, list):
        return parsed

    logger.warning("Unexpected batch response structure: %s", raw[:200])
    return []
Enter fullscreen mode Exit fullscreen mode

Cost estimation

Before running at scale, understand what you're paying.

# cost.py

# Approximate token counts for moderation
SYSTEM_PROMPT_TOKENS = 180    # fixed per call
TOKENS_PER_COMMENT = 60      # average user comment
OUTPUT_TOKENS = 80           # JSON response

def estimate_cost(num_comments: int,
                  batch_size: int = 20,
                  input_price_per_1k: float = 0.00015,   # gpt-4o-mini pricing
                  output_price_per_1k: float = 0.0006) -> dict:
    """
    Estimate API cost for moderating num_comments.
    Batch calls amortize the system prompt cost.
    """
    num_batches = -(-num_comments // batch_size)  # ceil division

    # Per batch: 1 system prompt + all comments
    input_tokens_per_batch = SYSTEM_PROMPT_TOKENS + (TOKENS_PER_COMMENT * batch_size)
    output_tokens_per_batch = OUTPUT_TOKENS * batch_size

    total_input  = input_tokens_per_batch * num_batches
    total_output = output_tokens_per_batch * num_batches

    cost = (total_input / 1000 * input_price_per_1k +
            total_output / 1000 * output_price_per_1k)

    return {
        "comments": num_comments,
        "batches": num_batches,
        "total_input_tokens": total_input,
        "total_output_tokens": total_output,
        "estimated_cost_usd": round(cost, 4),
        "cost_per_comment_usd": round(cost / num_comments, 6),
    }

# Example
print(estimate_cost(10_000))
# → {'comments': 10000, 'batches': 500, ..., 'estimated_cost_usd': 0.516, ...}
Enter fullscreen mode Exit fullscreen mode

10,000 comments for ~$0.52 with batching. Without batching (single calls), the system prompt overhead alone triples the cost.

Caching layer

Comments are often duplicates — bots submit the same message, users paste the same promotional text. Cache identical inputs permanently; cache near-identical inputs with normalized hashing.

# cache.py
import hashlib
import json
import redis
from typing import Optional

class ModerationCache:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.r = redis.from_url(redis_url, decode_responses=True)
        self.ttl_safe = 3600 * 24 * 7    # cache safe results for 7 days
        self.ttl_unsafe = 3600 * 24 * 30  # cache rejections for 30 days

    def _make_key(self, text: str) -> str:
        """Normalize and hash for cache key."""
        normalized = " ".join(text.lower().split())  # collapse whitespace
        return "moderation:" + hashlib.sha256(normalized.encode()).hexdigest()

    def get(self, text: str) -> Optional[dict]:
        key = self._make_key(text)
        cached = self.r.get(key)
        if cached:
            return json.loads(cached)
        return None

    def set(self, text: str, result: dict):
        key = self._make_key(text)
        is_safe = result.get("category") == "safe"
        ttl = self.ttl_safe if is_safe else self.ttl_unsafe
        self.r.setex(key, ttl, json.dumps(result))

    def get_many(self, texts: list[str]) -> dict[str, Optional[dict]]:
        """Batch cache lookup."""
        keys = [self._make_key(t) for t in texts]
        values = self.r.mget(keys)
        return {
            text: json.loads(val) if val else None
            for text, val in zip(texts, values)
        }
Enter fullscreen mode Exit fullscreen mode

Human review queue

# review_queue.py
import json
import time
import redis
from dataclasses import asdict
from models import PendingComment, ModerationResult

QUEUE_KEY = "moderation:human_review"

class HumanReviewQueue:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.r = redis.from_url(redis_url, decode_responses=True)

    def push(self, comment: PendingComment, result: ModerationResult):
        entry = {
            "comment": asdict(comment),
            "moderation": {
                "category": result.category,
                "confidence": result.confidence,
                "reasoning": result.reasoning,
                "flags": result.flags,
            },
            "queued_at": time.time(),
        }
        self.r.lpush(QUEUE_KEY, json.dumps(entry))

    def pop(self, count: int = 10) -> list[dict]:
        """Fetch next items for human reviewers."""
        items = []
        for _ in range(count):
            raw = self.r.rpop(QUEUE_KEY)
            if raw is None:
                break
            items.append(json.loads(raw))
        return items

    def queue_length(self) -> int:
        return self.r.llen(QUEUE_KEY)
Enter fullscreen mode Exit fullscreen mode

The full pipeline

# pipeline.py
import time
import logging
from typing import Optional

from models import ContentCategory, ModerationResult, PendingComment
from classifier import classify_single, classify_batch
from cache import ModerationCache
from review_queue import HumanReviewQueue

logger = logging.getLogger(__name__)

CONFIDENCE_THRESHOLD = 0.75  # below this → human review

class ModerationPipeline:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.cache = ModerationCache(redis_url)
        self.review_queue = HumanReviewQueue(redis_url)

    def _parse_result(self, raw: dict, start_time: float,
                       cache_hit: bool = False) -> ModerationResult:
        confidence = float(raw.get("confidence", 0.5))
        category_str = raw.get("category", "borderline").lower()

        try:
            category = ContentCategory(category_str)
        except ValueError:
            category = ContentCategory.BORDERLINE

        needs_review = (
            category == ContentCategory.BORDERLINE
            or confidence < CONFIDENCE_THRESHOLD
        )

        return ModerationResult(
            category=category,
            confidence=confidence,
            reasoning=raw.get("reasoning", ""),
            flags=raw.get("flags", []),
            needs_human_review=needs_review,
            processing_time_ms=(time.time() - start_time) * 1000,
            cache_hit=cache_hit,
        )

    def moderate(self, comment: PendingComment) -> ModerationResult:
        """Moderate a single comment with caching."""
        start = time.time()

        # Cache lookup
        cached = self.cache.get(comment.text)
        if cached:
            result = self._parse_result(cached, start, cache_hit=True)
            if result.needs_human_review:
                self.review_queue.push(comment, result)
            return result

        # LLM classification
        try:
            raw = classify_single(comment.text, context=comment.context)
        except Exception as e:
            logger.error("LLM classification failed: %s", e)
            # Fail safe: send to human review
            fallback = ModerationResult(
                category=ContentCategory.BORDERLINE,
                confidence=0.0,
                reasoning=f"Classification failed: {e}",
                flags=["llm_error"],
                needs_human_review=True,
                processing_time_ms=(time.time() - start) * 1000,
            )
            self.review_queue.push(comment, fallback)
            return fallback

        self.cache.set(comment.text, raw)
        result = self._parse_result(raw, start)

        if result.needs_human_review:
            self.review_queue.push(comment, result)

        return result

    def moderate_batch(self, comments: list[PendingComment]) -> list[ModerationResult]:
        """Moderate a batch efficiently, using cache for hits."""
        start = time.time()

        # Bulk cache lookup
        cache_results = self.cache.get_many([c.text for c in comments])

        to_classify = []
        results: dict[str, ModerationResult] = {}

        for comment in comments:
            cached = cache_results.get(comment.text)
            if cached:
                r = self._parse_result(cached, start, cache_hit=True)
                results[comment.comment_id] = r
                if r.needs_human_review:
                    self.review_queue.push(comment, r)
            else:
                to_classify.append(comment)

        # Batch LLM call for cache misses
        if to_classify:
            batch_input = [
                {"id": c.comment_id, "text": c.text, "context": c.context}
                for c in to_classify
            ]
            try:
                raw_results = classify_batch(batch_input)
                raw_by_id = {r.get("id", ""): r for r in raw_results}
            except Exception as e:
                logger.error("Batch classification failed: %s", e)
                raw_by_id = {}

            for comment in to_classify:
                raw = raw_by_id.get(comment.comment_id, {
                    "category": "borderline",
                    "confidence": 0.0,
                    "reasoning": "batch_classification_failed",
                    "flags": ["llm_error"],
                })
                self.cache.set(comment.text, raw)
                r = self._parse_result(raw, start)
                results[comment.comment_id] = r
                if r.needs_human_review:
                    self.review_queue.push(comment, r)

        return [results[c.comment_id] for c in comments if c.comment_id in results]
Enter fullscreen mode Exit fullscreen mode

Usage example

# main.py
from pipeline import ModerationPipeline
from models import PendingComment

pipeline = ModerationPipeline()

# Single comment
comment = PendingComment(
    comment_id="cmt_001",
    text="Great article! Check out my FREE SEO tool at bit.ly/xyz — doubles traffic guaranteed!",
    user_id="user_42",
    context="Article: Introduction to NIS 2"
)

result = pipeline.moderate(comment)
print(f"Category: {result.category.value}")
print(f"Confidence: {result.confidence:.0%}")
print(f"Flags: {result.flags}")
print(f"Needs review: {result.needs_human_review}")
print(f"Time: {result.processing_time_ms:.0f}ms (cached: {result.cache_hit})")

# Batch processing
batch = [
    PendingComment("cmt_002", "This helped me understand the topic, thanks!", "user_1"),
    PendingComment("cmt_003", "CLICK HERE NOW!!! WIN €500 AMAZON VOUCHER", "user_2"),
    PendingComment("cmt_004", "I disagree with point 3, here's why...", "user_3"),
]

results = pipeline.moderate_batch(batch)
for comment, result in zip(batch, results):
    print(f"{comment.comment_id}: {result.category.value} ({result.confidence:.0%})")
Enter fullscreen mode Exit fullscreen mode

Deployment considerations

Action mapping: Define what happens for each category in your application layer, not the pipeline. The pipeline only classifies. Auto-reject spam and phishing. Auto-approve high-confidence safe comments. Hold borderline and low-confidence results.

Review queue throughput: Track your queue_length() over time. If it grows faster than your reviewers clear it, lower the CONFIDENCE_THRESHOLD to 0.80 or add a second LLM pass for borderlines using a stronger model.

Monitoring: Log every classification with its confidence, category, and processing time. Alert when LLM error rate exceeds 2% or when cache hit rate drops below 20% (the latter suggests unusually high content diversity, which may indicate a bot campaign).

False positive rate: Sample 1% of auto-approved comments for human spot-check. If you're seeing more than 1 in 500 false negatives, tighten the confidence threshold or add domain-specific examples to the system prompt.

This pipeline handles tens of thousands of comments per day for under $5 in API costs, with the cache absorbing 30–60% of volume in typical deployments.

Top comments (0)