DEV Community

Cover image for GenAIOps on AWS: Building Production-Ready GenAI Systems - Part 1
Shoaibali Mir
Shoaibali Mir

Posted on • Edited on

GenAIOps on AWS: Building Production-Ready GenAI Systems - Part 1

Reading time: ~15-20 minutes

Level: Intermediate to Advanced

Series: Part 1 of 4 - GenAIOps Foundations

What you'll learn: How GenAIOps differs from DevOps, AWS tooling for GenAI operations, and building your first evaluation pipeline


The Problem: Traditional DevOps Isn't Enough

Your GenAI prototype works beautifully in Jupyter notebooks. Users love the demo. Leadership wants it in production next week.

Then reality hits:

Traditional DevOps assumes deterministic systems where function(input) always returns the same output. GenAI systems fundamentally break this assumption. You can't just "deploy code and monitor uptime" anymore.

This is where GenAIOps comes in.


What is GenAIOps?

GenAIOps extends DevOps principles to handle the unique operational challenges of production GenAI systems. The key difference? You're now managing four critical assets instead of one:

Traditional DevOps:  CODE
                      ↓
GenAIOps:            CODE + DATA + MODELS + PROMPTS
Enter fullscreen mode Exit fullscreen mode

Each asset requires versioning, testing, monitoring, and governance. Miss any one, and production reliability suffers.

Why AWS Now? October 2025 marked a turning point with two key launches:

  • CloudWatch GenAI Observability (GA): Purpose-built monitoring for LLM systems
  • Amazon Bedrock AgentCore Evaluations (Preview): 13 built-in quality metrics

AWS is signaling the industry is ready to operationalize GenAI at scale.


Why GenAIOps is Different: The Four Fundamental Shifts

1. Non-Deterministic Outputs

Traditional Software:

GenAI Systems:

Impact: Traditional unit tests fail. You need evaluation frameworks measuring quality dimensions, not binary correctness.

2. Quality vs. Correctness

Traditional software testing:

GenAI system evaluation:

3. Complex Multi-Component Systems

Modern GenAI applications are orchestrations, not single API calls:

User Query
    ↓
Input Guardrails (validate, sanitize)
    ↓
Query Rewriting (optimize for retrieval)
    ↓
Vector Search (retrieve relevant context)
    ↓
Context Ranking (re-rank by relevance)
    ↓
Prompt Construction (assemble final prompt)
    ↓
LLM Generation (multiple model routing)
    ↓
Output Guardrails (safety, quality checks)
    ↓
Response to User
Enter fullscreen mode Exit fullscreen mode

Each component can fail independently. Traditional monitoring doesn't capture these multi-step workflows.

4. Token Economics

Every operation has a cost tied to token consumption:

You need real-time cost tracking per user, per session, and per model to avoid budget surprises.


The GenAIOps Maturity Model

Where does your organization sit? Understanding your current level helps prioritize improvements.

Level 0: Ad-Hoc Experimentation

Characteristics:

  • Direct API calls in notebooks
  • No version control for prompts
  • Manual testing through UI
  • No cost tracking
  • Production = "it works on my laptop"

Pain Points:

Level 1: Basic Pipeline

Characteristics:

  • Prompts in version control (Git)
  • Basic CI/CD for deployment
  • Logging to CloudWatch
  • Manual evaluation before releases
  • Static prompts and models

Progress:

Level 2: Monitored Production

Characteristics:

  • Automated evaluation pipelines
  • Real-time observability dashboards
  • A/B testing capabilities
  • Cost monitoring and alerting
  • Prompt versioning with rollback

Capabilities:

Level 3: Intelligent Operations

Characteristics:

  • Continuous evaluation from production traffic
  • Automated quality regression detection
  • Dynamic model routing based on performance
  • Human-in-the-loop feedback integration
  • Automated prompt optimization

Advanced Patterns:

Level 4: Autonomous GenAIOps

Characteristics:

  • Self-healing systems with automated remediation
  • Predictive quality management
  • Automated cost optimization
  • Context-aware evaluation
  • Multi-dimensional optimization (quality, cost, latency)

Reality Check: Most organizations are between Level 0 and 1. This series will help you reach Level 2-3, which is the sweet spot for most production systems.


GenAIOps Core Pillars

Pillar 1: Data Governance for GenAI

Data governance in GenAI extends far beyond traditional data management. You're managing prompts as code, context windows as data, and model outputs as assets.

Prompt Governance

Why it matters: Prompts are the "code" of GenAI systems. A poorly worded prompt can cause production failures just like buggy code.

Training Data Lineage

Track the complete journey of data through your GenAI pipeline:

Why it matters: Regulatory compliance (GDPR, HIPAA) requires proving where training data came from and how it was processed.

Context Window Management

