DEV Community

Vinicius Fagundes
Vinicius Fagundes

Posted on

Human-in-the-Loop Systems: Building AI That Knows When to Ask for Help

Quick Reference: Terms You'll Encounter

Technical Acronyms:

  • HITL: Human-in-the-Loop—systems requiring human oversight or intervention
  • QA: Quality Assurance—verification of output correctness
  • SLA: Service Level Agreement—contractual performance guarantees
  • RLHF: Reinforcement Learning from Human Feedback—training method using human preferences

Statistical & Mathematical Terms:

  • Confidence Score: Model's self-assessed certainty (0-1 scale)
  • Precision: True positives / (True positives + False positives)
  • Recall: True positives / (True positives + False negatives)
  • Threshold: Cutoff value determining automatic vs. manual processing
  • Calibration: How well confidence scores reflect actual accuracy

Introduction: Why Fully Autonomous AI Is Often Wrong

Imagine you're building a self-checkout system for a grocery store. You could make it fully autonomous—scan items, charge cards, done. But what happens when:

  • Someone scans a banana but puts a steak in the bag?
  • The barcode is damaged and won't scan?
  • A customer disputes a price?

You need humans. Not for every transaction, but for the exceptions. The art is knowing when to escalate.

This is the HITL problem. Escalate too often, and you've just built an expensive way to route everything to humans. Escalate too rarely, and errors slip through, customers get angry, and trust erodes.

Here's another analogy: HITL is like a hospital triage system. Minor issues get self-service (bandages, ice packs). Moderate issues see a nurse. Serious issues go straight to a doctor. The triage nurse's job is classification—getting the right cases to the right level of attention.

A third way to think about it: Autonomous AI is cruise control; HITL is a co-pilot. Cruise control handles highway driving. But merging, construction zones, and bad weather? You want a human ready to take over. The best systems know when to hand back control.


Confidence Scoring: Teaching AI to Know What It Doesn't Know

from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
import re

class ConfidenceLevel(Enum):
    HIGH = "high"           # Proceed automatically
    MEDIUM = "medium"       # Proceed with logging
    LOW = "low"             # Flag for review
    CRITICAL = "critical"   # Block until human approval

@dataclass
class ConfidenceResult:
    """Result with confidence assessment."""
    output: Any
    confidence_score: float  # 0-1
    confidence_level: ConfidenceLevel
    reasoning: str
    signals: Dict[str, float] = field(default_factory=dict)
    should_escalate: bool = False

class ConfidenceEstimator:
    """
    Estimate confidence in LLM outputs.

    Multiple signals combined:
    1. Model's self-reported confidence
    2. Output consistency (multiple samples)
    3. Semantic similarity to known patterns
    4. Length and structure heuristics
    """

    def __init__(
        self,
        high_threshold: float = 0.85,
        medium_threshold: float = 0.65,
        low_threshold: float = 0.40
    ):
        self.thresholds = {
            ConfidenceLevel.HIGH: high_threshold,
            ConfidenceLevel.MEDIUM: medium_threshold,
            ConfidenceLevel.LOW: low_threshold
        }

    def _score_to_level(self, score: float) -> ConfidenceLevel:
        """Convert numeric score to confidence level."""
        if score >= self.thresholds[ConfidenceLevel.HIGH]:
            return ConfidenceLevel.HIGH
        elif score >= self.thresholds[ConfidenceLevel.MEDIUM]:
            return ConfidenceLevel.MEDIUM
        elif score >= self.thresholds[ConfidenceLevel.LOW]:
            return ConfidenceLevel.LOW
        return ConfidenceLevel.CRITICAL

    def estimate_from_logprobs(
        self, 
        token_logprobs: List[float]
    ) -> float:
        """
        Estimate confidence from token log probabilities.

        Higher average logprob = more confident generation.
        """
        if not token_logprobs:
            return 0.5

        # Convert logprobs to probabilities and average
        probs = [np.exp(lp) for lp in token_logprobs]
        avg_prob = np.mean(probs)

        # Penalize high variance (inconsistent confidence)
        variance_penalty = np.std(probs) * 0.5

        return max(0, min(1, avg_prob - variance_penalty))

    def estimate_from_consistency(
        self,
        outputs: List[str],
        llm_client=None
    ) -> Tuple[float, str]:
        """
        Estimate confidence by generating multiple outputs and checking agreement.

        High agreement = high confidence
        """
        if len(outputs) < 2:
            return 0.5, outputs[0] if outputs else ""

        # Simple consistency: check if outputs are similar
        # In production, use semantic similarity
        unique_outputs = list(set(outputs))

        if len(unique_outputs) == 1:
            return 0.95, outputs[0]  # Perfect agreement

        # Count most common output
        from collections import Counter
        counts = Counter(outputs)
        most_common, count = counts.most_common(1)[0]

        agreement_ratio = count / len(outputs)

        return agreement_ratio, most_common

    def estimate_from_structure(self, output: str, expected_format: str = None) -> float:
        """
        Estimate confidence based on output structure.

        Well-formed outputs suggest higher confidence.
        """
        score = 0.7  # Base score

        # Check for hedging language (reduces confidence)
        hedging_phrases = [
            "i'm not sure", "i think", "maybe", "possibly",
            "it might be", "i believe", "probably"
        ]
        output_lower = output.lower()
        hedging_count = sum(1 for phrase in hedging_phrases if phrase in output_lower)
        score -= hedging_count * 0.1

        # Check for confident language (increases confidence)
        confident_phrases = [
            "definitely", "certainly", "the answer is",
            "specifically", "exactly"
        ]
        confident_count = sum(1 for phrase in confident_phrases if phrase in output_lower)
        score += confident_count * 0.05

        # Check expected format if provided
        if expected_format == "json":
            try:
                import json
                json.loads(output)
                score += 0.1
            except:
                score -= 0.2

        return max(0, min(1, score))

    def estimate(
        self,
        output: str,
        token_logprobs: List[float] = None,
        alternative_outputs: List[str] = None,
        expected_format: str = None
    ) -> ConfidenceResult:
        """
        Combine multiple signals into overall confidence estimate.
        """
        signals = {}

        # Signal 1: Log probabilities
        if token_logprobs:
            signals["logprob"] = self.estimate_from_logprobs(token_logprobs)

        # Signal 2: Consistency
        if alternative_outputs:
            consistency_score, _ = self.estimate_from_consistency(
                [output] + alternative_outputs
            )
            signals["consistency"] = consistency_score

        # Signal 3: Structure
        signals["structure"] = self.estimate_from_structure(output, expected_format)

        # Combine signals (weighted average)
        weights = {"logprob": 0.4, "consistency": 0.4, "structure": 0.2}

        total_weight = sum(weights[k] for k in signals.keys())
        if total_weight > 0:
            combined_score = sum(
                signals[k] * weights[k] 
                for k in signals.keys()
            ) / total_weight
        else:
            combined_score = 0.5

        level = self._score_to_level(combined_score)

        return ConfidenceResult(
            output=output,
            confidence_score=combined_score,
            confidence_level=level,
            reasoning=self._generate_reasoning(signals, level),
            signals=signals,
            should_escalate=level in [ConfidenceLevel.LOW, ConfidenceLevel.CRITICAL]
        )

    def _generate_reasoning(
        self, 
        signals: Dict[str, float], 
        level: ConfidenceLevel
    ) -> str:
        """Generate human-readable reasoning for confidence level."""
        reasons = []

        if signals.get("logprob", 1) < 0.5:
            reasons.append("low token probabilities")
        if signals.get("consistency", 1) < 0.7:
            reasons.append("inconsistent across samples")
        if signals.get("structure", 1) < 0.5:
            reasons.append("hedging language detected")

        if not reasons:
            return f"Confidence {level.value}: all signals positive"

        return f"Confidence {level.value}: {', '.join(reasons)}"


