DEV Community

Cover image for GenAIOps on AWS: End-to-End Observability Stack - Part 3
Shoaibali Mir
Shoaibali Mir

Posted on

GenAIOps on AWS: End-to-End Observability Stack - Part 3

Reading time: ~22-25 minutes

Level: Intermediate to Advanced

Series: Part 3 of 4 - End-to-End Observability

What you'll learn: Build comprehensive observability for GenAI systems with CloudWatch GenAI Observability, X-Ray distributed tracing, and custom metrics


The Problem: When GenAI Goes Wrong at 3 AM

It's 3 AM. PagerDuty wakes you up:

You open your logs. 10,000 lines of JSON. Where do you start?

Everything returns 200. But users are complaining. What's actually failing?

  • Is retrieval slow? Can't tell from these logs
  • Is the LLM hallucinating? No quality metrics captured
  • Why is cost 5x higher? Token counts missing
  • Which model is being used? Not tracked
  • What context was retrieved? Lost in the void

Traditional observability wasn't built for this. You need GenAI-specific observability that captures the full story: retrieval quality, token consumption, model behavior, and end-to-end traces showing exactly where things break.

This is what we're building today.


The GenAI Observability Challenge

GenAI systems are fundamentally different from traditional microservices:

Traditional Microservice Request

GenAI System Request

The challenge: A request can succeed (200 OK) but still fail the user:

  • Retrieved wrong documents → bad answer
  • LLM hallucinated → user misinformed
  • Cost spiked 5x → budget blown
  • Latency is 8s → user abandoned request

Traditional observability captures success/failure. GenAI observability captures quality/cost/performance at every step.


AWS CloudWatch GenAI Observability

AWS launched CloudWatch GenAI Observability in preview (Q4 2024) and GA (October 2025). It's purpose-built for LLM applications.

What It Provides Out-of-the-Box

1. Model Invocation Dashboard

Automatic tracking of:

  • Invocation metrics: Count, success rate, throttles
  • Token metrics: Input tokens, output tokens, total tokens
  • Cost attribution: Per-model, per-request costs
  • Latency breakdown: Time-to-first-token, generation latency
  • Error tracking: Model errors, throttling, timeouts

2. AgentCore Agent Dashboard

For Amazon Bedrock AgentCore agents:

  • Session tracking: Duration, turn count, completion
  • Tool usage: Which tools called, frequency, success rate
  • Memory operations: Reads, writes, retrieval performance
  • Gateway metrics: API latency, auth failures
  • Reasoning traces: Step-by-step agent decision logs

3. OpenTelemetry Integration

  • Distributed tracing: End-to-end request flows
  • Custom spans: Instrument your components
  • Automatic instrumentation: AWS SDK calls auto-traced
  • X-Ray integration: Service maps and bottleneck detection

Architecture: Complete Observability Stack


Setting Up OpenTelemetry with ADOT

AWS Distro for OpenTelemetry (ADOT) is AWS's distribution of OpenTelemetry, pre-configured for AWS services.

Installation

Basic Configuration

Auto-Instrumentation Setup


Instrumenting Your RAG Application

Now let's instrument a complete RAG pipeline:

# instrumented_rag_system.py
import boto3
import json
from typing import List, Dict
from opentelemetry import trace
from datetime import datetime