In RAG systems, track what context was retrieved and used:

Why track this? When a user reports incorrect information, you need to:

  1. Identify what context was retrieved
  2. Trace back to source documents
  3. Determine if retrieval failed or generation failed
  4. Calculate cost of the interaction

Pillar 2: Automated Evaluation Pipelines

Manual evaluation doesn't scale. You need automated pipelines that evaluate every change before production.

Pre-Deployment Evaluation

Quality Gates in CI/CD:

# .github/workflows/evaluate-genai.yml
name: GenAI Evaluation Pipeline

on:
  pull_request:
    paths:
      - 'prompts/**'
      - 'rag_pipeline/**'
      - 'evaluation/**'

jobs:
  evaluate:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout Code
        uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install Dependencies
        run: |
          pip install boto3 ragas pandas numpy
          pip install -r requirements.txt

      - name: Run Synthetic Test Suite
        env:
          AWS_REGION: us-east-1
        run: |
          python scripts/evaluate_rag.py \
            --test-suite tests/synthetic_tests.json \
            --min-faithfulness 0.85 \
            --min-relevancy 0.80 \
            --min-context-precision 0.75 \
            --sample-size 1000

      - name: Run Regression Tests
        run: |
          python scripts/regression_tests.py \
            --baseline production_baseline.json \
            --current current_results.json \
            --max-degradation 0.05

      - name: Validate Cost Budget
        run: |
          python scripts/cost_validator.py \
            --max-cost-per-query 0.02 \
            --max-latency-p95 3000

      - name: Upload Results
        if: always()
        uses: actions/upload-artifact@v3
        with:
          name: evaluation-results
          path: results/

      - name: Comment on PR
        if: always()
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(fs.readFileSync('results/summary.json'));

            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: `## Evaluation Results

               Faithfulness: ${results.faithfulness}
               Relevancy: ${results.relevancy}
               Context Precision: ${results.context_precision}
               Avg Cost: $${results.avg_cost}
               P95 Latency: ${results.p95_latency}ms

              ${results.passed ? 'All checks passed!' : 'Quality gates failed'}`
            });
Enter fullscreen mode Exit fullscreen mode

Evaluation Configuration:

# config/evaluation_pipeline.yaml
evaluation_pipeline:
  stages:
    - name: "synthetic_test_suite"
      description: "Test on curated examples with known good answers"
      metrics:
        faithfulness:
          threshold: 0.85
          weight: 0.3
        answer_relevancy:
          threshold: 0.80
          weight: 0.3
        context_precision:
          threshold: 0.75
          weight: 0.2
        safety:
          threshold: 0.95
          weight: 0.2
      sample_size: 1000
      timeout_minutes: 15

    - name: "regression_tests"
      description: "Compare against production baseline"
      compare_against: "production_v1.2.0"
      max_degradation: 0.05  # 5% worse is unacceptable
      metrics: ["faithfulness", "relevancy", "cost", "latency"]

    - name: "cost_validation"
      description: "Ensure cost targets are met"
      max_cost_per_query: 0.02
      max_monthly_budget: 10000
      alert_threshold: 0.8  # Alert at 80% of budget

    - name: "latency_validation"
      description: "Ensure latency SLAs are met"
      max_p50_latency: 1500  # milliseconds
      max_p95_latency: 3000
      max_p99_latency: 5000
Enter fullscreen mode Exit fullscreen mode

Production Sampling Strategy

You can't evaluate every production request (too expensive). Use intelligent sampling:

# production_sampler.py
import random
from typing import Dict, Any
from dataclasses import dataclass

@dataclass
class SamplingDecision:
    should_evaluate: bool
    reason: str
    priority: int  # 1=highest, 5=lowest

class ProductionSampler:
    """Intelligent sampling for production evaluation"""

    def __init__(
        self,
        base_sample_rate: float = 0.02,  # 2% baseline
        cost_threshold: float = 0.05,     # Sample expensive queries
        latency_p95: float = 3000         # Sample slow queries
    ):
        self.base_sample_rate = base_sample_rate
        self.cost_threshold = cost_threshold
        self.latency_p95 = latency_p95

    def should_evaluate(self, request: Dict[str, Any]) -> SamplingDecision:
        """Decide whether to evaluate this production request"""

        # Always sample: New users (establish baseline)
        if request.get("is_new_user", False):
            return SamplingDecision(
                should_evaluate=True,
                reason="new_user",
                priority=2
            )

        # Always sample: Explicit negative feedback
        if request.get("has_negative_feedback", False):
            return SamplingDecision(
                should_evaluate=True,
                reason="negative_feedback",
                priority=1
            )

        # Always sample: High cost queries (investigate efficiency)
        if request.get("cost", 0) > self.cost_threshold:
            return SamplingDecision(
                should_evaluate=True,
                reason="high_cost",
                priority=2
            )

        # Always sample: Slow queries (identify bottlenecks)
        if request.get("latency_ms", 0) > self.latency_p95:
            return SamplingDecision(
                should_evaluate=True,
                reason="high_latency",
                priority=3
            )

        # Random baseline sampling
        if random.random() < self.base_sample_rate:
            return SamplingDecision(
                should_evaluate=True,
                reason="random_sample",
                priority=4
            )

        return SamplingDecision(
            should_evaluate=False,
            reason="not_selected",
            priority=5
        )

