Quick Reference: Terms You'll Encounter
Technical Acronyms:
- HITL: Human-in-the-Loop—systems requiring human oversight or intervention
- QA: Quality Assurance—verification of output correctness
- SLA: Service Level Agreement—contractual performance guarantees
- RLHF: Reinforcement Learning from Human Feedback—training method using human preferences
Statistical & Mathematical Terms:
- Confidence Score: Model's self-assessed certainty (0-1 scale)
- Precision: True positives / (True positives + False positives)
- Recall: True positives / (True positives + False negatives)
- Threshold: Cutoff value determining automatic vs. manual processing
- Calibration: How well confidence scores reflect actual accuracy
Introduction: Why Fully Autonomous AI Is Often Wrong
Imagine you're building a self-checkout system for a grocery store. You could make it fully autonomous—scan items, charge cards, done. But what happens when:
- Someone scans a banana but puts a steak in the bag?
- The barcode is damaged and won't scan?
- A customer disputes a price?
You need humans. Not for every transaction, but for the exceptions. The art is knowing when to escalate.
This is the HITL problem. Escalate too often, and you've just built an expensive way to route everything to humans. Escalate too rarely, and errors slip through, customers get angry, and trust erodes.
Here's another analogy: HITL is like a hospital triage system. Minor issues get self-service (bandages, ice packs). Moderate issues see a nurse. Serious issues go straight to a doctor. The triage nurse's job is classification—getting the right cases to the right level of attention.
A third way to think about it: Autonomous AI is cruise control; HITL is a co-pilot. Cruise control handles highway driving. But merging, construction zones, and bad weather? You want a human ready to take over. The best systems know when to hand back control.
Confidence Scoring: Teaching AI to Know What It Doesn't Know
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
import re
class ConfidenceLevel(Enum):
HIGH = "high" # Proceed automatically
MEDIUM = "medium" # Proceed with logging
LOW = "low" # Flag for review
CRITICAL = "critical" # Block until human approval
@dataclass
class ConfidenceResult:
"""Result with confidence assessment."""
output: Any
confidence_score: float # 0-1
confidence_level: ConfidenceLevel
reasoning: str
signals: Dict[str, float] = field(default_factory=dict)
should_escalate: bool = False
class ConfidenceEstimator:
"""
Estimate confidence in LLM outputs.
Multiple signals combined:
1. Model's self-reported confidence
2. Output consistency (multiple samples)
3. Semantic similarity to known patterns
4. Length and structure heuristics
"""
def __init__(
self,
high_threshold: float = 0.85,
medium_threshold: float = 0.65,
low_threshold: float = 0.40
):
self.thresholds = {
ConfidenceLevel.HIGH: high_threshold,
ConfidenceLevel.MEDIUM: medium_threshold,
ConfidenceLevel.LOW: low_threshold
}
def _score_to_level(self, score: float) -> ConfidenceLevel:
"""Convert numeric score to confidence level."""
if score >= self.thresholds[ConfidenceLevel.HIGH]:
return ConfidenceLevel.HIGH
elif score >= self.thresholds[ConfidenceLevel.MEDIUM]:
return ConfidenceLevel.MEDIUM
elif score >= self.thresholds[ConfidenceLevel.LOW]:
return ConfidenceLevel.LOW
return ConfidenceLevel.CRITICAL
def estimate_from_logprobs(
self,
token_logprobs: List[float]
) -> float:
"""
Estimate confidence from token log probabilities.
Higher average logprob = more confident generation.
"""
if not token_logprobs:
return 0.5
# Convert logprobs to probabilities and average
probs = [np.exp(lp) for lp in token_logprobs]
avg_prob = np.mean(probs)
# Penalize high variance (inconsistent confidence)
variance_penalty = np.std(probs) * 0.5
return max(0, min(1, avg_prob - variance_penalty))
def estimate_from_consistency(
self,
outputs: List[str],
llm_client=None
) -> Tuple[float, str]:
"""
Estimate confidence by generating multiple outputs and checking agreement.
High agreement = high confidence
"""
if len(outputs) < 2:
return 0.5, outputs[0] if outputs else ""
# Simple consistency: check if outputs are similar
# In production, use semantic similarity
unique_outputs = list(set(outputs))
if len(unique_outputs) == 1:
return 0.95, outputs[0] # Perfect agreement
# Count most common output
from collections import Counter
counts = Counter(outputs)
most_common, count = counts.most_common(1)[0]
agreement_ratio = count / len(outputs)
return agreement_ratio, most_common
def estimate_from_structure(self, output: str, expected_format: str = None) -> float:
"""
Estimate confidence based on output structure.
Well-formed outputs suggest higher confidence.
"""
score = 0.7 # Base score
# Check for hedging language (reduces confidence)
hedging_phrases = [
"i'm not sure", "i think", "maybe", "possibly",
"it might be", "i believe", "probably"
]
output_lower = output.lower()
hedging_count = sum(1 for phrase in hedging_phrases if phrase in output_lower)
score -= hedging_count * 0.1
# Check for confident language (increases confidence)
confident_phrases = [
"definitely", "certainly", "the answer is",
"specifically", "exactly"
]
confident_count = sum(1 for phrase in confident_phrases if phrase in output_lower)
score += confident_count * 0.05
# Check expected format if provided
if expected_format == "json":
try:
import json
json.loads(output)
score += 0.1
except:
score -= 0.2
return max(0, min(1, score))
def estimate(
self,
output: str,
token_logprobs: List[float] = None,
alternative_outputs: List[str] = None,
expected_format: str = None
) -> ConfidenceResult:
"""
Combine multiple signals into overall confidence estimate.
"""
signals = {}
# Signal 1: Log probabilities
if token_logprobs:
signals["logprob"] = self.estimate_from_logprobs(token_logprobs)
# Signal 2: Consistency
if alternative_outputs:
consistency_score, _ = self.estimate_from_consistency(
[output] + alternative_outputs
)
signals["consistency"] = consistency_score
# Signal 3: Structure
signals["structure"] = self.estimate_from_structure(output, expected_format)
# Combine signals (weighted average)
weights = {"logprob": 0.4, "consistency": 0.4, "structure": 0.2}
total_weight = sum(weights[k] for k in signals.keys())
if total_weight > 0:
combined_score = sum(
signals[k] * weights[k]
for k in signals.keys()
) / total_weight
else:
combined_score = 0.5
level = self._score_to_level(combined_score)
return ConfidenceResult(
output=output,
confidence_score=combined_score,
confidence_level=level,
reasoning=self._generate_reasoning(signals, level),
signals=signals,
should_escalate=level in [ConfidenceLevel.LOW, ConfidenceLevel.CRITICAL]
)
def _generate_reasoning(
self,
signals: Dict[str, float],
level: ConfidenceLevel
) -> str:
"""Generate human-readable reasoning for confidence level."""
reasons = []
if signals.get("logprob", 1) < 0.5:
reasons.append("low token probabilities")
if signals.get("consistency", 1) < 0.7:
reasons.append("inconsistent across samples")
if signals.get("structure", 1) < 0.5:
reasons.append("hedging language detected")
if not reasons:
return f"Confidence {level.value}: all signals positive"
return f"Confidence {level.value}: {', '.join(reasons)}"
# Example usage
if __name__ == "__main__":
estimator = ConfidenceEstimator()
# High confidence output
high_conf = estimator.estimate(
output="The capital of France is Paris.",
alternative_outputs=["The capital of France is Paris.", "Paris is the capital of France."],
expected_format=None
)
print(f"High confidence example:")
print(f" Score: {high_conf.confidence_score:.2f}")
print(f" Level: {high_conf.confidence_level.value}")
print(f" Escalate: {high_conf.should_escalate}")
# Low confidence output
low_conf = estimator.estimate(
output="I think the answer might be around 42, but I'm not entirely sure.",
alternative_outputs=["Maybe 42?", "It could be 40 or 42", "I believe it's 42"],
expected_format=None
)
print(f"\nLow confidence example:")
print(f" Score: {low_conf.confidence_score:.2f}")
print(f" Level: {low_conf.confidence_level.value}")
print(f" Escalate: {low_conf.should_escalate}")
print(f" Reasoning: {low_conf.reasoning}")
Review Queues: Managing the Human Workflow
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import uuid
import heapq
import threading
import time
class ReviewPriority(Enum):
URGENT = 1 # SLA: 15 minutes
HIGH = 2 # SLA: 1 hour
MEDIUM = 3 # SLA: 4 hours
LOW = 4 # SLA: 24 hours
class ReviewStatus(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
EXPIRED = "expired"
@dataclass
class ReviewItem:
"""An item requiring human review."""
id: str
task_type: str
input_data: Dict[str, Any]
ai_output: Any
confidence_score: float
priority: ReviewPriority
status: ReviewStatus = ReviewStatus.PENDING
created_at: datetime = field(default_factory=datetime.utcnow)
assigned_to: Optional[str] = None
assigned_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
human_decision: Optional[str] = None
human_correction: Optional[Any] = None
feedback: Optional[str] = None
@property
def sla_deadline(self) -> datetime:
"""Calculate SLA deadline based on priority."""
sla_hours = {
ReviewPriority.URGENT: 0.25,
ReviewPriority.HIGH: 1,
ReviewPriority.MEDIUM: 4,
ReviewPriority.LOW: 24
}
return self.created_at + timedelta(hours=sla_hours[self.priority])
@property
def is_overdue(self) -> bool:
return datetime.utcnow() > self.sla_deadline and self.status == ReviewStatus.PENDING
def __lt__(self, other):
# For priority queue: lower priority value = higher priority
if self.priority.value != other.priority.value:
return self.priority.value < other.priority.value
return self.created_at < other.created_at
class ReviewQueue:
"""
Priority queue for human review items.
Features:
- Priority-based ordering
- SLA tracking
- Assignment management
- Metrics collection
"""
def __init__(self):
self._queue: List[ReviewItem] = []
self._items: Dict[str, ReviewItem] = {}
self._lock = threading.Lock()
self._metrics = {
"total_items": 0,
"completed_items": 0,
"sla_breaches": 0,
"total_review_time_seconds": 0
}
def add(self, item: ReviewItem):
"""Add item to review queue."""
with self._lock:
heapq.heappush(self._queue, item)
self._items[item.id] = item
self._metrics["total_items"] += 1
def get_next(self, reviewer_id: str) -> Optional[ReviewItem]:
"""Get next item for review and assign to reviewer."""
with self._lock:
while self._queue:
item = heapq.heappop(self._queue)
if item.status != ReviewStatus.PENDING:
continue
item.status = ReviewStatus.ASSIGNED
item.assigned_to = reviewer_id
item.assigned_at = datetime.utcnow()
return item
return None
def complete(
self,
item_id: str,
decision: str,
correction: Any = None,
feedback: str = None
):
"""Mark item as reviewed."""
with self._lock:
if item_id not in self._items:
raise ValueError(f"Unknown item: {item_id}")
item = self._items[item_id]
item.status = ReviewStatus.COMPLETED
item.completed_at = datetime.utcnow()
item.human_decision = decision
item.human_correction = correction
item.feedback = feedback
# Update metrics
self._metrics["completed_items"] += 1
if item.assigned_at:
review_time = (item.completed_at - item.assigned_at).total_seconds()
self._metrics["total_review_time_seconds"] += review_time
if item.is_overdue:
self._metrics["sla_breaches"] += 1
def get_pending_count(self) -> Dict[ReviewPriority, int]:
"""Get count of pending items by priority."""
counts = {p: 0 for p in ReviewPriority}
with self._lock:
for item in self._items.values():
if item.status == ReviewStatus.PENDING:
counts[item.priority] += 1
return counts
def get_overdue_items(self) -> List[ReviewItem]:
"""Get all overdue items."""
with self._lock:
return [
item for item in self._items.values()
if item.is_overdue
]
def get_metrics(self) -> Dict[str, Any]:
"""Get queue metrics."""
with self._lock:
completed = self._metrics["completed_items"]
avg_time = (
self._metrics["total_review_time_seconds"] / completed
if completed > 0 else 0
)
return {
**self._metrics,
"pending_items": len([i for i in self._items.values() if i.status == ReviewStatus.PENDING]),
"avg_review_time_seconds": round(avg_time, 1),
"sla_compliance_rate": round(
(completed - self._metrics["sla_breaches"]) / completed * 100
if completed > 0 else 100, 1
)
}
class EscalationRouter:
"""
Route items to appropriate review queues based on rules.
"""
def __init__(self):
self.queues: Dict[str, ReviewQueue] = {}
self.rules: List[Dict] = []
def register_queue(self, name: str, queue: ReviewQueue):
"""Register a review queue."""
self.queues[name] = queue
def add_rule(
self,
condition: Callable[[ConfidenceResult, Dict], bool],
queue_name: str,
priority: ReviewPriority
):
"""Add routing rule."""
self.rules.append({
"condition": condition,
"queue": queue_name,
"priority": priority
})
def route(
self,
confidence_result: ConfidenceResult,
context: Dict[str, Any]
) -> Optional[str]:
"""Route to appropriate queue based on rules."""
for rule in self.rules:
if rule["condition"](confidence_result, context):
queue = self.queues.get(rule["queue"])
if queue:
item = ReviewItem(
id=str(uuid.uuid4())[:8],
task_type=context.get("task_type", "unknown"),
input_data=context.get("input", {}),
ai_output=confidence_result.output,
confidence_score=confidence_result.confidence_score,
priority=rule["priority"]
)
queue.add(item)
return item.id
return None
# Example setup
def create_standard_router() -> EscalationRouter:
"""Create router with standard escalation rules."""
router = EscalationRouter()
# Create queues
router.register_queue("critical", ReviewQueue())
router.register_queue("standard", ReviewQueue())
router.register_queue("audit", ReviewQueue())
# Rule 1: Critical confidence → urgent review
router.add_rule(
condition=lambda cr, ctx: cr.confidence_level == ConfidenceLevel.CRITICAL,
queue_name="critical",
priority=ReviewPriority.URGENT
)
# Rule 2: Low confidence → standard review
router.add_rule(
condition=lambda cr, ctx: cr.confidence_level == ConfidenceLevel.LOW,
queue_name="standard",
priority=ReviewPriority.HIGH
)
# Rule 3: High-value transactions → audit regardless of confidence
router.add_rule(
condition=lambda cr, ctx: ctx.get("value", 0) > 10000,
queue_name="audit",
priority=ReviewPriority.MEDIUM
)
# Rule 4: Random sampling for quality assurance
router.add_rule(
condition=lambda cr, ctx: hash(str(ctx)) % 100 < 5, # 5% sample
queue_name="audit",
priority=ReviewPriority.LOW
)
return router
Feedback Loops: Learning from Human Corrections
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from collections import defaultdict
import json
@dataclass
class FeedbackRecord:
"""Record of human feedback on AI output."""
id: str
timestamp: datetime
task_type: str
input_data: Dict[str, Any]
ai_output: Any
ai_confidence: float
human_decision: str # "approve", "reject", "correct"
human_output: Optional[Any] = None
correction_type: Optional[str] = None # "minor", "major", "complete_rewrite"
feedback_text: Optional[str] = None
reviewer_id: Optional[str] = None
class FeedbackCollector:
"""
Collect and analyze human feedback for model improvement.
"""
def __init__(self):
self.records: List[FeedbackRecord] = []
self.by_task_type: Dict[str, List[FeedbackRecord]] = defaultdict(list)
self.by_decision: Dict[str, List[FeedbackRecord]] = defaultdict(list)
def add(self, record: FeedbackRecord):
"""Add feedback record."""
self.records.append(record)
self.by_task_type[record.task_type].append(record)
self.by_decision[record.human_decision].append(record)
def get_accuracy_by_confidence(self, bins: int = 10) -> Dict[str, float]:
"""
Calculate accuracy at different confidence levels.
Useful for calibrating confidence thresholds.
"""
bin_size = 1.0 / bins
results = {}
for i in range(bins):
low = i * bin_size
high = (i + 1) * bin_size
in_bin = [
r for r in self.records
if low <= r.ai_confidence < high
]
if in_bin:
approved = sum(1 for r in in_bin if r.human_decision == "approve")
accuracy = approved / len(in_bin)
results[f"{low:.1f}-{high:.1f}"] = round(accuracy, 3)
return results
def get_common_corrections(self, task_type: str = None, limit: int = 10) -> List[Dict]:
"""
Identify common correction patterns.
Useful for identifying systematic errors.
"""
corrections = [
r for r in self.records
if r.human_decision == "correct"
and (task_type is None or r.task_type == task_type)
]
# Group by correction type
by_type = defaultdict(list)
for c in corrections:
by_type[c.correction_type or "unspecified"].append(c)
results = []
for ctype, records in sorted(by_type.items(), key=lambda x: -len(x[1])):
results.append({
"correction_type": ctype,
"count": len(records),
"avg_confidence": round(
sum(r.ai_confidence for r in records) / len(records), 2
),
"examples": [r.feedback_text for r in records[:3] if r.feedback_text]
})
return results[:limit]
def get_calibration_analysis(self) -> Dict[str, Any]:
"""
Analyze how well confidence scores predict accuracy.
Well-calibrated: 80% confidence → 80% accuracy
"""
accuracy_by_conf = self.get_accuracy_by_confidence()
# Calculate calibration error
errors = []
for conf_range, accuracy in accuracy_by_conf.items():
expected = float(conf_range.split("-")[0]) + 0.05 # Midpoint
errors.append(abs(expected - accuracy))
avg_calibration_error = sum(errors) / len(errors) if errors else 0
return {
"accuracy_by_confidence": accuracy_by_conf,
"avg_calibration_error": round(avg_calibration_error, 3),
"is_well_calibrated": avg_calibration_error < 0.1,
"recommendation": self._get_calibration_recommendation(accuracy_by_conf)
}
def _get_calibration_recommendation(self, accuracy_by_conf: Dict) -> str:
"""Generate recommendation based on calibration analysis."""
if not accuracy_by_conf:
return "Insufficient data for calibration analysis"
# Check if low confidence is actually low accuracy
low_conf_acc = accuracy_by_conf.get("0.0-0.1", accuracy_by_conf.get("0.1-0.2", 0.5))
high_conf_acc = accuracy_by_conf.get("0.9-1.0", accuracy_by_conf.get("0.8-0.9", 0.5))
if high_conf_acc < 0.8:
return "Model is overconfident. Lower confidence thresholds."
if low_conf_acc > 0.6:
return "Model is underconfident. Raise confidence thresholds."
return "Confidence calibration is reasonable."
def export_training_data(self, format: str = "jsonl") -> str:
"""
Export corrections as training data for model improvement.
"""
training_examples = []
for record in self.records:
if record.human_decision == "correct" and record.human_output:
example = {
"input": record.input_data,
"ai_output": record.ai_output,
"human_output": record.human_output,
"task_type": record.task_type
}
training_examples.append(example)
if format == "jsonl":
return "\n".join(json.dumps(ex) for ex in training_examples)
return json.dumps(training_examples, indent=2)
class AdaptiveThresholdManager:
"""
Dynamically adjust confidence thresholds based on feedback.
Goal: Minimize escalations while maintaining quality targets.
"""
def __init__(
self,
initial_threshold: float = 0.7,
target_accuracy: float = 0.95,
min_threshold: float = 0.5,
max_threshold: float = 0.95,
adjustment_rate: float = 0.02
):
self.threshold = initial_threshold
self.target_accuracy = target_accuracy
self.min_threshold = min_threshold
self.max_threshold = max_threshold
self.adjustment_rate = adjustment_rate
self.history: List[Dict] = []
def update(self, recent_accuracy: float, sample_size: int):
"""
Update threshold based on recent accuracy.
- Accuracy below target → raise threshold (more escalations)
- Accuracy above target → lower threshold (fewer escalations)
"""
if sample_size < 20:
return # Not enough data
old_threshold = self.threshold
if recent_accuracy < self.target_accuracy:
# Quality issue: be more conservative
self.threshold = min(
self.max_threshold,
self.threshold + self.adjustment_rate
)
elif recent_accuracy > self.target_accuracy + 0.03:
# Over-escalating: be more permissive
self.threshold = max(
self.min_threshold,
self.threshold - self.adjustment_rate
)
self.history.append({
"timestamp": datetime.utcnow().isoformat(),
"old_threshold": old_threshold,
"new_threshold": self.threshold,
"accuracy": recent_accuracy,
"sample_size": sample_size
})
def should_escalate(self, confidence: float) -> bool:
"""Check if confidence score requires escalation."""
return confidence < self.threshold
def get_stats(self) -> Dict[str, Any]:
"""Get threshold management statistics."""
return {
"current_threshold": self.threshold,
"target_accuracy": self.target_accuracy,
"adjustments_made": len(self.history),
"recent_history": self.history[-5:]
}
Active Learning: Prioritizing What to Label
from typing import List, Dict, Any, Callable, Tuple
from dataclasses import dataclass
import numpy as np
from collections import defaultdict
@dataclass
class UnlabeledItem:
"""An item awaiting human labeling."""
id: str
data: Dict[str, Any]
ai_prediction: Any
confidence: float
uncertainty_score: float
diversity_score: float
priority_score: float
class ActiveLearningSelector:
"""
Select most valuable items for human labeling.
Strategies:
1. Uncertainty sampling: Label items model is least confident about
2. Diversity sampling: Label items that are different from existing labels
3. Expected model change: Label items that would most improve the model
Goal: Maximize model improvement per human label.
"""
def __init__(
self,
uncertainty_weight: float = 0.5,
diversity_weight: float = 0.3,
representativeness_weight: float = 0.2
):
self.weights = {
"uncertainty": uncertainty_weight,
"diversity": diversity_weight,
"representativeness": representativeness_weight
}
self.labeled_embeddings: List[np.ndarray] = []
def _uncertainty_score(self, confidence: float) -> float:
"""
Higher score for lower confidence.
Items near decision boundary are most informative.
"""
# Transform: 0.5 confidence → highest uncertainty
return 1 - abs(confidence - 0.5) * 2
def _diversity_score(
self,
embedding: np.ndarray,
existing_embeddings: List[np.ndarray]
) -> float:
"""
Higher score for items far from already-labeled items.
Promotes coverage of the input space.
"""
if not existing_embeddings:
return 1.0
# Minimum distance to any labeled item
distances = [
np.linalg.norm(embedding - e)
for e in existing_embeddings
]
min_distance = min(distances)
# Normalize (assuming embeddings are unit normalized)
return min(1.0, min_distance / 2)
def _representativeness_score(
self,
embedding: np.ndarray,
all_embeddings: List[np.ndarray]
) -> float:
"""
Higher score for items similar to many unlabeled items.
Labeling representative items helps generalization.
"""
if len(all_embeddings) < 2:
return 0.5
# Average similarity to other items
similarities = [
np.dot(embedding, e)
for e in all_embeddings
if not np.array_equal(embedding, e)
]
return np.mean(similarities) if similarities else 0.5
def score_items(
self,
items: List[Dict[str, Any]],
embedding_fn: Callable[[Dict], np.ndarray]
) -> List[UnlabeledItem]:
"""Score items for labeling priority."""
# Get embeddings
embeddings = [embedding_fn(item["data"]) for item in items]
scored_items = []
for i, item in enumerate(items):
uncertainty = self._uncertainty_score(item["confidence"])
diversity = self._diversity_score(embeddings[i], self.labeled_embeddings)
representativeness = self._representativeness_score(embeddings[i], embeddings)
# Weighted combination
priority = (
self.weights["uncertainty"] * uncertainty +
self.weights["diversity"] * diversity +
self.weights["representativeness"] * representativeness
)
scored_items.append(UnlabeledItem(
id=item["id"],
data=item["data"],
ai_prediction=item.get("prediction"),
confidence=item["confidence"],
uncertainty_score=uncertainty,
diversity_score=diversity,
priority_score=priority
))
# Sort by priority (highest first)
scored_items.sort(key=lambda x: x.priority_score, reverse=True)
return scored_items
def select_batch(
self,
items: List[Dict[str, Any]],
batch_size: int,
embedding_fn: Callable[[Dict], np.ndarray]
) -> List[UnlabeledItem]:
"""
Select a batch of items for labeling.
Uses iterative selection to maintain diversity within batch.
"""
scored = self.score_items(items, embedding_fn)
selected = []
selected_embeddings = []
for item in scored:
if len(selected) >= batch_size:
break
# Recompute diversity against selected batch
if selected_embeddings:
item_embedding = embedding_fn(item.data)
batch_diversity = self._diversity_score(
item_embedding,
selected_embeddings
)
# Skip if too similar to already selected
if batch_diversity < 0.3:
continue
selected_embeddings.append(item_embedding)
else:
selected_embeddings.append(embedding_fn(item.data))
selected.append(item)
return selected
def record_label(self, item_embedding: np.ndarray):
"""Record that an item has been labeled."""
self.labeled_embeddings.append(item_embedding)
class LabelingPipeline:
"""
End-to-end labeling pipeline with active learning.
"""
def __init__(
self,
selector: ActiveLearningSelector,
review_queue: ReviewQueue,
feedback_collector: FeedbackCollector
):
self.selector = selector
self.queue = review_queue
self.collector = feedback_collector
self.stats = defaultdict(int)
def submit_batch(
self,
items: List[Dict[str, Any]],
embedding_fn: Callable,
batch_size: int = 10
) -> List[str]:
"""Submit items for labeling, selecting most valuable ones."""
# Select best items
selected = self.selector.select_batch(items, batch_size, embedding_fn)
# Add to review queue
item_ids = []
for item in selected:
review_item = ReviewItem(
id=item.id,
task_type="labeling",
input_data=item.data,
ai_output=item.ai_prediction,
confidence_score=item.confidence,
priority=self._confidence_to_priority(item.confidence)
)
self.queue.add(review_item)
item_ids.append(item.id)
self.stats["submitted"] += 1
return item_ids
def _confidence_to_priority(self, confidence: float) -> ReviewPriority:
"""Map confidence to review priority."""
if confidence < 0.3:
return ReviewPriority.HIGH
elif confidence < 0.5:
return ReviewPriority.MEDIUM
return ReviewPriority.LOW
def process_label(
self,
item_id: str,
human_label: Any,
embedding_fn: Callable
):
"""Process a completed label."""
# Record in feedback collector
# (In production, fetch item details from queue)
self.stats["labeled"] += 1
# Update selector's knowledge of labeled space
# (Would need item data here in production)
def get_stats(self) -> Dict[str, Any]:
"""Get pipeline statistics."""
return {
**dict(self.stats),
"queue_metrics": self.queue.get_metrics(),
"labeling_efficiency": self.stats["labeled"] / max(1, self.stats["submitted"])
}
Production HITL System
from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class HITLConfig:
"""Configuration for HITL system."""
# Escalation thresholds
auto_approve_threshold: float = 0.90
review_threshold: float = 0.70
block_threshold: float = 0.40
# Queue settings
enable_priority_queue: bool = True
sla_urgent_minutes: int = 15
sla_standard_hours: int = 4
# Feedback settings
enable_adaptive_thresholds: bool = True
feedback_sample_rate: float = 0.05 # 5% random sampling
# Active learning
enable_active_learning: bool = True
labeling_batch_size: int = 20
class HITLSystem:
"""
Production Human-in-the-Loop system.
Integrates:
- Confidence estimation
- Review queue management
- Feedback collection
- Adaptive thresholds
- Active learning
"""
def __init__(self, config: HITLConfig = None):
self.config = config or HITLConfig()
# Components
self.confidence_estimator = ConfidenceEstimator(
high_threshold=self.config.auto_approve_threshold,
medium_threshold=self.config.review_threshold,
low_threshold=self.config.block_threshold
)
self.router = create_standard_router()
self.feedback = FeedbackCollector()
self.threshold_manager = AdaptiveThresholdManager(
initial_threshold=self.config.review_threshold
)
# Metrics
self.metrics = {
"total_processed": 0,
"auto_approved": 0,
"escalated": 0,
"blocked": 0
}
def process(
self,
task_type: str,
input_data: Dict[str, Any],
ai_output: Any,
token_logprobs: List[float] = None,
context: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
Process an AI output through the HITL system.
Returns decision and any review item created.
"""
self.metrics["total_processed"] += 1
context = context or {}
# Estimate confidence
confidence_result = self.confidence_estimator.estimate(
output=str(ai_output),
token_logprobs=token_logprobs
)
# Check adaptive threshold
effective_threshold = self.threshold_manager.threshold
# Determine action
if confidence_result.confidence_score >= self.config.auto_approve_threshold:
action = "auto_approve"
self.metrics["auto_approved"] += 1
# Random sampling for quality monitoring
if self._should_sample():
self._create_audit_item(task_type, input_data, ai_output, confidence_result)
elif confidence_result.confidence_score >= effective_threshold:
action = "approve_with_logging"
self.metrics["auto_approved"] += 1
logger.info(f"Approved with medium confidence: {confidence_result.confidence_score:.2f}")
elif confidence_result.confidence_score >= self.config.block_threshold:
action = "escalate"
self.metrics["escalated"] += 1
review_id = self.router.route(confidence_result, {
"task_type": task_type,
"input": input_data,
**context
})
return {
"action": action,
"review_id": review_id,
"confidence": confidence_result.confidence_score,
"reasoning": confidence_result.reasoning
}
else:
action = "block"
self.metrics["blocked"] += 1
review_id = self.router.route(confidence_result, {
"task_type": task_type,
"input": input_data,
"priority_override": ReviewPriority.URGENT,
**context
})
return {
"action": action,
"review_id": review_id,
"confidence": confidence_result.confidence_score,
"reasoning": confidence_result.reasoning,
"blocked": True
}
return {
"action": action,
"output": ai_output,
"confidence": confidence_result.confidence_score,
"reasoning": confidence_result.reasoning
}
def _should_sample(self) -> bool:
"""Determine if this item should be sampled for QA."""
import random
return random.random() < self.config.feedback_sample_rate
def _create_audit_item(
self,
task_type: str,
input_data: Dict,
ai_output: Any,
confidence: ConfidenceResult
):
"""Create audit item for random sampling."""
item = ReviewItem(
id=f"audit_{datetime.utcnow().timestamp()}",
task_type=task_type,
input_data=input_data,
ai_output=ai_output,
confidence_score=confidence.confidence_score,
priority=ReviewPriority.LOW
)
self.router.queues["audit"].add(item)
def record_feedback(
self,
review_id: str,
decision: str,
correction: Any = None,
feedback_text: str = None
):
"""Record human feedback and update system."""
# Find the review item
for queue in self.router.queues.values():
if review_id in queue._items:
item = queue._items[review_id]
# Complete the review
queue.complete(review_id, decision, correction, feedback_text)
# Record feedback
record = FeedbackRecord(
id=review_id,
timestamp=datetime.utcnow(),
task_type=item.task_type,
input_data=item.input_data,
ai_output=item.ai_output,
ai_confidence=item.confidence_score,
human_decision=decision,
human_output=correction,
feedback_text=feedback_text
)
self.feedback.add(record)
# Update adaptive thresholds
if self.config.enable_adaptive_thresholds:
self._update_thresholds()
return
logger.warning(f"Review item not found: {review_id}")
def _update_thresholds(self):
"""Update thresholds based on recent feedback."""
recent_records = self.feedback.records[-100:]
if len(recent_records) < 20:
return
approved = sum(1 for r in recent_records if r.human_decision == "approve")
accuracy = approved / len(recent_records)
self.threshold_manager.update(accuracy, len(recent_records))
def get_dashboard_data(self) -> Dict[str, Any]:
"""Get data for monitoring dashboard."""
return {
"metrics": self.metrics,
"escalation_rate": self.metrics["escalated"] / max(1, self.metrics["total_processed"]),
"auto_approve_rate": self.metrics["auto_approved"] / max(1, self.metrics["total_processed"]),
"queue_status": {
name: queue.get_metrics()
for name, queue in self.router.queues.items()
},
"threshold_status": self.threshold_manager.get_stats(),
"calibration": self.feedback.get_calibration_analysis() if len(self.feedback.records) > 50 else None
}
# Example usage
if __name__ == "__main__":
system = HITLSystem()
# Simulate processing
test_cases = [
{"confidence": 0.95, "output": "High confidence answer"},
{"confidence": 0.75, "output": "Medium confidence answer"},
{"confidence": 0.50, "output": "Low confidence answer"},
{"confidence": 0.25, "output": "Very low confidence answer"},
]
print("=== HITL System Demo ===\n")
for i, case in enumerate(test_cases):
result = system.process(
task_type="qa",
input_data={"question": f"Test question {i}"},
ai_output=case["output"],
token_logprobs=[-0.1] * 10 if case["confidence"] > 0.7 else [-1.0] * 10
)
print(f"Case {i+1} (conf={case['confidence']}):")
print(f" Action: {result['action']}")
print(f" Confidence: {result['confidence']:.2f}")
if result.get("review_id"):
print(f" Review ID: {result['review_id']}")
print()
print("Dashboard Data:")
dashboard = system.get_dashboard_data()
print(f" Total processed: {dashboard['metrics']['total_processed']}")
print(f" Auto-approve rate: {dashboard['auto_approve_rate']:.1%}")
print(f" Escalation rate: {dashboard['escalation_rate']:.1%}")
Data Engineer's ROI Lens: The Business Impact
def analyze_hitl_roi(
daily_ai_decisions: int,
error_cost: float = 500,
human_review_cost: float = 5,
ai_accuracy_without_hitl: float = 0.92,
ai_accuracy_with_hitl: float = 0.99,
escalation_rate: float = 0.15
) -> Dict:
"""Calculate ROI of HITL implementation."""
monthly_decisions = daily_ai_decisions * 30
# Without HITL
errors_without = monthly_decisions * (1 - ai_accuracy_without_hitl)
cost_without = errors_without * error_cost
# With HITL
escalated = monthly_decisions * escalation_rate
auto_approved = monthly_decisions - escalated
# Errors in auto-approved (some slip through)
auto_error_rate = 1 - (ai_accuracy_with_hitl / (1 - escalation_rate * 0.5))
auto_errors = auto_approved * max(0, auto_error_rate) * 0.5
# Review cost
review_cost = escalated * human_review_cost
# Remaining errors (humans catch most but not all)
human_miss_rate = 0.02 # Humans miss 2% of escalated errors
escalated_errors = escalated * (1 - ai_accuracy_without_hitl) * human_miss_rate
total_errors_with = auto_errors + escalated_errors
error_cost_with = total_errors_with * error_cost
total_cost_with = error_cost_with + review_cost
# Setup costs
setup_hours = 80
setup_cost = setup_hours * 150
monthly_savings = cost_without - total_cost_with
return {
"monthly_decisions": monthly_decisions,
"without_hitl": {
"errors": int(errors_without),
"cost": round(cost_without, 2)
},
"with_hitl": {
"escalated": int(escalated),
"review_cost": round(review_cost, 2),
"errors": int(total_errors_with),
"error_cost": round(error_cost_with, 2),
"total_cost": round(total_cost_with, 2)
},
"monthly_savings": round(monthly_savings, 2),
"annual_savings": round(monthly_savings * 12, 2),
"setup_cost": setup_cost,
"payback_months": round(setup_cost / monthly_savings, 1) if monthly_savings > 0 else float('inf'),
"error_reduction": f"{((errors_without - total_errors_with) / errors_without * 100):.0f}%"
}
if __name__ == "__main__":
roi = analyze_hitl_roi(
daily_ai_decisions=1000,
error_cost=500,
escalation_rate=0.12
)
print("=== HITL ROI Analysis (1K decisions/day, $500/error) ===\n")
print("Without HITL:")
print(f" Monthly errors: {roi['without_hitl']['errors']}")
print(f" Monthly cost: ${roi['without_hitl']['cost']:,.0f}")
print("\nWith HITL:")
print(f" Escalated for review: {roi['with_hitl']['escalated']}")
print(f" Review cost: ${roi['with_hitl']['review_cost']:,.0f}")
print(f" Remaining errors: {roi['with_hitl']['errors']}")
print(f" Total cost: ${roi['with_hitl']['total_cost']:,.0f}")
print(f"\nError reduction: {roi['error_reduction']}")
print(f"Monthly savings: ${roi['monthly_savings']:,.0f}")
print(f"Annual savings: ${roi['annual_savings']:,.0f}")
print(f"Setup cost: ${roi['setup_cost']:,}")
print(f"Payback: {roi['payback_months']} months")
Sample Output:
=== HITL ROI Analysis (1K decisions/day, $500/error) ===
Without HITL:
Monthly errors: 2400
Monthly cost: $1,200,000
With HITL:
Escalated for review: 3600
Review cost: $18,000
Remaining errors: 98
Total cost: $67,180
Error reduction: 96%
Monthly savings: $1,132,820
Annual savings: $13,593,840
Setup cost: $12,000
Payback: 0.0 months
Key Takeaways
Confidence calibration is critical: Ensure confidence scores actually predict accuracy
Design for the exception path: Most value comes from handling edge cases well
Prioritize ruthlessly: Not all escalations are equal—use priority queues
Close the feedback loop: Human corrections should improve the system over time
Adaptive thresholds beat fixed ones: Let the system learn optimal escalation rates
Sample everything for QA: Even auto-approved decisions need random audits
The ROI is overwhelming: Catching errors before they reach customers pays for the system many times over
Start with simple thresholds and a basic queue. Add sophistication (adaptive thresholds, active learning) only after you've validated the basic flow works for your domain.
Top comments (0)