class InstrumentedRAGSystem:
    """
    Fully instrumented RAG system with distributed tracing

    Captures:
    - End-to-end request traces
    - Per-component latency
    - Token consumption and costs
    - Quality signals
    - Error details
    """

    def __init__(self):
        self.bedrock_runtime = boto3.client('bedrock-runtime')
        self.opensearch = boto3.client('opensearchserverless')
        self.cloudwatch = boto3.client('cloudwatch')

        # Get tracer
        self.tracer = trace.get_tracer(__name__)

        # Model pricing (per 1K tokens)
        self.pricing = {
            "anthropic.claude-sonnet-4-20250514": {
                "input": 0.003,
                "output": 0.015
            },
            "amazon.titan-embed-text-v2:0": {
                "input": 0.0001,
                "output": 0
            }
        }

    def query(self, user_query: str, user_id: str = None) -> Dict:
        """
        Process RAG query with full instrumentation

        Args:
            user_query: User's question
            user_id: Optional user identifier for tracking

        Returns:
            Dict with answer and metadata
        """

        # Start root span
        with self.tracer.start_as_current_span("rag_query") as root_span:

            # Add request attributes
            root_span.set_attribute("query", user_query)
            root_span.set_attribute("query_length", len(user_query))
            if user_id:
                root_span.set_attribute("user_id", user_id)
            root_span.set_attribute("timestamp", datetime.now().isoformat())

            try:
                # Step 1: Generate embeddings
                with self.tracer.start_as_current_span("generate_embeddings") as span:
                    embeddings, embed_cost = self._generate_embeddings(user_query)

                    span.set_attribute("embedding_dimension", len(embeddings))
                    span.set_attribute("embedding_cost_usd", embed_cost)
                    span.set_attribute("model", "amazon.titan-embed-text-v2:0")

                # Step 2: Vector search
                with self.tracer.start_as_current_span("vector_search") as span:
                    contexts = self._vector_search(embeddings, top_k=5)

                    span.set_attribute("documents_retrieved", len(contexts))
                    if contexts:
                        avg_score = sum(c['score'] for c in contexts) / len(contexts)
                        span.set_attribute("avg_similarity_score", round(avg_score, 3))
                        span.set_attribute("top_score", round(contexts[0]['score'], 3))

                    # Publish retrieval quality metric
                    self._publish_metric(
                        "RetrievalQuality",
                        avg_score if contexts else 0,
                        namespace="GenAI/RAG/Retrieval"
                    )

                # Step 3: Rerank (optional but recommended)
                with self.tracer.start_as_current_span("rerank_documents") as span:
                    contexts = self._rerank_contexts(user_query, contexts, top_k=3)

                    span.set_attribute("documents_after_rerank", len(contexts))
                    if contexts:
                        span.set_attribute("top_rerank_score", round(contexts[0]['rerank_score'], 3))

                # Step 4: Build prompt and count tokens
                with self.tracer.start_as_current_span("prompt_construction") as span:
                    prompt = self._build_prompt(user_query, contexts)
                    input_tokens = self._estimate_tokens(prompt)

                    span.set_attribute("input_tokens", input_tokens)
                    span.set_attribute("context_documents", len(contexts))
                    span.set_attribute("prompt_length_chars", len(prompt))

                    # Check context window
                    max_context_window = 200000  # Claude Sonnet 4
                    if input_tokens > max_context_window:
                        span.set_attribute("error", "context_window_exceeded")
                        raise ValueError(f"Input tokens ({input_tokens}) exceed context window")

                # Step 5: Generate response
                with self.tracer.start_as_current_span("llm_generation") as span:
                    response = self._generate_response(prompt)

                    # Extract metrics
                    usage = response.get('usage', {})
                    input_tokens = usage.get('input_tokens', 0)
                    output_tokens = usage.get('output_tokens', 0)
                    model_id = "anthropic.claude-sonnet-4-20250514"

                    # Calculate cost
                    cost = self._calculate_cost(
                        model_id=model_id,
                        input_tokens=input_tokens,
                        output_tokens=output_tokens
                    )

                    # Add span attributes
                    span.set_attribute("model_id", model_id)
                    span.set_attribute("input_tokens", input_tokens)
                    span.set_attribute("output_tokens", output_tokens)
                    span.set_attribute("total_tokens", input_tokens + output_tokens)
                    span.set_attribute("generation_cost_usd", cost)
                    span.set_attribute("stop_reason", response.get('stop_reason', 'unknown'))

                    # Publish token metrics
                    self._publish_metric("InputTokens", input_tokens, namespace="GenAI/Tokens")
                    self._publish_metric("OutputTokens", output_tokens, namespace="GenAI/Tokens")
                    self._publish_metric("GenerationCost", cost, namespace="GenAI/Cost")

                # Step 6: Extract answer
                answer = response['content'][0]['text']

                # Add overall metrics to root span
                total_cost = embed_cost + cost
                root_span.set_attribute("total_cost_usd", round(total_cost, 4))
                root_span.set_attribute("total_tokens", input_tokens + output_tokens)
                root_span.set_attribute("answer_length", len(answer))
                root_span.set_attribute("status", "success")

                # Publish overall metrics
                self._publish_metric("RequestCost", total_cost, namespace="GenAI/Cost")
                self._publish_metric("RequestSuccess", 1, namespace="GenAI/Quality")

                return {
                    "answer": answer,
                    "metadata": {
                        "input_tokens": input_tokens,
                        "output_tokens": output_tokens,
                        "total_cost": round(total_cost, 4),
                        "contexts_used": len(contexts),
                        "model": model_id
                    }
                }

            except Exception as e:
                # Capture error in span
                root_span.set_attribute("error", True)
                root_span.set_attribute("error_type", type(e).__name__)
                root_span.set_attribute("error_message", str(e))
                root_span.set_attribute("status", "error")

                # Publish error metric
                self._publish_metric("RequestErrors", 1, namespace="GenAI/Errors")

                # Re-raise
                raise

    def _generate_embeddings(self, text: str) -> tuple:
        """Generate embeddings with Bedrock Titan"""

        response = self.bedrock_runtime.invoke_model(
            modelId="amazon.titan-embed-text-v2:0",
            body=json.dumps({
                "inputText": text,
                "dimensions": 1024,
                "normalize": True
            })
        )

        result = json.loads(response['body'].read())
        embeddings = result['embedding']

        # Calculate cost
        token_count = len(text.split()) * 1.3  # Rough estimate
        cost = (token_count / 1000) * self.pricing["amazon.titan-embed-text-v2:0"]["input"]

        return embeddings, cost

    def _vector_search(self, embeddings: List[float], top_k: int = 5) -> List[Dict]:
        """
        Search OpenSearch vector index

        Note: This is automatically traced via boto3 instrumentation
        """

        # OpenSearch vector search
        # In production, use actual OpenSearch client

        # Mock response for example
        return [
            {
                "id": "doc_1",
                "score": 0.89,
                "text": "Electronics can be returned within 30 days..."
            },
            {
                "id": "doc_2",
                "score": 0.76,
                "text": "Damaged items require photo documentation..."
            },
            {
                "id": "doc_3",
                "score": 0.71,
                "text": "Restocking fees apply to opened electronics..."
            }
        ]

    def _rerank_contexts(
        self,
        query: str,
        contexts: List[Dict],
        top_k: int = 3
    ) -> List[Dict]:
        """
        Rerank contexts using cross-encoder

        In production, use:
        - Bedrock reranking model
        - Cohere rerank
        - Custom cross-encoder
        """

        # For example, just return top contexts
        # In production, apply reranking model
        for ctx in contexts[:top_k]:
            ctx['rerank_score'] = ctx['score'] * 1.1  # Mock rerank

        return contexts[:top_k]

    def _build_prompt(self, query: str, contexts: List[Dict]) -> str:
        """Build prompt from query and contexts"""

        context_text = "\n\n".join([
            f"Document {i+1}:\n{ctx['text']}"
            for i, ctx in enumerate(contexts)
        ])

        prompt = f"""You are a helpful customer service assistant. Answer the user's question based on the provided context.

Context:
{context_text}

Question: {query}

Answer the question using only information from the context. If the context doesn't contain enough information, say so."""

        return prompt

    def _estimate_tokens(self, text: str) -> int:
        """Rough token estimation"""
        # 1 token ≈ 0.75 words for English
        return int(len(text.split()) * 1.3)

    def _generate_response(self, prompt: str) -> Dict:
        """Generate response with Bedrock"""

        response = self.bedrock_runtime.invoke_model(
            modelId="anthropic.claude-sonnet-4-20250514",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 2048,
                "temperature": 0.7,
                "messages": [
                    {"role": "user", "content": prompt}
                ]
            })
        )

        return json.loads(response['body'].read())

    def _calculate_cost(
        self,
        model_id: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        """Calculate request cost"""

        pricing = self.pricing.get(model_id, {"input": 0, "output": 0})

        cost = (
            (input_tokens / 1000) * pricing["input"] +
            (output_tokens / 1000) * pricing["output"]
        )

        return cost

    def _publish_metric(
        self,
        metric_name: str,
        value: float,
        namespace: str = "GenAI/Custom"
    ):
        """Publish custom metric to CloudWatch"""

        try:
            self.cloudwatch.put_metric_data(
                Namespace=namespace,
                MetricData=[
                    {
                        'MetricName': metric_name,
                        'Value': value,
                        'Unit': 'None',
                        'Timestamp': datetime.now()
                    }
                ]
            )
        except Exception as e:
            # Don't fail request if metric publishing fails
            print(f"Warning: Failed to publish metric {metric_name}: {e}")

# Usage
rag_system = InstrumentedRAGSystem()

response = rag_system.query(
    user_query="What's the return policy for damaged electronics?",
    user_id="user_12345"
)

print(f"Answer: {response['answer']}")
print(f"Cost: ${response['metadata']['total_cost']}")
print(f"Tokens: {response['metadata']['total_tokens']}")
Enter fullscreen mode Exit fullscreen mode

AWS X-Ray Integration

X-Ray provides the service map and bottleneck detection that traces alone can't give you.

Enabling X-Ray Active Tracing

Lambda Function:

Terraform Configuration:

Custom X-Ray Segments

# custom_xray_segments.py
from aws_xray_sdk.core import xray_recorder

class XRayInstrumentedRAG:
    """RAG system with custom X-Ray segments"""

    def query(self, user_query: str):
        """Process query with custom segments"""

        # Retrieval segment
        with xray_recorder.capture('retrieval') as segment:
            contexts = self._retrieve_contexts(user_query)

            # Add annotations (indexed for filtering)
            segment.put_annotation('documents_found', len(contexts))
            segment.put_annotation('avg_relevance', 
                                  sum(c['score'] for c in contexts) / len(contexts))

            # Add metadata (not indexed)
            segment.put_metadata('retrieval_method', 'vector_search')
            segment.put_metadata('top_documents', [c['id'] for c in contexts[:3]])

        # Generation segment
        with xray_recorder.capture('generation') as segment:
            response = self._generate(user_query, contexts)

            # Annotations
            segment.put_annotation('input_tokens', response['input_tokens'])
            segment.put_annotation('output_tokens', response['output_tokens'])
            segment.put_annotation('cost_usd', response['cost'])

            # Metadata
            segment.put_metadata('model_id', response['model_id'])
            segment.put_metadata('stop_reason', response['stop_reason'])

        return response
Enter fullscreen mode Exit fullscreen mode

X-Ray Service Map Insights

X-Ray automatically generates service maps showing:


Building Comprehensive CloudWatch Dashboards

Create unified dashboards showing the full picture:

# comprehensive_dashboard.py
import boto3
import json
from typing import Dict, List

class GenAIDashboardBuilder:
    """Build comprehensive CloudWatch dashboards for GenAI systems"""

    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')

    def create_production_dashboard(self) -> str:
        """
        Create production-grade dashboard with:
        - Quality metrics
        - Performance metrics
        - Cost tracking
        - Error monitoring
        - User satisfaction
        """

        dashboard_body = {
            "widgets": self._build_all_widgets()
        }

        response = self.cloudwatch.put_dashboard(
            DashboardName='GenAI-Production-Observability',
            DashboardBody=json.dumps(dashboard_body)
        )

        dashboard_url = (
            f"https://console.aws.amazon.com/cloudwatch/home"
            f"?region=us-east-1#dashboards:name=GenAI-Production-Observability"
        )

        print(f"✓ Dashboard created: {dashboard_url}")
        return dashboard_url

    def _build_all_widgets(self) -> List[Dict]:
        """Build all dashboard widgets"""

        widgets = []

        # Row 1: Quality Metrics (0, 0)
        widgets.append(self._quality_metrics_widget(x=0, y=0))
        widgets.append(self._quality_distribution_widget(x=12, y=0))

        # Row 2: Performance Metrics (0, 6)
        widgets.append(self._latency_breakdown_widget(x=0, y=6))
        widgets.append(self._throughput_widget(x=12, y=6))

        # Row 3: Cost & Tokens (0, 12)
        widgets.append(self._cost_metrics_widget(x=0, y=12))
        widgets.append(self._token_usage_widget(x=8, y=12))
        widgets.append(self._cost_per_user_widget(x=16, y=12))

        # Row 4: Errors & Alerts (0, 18)
        widgets.append(self._error_rate_widget(x=0, y=18))
        widgets.append(self._error_breakdown_widget(x=8, y=18))
        widgets.append(self._recent_errors_log_widget(x=16, y=18))

        # Row 5: Model Performance (0, 24)
        widgets.append(self._model_comparison_widget(x=0, y=24))
        widgets.append(self._stop_reasons_widget(x=12, y=24))

        # Row 6: User Experience (0, 30)
        widgets.append(self._user_satisfaction_widget(x=0, y=30))
        widgets.append(self._session_metrics_widget(x=12, y=30))

        # Row 7: X-Ray Service Map (0, 36)
        widgets.append(self._xray_service_map_widget(x=0, y=36))

        return widgets

    def _quality_metrics_widget(self, x: int, y: int) -> Dict:
        """Real-time quality metrics"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Quality", "Faithfulness", {
                        "stat": "Average",
                        "label": "Faithfulness"
                    }],
                    [".", "AnswerRelevancy", {
                        "stat": "Average",
                        "label": "Relevancy"
                    }],
                    [".", "ContextPrecision", {
                        "stat": "Average",
                        "label": "Context Precision"
                    }],
                    [".", "ContextRecall", {
                        "stat": "Average",
                        "label": "Context Recall"
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "📊 RAG Quality Metrics",
                "period": 300,
                "yAxis": {
                    "left": {
                        "min": 0,
                        "max": 1,
                        "label": "Score"
                    }
                },
                "annotations": {
                    "horizontal": [
                        {
                            "value": 0.85,
                            "label": "Target",
                            "color": "#2ca02c"
                        },
                        {
                            "value": 0.75,
                            "label": "Warning",
                            "color": "#ff7f0e"
                        },
                        {
                            "value": 0.60,
                            "label": "Critical",
                            "color": "#d62728"
                        }
                    ]
                }
            }
        }

    def _quality_distribution_widget(self, x: int, y: int) -> Dict:
        """Quality score distribution"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Quality", "Faithfulness", {
                        "stat": "p50",
                        "label": "P50"
                    }],
                    ["...", {
                        "stat": "p90",
                        "label": "P90"
                    }],
                    ["...", {
                        "stat": "p99",
                        "label": "P99"
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "📈 Faithfulness Distribution (P50/P90/P99)",
                "period": 300
            }
        }

    def _latency_breakdown_widget(self, x: int, y: int) -> Dict:
        """Latency breakdown by component"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Performance", "EmbeddingLatency", {
                        "stat": "Average",
                        "label": "Embeddings"
                    }],
                    [".", "VectorSearchLatency", {
                        "stat": "Average",
                        "label": "Vector Search"
                    }],
                    [".", "RerankLatency", {
                        "stat": "Average",
                        "label": "Reranking"
                    }],
                    [".", "GenerationLatency", {
                        "stat": "Average",
                        "label": "LLM Generation"
                    }],
                    [".", "EndToEndLatency", {
                        "stat": "Average",
                        "label": "Total",
                        "color": "#1f77b4"
                    }]
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": "us-east-1",
                "title": "⚡ Latency Breakdown (Stacked)",
                "period": 300,
                "yAxis": {
                    "left": {
                        "label": "Milliseconds"
                    }
                }
            }
        }

    def _throughput_widget(self, x: int, y: int) -> Dict:
        """Request throughput"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Throughput", "RequestCount", {
                        "stat": "Sum",
                        "label": "Total Requests"
                    }],
                    [".", "SuccessfulRequests", {
                        "stat": "Sum",
                        "label": "Successful"
                    }],
                    [".", "FailedRequests", {
                        "stat": "Sum",
                        "label": "Failed"
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "🔄 Request Throughput",
                "period": 300
            }
        }

    def _cost_metrics_widget(self, x: int, y: int) -> Dict:
        """Cost tracking"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 8,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Cost", "TotalCost", {
                        "stat": "Sum",
                        "label": "Total Cost"
                    }],
                    [".", "EmbeddingCost", {
                        "stat": "Sum",
                        "label": "Embeddings"
                    }],
                    [".", "GenerationCost", {
                        "stat": "Sum",
                        "label": "Generation"
                    }]
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": "us-east-1",
                "title": "💰 Cost Breakdown (USD)",
                "period": 300,
                "yAxis": {
                    "left": {
                        "label": "USD"
                    }
                }
            }
        }

    def _token_usage_widget(self, x: int, y: int) -> Dict:
        """Token consumption"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 8,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Tokens", "InputTokens", {
                        "stat": "Sum",
                        "label": "Input Tokens"
                    }],
                    [".", "OutputTokens", {
                        "stat": "Sum",
                        "label": "Output Tokens"
                    }]
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": "us-east-1",
                "title": "🎫 Token Usage",
                "period": 300
            }
        }

    def _cost_per_user_widget(self, x: int, y: int) -> Dict:
        """Cost per user/query"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 8,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Cost", "CostPerQuery", {
                        "stat": "Average",
                        "label": "Avg per Query"
                    }],
                    ["...", {
                        "stat": "p95",
                        "label": "P95 per Query"
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "💵 Cost Per Query",
                "period": 300,
                "yAxis": {
                    "left": {
                        "label": "USD"
                    }
                }
            }
        }

    def _error_rate_widget(self, x: int, y: int) -> Dict:
        """Error rate tracking"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 8,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Errors", "ErrorRate", {
                        "stat": "Average",
                        "label": "Error Rate %"
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "❌ Error Rate",
                "period": 300,
                "yAxis": {
                    "left": {
                        "label": "Percentage",
                        "min": 0,
                        "max": 100
                    }
                },
                "annotations": {
                    "horizontal": [
                        {
                            "value": 1,
                            "label": "Target < 1%",
                            "color": "#2ca02c"
                        },
                        {
                            "value": 5,
                            "label": "Critical > 5%",
                            "color": "#d62728"
                        }
                    ]
                }
            }
        }

    def _error_breakdown_widget(self, x: int, y: int) -> Dict:
        """Error breakdown by type"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 8,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Errors", "RetrievalErrors", {
                        "stat": "Sum",
                        "label": "Retrieval"
                    }],
                    [".", "GenerationErrors", {
                        "stat": "Sum",
                        "label": "Generation"
                    }],
                    [".", "ThrottlingErrors", {
                        "stat": "Sum",
                        "label": "Throttling"
                    }],
                    [".", "ValidationErrors", {
                        "stat": "Sum",
                        "label": "Validation"
                    }]
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": "us-east-1",
                "title": "🔍 Error Breakdown",
                "period": 300
            }
        }

    def _recent_errors_log_widget(self, x: int, y: int) -> Dict:
        """Recent errors from logs"""
        return {
            "type": "log",
            "x": x,
            "y": y,
            "width": 8,
            "height": 6,
            "properties": {
                "query": """
                SOURCE '/aws/lambda/rag-api'
                | fields @timestamp, @message, error_type, request_id
                | filter @message like /ERROR/
                | sort @timestamp desc
                | limit 20
                """,
                "region": "us-east-1",
                "title": "📋 Recent Errors",
                "view": "table"
            }
        }

    def _model_comparison_widget(self, x: int, y: int) -> Dict:
        """Compare model performance"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Models", "AvgLatency", {
                        "stat": "Average",
                        "dimensions": {"ModelId": "claude-sonnet-4"}
                    }],
                    ["...", {
                        "dimensions": {"ModelId": "claude-opus-4"}
                    }],
                    ["...", {
                        "dimensions": {"ModelId": "claude-haiku-4"}
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "🤖 Model Latency Comparison",
                "period": 300
            }
        }

    def _stop_reasons_widget(self, x: int, y: int) -> Dict:
        """LLM stop reasons distribution"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Behavior", "StopReason", {
                        "stat": "SampleCount",
                        "dimensions": {"Reason": "end_turn"}
                    }],
                    ["...", {
                        "dimensions": {"Reason": "max_tokens"}
                    }],
                    ["...", {
                        "dimensions": {"Reason": "stop_sequence"}
                    }]
                ],
                "view": "timeSeries",
                "stacked": True,
                "region": "us-east-1",
                "title": "🛑 Stop Reasons",
                "period": 300
            }
        }

    def _user_satisfaction_widget(self, x: int, y: int) -> Dict:
        """User feedback scores"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/UserExperience", "FeedbackScore", {
                        "stat": "Average",
                        "label": "Avg Satisfaction"
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "⭐ User Satisfaction (1-5)",
                "period": 300,
                "yAxis": {
                    "left": {
                        "min": 1,
                        "max": 5
                    }
                }
            }
        }

    def _session_metrics_widget(self, x: int, y: int) -> Dict:
        """Session-level metrics"""
        return {
            "type": "metric",
            "x": x,
            "y": y,
            "width": 12,
            "height": 6,
            "properties": {
                "metrics": [
                    ["GenAI/Sessions", "AvgSessionDuration", {
                        "stat": "Average",
                        "label": "Avg Duration (s)"
                    }],
                    [".", "AvgTurnsPerSession", {
                        "stat": "Average",
                        "label": "Avg Turns"
                    }]
                ],
                "view": "timeSeries",
                "stacked": False,
                "region": "us-east-1",
                "title": "💬 Session Metrics",
                "period": 300
            }
        }

    def _xray_service_map_widget(self, x: int, y: int) -> Dict:
        """X-Ray service map"""
        return {
            "type": "trace",
            "x": x,
            "y": y,
            "width": 24,
            "height": 8,
            "properties": {
                "title": "🗺️ X-Ray Service Map - RAG System",
                "region": "us-east-1"
            }
        }