# Usage in production
sampler = ProductionSampler(
    base_sample_rate=0.02,      # 2% of all traffic
    cost_threshold=0.05,         # Queries over $0.05
    latency_p95=3000            # Queries over 3 seconds
)

decision = sampler.should_evaluate(request)

if decision.should_evaluate:
    # Queue for evaluation (async)
    evaluation_queue.send_message({
        "request_id": request["id"],
        "query": request["query"],
        "response": request["response"],
        "context": request["retrieved_context"],
        "priority": decision.priority,
        "reason": decision.reason
    })
Enter fullscreen mode Exit fullscreen mode

Why this matters:

  • Full evaluation of all traffic = too expensive
  • Random sampling only = miss critical issues
  • Intelligent sampling = catch problems while controlling costs

Pillar 3: CI/CD for LLMs

Traditional CI/CD deploys code. GenAI CI/CD deploys code, prompts, and models.

Model Version Control

# model_registry.py
from typing import Dict, Any
from datetime import datetime

class ModelRegistry:
    """Central registry for model versions and their performance"""

    def register_model(
        self,
        model_id: str,
        evaluation_scores: Dict[str, float],
        cost_metrics: Dict[str, float],
        metadata: Dict[str, Any]
    ) -> Dict:
        """Register a new model version with evaluation data"""

        return {
            "model_id": model_id,
            "bedrock_arn": f"arn:aws:bedrock:us-east-1::foundation-model/{model_id}",
            "registration_timestamp": datetime.now().isoformat(),

            # Quality metrics from evaluation
            "evaluation_scores": {
                "faithfulness": evaluation_scores["faithfulness"],
                "answer_relevancy": evaluation_scores["relevancy"],
                "context_precision": evaluation_scores["precision"],
                "safety_score": evaluation_scores["safety"]
            },

            # Cost and performance metrics
            "cost_metrics": {
                "cost_per_1k_input_tokens": cost_metrics["input"],
                "cost_per_1k_output_tokens": cost_metrics["output"],
                "avg_cost_per_query": cost_metrics["avg_query_cost"],
                "latency_p50": cost_metrics["latency_p50"],
                "latency_p95": cost_metrics["latency_p95"]
            },

            # Deployment metadata
            "deployment_status": "candidate",  # candidate, canary, production
            "traffic_allocation": 0.0,          # Start at 0%
            "approval_status": "pending",       # pending, approved, rejected
            "approved_by": None,

            # Rollback information
            "previous_version": metadata.get("previous_version"),
            "rollback_threshold": {
                "min_quality_score": 0.80,
                "max_error_rate": 0.05,
                "max_cost_increase": 0.20  # 20% cost increase triggers rollback
            }
        }

    def promote_to_production(
        self,
        model_id: str,
        canary_results: Dict[str, float]
    ) -> bool:
        """Promote model to production if canary succeeds"""

        # Check canary performance
        quality_ok = canary_results["avg_quality"] >= 0.85
        error_rate_ok = canary_results["error_rate"] < 0.02
        cost_ok = canary_results["cost_increase"] < 0.20

        if quality_ok and error_rate_ok and cost_ok:
            self._update_traffic_allocation(model_id, 1.0)
            return True
        else:
            self._rollback_to_previous(model_id)
            return False
Enter fullscreen mode Exit fullscreen mode

Canary Deployments for Prompts

Test new prompts on a small percentage of traffic before full rollout:

# prompt_router.py
from typing import Optional
import hashlib

class PromptRouter:
    """Route users to different prompt versions for A/B testing"""

    def __init__(self, rollout_config: Dict):
        self.config = rollout_config

    def route_request(
        self,
        user_id: str,
        experiment_id: Optional[str] = None
    ) -> str:
        """Deterministically route users to prompt versions"""

        # Get user's routing bucket (consistent hashing)
        user_bucket = self._get_user_bucket(user_id)

        # Check if user is in experiment
        if experiment_id:
            experiment = self.config["experiments"].get(experiment_id)
            if experiment and user_bucket < experiment["traffic_percentage"]:
                return experiment["variant_version"]

        # Default to production version
        return self.config["production_version"]

    def _get_user_bucket(self, user_id: str) -> int:
        """Hash user ID to bucket (0-99)"""
        hash_value = hashlib.md5(user_id.encode()).hexdigest()
        return int(hash_value, 16) % 100