# Example usage
if __name__ == "__main__":
    estimator = ConfidenceEstimator()

    # High confidence output
    high_conf = estimator.estimate(
        output="The capital of France is Paris.",
        alternative_outputs=["The capital of France is Paris.", "Paris is the capital of France."],
        expected_format=None
    )
    print(f"High confidence example:")
    print(f"  Score: {high_conf.confidence_score:.2f}")
    print(f"  Level: {high_conf.confidence_level.value}")
    print(f"  Escalate: {high_conf.should_escalate}")

    # Low confidence output
    low_conf = estimator.estimate(
        output="I think the answer might be around 42, but I'm not entirely sure.",
        alternative_outputs=["Maybe 42?", "It could be 40 or 42", "I believe it's 42"],
        expected_format=None
    )
    print(f"\nLow confidence example:")
    print(f"  Score: {low_conf.confidence_score:.2f}")
    print(f"  Level: {low_conf.confidence_level.value}")
    print(f"  Escalate: {low_conf.should_escalate}")
    print(f"  Reasoning: {low_conf.reasoning}")
Enter fullscreen mode Exit fullscreen mode

Review Queues: Managing the Human Workflow

from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import uuid
import heapq
import threading
import time

class ReviewPriority(Enum):
    URGENT = 1      # SLA: 15 minutes
    HIGH = 2        # SLA: 1 hour
    MEDIUM = 3      # SLA: 4 hours
    LOW = 4         # SLA: 24 hours

class ReviewStatus(Enum):
    PENDING = "pending"
    ASSIGNED = "assigned"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    EXPIRED = "expired"

@dataclass
class ReviewItem:
    """An item requiring human review."""
    id: str
    task_type: str
    input_data: Dict[str, Any]
    ai_output: Any
    confidence_score: float
    priority: ReviewPriority
    status: ReviewStatus = ReviewStatus.PENDING
    created_at: datetime = field(default_factory=datetime.utcnow)
    assigned_to: Optional[str] = None
    assigned_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    human_decision: Optional[str] = None
    human_correction: Optional[Any] = None
    feedback: Optional[str] = None

    @property
    def sla_deadline(self) -> datetime:
        """Calculate SLA deadline based on priority."""
        sla_hours = {
            ReviewPriority.URGENT: 0.25,
            ReviewPriority.HIGH: 1,
            ReviewPriority.MEDIUM: 4,
            ReviewPriority.LOW: 24
        }
        return self.created_at + timedelta(hours=sla_hours[self.priority])

    @property
    def is_overdue(self) -> bool:
        return datetime.utcnow() > self.sla_deadline and self.status == ReviewStatus.PENDING

    def __lt__(self, other):
        # For priority queue: lower priority value = higher priority
        if self.priority.value != other.priority.value:
            return self.priority.value < other.priority.value
        return self.created_at < other.created_at