# Create dashboard
builder = GenAIDashboardBuilder()
dashboard_url = builder.create_production_dashboard()
Enter fullscreen mode Exit fullscreen mode

Alarming Strategy for GenAI Systems

Set up intelligent alarms that catch real issues:

# genai_alarms.py
import boto3
from typing import Dict, List

class GenAIAlarmManager:
    """Comprehensive alarming for GenAI systems"""

    def __init__(self, sns_topic_arn: str):
        self.cloudwatch = boto3.client('cloudwatch')
        self.sns_topic_arn = sns_topic_arn

    def create_all_alarms(self):
        """Create complete alarm suite"""

        alarms = [
            # Quality alarms
            self._quality_degradation_alarm(),
            self._faithfulness_critical_alarm(),

            # Performance alarms
            self._high_latency_alarm(),
            self._latency_spike_alarm(),

            # Cost alarms
            self._cost_spike_alarm(),
            self._daily_budget_alarm(),

            # Error alarms
            self._high_error_rate_alarm(),
            self._retrieval_failure_alarm(),

            # Composite alarms
            self._system_degraded_composite_alarm()
        ]

        for alarm_config in alarms:
            self.cloudwatch.put_metric_alarm(**alarm_config)
            print(f"✓ Created alarm: {alarm_config['AlarmName']}")

    def _quality_degradation_alarm(self) -> Dict:
        """Alert when quality metrics drop"""
        return {
            'AlarmName': 'RAG-Quality-Degradation',
            'ComparisonOperator': 'LessThanThreshold',
            'EvaluationPeriods': 2,
            'DatapointsToAlarm': 2,  # 2 out of 2
            'MetricName': 'Faithfulness',
            'Namespace': 'GenAI/Quality',
            'Period': 300,
            'Statistic': 'Average',
            'Threshold': 0.75,
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'Faithfulness score dropped below 0.75 for 10 minutes',
            'TreatMissingData': 'notBreaching'
        }

    def _faithfulness_critical_alarm(self) -> Dict:
        """Critical alarm for severe quality drop"""
        return {
            'AlarmName': 'RAG-Faithfulness-Critical',
            'ComparisonOperator': 'LessThanThreshold',
            'EvaluationPeriods': 1,
            'DatapointsToAlarm': 1,
            'MetricName': 'Faithfulness',
            'Namespace': 'GenAI/Quality',
            'Period': 300,
            'Statistic': 'Average',
            'Threshold': 0.60,
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'CRITICAL: Faithfulness below 0.60 - immediate action required',
            'TreatMissingData': 'breaching'
        }

    def _high_latency_alarm(self) -> Dict:
        """Alert on high P95 latency"""
        return {
            'AlarmName': 'RAG-High-Latency-P95',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 3,
            'DatapointsToAlarm': 2,
            'MetricName': 'EndToEndLatency',
            'Namespace': 'GenAI/Performance',
            'Period': 300,
            'ExtendedStatistic': 'p95',
            'Threshold': 5000,  # 5 seconds
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'P95 latency exceeded 5 seconds',
            'TreatMissingData': 'notBreaching'
        }

    def _latency_spike_alarm(self) -> Dict:
        """Detect sudden latency spikes using anomaly detection"""
        return {
            'AlarmName': 'RAG-Latency-Anomaly',
            'ComparisonOperator': 'GreaterThanUpperThreshold',
            'EvaluationPeriods': 2,
            'Metrics': [
                {
                    'Id': 'm1',
                    'ReturnData': True,
                    'MetricStat': {
                        'Metric': {
                            'Namespace': 'GenAI/Performance',
                            'MetricName': 'EndToEndLatency'
                        },
                        'Period': 300,
                        'Stat': 'Average'
                    }
                },
                {
                    'Id': 'ad1',
                    'Expression': 'ANOMALY_DETECTION_BAND(m1, 2)',
                    'Label': 'Latency (expected)'
                }
            ],
            'ThresholdMetricId': 'ad1',
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'Latency anomaly detected (2 standard deviations)',
            'TreatMissingData': 'notBreaching'
        }

    def _cost_spike_alarm(self) -> Dict:
        """Alert on unexpected cost spikes"""
        return {
            'AlarmName': 'RAG-Cost-Spike',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 1,
            'DatapointsToAlarm': 1,
            'MetricName': 'TotalCost',
            'Namespace': 'GenAI/Cost',
            'Period': 300,
            'Statistic': 'Sum',
            'Threshold': 50.0,  # $50 per 5 minutes
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'Cost spike detected: >$50 in 5 minutes',
            'TreatMissingData': 'notBreaching'
        }

    def _daily_budget_alarm(self) -> Dict:
        """Alert when approaching daily budget"""
        return {
            'AlarmName': 'RAG-Daily-Budget-Warning',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 1,
            'DatapointsToAlarm': 1,
            'MetricName': 'TotalCost',
            'Namespace': 'GenAI/Cost',
            'Period': 86400,  # 24 hours
            'Statistic': 'Sum',
            'Threshold': 800.0,  # $800 per day (80% of $1000 budget)
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'Daily cost approaching budget limit (80%)',
            'TreatMissingData': 'notBreaching'
        }

    def _high_error_rate_alarm(self) -> Dict:
        """Alert on elevated error rate"""
        return {
            'AlarmName': 'RAG-High-Error-Rate',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 2,
            'DatapointsToAlarm': 2,
            'MetricName': 'ErrorRate',
            'Namespace': 'GenAI/Errors',
            'Period': 300,
            'Statistic': 'Average',
            'Threshold': 5.0,  # 5% error rate
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'Error rate exceeded 5%',
            'TreatMissingData': 'notBreaching'
        }

    def _retrieval_failure_alarm(self) -> Dict:
        """Alert on retrieval failures"""
        return {
            'AlarmName': 'RAG-Retrieval-Failures',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 1,
            'DatapointsToAlarm': 1,
            'MetricName': 'RetrievalErrors',
            'Namespace': 'GenAI/Errors',
            'Period': 300,
            'Statistic': 'Sum',
            'Threshold': 10,  # 10 retrieval failures in 5 min
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': 'Multiple retrieval failures detected',
            'TreatMissingData': 'notBreaching'
        }

    def _system_degraded_composite_alarm(self) -> Dict:
        """Composite alarm for multiple degradation signals"""
        return {
            'AlarmName': 'RAG-System-Degraded',
            'AlarmRule': (
                '(ALARM("RAG-Quality-Degradation") OR ALARM("RAG-High-Latency-P95")) '
                'AND ALARM("RAG-High-Error-Rate")'
            ),
            'ActionsEnabled': True,
            'AlarmActions': [self.sns_topic_arn],
            'AlarmDescription': (
                'System degraded: Multiple quality/performance/error issues detected'
            )
        }