# Configuration
rollout_config = {
    "production_version": "v1.2.0",
    "experiments": {
        "exp_001_concise_responses": {
            "variant_version": "v1.3.0",
            "traffic_percentage": 10,  # 10% of users
            "start_date": "2025-01-20",
            "metrics_to_track": ["response_length", "user_satisfaction"],
            "success_criteria": {
                "min_satisfaction": 4.2,  # out of 5
                "max_response_length": 500  # tokens
            }
        }
    }
}

# Usage
router = PromptRouter(rollout_config)

# User makes a request
prompt_version = router.route_request(
    user_id="user_12345",
    experiment_id="exp_001_concise_responses"
)

# Load the appropriate prompt
prompt = load_prompt_version(prompt_version)
response = generate_response(prompt, user_query)
Enter fullscreen mode Exit fullscreen mode

Automated Rollback Strategies

Define triggers that automatically rollback deployments:

# config/rollback_policy.yaml
rollback_policy:
  # Define what triggers a rollback
  triggers:
    - name: "high_error_rate"
      metric: "error_rate"
      threshold: 0.05  # 5% error rate
      window: "5min"
      comparison: "greater_than"
      severity: "critical"

    - name: "quality_degradation"
      metric: "avg_quality_score"
      threshold: 0.75
      window: "15min"
      comparison: "less_than"
      severity: "high"

    - name: "cost_explosion"
      metric: "cost_per_request"
      threshold: 0.05  # $0.05 per request
      window: "10min"
      comparison: "greater_than"
      severity: "high"

    - name: "latency_spike"
      metric: "p95_latency"
      threshold: 5000  # 5 seconds
      window: "5min"
      comparison: "greater_than"
      severity: "medium"

  # What to do when triggered
  actions:
    - action: "rollback"
      target: "last_stable_version"
      execution: "immediate"

    - action: "notify"
      channels: ["slack://ops-team", "pagerduty://genai-oncall"]
      include_metrics: true

    - action: "create_incident"
      severity: "high"
      assign_to: "genai-team"
      include_logs: true

    - action: "stop_traffic"
      variant: "canary"
      percentage: 0  # Stop all traffic to new version

  # Rollback procedure
  rollback_procedure:
    steps:
      - verify_previous_version_exists
      - stop_new_version_traffic
      - route_100_percent_to_previous
      - validate_metrics_improved
      - send_postmortem_template
Enter fullscreen mode Exit fullscreen mode

Implementation:

# rollback_manager.py
import boto3
from typing import Dict, List

class RollbackManager:
    """Automated rollback when quality degrades"""

    def __init__(self, policy_config: Dict):
        self.policy = policy_config
        self.cloudwatch = boto3.client('cloudwatch')
        self.sns = boto3.client('sns')

    def check_rollback_triggers(self) -> List[str]:
        """Check if any rollback triggers are fired"""

        triggered = []

        for trigger in self.policy["triggers"]:
            # Query CloudWatch for metric
            metric_value = self._get_metric_value(
                metric_name=trigger["metric"],
                window=trigger["window"]
            )

            # Check threshold
            should_trigger = self._compare_threshold(
                value=metric_value,
                threshold=trigger["threshold"],
                comparison=trigger["comparison"]
            )

            if should_trigger:
                triggered.append(trigger["name"])

        return triggered

    def execute_rollback(self, triggered_rules: List[str]):
        """Execute rollback actions"""

        # 1. Stop traffic to new version
        self._stop_canary_traffic()

        # 2. Route to previous stable version
        self._route_to_stable_version()

        # 3. Notify team
        self._send_notifications(triggered_rules)

        # 4. Create incident
        self._create_incident(triggered_rules)

    def _stop_canary_traffic(self):
        """Immediately stop routing to canary version"""
        # Implementation depends on your routing mechanism
        # Could be Lambda@Edge, API Gateway, or custom router
        pass

    def _send_notifications(self, triggered_rules: List[str]):
        """Alert team about rollback"""

        message = f"""
         AUTOMATED ROLLBACK EXECUTED

        Triggered by: {', '.join(triggered_rules)}
        Timestamp: {datetime.now()}
        Action: Reverted to previous stable version

        Metrics at time of rollback:
        - Error rate: {self._get_metric_value('error_rate', '5min')}
        - Quality score: {self._get_metric_value('avg_quality_score', '15min')}
        - Cost per request: ${self._get_metric_value('cost_per_request', '10min')}

        Incident created: [Link to incident]
        Runbook: [Link to postmortem runbook]
        """

        self.sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789:genai-alerts',
            Subject='GenAI System Rollback Executed',
            Message=message
        )