class ReviewQueue:
    """
    Priority queue for human review items.

    Features:
    - Priority-based ordering
    - SLA tracking
    - Assignment management
    - Metrics collection
    """

    def __init__(self):
        self._queue: List[ReviewItem] = []
        self._items: Dict[str, ReviewItem] = {}
        self._lock = threading.Lock()
        self._metrics = {
            "total_items": 0,
            "completed_items": 0,
            "sla_breaches": 0,
            "total_review_time_seconds": 0
        }

    def add(self, item: ReviewItem):
        """Add item to review queue."""
        with self._lock:
            heapq.heappush(self._queue, item)
            self._items[item.id] = item
            self._metrics["total_items"] += 1

    def get_next(self, reviewer_id: str) -> Optional[ReviewItem]:
        """Get next item for review and assign to reviewer."""
        with self._lock:
            while self._queue:
                item = heapq.heappop(self._queue)

                if item.status != ReviewStatus.PENDING:
                    continue

                item.status = ReviewStatus.ASSIGNED
                item.assigned_to = reviewer_id
                item.assigned_at = datetime.utcnow()

                return item

            return None

    def complete(
        self,
        item_id: str,
        decision: str,
        correction: Any = None,
        feedback: str = None
    ):
        """Mark item as reviewed."""
        with self._lock:
            if item_id not in self._items:
                raise ValueError(f"Unknown item: {item_id}")

            item = self._items[item_id]
            item.status = ReviewStatus.COMPLETED
            item.completed_at = datetime.utcnow()
            item.human_decision = decision
            item.human_correction = correction
            item.feedback = feedback

            # Update metrics
            self._metrics["completed_items"] += 1
            if item.assigned_at:
                review_time = (item.completed_at - item.assigned_at).total_seconds()
                self._metrics["total_review_time_seconds"] += review_time

            if item.is_overdue:
                self._metrics["sla_breaches"] += 1

    def get_pending_count(self) -> Dict[ReviewPriority, int]:
        """Get count of pending items by priority."""
        counts = {p: 0 for p in ReviewPriority}
        with self._lock:
            for item in self._items.values():
                if item.status == ReviewStatus.PENDING:
                    counts[item.priority] += 1
        return counts

    def get_overdue_items(self) -> List[ReviewItem]:
        """Get all overdue items."""
        with self._lock:
            return [
                item for item in self._items.values()
                if item.is_overdue
            ]

    def get_metrics(self) -> Dict[str, Any]:
        """Get queue metrics."""
        with self._lock:
            completed = self._metrics["completed_items"]
            avg_time = (
                self._metrics["total_review_time_seconds"] / completed
                if completed > 0 else 0
            )

            return {
                **self._metrics,
                "pending_items": len([i for i in self._items.values() if i.status == ReviewStatus.PENDING]),
                "avg_review_time_seconds": round(avg_time, 1),
                "sla_compliance_rate": round(
                    (completed - self._metrics["sla_breaches"]) / completed * 100
                    if completed > 0 else 100, 1
                )
            }


class EscalationRouter:
    """
    Route items to appropriate review queues based on rules.
    """

    def __init__(self):
        self.queues: Dict[str, ReviewQueue] = {}
        self.rules: List[Dict] = []

    def register_queue(self, name: str, queue: ReviewQueue):
        """Register a review queue."""
        self.queues[name] = queue

    def add_rule(
        self,
        condition: Callable[[ConfidenceResult, Dict], bool],
        queue_name: str,
        priority: ReviewPriority
    ):
        """Add routing rule."""
        self.rules.append({
            "condition": condition,
            "queue": queue_name,
            "priority": priority
        })

    def route(
        self,
        confidence_result: ConfidenceResult,
        context: Dict[str, Any]
    ) -> Optional[str]:
        """Route to appropriate queue based on rules."""
        for rule in self.rules:
            if rule["condition"](confidence_result, context):
                queue = self.queues.get(rule["queue"])
                if queue:
                    item = ReviewItem(
                        id=str(uuid.uuid4())[:8],
                        task_type=context.get("task_type", "unknown"),
                        input_data=context.get("input", {}),
                        ai_output=confidence_result.output,
                        confidence_score=confidence_result.confidence_score,
                        priority=rule["priority"]
                    )
                    queue.add(item)
                    return item.id
        return None


# Example setup
def create_standard_router() -> EscalationRouter:
    """Create router with standard escalation rules."""
    router = EscalationRouter()

    # Create queues
    router.register_queue("critical", ReviewQueue())
    router.register_queue("standard", ReviewQueue())
    router.register_queue("audit", ReviewQueue())

    # Rule 1: Critical confidence → urgent review
    router.add_rule(
        condition=lambda cr, ctx: cr.confidence_level == ConfidenceLevel.CRITICAL,
        queue_name="critical",
        priority=ReviewPriority.URGENT
    )

    # Rule 2: Low confidence → standard review
    router.add_rule(
        condition=lambda cr, ctx: cr.confidence_level == ConfidenceLevel.LOW,
        queue_name="standard",
        priority=ReviewPriority.HIGH
    )

    # Rule 3: High-value transactions → audit regardless of confidence
    router.add_rule(
        condition=lambda cr, ctx: ctx.get("value", 0) > 10000,
        queue_name="audit",
        priority=ReviewPriority.MEDIUM
    )

    # Rule 4: Random sampling for quality assurance
    router.add_rule(
        condition=lambda cr, ctx: hash(str(ctx)) % 100 < 5,  # 5% sample
        queue_name="audit",
        priority=ReviewPriority.LOW
    )

    return router
