Reading time: ~15-20 minutes
Level: Intermediate to Advanced
Series: Part 1 of 4 - GenAIOps Foundations
What you'll learn: How GenAIOps differs from DevOps, AWS tooling for GenAI operations, and building your first evaluation pipeline
The Problem: Traditional DevOps Isn't Enough
Your GenAI prototype works beautifully in Jupyter notebooks. Users love the demo. Leadership wants it in production next week.
Then reality hits:
Traditional DevOps assumes deterministic systems where function(input) always returns the same output. GenAI systems fundamentally break this assumption. You can't just "deploy code and monitor uptime" anymore.
This is where GenAIOps comes in.
What is GenAIOps?
GenAIOps extends DevOps principles to handle the unique operational challenges of production GenAI systems. The key difference? You're now managing four critical assets instead of one:
Traditional DevOps: CODE
↓
GenAIOps: CODE + DATA + MODELS + PROMPTS
Each asset requires versioning, testing, monitoring, and governance. Miss any one, and production reliability suffers.
Why AWS Now? October 2025 marked a turning point with two key launches:
- CloudWatch GenAI Observability (GA): Purpose-built monitoring for LLM systems
- Amazon Bedrock AgentCore Evaluations (Preview): 13 built-in quality metrics
AWS is signaling the industry is ready to operationalize GenAI at scale.
Why GenAIOps is Different: The Four Fundamental Shifts
1. Non-Deterministic Outputs
Traditional Software:
GenAI Systems:
Impact: Traditional unit tests fail. You need evaluation frameworks measuring quality dimensions, not binary correctness.
2. Quality vs. Correctness
Traditional software testing:
GenAI system evaluation:
3. Complex Multi-Component Systems
Modern GenAI applications are orchestrations, not single API calls:
User Query
↓
Input Guardrails (validate, sanitize)
↓
Query Rewriting (optimize for retrieval)
↓
Vector Search (retrieve relevant context)
↓
Context Ranking (re-rank by relevance)
↓
Prompt Construction (assemble final prompt)
↓
LLM Generation (multiple model routing)
↓
Output Guardrails (safety, quality checks)
↓
Response to User
Each component can fail independently. Traditional monitoring doesn't capture these multi-step workflows.
4. Token Economics
Every operation has a cost tied to token consumption:
You need real-time cost tracking per user, per session, and per model to avoid budget surprises.
The GenAIOps Maturity Model
Where does your organization sit? Understanding your current level helps prioritize improvements.
Level 0: Ad-Hoc Experimentation
Characteristics:
- Direct API calls in notebooks
- No version control for prompts
- Manual testing through UI
- No cost tracking
- Production = "it works on my laptop"
Pain Points:
Level 1: Basic Pipeline
Characteristics:
- Prompts in version control (Git)
- Basic CI/CD for deployment
- Logging to CloudWatch
- Manual evaluation before releases
- Static prompts and models
Progress:
Level 2: Monitored Production
Characteristics:
- Automated evaluation pipelines
- Real-time observability dashboards
- A/B testing capabilities
- Cost monitoring and alerting
- Prompt versioning with rollback
Capabilities:
Level 3: Intelligent Operations
Characteristics:
- Continuous evaluation from production traffic
- Automated quality regression detection
- Dynamic model routing based on performance
- Human-in-the-loop feedback integration
- Automated prompt optimization
Advanced Patterns:
Level 4: Autonomous GenAIOps
Characteristics:
- Self-healing systems with automated remediation
- Predictive quality management
- Automated cost optimization
- Context-aware evaluation
- Multi-dimensional optimization (quality, cost, latency)
Reality Check: Most organizations are between Level 0 and 1. This series will help you reach Level 2-3, which is the sweet spot for most production systems.
GenAIOps Core Pillars
Pillar 1: Data Governance for GenAI
Data governance in GenAI extends far beyond traditional data management. You're managing prompts as code, context windows as data, and model outputs as assets.
Prompt Governance
Why it matters: Prompts are the "code" of GenAI systems. A poorly worded prompt can cause production failures just like buggy code.
Training Data Lineage
Track the complete journey of data through your GenAI pipeline:
Why it matters: Regulatory compliance (GDPR, HIPAA) requires proving where training data came from and how it was processed.
Context Window Management
In RAG systems, track what context was retrieved and used:
Why track this? When a user reports incorrect information, you need to:
- Identify what context was retrieved
- Trace back to source documents
- Determine if retrieval failed or generation failed
- Calculate cost of the interaction
Pillar 2: Automated Evaluation Pipelines
Manual evaluation doesn't scale. You need automated pipelines that evaluate every change before production.
Pre-Deployment Evaluation
Quality Gates in CI/CD:
# .github/workflows/evaluate-genai.yml
name: GenAI Evaluation Pipeline
on:
pull_request:
paths:
- 'prompts/**'
- 'rag_pipeline/**'
- 'evaluation/**'
jobs:
evaluate:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install Dependencies
run: |
pip install boto3 ragas pandas numpy
pip install -r requirements.txt
- name: Run Synthetic Test Suite
env:
AWS_REGION: us-east-1
run: |
python scripts/evaluate_rag.py \
--test-suite tests/synthetic_tests.json \
--min-faithfulness 0.85 \
--min-relevancy 0.80 \
--min-context-precision 0.75 \
--sample-size 1000
- name: Run Regression Tests
run: |
python scripts/regression_tests.py \
--baseline production_baseline.json \
--current current_results.json \
--max-degradation 0.05
- name: Validate Cost Budget
run: |
python scripts/cost_validator.py \
--max-cost-per-query 0.02 \
--max-latency-p95 3000
- name: Upload Results
if: always()
uses: actions/upload-artifact@v3
with:
name: evaluation-results
path: results/
- name: Comment on PR
if: always()
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const results = JSON.parse(fs.readFileSync('results/summary.json'));
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Evaluation Results
Faithfulness: ${results.faithfulness}
Relevancy: ${results.relevancy}
Context Precision: ${results.context_precision}
Avg Cost: $${results.avg_cost}
P95 Latency: ${results.p95_latency}ms
${results.passed ? 'All checks passed!' : 'Quality gates failed'}`
});
Evaluation Configuration:
# config/evaluation_pipeline.yaml
evaluation_pipeline:
stages:
- name: "synthetic_test_suite"
description: "Test on curated examples with known good answers"
metrics:
faithfulness:
threshold: 0.85
weight: 0.3
answer_relevancy:
threshold: 0.80
weight: 0.3
context_precision:
threshold: 0.75
weight: 0.2
safety:
threshold: 0.95
weight: 0.2
sample_size: 1000
timeout_minutes: 15
- name: "regression_tests"
description: "Compare against production baseline"
compare_against: "production_v1.2.0"
max_degradation: 0.05 # 5% worse is unacceptable
metrics: ["faithfulness", "relevancy", "cost", "latency"]
- name: "cost_validation"
description: "Ensure cost targets are met"
max_cost_per_query: 0.02
max_monthly_budget: 10000
alert_threshold: 0.8 # Alert at 80% of budget
- name: "latency_validation"
description: "Ensure latency SLAs are met"
max_p50_latency: 1500 # milliseconds
max_p95_latency: 3000
max_p99_latency: 5000
Production Sampling Strategy
You can't evaluate every production request (too expensive). Use intelligent sampling:
# production_sampler.py
import random
from typing import Dict, Any
from dataclasses import dataclass
@dataclass
class SamplingDecision:
should_evaluate: bool
reason: str
priority: int # 1=highest, 5=lowest
class ProductionSampler:
"""Intelligent sampling for production evaluation"""
def __init__(
self,
base_sample_rate: float = 0.02, # 2% baseline
cost_threshold: float = 0.05, # Sample expensive queries
latency_p95: float = 3000 # Sample slow queries
):
self.base_sample_rate = base_sample_rate
self.cost_threshold = cost_threshold
self.latency_p95 = latency_p95
def should_evaluate(self, request: Dict[str, Any]) -> SamplingDecision:
"""Decide whether to evaluate this production request"""
# Always sample: New users (establish baseline)
if request.get("is_new_user", False):
return SamplingDecision(
should_evaluate=True,
reason="new_user",
priority=2
)
# Always sample: Explicit negative feedback
if request.get("has_negative_feedback", False):
return SamplingDecision(
should_evaluate=True,
reason="negative_feedback",
priority=1
)
# Always sample: High cost queries (investigate efficiency)
if request.get("cost", 0) > self.cost_threshold:
return SamplingDecision(
should_evaluate=True,
reason="high_cost",
priority=2
)
# Always sample: Slow queries (identify bottlenecks)
if request.get("latency_ms", 0) > self.latency_p95:
return SamplingDecision(
should_evaluate=True,
reason="high_latency",
priority=3
)
# Random baseline sampling
if random.random() < self.base_sample_rate:
return SamplingDecision(
should_evaluate=True,
reason="random_sample",
priority=4
)
return SamplingDecision(
should_evaluate=False,
reason="not_selected",
priority=5
)
# Usage in production
sampler = ProductionSampler(
base_sample_rate=0.02, # 2% of all traffic
cost_threshold=0.05, # Queries over $0.05
latency_p95=3000 # Queries over 3 seconds
)
decision = sampler.should_evaluate(request)
if decision.should_evaluate:
# Queue for evaluation (async)
evaluation_queue.send_message({
"request_id": request["id"],
"query": request["query"],
"response": request["response"],
"context": request["retrieved_context"],
"priority": decision.priority,
"reason": decision.reason
})
Why this matters:
- Full evaluation of all traffic = too expensive
- Random sampling only = miss critical issues
- Intelligent sampling = catch problems while controlling costs
Pillar 3: CI/CD for LLMs
Traditional CI/CD deploys code. GenAI CI/CD deploys code, prompts, and models.
Model Version Control
# model_registry.py
from typing import Dict, Any
from datetime import datetime
class ModelRegistry:
"""Central registry for model versions and their performance"""
def register_model(
self,
model_id: str,
evaluation_scores: Dict[str, float],
cost_metrics: Dict[str, float],
metadata: Dict[str, Any]
) -> Dict:
"""Register a new model version with evaluation data"""
return {
"model_id": model_id,
"bedrock_arn": f"arn:aws:bedrock:us-east-1::foundation-model/{model_id}",
"registration_timestamp": datetime.now().isoformat(),
# Quality metrics from evaluation
"evaluation_scores": {
"faithfulness": evaluation_scores["faithfulness"],
"answer_relevancy": evaluation_scores["relevancy"],
"context_precision": evaluation_scores["precision"],
"safety_score": evaluation_scores["safety"]
},
# Cost and performance metrics
"cost_metrics": {
"cost_per_1k_input_tokens": cost_metrics["input"],
"cost_per_1k_output_tokens": cost_metrics["output"],
"avg_cost_per_query": cost_metrics["avg_query_cost"],
"latency_p50": cost_metrics["latency_p50"],
"latency_p95": cost_metrics["latency_p95"]
},
# Deployment metadata
"deployment_status": "candidate", # candidate, canary, production
"traffic_allocation": 0.0, # Start at 0%
"approval_status": "pending", # pending, approved, rejected
"approved_by": None,
# Rollback information
"previous_version": metadata.get("previous_version"),
"rollback_threshold": {
"min_quality_score": 0.80,
"max_error_rate": 0.05,
"max_cost_increase": 0.20 # 20% cost increase triggers rollback
}
}
def promote_to_production(
self,
model_id: str,
canary_results: Dict[str, float]
) -> bool:
"""Promote model to production if canary succeeds"""
# Check canary performance
quality_ok = canary_results["avg_quality"] >= 0.85
error_rate_ok = canary_results["error_rate"] < 0.02
cost_ok = canary_results["cost_increase"] < 0.20
if quality_ok and error_rate_ok and cost_ok:
self._update_traffic_allocation(model_id, 1.0)
return True
else:
self._rollback_to_previous(model_id)
return False
Canary Deployments for Prompts
Test new prompts on a small percentage of traffic before full rollout:
# prompt_router.py
from typing import Optional
import hashlib
class PromptRouter:
"""Route users to different prompt versions for A/B testing"""
def __init__(self, rollout_config: Dict):
self.config = rollout_config
def route_request(
self,
user_id: str,
experiment_id: Optional[str] = None
) -> str:
"""Deterministically route users to prompt versions"""
# Get user's routing bucket (consistent hashing)
user_bucket = self._get_user_bucket(user_id)
# Check if user is in experiment
if experiment_id:
experiment = self.config["experiments"].get(experiment_id)
if experiment and user_bucket < experiment["traffic_percentage"]:
return experiment["variant_version"]
# Default to production version
return self.config["production_version"]
def _get_user_bucket(self, user_id: str) -> int:
"""Hash user ID to bucket (0-99)"""
hash_value = hashlib.md5(user_id.encode()).hexdigest()
return int(hash_value, 16) % 100
# Configuration
rollout_config = {
"production_version": "v1.2.0",
"experiments": {
"exp_001_concise_responses": {
"variant_version": "v1.3.0",
"traffic_percentage": 10, # 10% of users
"start_date": "2025-01-20",
"metrics_to_track": ["response_length", "user_satisfaction"],
"success_criteria": {
"min_satisfaction": 4.2, # out of 5
"max_response_length": 500 # tokens
}
}
}
}
# Usage
router = PromptRouter(rollout_config)
# User makes a request
prompt_version = router.route_request(
user_id="user_12345",
experiment_id="exp_001_concise_responses"
)
# Load the appropriate prompt
prompt = load_prompt_version(prompt_version)
response = generate_response(prompt, user_query)
Automated Rollback Strategies
Define triggers that automatically rollback deployments:
# config/rollback_policy.yaml
rollback_policy:
# Define what triggers a rollback
triggers:
- name: "high_error_rate"
metric: "error_rate"
threshold: 0.05 # 5% error rate
window: "5min"
comparison: "greater_than"
severity: "critical"
- name: "quality_degradation"
metric: "avg_quality_score"
threshold: 0.75
window: "15min"
comparison: "less_than"
severity: "high"
- name: "cost_explosion"
metric: "cost_per_request"
threshold: 0.05 # $0.05 per request
window: "10min"
comparison: "greater_than"
severity: "high"
- name: "latency_spike"
metric: "p95_latency"
threshold: 5000 # 5 seconds
window: "5min"
comparison: "greater_than"
severity: "medium"
# What to do when triggered
actions:
- action: "rollback"
target: "last_stable_version"
execution: "immediate"
- action: "notify"
channels: ["slack://ops-team", "pagerduty://genai-oncall"]
include_metrics: true
- action: "create_incident"
severity: "high"
assign_to: "genai-team"
include_logs: true
- action: "stop_traffic"
variant: "canary"
percentage: 0 # Stop all traffic to new version
# Rollback procedure
rollback_procedure:
steps:
- verify_previous_version_exists
- stop_new_version_traffic
- route_100_percent_to_previous
- validate_metrics_improved
- send_postmortem_template
Implementation:
# rollback_manager.py
import boto3
from typing import Dict, List
class RollbackManager:
"""Automated rollback when quality degrades"""
def __init__(self, policy_config: Dict):
self.policy = policy_config
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
def check_rollback_triggers(self) -> List[str]:
"""Check if any rollback triggers are fired"""
triggered = []
for trigger in self.policy["triggers"]:
# Query CloudWatch for metric
metric_value = self._get_metric_value(
metric_name=trigger["metric"],
window=trigger["window"]
)
# Check threshold
should_trigger = self._compare_threshold(
value=metric_value,
threshold=trigger["threshold"],
comparison=trigger["comparison"]
)
if should_trigger:
triggered.append(trigger["name"])
return triggered
def execute_rollback(self, triggered_rules: List[str]):
"""Execute rollback actions"""
# 1. Stop traffic to new version
self._stop_canary_traffic()
# 2. Route to previous stable version
self._route_to_stable_version()
# 3. Notify team
self._send_notifications(triggered_rules)
# 4. Create incident
self._create_incident(triggered_rules)
def _stop_canary_traffic(self):
"""Immediately stop routing to canary version"""
# Implementation depends on your routing mechanism
# Could be Lambda@Edge, API Gateway, or custom router
pass
def _send_notifications(self, triggered_rules: List[str]):
"""Alert team about rollback"""
message = f"""
AUTOMATED ROLLBACK EXECUTED
Triggered by: {', '.join(triggered_rules)}
Timestamp: {datetime.now()}
Action: Reverted to previous stable version
Metrics at time of rollback:
- Error rate: {self._get_metric_value('error_rate', '5min')}
- Quality score: {self._get_metric_value('avg_quality_score', '15min')}
- Cost per request: ${self._get_metric_value('cost_per_request', '10min')}
Incident created: [Link to incident]
Runbook: [Link to postmortem runbook]
"""
self.sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789:genai-alerts',
Subject='GenAI System Rollback Executed',
Message=message
)
AWS Services for GenAIOps Foundations
Amazon Bedrock
Your foundation model layer:
- Access to leading models: Claude 4.5, Nova, Llama, Mistral, and more
- Model evaluation: Built-in comparison capabilities
- Guardrails: Content filtering and safety checks
- Custom models: Fine-tuning on your data
- Agents: Autonomous task execution with tool use
Example:
Amazon Bedrock AgentCore
The agentic platform that went GA in October 2025, providing a complete framework for building autonomous agents:
Key Components:
-
Runtime: Fully-managed agent execution environment
- Handles agent loops and reasoning
- Manages tool invocation
- Provides built-in error handling
-
Gateway: Secure tool integration
- OAuth 2.0 authentication
- API key management
- Rate limiting and throttling
-
Memory: Multi-tier memory system
- Short-term: Within-conversation context
- Long-term: Persistent across sessions
- Episodic: Historical interaction patterns
-
Identity: Multi-tenant authentication
- User-level permissions
- Role-based access control
- Audit logging
-
Evaluations (Preview): 13 built-in quality evaluators
- Faithfulness, relevance, coherence
- Safety and bias detection
- Custom metric definitions
-
Policy (Preview): Natural language policies
- Define agent behavior constraints
- Set data access rules
- Compliance enforcement
Example:
AWS CloudWatch GenAI Observability
Now GA as of October 2025, purpose-built for LLM monitoring:
Capabilities:
- Model Invocation Tracking
- AgentCore Agent Monitoring
- End-to-End Prompt Tracing via OpenTelemetry
-
Integration with Existing CloudWatch
- Logs, Metrics, Alarms all in one place
- No new tools to learn
- Unified dashboards
AWS X-Ray
Distributed tracing for complex GenAI workflows:
X-Ray Service Map shows:
- Which components are slowest
- Where errors occur most
- Cost attribution by component
- Dependency relationships
Building Your First GenAIOps Pipeline
Let's build a production-ready evaluation pipeline using the tools we've discussed.
Step 1: Set Up Evaluation Framework
# evaluation_pipeline.py
import boto3
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall
)
from datetime import datetime
from typing import List, Dict
class GenAIOpsEvaluator:
"""Production-ready evaluation pipeline"""
def __init__(self, region: str = 'us-east-1'):
self.bedrock = boto3.client('bedrock-runtime', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
def evaluate_rag_system(
self,
test_cases: List[Dict],
model_id: str = 'anthropic.claude-sonnet-4-20250514'
) -> Dict:
"""
Evaluate RAG system using RAGAS metrics
Args:
test_cases: List of test cases with query, contexts, answer, ground_truth
model_id: Bedrock model ID to use as evaluator
Returns:
Dict of evaluation scores
"""
# Prepare evaluation dataset
eval_data = {
"question": [tc["query"] for tc in test_cases],
"contexts": [tc["retrieved_contexts"] for tc in test_cases],
"answer": [tc["generated_answer"] for tc in test_cases],
"ground_truth": [tc["expected_answer"] for tc in test_cases]
}
# Run evaluation
print(f"Evaluating {len(test_cases)} test cases...")
results = evaluate(
eval_data,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall
],
llm=self._get_bedrock_llm(model_id)
)
# Convert to dict for easier handling
scores = {
"faithfulness": float(results["faithfulness"]),
"answer_relevancy": float(results["answer_relevancy"]),
"context_precision": float(results["context_precision"]),
"context_recall": float(results["context_recall"]),
"timestamp": datetime.now().isoformat(),
"test_case_count": len(test_cases),
"model_id": model_id
}
# Publish to CloudWatch
self._publish_metrics(scores)
# Store detailed results in S3
self._store_results(scores, results)
return scores
def _publish_metrics(self, scores: Dict):
"""Publish evaluation metrics to CloudWatch"""
namespace = "GenAI/Evaluation"
timestamp = datetime.now()
metric_data = [
{
'MetricName': metric_name,
'Value': value,
'Unit': 'None',
'Timestamp': timestamp,
'Dimensions': [
{'Name': 'ModelId', 'Value': scores['model_id']}
]
}
for metric_name, value in scores.items()
if isinstance(value, (int, float))
]
self.cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=metric_data
)
print(f"Published {len(metric_data)} metrics to CloudWatch")
def _store_results(self, scores: Dict, detailed_results: Dict):
"""Store evaluation results in S3 for audit trail"""
bucket = 'genai-evaluation-results'
key = f"evaluations/{datetime.now().strftime('%Y/%m/%d')}/{scores['model_id']}.json"
self.s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps({
"scores": scores,
"detailed_results": detailed_results
}, indent=2)
)
print(f"Stored results in s3://{bucket}/{key}")
def check_quality_gates(
self,
scores: Dict,
thresholds: Dict = None
) -> bool:
"""
Check if evaluation passes quality gates
Args:
scores: Evaluation scores
thresholds: Minimum acceptable scores
Returns:
True if all gates pass
"""
if thresholds is None:
thresholds = {
"faithfulness": 0.85,
"answer_relevancy": 0.80,
"context_precision": 0.75,
"context_recall": 0.75
}
passed = True
for metric, threshold in thresholds.items():
if scores.get(metric, 0) < threshold:
print(f" {metric}: {scores[metric]:.3f} < {threshold}")
passed = False
else:
print(f" {metric}: {scores[metric]:.3f} >= {threshold}")
return passed
Step 2: Create CI/CD Integration
# .github/workflows/evaluate-rag.yml
name: RAG Evaluation Pipeline
on:
pull_request:
paths:
- 'prompts/**'
- 'rag/**'
- 'tests/**'
push:
branches: [main]
schedule:
# Run nightly evaluation
- cron: '0 2 * * *'
env:
AWS_REGION: us-east-1
jobs:
evaluate:
runs-on: ubuntu-latest
permissions:
id-token: write # For OIDC
contents: read
pull-requests: write
steps:
- name: Checkout Code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
cache: 'pip'
- name: Install Dependencies
run: |
pip install -r requirements.txt
pip install boto3 ragas pandas numpy
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::123456789:role/github-actions-role
aws-region: ${{ env.AWS_REGION }}
- name: Run RAG Evaluation
id: evaluate
run: |
python scripts/evaluate_rag.py \
--test-suite tests/regression_tests.json \
--output results.json
# Export results for next steps
echo "results_path=results.json" >> $GITHUB_OUTPUT
- name: Check Quality Gates
id: quality_gates
run: |
python scripts/check_quality_gates.py \
--results results.json \
--min-faithfulness 0.85 \
--min-relevancy 0.80 \
--min-context-precision 0.75
# Capture exit code
echo "passed=$?" >> $GITHUB_OUTPUT
- name: Upload Results Artifact
if: always()
uses: actions/upload-artifact@v3
with:
name: evaluation-results
path: results/
retention-days: 30
- name: Comment on PR
if: github.event_name == 'pull_request'
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const results = JSON.parse(fs.readFileSync('results.json'));
const passed = results.all_gates_passed;
const emoji = passed ? '✅' : '❌';
const body = `## ${emoji} GenAI Evaluation Results
**Quality Metrics:**
- Faithfulness: ${results.faithfulness.toFixed(3)} ${results.faithfulness >= 0.85 ? '✅' : '❌'}
- Answer Relevancy: ${results.answer_relevancy.toFixed(3)} ${results.answer_relevancy >= 0.80 ? '✅' : '❌'}
- Context Precision: ${results.context_precision.toFixed(3)} ${results.context_precision >= 0.75 ? '✅' : '❌'}
- Context Recall: ${results.context_recall.toFixed(3)} ${results.context_recall >= 0.75 ? '✅' : '❌'}
**Cost & Performance:**
- Avg Cost per Query: $${results.avg_cost.toFixed(4)}
- P95 Latency: ${results.p95_latency}ms
**Test Coverage:**
- Test Cases: ${results.test_case_count}
- Model: ${results.model_id}
${passed ? 'All quality gates passed! Ready to merge.' : 'Quality gates failed. Please review before merging.'}
<details>
<summary>View detailed results</summary>
\`\`\`json
${JSON.stringify(results, null, 2)}
\`\`\`
</details>
`;
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: body
});
- name: Fail if Quality Gates Failed
if: steps.quality_gates.outputs.passed != '0'
run: |
echo "Quality gates failed. Blocking merge."
exit 1
Step 3: Set Up CloudWatch Dashboard
# create_dashboard.py
import boto3
import json
cloudwatch = boto3.client('cloudwatch')
dashboard_body = {
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["GenAI/Evaluation", "faithfulness", {"stat": "Average"}],
[".", "answer_relevancy", {"stat": "Average"}],
[".", "context_precision", {"stat": "Average"}],
[".", "context_recall", {"stat": "Average"}]
],
"period": 300,
"stat": "Average",
"region": "us-east-1",
"title": "RAG Quality Metrics",
"yAxis": {
"left": {"min": 0, "max": 1}
}
}
},
{
"type": "metric",
"properties": {
"metrics": [
["AWS/Bedrock", "Invocations", {"stat": "Sum"}],
[".", "InputTokenCount", {"stat": "Sum"}],
[".", "OutputTokenCount", {"stat": "Sum"}]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "Bedrock Usage"
}
},
{
"type": "metric",
"properties": {
"metrics": [
["GenAI/Cost", "TotalCost", {"stat": "Sum"}],
[".", "AvgCostPerQuery", {"stat": "Average"}]
],
"period": 300,
"region": "us-east-1",
"title": "Cost Metrics"
}
}
]
}
cloudwatch.put_dashboard(
DashboardName='GenAI-Operations',
DashboardBody=json.dumps(dashboard_body)
)
print("Dashboard created: GenAI-Operations")
Cost Governance in GenAIOps
Token consumption is your new primary cost driver. You need real-time tracking and budgets.
Cost Tracking Implementation
# cost_tracker.py
import boto3
from typing import Dict, List
from datetime import datetime
class CostTracker:
"""Track and manage GenAI costs"""
# Updated pricing as of October 2025
MODEL_COSTS = {
"anthropic.claude-sonnet-4-20250514": {
"input": 0.003 / 1000, # $0.003 per 1K input tokens
"output": 0.015 / 1000 # $0.015 per 1K output tokens
},
"anthropic.claude-haiku-4-5-20251001": {
"input": 0.0008 / 1000, # $0.0008 per 1K input tokens
"output": 0.004 / 1000 # $0.004 per 1K output tokens
},
"anthropic.claude-opus-4-20250514": {
"input": 0.015 / 1000, # $0.015 per 1K input tokens
"output": 0.075 / 1000 # $0.075 per 1K output tokens
},
"amazon.titan-embed-text-v2:0": {
"input": 0.0001 / 1000, # $0.0001 per 1K tokens
"output": 0 # No output cost for embeddings
}
}
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
def calculate_request_cost(
self,
model_id: str,
input_tokens: int,
output_tokens: int
) -> float:
"""Calculate cost for a single request"""
if model_id not in self.MODEL_COSTS:
raise ValueError(f"Unknown model: {model_id}")
costs = self.MODEL_COSTS[model_id]
total_cost = (
(input_tokens * costs["input"]) +
(output_tokens * costs["output"])
)
return total_cost
def track_session_cost(
self,
session_id: str,
requests: List[Dict]
) -> Dict:
"""Track cumulative cost for user session"""
total_cost = 0
total_input_tokens = 0
total_output_tokens = 0
for req in requests:
cost = self.calculate_request_cost(
model_id=req["model"],
input_tokens=req["input_tokens"],
output_tokens=req["output_tokens"]
)
total_cost += cost
total_input_tokens += req["input_tokens"]
total_output_tokens += req["output_tokens"]
# Publish to CloudWatch
self._publish_cost_metrics(
session_id=session_id,
total_cost=total_cost,
request_count=len(requests)
)
return {
"session_id": session_id,
"total_cost": total_cost,
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"request_count": len(requests),
"avg_cost_per_request": total_cost / len(requests) if requests else 0
}
def _publish_cost_metrics(
self,
session_id: str,
total_cost: float,
request_count: int
):
"""Publish cost metrics to CloudWatch"""
self.cloudwatch.put_metric_data(
Namespace="GenAI/Cost",
MetricData=[
{
'MetricName': 'SessionCost',
'Value': total_cost,
'Unit': 'None',
'Timestamp': datetime.now(),
'Dimensions': [
{'Name': 'SessionId', 'Value': session_id}
]
},
{
'MetricName': 'AvgCostPerRequest',
'Value': total_cost / request_count if request_count > 0 else 0,
'Unit': 'None',
'Timestamp': datetime.now()
}
]
)
# Usage
tracker = CostTracker()
# Track a session
session_costs = tracker.track_session_cost(
session_id="session_123",
requests=[
{
"model": "anthropic.claude-sonnet-4-20250514",
"input_tokens": 1500,
"output_tokens": 500
},
{
"model": "anthropic.claude-sonnet-4-20250514",
"input_tokens": 2000,
"output_tokens": 800
}
]
)
print(f"Session cost: ${session_costs['total_cost']:.4f}")
print(f"Avg per request: ${session_costs['avg_cost_per_request']:.4f}")
Budget Alerts
# budget_alerting.py
import boto3
cloudwatch = boto3.client('cloudwatch')
# Create alarm for daily cost threshold
cloudwatch.put_metric_alarm(
AlarmName='GenAI-DailyCostThreshold',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='SessionCost',
Namespace='GenAI/Cost',
Period=86400, # 24 hours
Statistic='Sum',
Threshold=100.0, # $100 per day
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789:genai-cost-alerts'
],
AlarmDescription='Alert when daily GenAI costs exceed $100'
)
# Create alarm for cost per request spike
cloudwatch.put_metric_alarm(
AlarmName='GenAI-CostPerRequestSpike',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='AvgCostPerRequest',
Namespace='GenAI/Cost',
Period=300, # 5 minutes
Statistic='Average',
Threshold=0.05, # $0.05 per request
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-east-1:123456789:genai-cost-alerts'
],
AlarmDescription='Alert when avg cost per request exceeds $0.05'
)
Key Takeaways
GenAIOps is fundamentally different from DevOps - Non-deterministic systems require evaluation frameworks, not just testing. Quality is multi-dimensional: faithfulness, relevance, safety, and cost must all be tracked.
Evaluation must be automated and continuous - Manual review doesn't scale. Build CI/CD pipelines with quality gates that block deployments when scores degrade. Use intelligent sampling in production to catch issues early.
AWS provides production-ready tooling - CloudWatch GenAI Observability (GA), Bedrock AgentCore Evaluations (Preview), and X-Ray give you purpose-built infrastructure for GenAI operations. Don't build from scratch.
Start with foundations before optimization - Version control for prompts, basic CI/CD, cost tracking, and automated evaluation should come before advanced features like auto-scaling or multi-region deployments.
Cost governance is non-negotiable - Token consumption can spike unexpectedly. Real-time cost tracking, per-user budgets, and automated alerts prevent bill shock.
Progressive maturity is the path - Most organizations are at Level 0-1. This series will help you reach Level 2-3, where automated evaluation, monitoring, and rollback create reliable production systems.
What's Next in This Series
Part 2: RAG Evaluation & Quality Metrics
We'll dive deep into evaluating Retrieval-Augmented Generation systems:
- Using Amazon Bedrock Evaluations with the RAGAS framework
- Measuring retrieval effectiveness: context precision and recall
- Answer quality evaluation: relevance, faithfulness, and coherence
- Building comprehensive evaluation datasets with synthetic data
- Implementing automated quality gates for RAG systems
- Detecting and handling retrieval failures in production
Part 3: Production Monitoring & Observability
Building comprehensive observability for GenAI systems:
- CloudWatch GenAI Observability dashboards
- Distributed tracing with AWS X-Ray
- Agent monitoring with Bedrock AgentCore
- Cost attribution and anomaly detection
- Building runbooks for common production issues
Part 4: Production Hardening & Scale
Taking GenAI systems to enterprise production:
- Multi-region deployments for resilience
- Auto-scaling strategies for variable load
- Security hardening with VPC endpoints and encryption
- Compliance automation (GDPR, HIPAA, SOC 2)
- Disaster recovery and business continuity
Additional Resources
AWS Documentation:
- AWS CloudWatch GenAI Observability
- Amazon Bedrock AgentCore
- Amazon Bedrock User Guide
- AWS X-Ray Developer Guide
Evaluation Frameworks:
Blog Posts & Announcements:
Community:
Let's Connect!
Building GenAIOps systems on AWS? Let's share experiences!
Follow me for Part 2 on RAG Evaluation & Quality Metrics coming next. We'll explore how to measure and improve the quality of your RAG systems using Amazon Bedrock Evaluations.
About the Author
Connect with me on:
Tags: #aws #genai #mlops #cloudwatch #bedrock #devops #genaops #rag #llm #observability


















Top comments (0)