Enter fullscreen mode Exit fullscreen mode

AWS Services for GenAIOps Foundations

Amazon Bedrock

Your foundation model layer:

  • Access to leading models: Claude 4.5, Nova, Llama, Mistral, and more
  • Model evaluation: Built-in comparison capabilities
  • Guardrails: Content filtering and safety checks
  • Custom models: Fine-tuning on your data
  • Agents: Autonomous task execution with tool use

Example:

Amazon Bedrock AgentCore

The agentic platform that went GA in October 2025, providing a complete framework for building autonomous agents:

Key Components:

  1. Runtime: Fully-managed agent execution environment

    • Handles agent loops and reasoning
    • Manages tool invocation
    • Provides built-in error handling
  2. Gateway: Secure tool integration

    • OAuth 2.0 authentication
    • API key management
    • Rate limiting and throttling
  3. Memory: Multi-tier memory system

    • Short-term: Within-conversation context
    • Long-term: Persistent across sessions
    • Episodic: Historical interaction patterns
  4. Identity: Multi-tenant authentication

    • User-level permissions
    • Role-based access control
    • Audit logging
  5. Evaluations (Preview): 13 built-in quality evaluators

    • Faithfulness, relevance, coherence
    • Safety and bias detection
    • Custom metric definitions
  6. Policy (Preview): Natural language policies

    • Define agent behavior constraints
    • Set data access rules
    • Compliance enforcement

Example:

AWS CloudWatch GenAI Observability

Now GA as of October 2025, purpose-built for LLM monitoring:

Capabilities:

  1. Model Invocation Tracking

  1. AgentCore Agent Monitoring

  1. End-to-End Prompt Tracing via OpenTelemetry

  1. Integration with Existing CloudWatch
    • Logs, Metrics, Alarms all in one place
    • No new tools to learn
    • Unified dashboards

AWS X-Ray

Distributed tracing for complex GenAI workflows:

X-Ray Service Map shows:

  • Which components are slowest
  • Where errors occur most
  • Cost attribution by component
  • Dependency relationships

Building Your First GenAIOps Pipeline

Let's build a production-ready evaluation pipeline using the tools we've discussed.

Step 1: Set Up Evaluation Framework

# evaluation_pipeline.py
import boto3
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall
)
from datetime import datetime
from typing import List, Dict

class GenAIOpsEvaluator:
    """Production-ready evaluation pipeline"""

    def __init__(self, region: str = 'us-east-1'):
        self.bedrock = boto3.client('bedrock-runtime', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)

    def evaluate_rag_system(
        self,
        test_cases: List[Dict],
        model_id: str = 'anthropic.claude-sonnet-4-20250514'
    ) -> Dict:
        """
        Evaluate RAG system using RAGAS metrics

        Args:
            test_cases: List of test cases with query, contexts, answer, ground_truth
            model_id: Bedrock model ID to use as evaluator

        Returns:
            Dict of evaluation scores
        """

        # Prepare evaluation dataset
        eval_data = {
            "question": [tc["query"] for tc in test_cases],
            "contexts": [tc["retrieved_contexts"] for tc in test_cases],
            "answer": [tc["generated_answer"] for tc in test_cases],
            "ground_truth": [tc["expected_answer"] for tc in test_cases]
        }

        # Run evaluation
        print(f"Evaluating {len(test_cases)} test cases...")
        results = evaluate(
            eval_data,
            metrics=[
                faithfulness,
                answer_relevancy,
                context_precision,
                context_recall
            ],
            llm=self._get_bedrock_llm(model_id)
        )

        # Convert to dict for easier handling
        scores = {
            "faithfulness": float(results["faithfulness"]),
            "answer_relevancy": float(results["answer_relevancy"]),
            "context_precision": float(results["context_precision"]),
            "context_recall": float(results["context_recall"]),
            "timestamp": datetime.now().isoformat(),
            "test_case_count": len(test_cases),
            "model_id": model_id
        }

        # Publish to CloudWatch
        self._publish_metrics(scores)

        # Store detailed results in S3
        self._store_results(scores, results)

        return scores

    def _publish_metrics(self, scores: Dict):
        """Publish evaluation metrics to CloudWatch"""

        namespace = "GenAI/Evaluation"
        timestamp = datetime.now()

        metric_data = [
            {
                'MetricName': metric_name,
                'Value': value,
                'Unit': 'None',
                'Timestamp': timestamp,
                'Dimensions': [
                    {'Name': 'ModelId', 'Value': scores['model_id']}
                ]
            }
            for metric_name, value in scores.items()
            if isinstance(value, (int, float))
        ]

        self.cloudwatch.put_metric_data(
            Namespace=namespace,
            MetricData=metric_data
        )

        print(f"Published {len(metric_data)} metrics to CloudWatch")

    def _store_results(self, scores: Dict, detailed_results: Dict):
        """Store evaluation results in S3 for audit trail"""

        bucket = 'genai-evaluation-results'
        key = f"evaluations/{datetime.now().strftime('%Y/%m/%d')}/{scores['model_id']}.json"

        self.s3.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps({
                "scores": scores,
                "detailed_results": detailed_results
            }, indent=2)
        )

        print(f"Stored results in s3://{bucket}/{key}")

    def check_quality_gates(
        self,
        scores: Dict,
        thresholds: Dict = None
    ) -> bool:
        """
        Check if evaluation passes quality gates

        Args:
            scores: Evaluation scores
            thresholds: Minimum acceptable scores

        Returns:
            True if all gates pass
        """

        if thresholds is None:
            thresholds = {
                "faithfulness": 0.85,
                "answer_relevancy": 0.80,
                "context_precision": 0.75,
                "context_recall": 0.75
            }

        passed = True
        for metric, threshold in thresholds.items():
            if scores.get(metric, 0) < threshold:
                print(f" {metric}: {scores[metric]:.3f} < {threshold}")
                passed = False
            else:
                print(f" {metric}: {scores[metric]:.3f} >= {threshold}")

        return passed