Enter fullscreen mode Exit fullscreen mode

Feedback Loops: Learning from Human Corrections

from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from collections import defaultdict
import json

@dataclass
class FeedbackRecord:
    """Record of human feedback on AI output."""
    id: str
    timestamp: datetime
    task_type: str
    input_data: Dict[str, Any]
    ai_output: Any
    ai_confidence: float
    human_decision: str  # "approve", "reject", "correct"
    human_output: Optional[Any] = None
    correction_type: Optional[str] = None  # "minor", "major", "complete_rewrite"
    feedback_text: Optional[str] = None
    reviewer_id: Optional[str] = None

class FeedbackCollector:
    """
    Collect and analyze human feedback for model improvement.
    """

    def __init__(self):
        self.records: List[FeedbackRecord] = []
        self.by_task_type: Dict[str, List[FeedbackRecord]] = defaultdict(list)
        self.by_decision: Dict[str, List[FeedbackRecord]] = defaultdict(list)

    def add(self, record: FeedbackRecord):
        """Add feedback record."""
        self.records.append(record)
        self.by_task_type[record.task_type].append(record)
        self.by_decision[record.human_decision].append(record)

    def get_accuracy_by_confidence(self, bins: int = 10) -> Dict[str, float]:
        """
        Calculate accuracy at different confidence levels.

        Useful for calibrating confidence thresholds.
        """
        bin_size = 1.0 / bins
        results = {}

        for i in range(bins):
            low = i * bin_size
            high = (i + 1) * bin_size

            in_bin = [
                r for r in self.records
                if low <= r.ai_confidence < high
            ]

            if in_bin:
                approved = sum(1 for r in in_bin if r.human_decision == "approve")
                accuracy = approved / len(in_bin)
                results[f"{low:.1f}-{high:.1f}"] = round(accuracy, 3)

        return results

    def get_common_corrections(self, task_type: str = None, limit: int = 10) -> List[Dict]:
        """
        Identify common correction patterns.

        Useful for identifying systematic errors.
        """
        corrections = [
            r for r in self.records
            if r.human_decision == "correct"
            and (task_type is None or r.task_type == task_type)
        ]

        # Group by correction type
        by_type = defaultdict(list)
        for c in corrections:
            by_type[c.correction_type or "unspecified"].append(c)

        results = []
        for ctype, records in sorted(by_type.items(), key=lambda x: -len(x[1])):
            results.append({
                "correction_type": ctype,
                "count": len(records),
                "avg_confidence": round(
                    sum(r.ai_confidence for r in records) / len(records), 2
                ),
                "examples": [r.feedback_text for r in records[:3] if r.feedback_text]
            })

        return results[:limit]

    def get_calibration_analysis(self) -> Dict[str, Any]:
        """
        Analyze how well confidence scores predict accuracy.

        Well-calibrated: 80% confidence → 80% accuracy
        """
        accuracy_by_conf = self.get_accuracy_by_confidence()

        # Calculate calibration error
        errors = []
        for conf_range, accuracy in accuracy_by_conf.items():
            expected = float(conf_range.split("-")[0]) + 0.05  # Midpoint
            errors.append(abs(expected - accuracy))

        avg_calibration_error = sum(errors) / len(errors) if errors else 0

        return {
            "accuracy_by_confidence": accuracy_by_conf,
            "avg_calibration_error": round(avg_calibration_error, 3),
            "is_well_calibrated": avg_calibration_error < 0.1,
            "recommendation": self._get_calibration_recommendation(accuracy_by_conf)
        }

    def _get_calibration_recommendation(self, accuracy_by_conf: Dict) -> str:
        """Generate recommendation based on calibration analysis."""
        if not accuracy_by_conf:
            return "Insufficient data for calibration analysis"

        # Check if low confidence is actually low accuracy
        low_conf_acc = accuracy_by_conf.get("0.0-0.1", accuracy_by_conf.get("0.1-0.2", 0.5))
        high_conf_acc = accuracy_by_conf.get("0.9-1.0", accuracy_by_conf.get("0.8-0.9", 0.5))

        if high_conf_acc < 0.8:
            return "Model is overconfident. Lower confidence thresholds."
        if low_conf_acc > 0.6:
            return "Model is underconfident. Raise confidence thresholds."

        return "Confidence calibration is reasonable."

    def export_training_data(self, format: str = "jsonl") -> str:
        """
        Export corrections as training data for model improvement.
        """
        training_examples = []

        for record in self.records:
            if record.human_decision == "correct" and record.human_output:
                example = {
                    "input": record.input_data,
                    "ai_output": record.ai_output,
                    "human_output": record.human_output,
                    "task_type": record.task_type
                }
                training_examples.append(example)

        if format == "jsonl":
            return "\n".join(json.dumps(ex) for ex in training_examples)
        return json.dumps(training_examples, indent=2)