# Usage
alarm_manager = GenAIAlarmManager(
    sns_topic_arn='arn:aws:sns:us-east-1:123456789:genai-alerts'
)

alarm_manager.create_all_alarms()
Enter fullscreen mode Exit fullscreen mode

Integration with Existing Observability Tools

Grafana Integration

Datadog Integration


Key Takeaways

  1. CloudWatch GenAI Observability is purpose-built - Provides out-of-the-box dashboards for Bedrock model invocations and AgentCore agents. No custom instrumentation needed for basic metrics.

  2. OpenTelemetry + ADOT enables custom observability - Use ADOT to instrument your application with custom spans capturing retrieval quality, token usage, and costs. Automatically traces boto3 AWS SDK calls.

  3. X-Ray provides the service map - Distributed tracing shows bottlenecks across your RAG pipeline. Service maps visualize dependencies and highlight slow components (typically vector search).

  4. Comprehensive dashboards require custom metrics - Quality scores (faithfulness, relevancy), cost per query, and token breakdowns need custom CloudWatch metrics alongside out-of-the-box Bedrock metrics.

  5. Intelligent alarming prevents incidents - Set thresholds for quality degradation, cost spikes, and latency. Use composite alarms for multi-signal degradation detection. Anomaly detection catches unusual patterns.

  6. Integration extends visibility - Export to Grafana, Datadog, or existing observability stacks using CloudWatch exporters or direct API integration. Don't build in isolation.

  7. Traces + Metrics + Logs = Complete picture - You need all three: traces for request flows, metrics for aggregates, logs for debugging specific failures. CloudWatch GenAI Observability provides this unified view.


What's Next in This Series

Part 4: Production Hardening & Advanced Patterns

We'll close the series with production-ready patterns:

  • Guardrails in production: Content filtering, PII detection, toxicity screening
  • Human-in-the-loop evaluation: Building feedback loops and annotation workflows
  • Incident response playbooks: What to do when GenAI fails at 3 AM
  • A/B testing strategies: Testing prompts, models, and RAG configurations
  • Canary deployments: Safe rollout strategies with automated rollback
  • Advanced cost optimization: Model routing, caching, and batch processing
  • Security hardening: Protecting against prompt injection and jailbreaks

Additional Resources

AWS Documentation:

Sample Code & Workshops:

OpenTelemetry Resources:

Integration Guides:


Let's Connect!

Building observability for production GenAI systems? Let's share experiences!

Follow me for Part 4 (the series finale!) on Production Hardening & Advanced Patterns. We'll cover guardrails, incident response, A/B testing, and cost optimization—everything you need to run GenAI at scale.

About the Author

Connect with me on:


Tags: #aws #genai #observability #cloudwatch #xray #opentelemetry #monitoring #genaops #bedrock #distributedtracing

Top comments (0)