Enter fullscreen mode Exit fullscreen mode

Step 2: Create CI/CD Integration

# .github/workflows/evaluate-rag.yml
name: RAG Evaluation Pipeline

on:
  pull_request:
    paths:
      - 'prompts/**'
      - 'rag/**'
      - 'tests/**'
  push:
    branches: [main]
  schedule:
    # Run nightly evaluation
    - cron: '0 2 * * *'

env:
  AWS_REGION: us-east-1

jobs:
  evaluate:
    runs-on: ubuntu-latest

    permissions:
      id-token: write  # For OIDC
      contents: read
      pull-requests: write

    steps:
      - name: Checkout Code
        uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install Dependencies
        run: |
          pip install -r requirements.txt
          pip install boto3 ragas pandas numpy

      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::123456789:role/github-actions-role
          aws-region: ${{ env.AWS_REGION }}

      - name: Run RAG Evaluation
        id: evaluate
        run: |
          python scripts/evaluate_rag.py \
            --test-suite tests/regression_tests.json \
            --output results.json

          # Export results for next steps
          echo "results_path=results.json" >> $GITHUB_OUTPUT

      - name: Check Quality Gates
        id: quality_gates
        run: |
          python scripts/check_quality_gates.py \
            --results results.json \
            --min-faithfulness 0.85 \
            --min-relevancy 0.80 \
            --min-context-precision 0.75

          # Capture exit code
          echo "passed=$?" >> $GITHUB_OUTPUT

      - name: Upload Results Artifact
        if: always()
        uses: actions/upload-artifact@v3
        with:
          name: evaluation-results
          path: results/
          retention-days: 30

      - name: Comment on PR
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(fs.readFileSync('results.json'));

            const passed = results.all_gates_passed;
            const emoji = passed ? '✅' : '❌';

            const body = `## ${emoji} GenAI Evaluation Results

            **Quality Metrics:**
            - Faithfulness: ${results.faithfulness.toFixed(3)} ${results.faithfulness >= 0.85 ? '✅' : '❌'}
            - Answer Relevancy: ${results.answer_relevancy.toFixed(3)} ${results.answer_relevancy >= 0.80 ? '✅' : '❌'}
            - Context Precision: ${results.context_precision.toFixed(3)} ${results.context_precision >= 0.75 ? '✅' : '❌'}
            - Context Recall: ${results.context_recall.toFixed(3)} ${results.context_recall >= 0.75 ? '✅' : '❌'}

            **Cost & Performance:**
            - Avg Cost per Query: $${results.avg_cost.toFixed(4)}
            - P95 Latency: ${results.p95_latency}ms

            **Test Coverage:**
            - Test Cases: ${results.test_case_count}
            - Model: ${results.model_id}

            ${passed ? 'All quality gates passed! Ready to merge.' : 'Quality gates failed. Please review before merging.'}

            <details>
            <summary>View detailed results</summary>

            \`\`\`json
            ${JSON.stringify(results, null, 2)}
            \`\`\`
            </details>
            `;

            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: body
            });

      - name: Fail if Quality Gates Failed
        if: steps.quality_gates.outputs.passed != '0'
        run: |
          echo "Quality gates failed. Blocking merge."
          exit 1
Enter fullscreen mode Exit fullscreen mode

Step 3: Set Up CloudWatch Dashboard

# create_dashboard.py
import boto3
import json

cloudwatch = boto3.client('cloudwatch')