class AdaptiveThresholdManager:
    """
    Dynamically adjust confidence thresholds based on feedback.

    Goal: Minimize escalations while maintaining quality targets.
    """

    def __init__(
        self,
        initial_threshold: float = 0.7,
        target_accuracy: float = 0.95,
        min_threshold: float = 0.5,
        max_threshold: float = 0.95,
        adjustment_rate: float = 0.02
    ):
        self.threshold = initial_threshold
        self.target_accuracy = target_accuracy
        self.min_threshold = min_threshold
        self.max_threshold = max_threshold
        self.adjustment_rate = adjustment_rate
        self.history: List[Dict] = []

    def update(self, recent_accuracy: float, sample_size: int):
        """
        Update threshold based on recent accuracy.

        - Accuracy below target → raise threshold (more escalations)
        - Accuracy above target → lower threshold (fewer escalations)
        """
        if sample_size < 20:
            return  # Not enough data

        old_threshold = self.threshold

        if recent_accuracy < self.target_accuracy:
            # Quality issue: be more conservative
            self.threshold = min(
                self.max_threshold,
                self.threshold + self.adjustment_rate
            )
        elif recent_accuracy > self.target_accuracy + 0.03:
            # Over-escalating: be more permissive
            self.threshold = max(
                self.min_threshold,
                self.threshold - self.adjustment_rate
            )

        self.history.append({
            "timestamp": datetime.utcnow().isoformat(),
            "old_threshold": old_threshold,
            "new_threshold": self.threshold,
            "accuracy": recent_accuracy,
            "sample_size": sample_size
        })

    def should_escalate(self, confidence: float) -> bool:
        """Check if confidence score requires escalation."""
        return confidence < self.threshold

    def get_stats(self) -> Dict[str, Any]:
        """Get threshold management statistics."""
        return {
            "current_threshold": self.threshold,
            "target_accuracy": self.target_accuracy,
            "adjustments_made": len(self.history),
            "recent_history": self.history[-5:]
        }
Enter fullscreen mode Exit fullscreen mode

Active Learning: Prioritizing What to Label

from typing import List, Dict, Any, Callable, Tuple
from dataclasses import dataclass
import numpy as np
from collections import defaultdict

@dataclass
class UnlabeledItem:
    """An item awaiting human labeling."""
    id: str
    data: Dict[str, Any]
    ai_prediction: Any
    confidence: float
    uncertainty_score: float
    diversity_score: float
    priority_score: float

class ActiveLearningSelector:
    """
    Select most valuable items for human labeling.

    Strategies:
    1. Uncertainty sampling: Label items model is least confident about
    2. Diversity sampling: Label items that are different from existing labels
    3. Expected model change: Label items that would most improve the model

    Goal: Maximize model improvement per human label.
    """

    def __init__(
        self,
        uncertainty_weight: float = 0.5,
        diversity_weight: float = 0.3,
        representativeness_weight: float = 0.2
    ):
        self.weights = {
            "uncertainty": uncertainty_weight,
            "diversity": diversity_weight,
            "representativeness": representativeness_weight
        }
        self.labeled_embeddings: List[np.ndarray] = []

    def _uncertainty_score(self, confidence: float) -> float:
        """
        Higher score for lower confidence.
        Items near decision boundary are most informative.
        """
        # Transform: 0.5 confidence → highest uncertainty
        return 1 - abs(confidence - 0.5) * 2

    def _diversity_score(
        self, 
        embedding: np.ndarray,
        existing_embeddings: List[np.ndarray]
    ) -> float:
        """
        Higher score for items far from already-labeled items.
        Promotes coverage of the input space.
        """
        if not existing_embeddings:
            return 1.0

        # Minimum distance to any labeled item
        distances = [
            np.linalg.norm(embedding - e)
            for e in existing_embeddings
        ]
        min_distance = min(distances)

        # Normalize (assuming embeddings are unit normalized)
        return min(1.0, min_distance / 2)

    def _representativeness_score(
        self,
        embedding: np.ndarray,
        all_embeddings: List[np.ndarray]
    ) -> float:
        """
        Higher score for items similar to many unlabeled items.
        Labeling representative items helps generalization.
        """
        if len(all_embeddings) < 2:
            return 0.5

        # Average similarity to other items
        similarities = [
            np.dot(embedding, e)
            for e in all_embeddings
            if not np.array_equal(embedding, e)
        ]

        return np.mean(similarities) if similarities else 0.5

    def score_items(
        self,
        items: List[Dict[str, Any]],
        embedding_fn: Callable[[Dict], np.ndarray]
    ) -> List[UnlabeledItem]:
        """Score items for labeling priority."""

        # Get embeddings
        embeddings = [embedding_fn(item["data"]) for item in items]

        scored_items = []
        for i, item in enumerate(items):
            uncertainty = self._uncertainty_score(item["confidence"])
            diversity = self._diversity_score(embeddings[i], self.labeled_embeddings)
            representativeness = self._representativeness_score(embeddings[i], embeddings)

            # Weighted combination
            priority = (
                self.weights["uncertainty"] * uncertainty +
                self.weights["diversity"] * diversity +
                self.weights["representativeness"] * representativeness
            )

            scored_items.append(UnlabeledItem(
                id=item["id"],
                data=item["data"],
                ai_prediction=item.get("prediction"),
                confidence=item["confidence"],
                uncertainty_score=uncertainty,
                diversity_score=diversity,
                priority_score=priority
            ))

        # Sort by priority (highest first)
        scored_items.sort(key=lambda x: x.priority_score, reverse=True)

        return scored_items

    def select_batch(
        self,
        items: List[Dict[str, Any]],
        batch_size: int,
        embedding_fn: Callable[[Dict], np.ndarray]
    ) -> List[UnlabeledItem]:
        """
        Select a batch of items for labeling.

        Uses iterative selection to maintain diversity within batch.
        """
        scored = self.score_items(items, embedding_fn)

        selected = []
        selected_embeddings = []

        for item in scored:
            if len(selected) >= batch_size:
                break

            # Recompute diversity against selected batch
            if selected_embeddings:
                item_embedding = embedding_fn(item.data)
                batch_diversity = self._diversity_score(
                    item_embedding, 
                    selected_embeddings
                )
                # Skip if too similar to already selected
                if batch_diversity < 0.3:
                    continue
                selected_embeddings.append(item_embedding)
            else:
                selected_embeddings.append(embedding_fn(item.data))

            selected.append(item)

        return selected

    def record_label(self, item_embedding: np.ndarray):
        """Record that an item has been labeled."""
        self.labeled_embeddings.append(item_embedding)