dashboard_body = {
    "widgets": [
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["GenAI/Evaluation", "faithfulness", {"stat": "Average"}],
                    [".", "answer_relevancy", {"stat": "Average"}],
                    [".", "context_precision", {"stat": "Average"}],
                    [".", "context_recall", {"stat": "Average"}]
                ],
                "period": 300,
                "stat": "Average",
                "region": "us-east-1",
                "title": "RAG Quality Metrics",
                "yAxis": {
                    "left": {"min": 0, "max": 1}
                }
            }
        },
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["AWS/Bedrock", "Invocations", {"stat": "Sum"}],
                    [".", "InputTokenCount", {"stat": "Sum"}],
                    [".", "OutputTokenCount", {"stat": "Sum"}]
                ],
                "period": 300,
                "stat": "Sum",
                "region": "us-east-1",
                "title": "Bedrock Usage"
            }
        },
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["GenAI/Cost", "TotalCost", {"stat": "Sum"}],
                    [".", "AvgCostPerQuery", {"stat": "Average"}]
                ],
                "period": 300,
                "region": "us-east-1",
                "title": "Cost Metrics"
            }
        }
    ]
}

cloudwatch.put_dashboard(
    DashboardName='GenAI-Operations',
    DashboardBody=json.dumps(dashboard_body)
)

print("Dashboard created: GenAI-Operations")
Enter fullscreen mode Exit fullscreen mode

Cost Governance in GenAIOps

Token consumption is your new primary cost driver. You need real-time tracking and budgets.

Cost Tracking Implementation

# cost_tracker.py
import boto3
from typing import Dict, List
from datetime import datetime

class CostTracker:
    """Track and manage GenAI costs"""

    # Updated pricing as of October 2025
    MODEL_COSTS = {
        "anthropic.claude-sonnet-4-20250514": {
            "input": 0.003 / 1000,   # $0.003 per 1K input tokens
            "output": 0.015 / 1000   # $0.015 per 1K output tokens
        },
        "anthropic.claude-haiku-4-5-20251001": {
            "input": 0.0008 / 1000,  # $0.0008 per 1K input tokens
            "output": 0.004 / 1000   # $0.004 per 1K output tokens
        },
        "anthropic.claude-opus-4-20250514": {
            "input": 0.015 / 1000,   # $0.015 per 1K input tokens
            "output": 0.075 / 1000   # $0.075 per 1K output tokens
        },
        "amazon.titan-embed-text-v2:0": {
            "input": 0.0001 / 1000,  # $0.0001 per 1K tokens
            "output": 0               # No output cost for embeddings
        }
    }

    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')

    def calculate_request_cost(
        self,
        model_id: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        """Calculate cost for a single request"""

        if model_id not in self.MODEL_COSTS:
            raise ValueError(f"Unknown model: {model_id}")

        costs = self.MODEL_COSTS[model_id]
        total_cost = (
            (input_tokens * costs["input"]) +
            (output_tokens * costs["output"])
        )

        return total_cost

    def track_session_cost(
        self,
        session_id: str,
        requests: List[Dict]
    ) -> Dict:
        """Track cumulative cost for user session"""

        total_cost = 0
        total_input_tokens = 0
        total_output_tokens = 0

        for req in requests:
            cost = self.calculate_request_cost(
                model_id=req["model"],
                input_tokens=req["input_tokens"],
                output_tokens=req["output_tokens"]
            )
            total_cost += cost
            total_input_tokens += req["input_tokens"]
            total_output_tokens += req["output_tokens"]

        # Publish to CloudWatch
        self._publish_cost_metrics(
            session_id=session_id,
            total_cost=total_cost,
            request_count=len(requests)
        )

        return {
            "session_id": session_id,
            "total_cost": total_cost,
            "total_input_tokens": total_input_tokens,
            "total_output_tokens": total_output_tokens,
            "request_count": len(requests),
            "avg_cost_per_request": total_cost / len(requests) if requests else 0
        }

    def _publish_cost_metrics(
        self,
        session_id: str,
        total_cost: float,
        request_count: int
    ):
        """Publish cost metrics to CloudWatch"""

        self.cloudwatch.put_metric_data(
            Namespace="GenAI/Cost",
            MetricData=[
                {
                    'MetricName': 'SessionCost',
                    'Value': total_cost,
                    'Unit': 'None',
                    'Timestamp': datetime.now(),
                    'Dimensions': [
                        {'Name': 'SessionId', 'Value': session_id}
                    ]
                },
                {
                    'MetricName': 'AvgCostPerRequest',
                    'Value': total_cost / request_count if request_count > 0 else 0,
                    'Unit': 'None',
                    'Timestamp': datetime.now()
                }
            ]
        )

# Usage
tracker = CostTracker()

# Track a session
session_costs = tracker.track_session_cost(
    session_id="session_123",
    requests=[
        {
            "model": "anthropic.claude-sonnet-4-20250514",
            "input_tokens": 1500,
            "output_tokens": 500
        },
        {
            "model": "anthropic.claude-sonnet-4-20250514",
            "input_tokens": 2000,
            "output_tokens": 800
        }
    ]
)

print(f"Session cost: ${session_costs['total_cost']:.4f}")
print(f"Avg per request: ${session_costs['avg_cost_per_request']:.4f}")
Enter fullscreen mode Exit fullscreen mode

Budget Alerts

# budget_alerting.py
import boto3

cloudwatch = boto3.client('cloudwatch')

# Create alarm for daily cost threshold
cloudwatch.put_metric_alarm(
    AlarmName='GenAI-DailyCostThreshold',
    ComparisonOperator='GreaterThanThreshold',
    EvaluationPeriods=1,
    MetricName='SessionCost',
    Namespace='GenAI/Cost',
    Period=86400,  # 24 hours
    Statistic='Sum',
    Threshold=100.0,  # $100 per day
    ActionsEnabled=True,
    AlarmActions=[
        'arn:aws:sns:us-east-1:123456789:genai-cost-alerts'
    ],
    AlarmDescription='Alert when daily GenAI costs exceed $100'
)

# Create alarm for cost per request spike
cloudwatch.put_metric_alarm(
    AlarmName='GenAI-CostPerRequestSpike',
    ComparisonOperator='GreaterThanThreshold',
    EvaluationPeriods=2,
    MetricName='AvgCostPerRequest',
    Namespace='GenAI/Cost',
    Period=300,  # 5 minutes
    Statistic='Average',
    Threshold=0.05,  # $0.05 per request
    ActionsEnabled=True,
    AlarmActions=[
        'arn:aws:sns:us-east-1:123456789:genai-cost-alerts'
    ],
    AlarmDescription='Alert when avg cost per request exceeds $0.05'
)
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. GenAIOps is fundamentally different from DevOps - Non-deterministic systems require evaluation frameworks, not just testing. Quality is multi-dimensional: faithfulness, relevance, safety, and cost must all be tracked.

  2. Evaluation must be automated and continuous - Manual review doesn't scale. Build CI/CD pipelines with quality gates that block deployments when scores degrade. Use intelligent sampling in production to catch issues early.

  3. AWS provides production-ready tooling - CloudWatch GenAI Observability (GA), Bedrock AgentCore Evaluations (Preview), and X-Ray give you purpose-built infrastructure for GenAI operations. Don't build from scratch.

  4. Start with foundations before optimization - Version control for prompts, basic CI/CD, cost tracking, and automated evaluation should come before advanced features like auto-scaling or multi-region deployments.

  5. Cost governance is non-negotiable - Token consumption can spike unexpectedly. Real-time cost tracking, per-user budgets, and automated alerts prevent bill shock.

  6. Progressive maturity is the path - Most organizations are at Level 0-1. This series will help you reach Level 2-3, where automated evaluation, monitoring, and rollback create reliable production systems.


What's Next in This Series

Part 2: RAG Evaluation & Quality Metrics

We'll dive deep into evaluating Retrieval-Augmented Generation systems:

  • Using Amazon Bedrock Evaluations with the RAGAS framework
  • Measuring retrieval effectiveness: context precision and recall
  • Answer quality evaluation: relevance, faithfulness, and coherence
  • Building comprehensive evaluation datasets with synthetic data
  • Implementing automated quality gates for RAG systems
  • Detecting and handling retrieval failures in production

Part 3: Production Monitoring & Observability

Building comprehensive observability for GenAI systems:

  • CloudWatch GenAI Observability dashboards
  • Distributed tracing with AWS X-Ray
  • Agent monitoring with Bedrock AgentCore
  • Cost attribution and anomaly detection
  • Building runbooks for common production issues

Part 4: Production Hardening & Scale

Taking GenAI systems to enterprise production:

  • Multi-region deployments for resilience
  • Auto-scaling strategies for variable load
  • Security hardening with VPC endpoints and encryption
  • Compliance automation (GDPR, HIPAA, SOC 2)
  • Disaster recovery and business continuity

Additional Resources

AWS Documentation:

Evaluation Frameworks:

Blog Posts & Announcements:

Community:


Let's Connect!

Building GenAIOps systems on AWS? Let's share experiences!

Follow me for Part 2 on RAG Evaluation & Quality Metrics coming next. We'll explore how to measure and improve the quality of your RAG systems using Amazon Bedrock Evaluations.

About the Author

Connect with me on:


Tags: #aws #genai #mlops #cloudwatch #bedrock #devops #genaops #rag #llm #observability

Top comments (0)