class LabelingPipeline:
    """
    End-to-end labeling pipeline with active learning.
    """

    def __init__(
        self,
        selector: ActiveLearningSelector,
        review_queue: ReviewQueue,
        feedback_collector: FeedbackCollector
    ):
        self.selector = selector
        self.queue = review_queue
        self.collector = feedback_collector
        self.stats = defaultdict(int)

    def submit_batch(
        self,
        items: List[Dict[str, Any]],
        embedding_fn: Callable,
        batch_size: int = 10
    ) -> List[str]:
        """Submit items for labeling, selecting most valuable ones."""

        # Select best items
        selected = self.selector.select_batch(items, batch_size, embedding_fn)

        # Add to review queue
        item_ids = []
        for item in selected:
            review_item = ReviewItem(
                id=item.id,
                task_type="labeling",
                input_data=item.data,
                ai_output=item.ai_prediction,
                confidence_score=item.confidence,
                priority=self._confidence_to_priority(item.confidence)
            )
            self.queue.add(review_item)
            item_ids.append(item.id)
            self.stats["submitted"] += 1

        return item_ids

    def _confidence_to_priority(self, confidence: float) -> ReviewPriority:
        """Map confidence to review priority."""
        if confidence < 0.3:
            return ReviewPriority.HIGH
        elif confidence < 0.5:
            return ReviewPriority.MEDIUM
        return ReviewPriority.LOW

    def process_label(
        self,
        item_id: str,
        human_label: Any,
        embedding_fn: Callable
    ):
        """Process a completed label."""
        # Record in feedback collector
        # (In production, fetch item details from queue)

        self.stats["labeled"] += 1

        # Update selector's knowledge of labeled space
        # (Would need item data here in production)

    def get_stats(self) -> Dict[str, Any]:
        """Get pipeline statistics."""
        return {
            **dict(self.stats),
            "queue_metrics": self.queue.get_metrics(),
            "labeling_efficiency": self.stats["labeled"] / max(1, self.stats["submitted"])
        }
Enter fullscreen mode Exit fullscreen mode

Production HITL System

from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

@dataclass
class HITLConfig:
    """Configuration for HITL system."""
    # Escalation thresholds
    auto_approve_threshold: float = 0.90
    review_threshold: float = 0.70
    block_threshold: float = 0.40

    # Queue settings
    enable_priority_queue: bool = True
    sla_urgent_minutes: int = 15
    sla_standard_hours: int = 4

    # Feedback settings
    enable_adaptive_thresholds: bool = True
    feedback_sample_rate: float = 0.05  # 5% random sampling

    # Active learning
    enable_active_learning: bool = True
    labeling_batch_size: int = 20

class HITLSystem:
    """
    Production Human-in-the-Loop system.

    Integrates:
    - Confidence estimation
    - Review queue management  
    - Feedback collection
    - Adaptive thresholds
    - Active learning
    """

    def __init__(self, config: HITLConfig = None):
        self.config = config or HITLConfig()

        # Components
        self.confidence_estimator = ConfidenceEstimator(
            high_threshold=self.config.auto_approve_threshold,
            medium_threshold=self.config.review_threshold,
            low_threshold=self.config.block_threshold
        )
        self.router = create_standard_router()
        self.feedback = FeedbackCollector()
        self.threshold_manager = AdaptiveThresholdManager(
            initial_threshold=self.config.review_threshold
        )

        # Metrics
        self.metrics = {
            "total_processed": 0,
            "auto_approved": 0,
            "escalated": 0,
            "blocked": 0
        }

    def process(
        self,
        task_type: str,
        input_data: Dict[str, Any],
        ai_output: Any,
        token_logprobs: List[float] = None,
        context: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """
        Process an AI output through the HITL system.

        Returns decision and any review item created.
        """
        self.metrics["total_processed"] += 1
        context = context or {}

        # Estimate confidence
        confidence_result = self.confidence_estimator.estimate(
            output=str(ai_output),
            token_logprobs=token_logprobs
        )

        # Check adaptive threshold
        effective_threshold = self.threshold_manager.threshold

        # Determine action
        if confidence_result.confidence_score >= self.config.auto_approve_threshold:
            action = "auto_approve"
            self.metrics["auto_approved"] += 1

            # Random sampling for quality monitoring
            if self._should_sample():
                self._create_audit_item(task_type, input_data, ai_output, confidence_result)

        elif confidence_result.confidence_score >= effective_threshold:
            action = "approve_with_logging"
            self.metrics["auto_approved"] += 1
            logger.info(f"Approved with medium confidence: {confidence_result.confidence_score:.2f}")

        elif confidence_result.confidence_score >= self.config.block_threshold:
            action = "escalate"
            self.metrics["escalated"] += 1
            review_id = self.router.route(confidence_result, {
                "task_type": task_type,
                "input": input_data,
                **context
            })
            return {
                "action": action,
                "review_id": review_id,
                "confidence": confidence_result.confidence_score,
                "reasoning": confidence_result.reasoning
            }

        else:
            action = "block"
            self.metrics["blocked"] += 1
            review_id = self.router.route(confidence_result, {
                "task_type": task_type,
                "input": input_data,
                "priority_override": ReviewPriority.URGENT,
                **context
            })
            return {
                "action": action,
                "review_id": review_id,
                "confidence": confidence_result.confidence_score,
                "reasoning": confidence_result.reasoning,
                "blocked": True
            }

        return {
            "action": action,
            "output": ai_output,
            "confidence": confidence_result.confidence_score,
            "reasoning": confidence_result.reasoning
        }

    def _should_sample(self) -> bool:
        """Determine if this item should be sampled for QA."""
        import random
        return random.random() < self.config.feedback_sample_rate

    def _create_audit_item(
        self,
        task_type: str,
        input_data: Dict,
        ai_output: Any,
        confidence: ConfidenceResult
    ):
        """Create audit item for random sampling."""
        item = ReviewItem(
            id=f"audit_{datetime.utcnow().timestamp()}",
            task_type=task_type,
            input_data=input_data,
            ai_output=ai_output,
            confidence_score=confidence.confidence_score,
            priority=ReviewPriority.LOW
        )
        self.router.queues["audit"].add(item)

    def record_feedback(
        self,
        review_id: str,
        decision: str,
        correction: Any = None,
        feedback_text: str = None
    ):
        """Record human feedback and update system."""
        # Find the review item
        for queue in self.router.queues.values():
            if review_id in queue._items:
                item = queue._items[review_id]

                # Complete the review
                queue.complete(review_id, decision, correction, feedback_text)

                # Record feedback
                record = FeedbackRecord(
                    id=review_id,
                    timestamp=datetime.utcnow(),
                    task_type=item.task_type,
                    input_data=item.input_data,
                    ai_output=item.ai_output,
                    ai_confidence=item.confidence_score,
                    human_decision=decision,
                    human_output=correction,
                    feedback_text=feedback_text
                )
                self.feedback.add(record)

                # Update adaptive thresholds
                if self.config.enable_adaptive_thresholds:
                    self._update_thresholds()

                return

        logger.warning(f"Review item not found: {review_id}")

    def _update_thresholds(self):
        """Update thresholds based on recent feedback."""
        recent_records = self.feedback.records[-100:]
        if len(recent_records) < 20:
            return

        approved = sum(1 for r in recent_records if r.human_decision == "approve")
        accuracy = approved / len(recent_records)

        self.threshold_manager.update(accuracy, len(recent_records))

    def get_dashboard_data(self) -> Dict[str, Any]:
        """Get data for monitoring dashboard."""
        return {
            "metrics": self.metrics,
            "escalation_rate": self.metrics["escalated"] / max(1, self.metrics["total_processed"]),
            "auto_approve_rate": self.metrics["auto_approved"] / max(1, self.metrics["total_processed"]),
            "queue_status": {
                name: queue.get_metrics()
                for name, queue in self.router.queues.items()
            },
            "threshold_status": self.threshold_manager.get_stats(),
            "calibration": self.feedback.get_calibration_analysis() if len(self.feedback.records) > 50 else None
        }


# Example usage
if __name__ == "__main__":
    system = HITLSystem()

    # Simulate processing
    test_cases = [
        {"confidence": 0.95, "output": "High confidence answer"},
        {"confidence": 0.75, "output": "Medium confidence answer"},
        {"confidence": 0.50, "output": "Low confidence answer"},
        {"confidence": 0.25, "output": "Very low confidence answer"},
    ]

    print("=== HITL System Demo ===\n")

    for i, case in enumerate(test_cases):
        result = system.process(
            task_type="qa",
            input_data={"question": f"Test question {i}"},
            ai_output=case["output"],
            token_logprobs=[-0.1] * 10 if case["confidence"] > 0.7 else [-1.0] * 10
        )

        print(f"Case {i+1} (conf={case['confidence']}):")
        print(f"  Action: {result['action']}")
        print(f"  Confidence: {result['confidence']:.2f}")
        if result.get("review_id"):
            print(f"  Review ID: {result['review_id']}")
        print()

    print("Dashboard Data:")
    dashboard = system.get_dashboard_data()
    print(f"  Total processed: {dashboard['metrics']['total_processed']}")
    print(f"  Auto-approve rate: {dashboard['auto_approve_rate']:.1%}")
    print(f"  Escalation rate: {dashboard['escalation_rate']:.1%}")
Enter fullscreen mode Exit fullscreen mode

Data Engineer's ROI Lens: The Business Impact

def analyze_hitl_roi(
    daily_ai_decisions: int,
    error_cost: float = 500,
    human_review_cost: float = 5,
    ai_accuracy_without_hitl: float = 0.92,
    ai_accuracy_with_hitl: float = 0.99,
    escalation_rate: float = 0.15
) -> Dict:
    """Calculate ROI of HITL implementation."""

    monthly_decisions = daily_ai_decisions * 30

    # Without HITL
    errors_without = monthly_decisions * (1 - ai_accuracy_without_hitl)
    cost_without = errors_without * error_cost

    # With HITL
    escalated = monthly_decisions * escalation_rate
    auto_approved = monthly_decisions - escalated

    # Errors in auto-approved (some slip through)
    auto_error_rate = 1 - (ai_accuracy_with_hitl / (1 - escalation_rate * 0.5))
    auto_errors = auto_approved * max(0, auto_error_rate) * 0.5

    # Review cost
    review_cost = escalated * human_review_cost

    # Remaining errors (humans catch most but not all)
    human_miss_rate = 0.02  # Humans miss 2% of escalated errors
    escalated_errors = escalated * (1 - ai_accuracy_without_hitl) * human_miss_rate

    total_errors_with = auto_errors + escalated_errors
    error_cost_with = total_errors_with * error_cost
    total_cost_with = error_cost_with + review_cost

    # Setup costs
    setup_hours = 80
    setup_cost = setup_hours * 150

    monthly_savings = cost_without - total_cost_with

    return {
        "monthly_decisions": monthly_decisions,
        "without_hitl": {
            "errors": int(errors_without),
            "cost": round(cost_without, 2)
        },
        "with_hitl": {
            "escalated": int(escalated),
            "review_cost": round(review_cost, 2),
            "errors": int(total_errors_with),
            "error_cost": round(error_cost_with, 2),
            "total_cost": round(total_cost_with, 2)
        },
        "monthly_savings": round(monthly_savings, 2),
        "annual_savings": round(monthly_savings * 12, 2),
        "setup_cost": setup_cost,
        "payback_months": round(setup_cost / monthly_savings, 1) if monthly_savings > 0 else float('inf'),
        "error_reduction": f"{((errors_without - total_errors_with) / errors_without * 100):.0f}%"
    }


if __name__ == "__main__":
    roi = analyze_hitl_roi(
        daily_ai_decisions=1000,
        error_cost=500,
        escalation_rate=0.12
    )

    print("=== HITL ROI Analysis (1K decisions/day, $500/error) ===\n")

    print("Without HITL:")
    print(f"  Monthly errors: {roi['without_hitl']['errors']}")
    print(f"  Monthly cost: ${roi['without_hitl']['cost']:,.0f}")

    print("\nWith HITL:")
    print(f"  Escalated for review: {roi['with_hitl']['escalated']}")
    print(f"  Review cost: ${roi['with_hitl']['review_cost']:,.0f}")
    print(f"  Remaining errors: {roi['with_hitl']['errors']}")
    print(f"  Total cost: ${roi['with_hitl']['total_cost']:,.0f}")

    print(f"\nError reduction: {roi['error_reduction']}")
    print(f"Monthly savings: ${roi['monthly_savings']:,.0f}")
    print(f"Annual savings: ${roi['annual_savings']:,.0f}")
    print(f"Setup cost: ${roi['setup_cost']:,}")
    print(f"Payback: {roi['payback_months']} months")
Enter fullscreen mode Exit fullscreen mode

Sample Output:

=== HITL ROI Analysis (1K decisions/day, $500/error) ===

Without HITL:
  Monthly errors: 2400
  Monthly cost: $1,200,000

With HITL:
  Escalated for review: 3600
  Review cost: $18,000
  Remaining errors: 98
  Total cost: $67,180

Error reduction: 96%
Monthly savings: $1,132,820
Annual savings: $13,593,840
Setup cost: $12,000
Payback: 0.0 months
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Confidence calibration is critical: Ensure confidence scores actually predict accuracy

  2. Design for the exception path: Most value comes from handling edge cases well

  3. Prioritize ruthlessly: Not all escalations are equal—use priority queues

  4. Close the feedback loop: Human corrections should improve the system over time

  5. Adaptive thresholds beat fixed ones: Let the system learn optimal escalation rates

  6. Sample everything for QA: Even auto-approved decisions need random audits

  7. The ROI is overwhelming: Catching errors before they reach customers pays for the system many times over

Start with simple thresholds and a basic queue. Add sophistication (adaptive thresholds, active learning) only after you've validated the basic flow works for your domain.

Top comments (0)