Reading time: ~22-25 minutes
Level: Intermediate to Advanced
Series: Part 3 of 4 - End-to-End Observability
What you'll learn: Build comprehensive observability for GenAI systems with CloudWatch GenAI Observability, X-Ray distributed tracing, and custom metrics
The Problem: When GenAI Goes Wrong at 3 AM
It's 3 AM. PagerDuty wakes you up:
You open your logs. 10,000 lines of JSON. Where do you start?
Everything returns 200. But users are complaining. What's actually failing?
- Is retrieval slow? Can't tell from these logs
- Is the LLM hallucinating? No quality metrics captured
- Why is cost 5x higher? Token counts missing
- Which model is being used? Not tracked
- What context was retrieved? Lost in the void
Traditional observability wasn't built for this. You need GenAI-specific observability that captures the full story: retrieval quality, token consumption, model behavior, and end-to-end traces showing exactly where things break.
This is what we're building today.
The GenAI Observability Challenge
GenAI systems are fundamentally different from traditional microservices:
Traditional Microservice Request
GenAI System Request
The challenge: A request can succeed (200 OK) but still fail the user:
- Retrieved wrong documents → bad answer
- LLM hallucinated → user misinformed
- Cost spiked 5x → budget blown
- Latency is 8s → user abandoned request
Traditional observability captures success/failure. GenAI observability captures quality/cost/performance at every step.
AWS CloudWatch GenAI Observability
AWS launched CloudWatch GenAI Observability in preview (Q4 2024) and GA (October 2025). It's purpose-built for LLM applications.
What It Provides Out-of-the-Box
1. Model Invocation Dashboard
Automatic tracking of:
- Invocation metrics: Count, success rate, throttles
- Token metrics: Input tokens, output tokens, total tokens
- Cost attribution: Per-model, per-request costs
- Latency breakdown: Time-to-first-token, generation latency
- Error tracking: Model errors, throttling, timeouts
2. AgentCore Agent Dashboard
For Amazon Bedrock AgentCore agents:
- Session tracking: Duration, turn count, completion
- Tool usage: Which tools called, frequency, success rate
- Memory operations: Reads, writes, retrieval performance
- Gateway metrics: API latency, auth failures
- Reasoning traces: Step-by-step agent decision logs
3. OpenTelemetry Integration
- Distributed tracing: End-to-end request flows
- Custom spans: Instrument your components
- Automatic instrumentation: AWS SDK calls auto-traced
- X-Ray integration: Service maps and bottleneck detection
Architecture: Complete Observability Stack
Setting Up OpenTelemetry with ADOT
AWS Distro for OpenTelemetry (ADOT) is AWS's distribution of OpenTelemetry, pre-configured for AWS services.
Installation
Basic Configuration
Auto-Instrumentation Setup
Instrumenting Your RAG Application
Now let's instrument a complete RAG pipeline:
# instrumented_rag_system.py
import boto3
import json
from typing import List, Dict
from opentelemetry import trace
from datetime import datetime
class InstrumentedRAGSystem:
"""
Fully instrumented RAG system with distributed tracing
Captures:
- End-to-end request traces
- Per-component latency
- Token consumption and costs
- Quality signals
- Error details
"""
def __init__(self):
self.bedrock_runtime = boto3.client('bedrock-runtime')
self.opensearch = boto3.client('opensearchserverless')
self.cloudwatch = boto3.client('cloudwatch')
# Get tracer
self.tracer = trace.get_tracer(__name__)
# Model pricing (per 1K tokens)
self.pricing = {
"anthropic.claude-sonnet-4-20250514": {
"input": 0.003,
"output": 0.015
},
"amazon.titan-embed-text-v2:0": {
"input": 0.0001,
"output": 0
}
}
def query(self, user_query: str, user_id: str = None) -> Dict:
"""
Process RAG query with full instrumentation
Args:
user_query: User's question
user_id: Optional user identifier for tracking
Returns:
Dict with answer and metadata
"""
# Start root span
with self.tracer.start_as_current_span("rag_query") as root_span:
# Add request attributes
root_span.set_attribute("query", user_query)
root_span.set_attribute("query_length", len(user_query))
if user_id:
root_span.set_attribute("user_id", user_id)
root_span.set_attribute("timestamp", datetime.now().isoformat())
try:
# Step 1: Generate embeddings
with self.tracer.start_as_current_span("generate_embeddings") as span:
embeddings, embed_cost = self._generate_embeddings(user_query)
span.set_attribute("embedding_dimension", len(embeddings))
span.set_attribute("embedding_cost_usd", embed_cost)
span.set_attribute("model", "amazon.titan-embed-text-v2:0")
# Step 2: Vector search
with self.tracer.start_as_current_span("vector_search") as span:
contexts = self._vector_search(embeddings, top_k=5)
span.set_attribute("documents_retrieved", len(contexts))
if contexts:
avg_score = sum(c['score'] for c in contexts) / len(contexts)
span.set_attribute("avg_similarity_score", round(avg_score, 3))
span.set_attribute("top_score", round(contexts[0]['score'], 3))
# Publish retrieval quality metric
self._publish_metric(
"RetrievalQuality",
avg_score if contexts else 0,
namespace="GenAI/RAG/Retrieval"
)
# Step 3: Rerank (optional but recommended)
with self.tracer.start_as_current_span("rerank_documents") as span:
contexts = self._rerank_contexts(user_query, contexts, top_k=3)
span.set_attribute("documents_after_rerank", len(contexts))
if contexts:
span.set_attribute("top_rerank_score", round(contexts[0]['rerank_score'], 3))
# Step 4: Build prompt and count tokens
with self.tracer.start_as_current_span("prompt_construction") as span:
prompt = self._build_prompt(user_query, contexts)
input_tokens = self._estimate_tokens(prompt)
span.set_attribute("input_tokens", input_tokens)
span.set_attribute("context_documents", len(contexts))
span.set_attribute("prompt_length_chars", len(prompt))
# Check context window
max_context_window = 200000 # Claude Sonnet 4
if input_tokens > max_context_window:
span.set_attribute("error", "context_window_exceeded")
raise ValueError(f"Input tokens ({input_tokens}) exceed context window")
# Step 5: Generate response
with self.tracer.start_as_current_span("llm_generation") as span:
response = self._generate_response(prompt)
# Extract metrics
usage = response.get('usage', {})
input_tokens = usage.get('input_tokens', 0)
output_tokens = usage.get('output_tokens', 0)
model_id = "anthropic.claude-sonnet-4-20250514"
# Calculate cost
cost = self._calculate_cost(
model_id=model_id,
input_tokens=input_tokens,
output_tokens=output_tokens
)
# Add span attributes
span.set_attribute("model_id", model_id)
span.set_attribute("input_tokens", input_tokens)
span.set_attribute("output_tokens", output_tokens)
span.set_attribute("total_tokens", input_tokens + output_tokens)
span.set_attribute("generation_cost_usd", cost)
span.set_attribute("stop_reason", response.get('stop_reason', 'unknown'))
# Publish token metrics
self._publish_metric("InputTokens", input_tokens, namespace="GenAI/Tokens")
self._publish_metric("OutputTokens", output_tokens, namespace="GenAI/Tokens")
self._publish_metric("GenerationCost", cost, namespace="GenAI/Cost")
# Step 6: Extract answer
answer = response['content'][0]['text']
# Add overall metrics to root span
total_cost = embed_cost + cost
root_span.set_attribute("total_cost_usd", round(total_cost, 4))
root_span.set_attribute("total_tokens", input_tokens + output_tokens)
root_span.set_attribute("answer_length", len(answer))
root_span.set_attribute("status", "success")
# Publish overall metrics
self._publish_metric("RequestCost", total_cost, namespace="GenAI/Cost")
self._publish_metric("RequestSuccess", 1, namespace="GenAI/Quality")
return {
"answer": answer,
"metadata": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_cost": round(total_cost, 4),
"contexts_used": len(contexts),
"model": model_id
}
}
except Exception as e:
# Capture error in span
root_span.set_attribute("error", True)
root_span.set_attribute("error_type", type(e).__name__)
root_span.set_attribute("error_message", str(e))
root_span.set_attribute("status", "error")
# Publish error metric
self._publish_metric("RequestErrors", 1, namespace="GenAI/Errors")
# Re-raise
raise
def _generate_embeddings(self, text: str) -> tuple:
"""Generate embeddings with Bedrock Titan"""
response = self.bedrock_runtime.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body=json.dumps({
"inputText": text,
"dimensions": 1024,
"normalize": True
})
)
result = json.loads(response['body'].read())
embeddings = result['embedding']
# Calculate cost
token_count = len(text.split()) * 1.3 # Rough estimate
cost = (token_count / 1000) * self.pricing["amazon.titan-embed-text-v2:0"]["input"]
return embeddings, cost
def _vector_search(self, embeddings: List[float], top_k: int = 5) -> List[Dict]:
"""
Search OpenSearch vector index
Note: This is automatically traced via boto3 instrumentation
"""
# OpenSearch vector search
# In production, use actual OpenSearch client
# Mock response for example
return [
{
"id": "doc_1",
"score": 0.89,
"text": "Electronics can be returned within 30 days..."
},
{
"id": "doc_2",
"score": 0.76,
"text": "Damaged items require photo documentation..."
},
{
"id": "doc_3",
"score": 0.71,
"text": "Restocking fees apply to opened electronics..."
}
]
def _rerank_contexts(
self,
query: str,
contexts: List[Dict],
top_k: int = 3
) -> List[Dict]:
"""
Rerank contexts using cross-encoder
In production, use:
- Bedrock reranking model
- Cohere rerank
- Custom cross-encoder
"""
# For example, just return top contexts
# In production, apply reranking model
for ctx in contexts[:top_k]:
ctx['rerank_score'] = ctx['score'] * 1.1 # Mock rerank
return contexts[:top_k]
def _build_prompt(self, query: str, contexts: List[Dict]) -> str:
"""Build prompt from query and contexts"""
context_text = "\n\n".join([
f"Document {i+1}:\n{ctx['text']}"
for i, ctx in enumerate(contexts)
])
prompt = f"""You are a helpful customer service assistant. Answer the user's question based on the provided context.
Context:
{context_text}
Question: {query}
Answer the question using only information from the context. If the context doesn't contain enough information, say so."""
return prompt
def _estimate_tokens(self, text: str) -> int:
"""Rough token estimation"""
# 1 token ≈ 0.75 words for English
return int(len(text.split()) * 1.3)
def _generate_response(self, prompt: str) -> Dict:
"""Generate response with Bedrock"""
response = self.bedrock_runtime.invoke_model(
modelId="anthropic.claude-sonnet-4-20250514",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"temperature": 0.7,
"messages": [
{"role": "user", "content": prompt}
]
})
)
return json.loads(response['body'].read())
def _calculate_cost(
self,
model_id: str,
input_tokens: int,
output_tokens: int
) -> float:
"""Calculate request cost"""
pricing = self.pricing.get(model_id, {"input": 0, "output": 0})
cost = (
(input_tokens / 1000) * pricing["input"] +
(output_tokens / 1000) * pricing["output"]
)
return cost
def _publish_metric(
self,
metric_name: str,
value: float,
namespace: str = "GenAI/Custom"
):
"""Publish custom metric to CloudWatch"""
try:
self.cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': 'None',
'Timestamp': datetime.now()
}
]
)
except Exception as e:
# Don't fail request if metric publishing fails
print(f"Warning: Failed to publish metric {metric_name}: {e}")
# Usage
rag_system = InstrumentedRAGSystem()
response = rag_system.query(
user_query="What's the return policy for damaged electronics?",
user_id="user_12345"
)
print(f"Answer: {response['answer']}")
print(f"Cost: ${response['metadata']['total_cost']}")
print(f"Tokens: {response['metadata']['total_tokens']}")
AWS X-Ray Integration
X-Ray provides the service map and bottleneck detection that traces alone can't give you.
Enabling X-Ray Active Tracing
Lambda Function:
Terraform Configuration:
Custom X-Ray Segments
# custom_xray_segments.py
from aws_xray_sdk.core import xray_recorder
class XRayInstrumentedRAG:
"""RAG system with custom X-Ray segments"""
def query(self, user_query: str):
"""Process query with custom segments"""
# Retrieval segment
with xray_recorder.capture('retrieval') as segment:
contexts = self._retrieve_contexts(user_query)
# Add annotations (indexed for filtering)
segment.put_annotation('documents_found', len(contexts))
segment.put_annotation('avg_relevance',
sum(c['score'] for c in contexts) / len(contexts))
# Add metadata (not indexed)
segment.put_metadata('retrieval_method', 'vector_search')
segment.put_metadata('top_documents', [c['id'] for c in contexts[:3]])
# Generation segment
with xray_recorder.capture('generation') as segment:
response = self._generate(user_query, contexts)
# Annotations
segment.put_annotation('input_tokens', response['input_tokens'])
segment.put_annotation('output_tokens', response['output_tokens'])
segment.put_annotation('cost_usd', response['cost'])
# Metadata
segment.put_metadata('model_id', response['model_id'])
segment.put_metadata('stop_reason', response['stop_reason'])
return response
X-Ray Service Map Insights
X-Ray automatically generates service maps showing:
Building Comprehensive CloudWatch Dashboards
Create unified dashboards showing the full picture:
# comprehensive_dashboard.py
import boto3
import json
from typing import Dict, List
class GenAIDashboardBuilder:
"""Build comprehensive CloudWatch dashboards for GenAI systems"""
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
def create_production_dashboard(self) -> str:
"""
Create production-grade dashboard with:
- Quality metrics
- Performance metrics
- Cost tracking
- Error monitoring
- User satisfaction
"""
dashboard_body = {
"widgets": self._build_all_widgets()
}
response = self.cloudwatch.put_dashboard(
DashboardName='GenAI-Production-Observability',
DashboardBody=json.dumps(dashboard_body)
)
dashboard_url = (
f"https://console.aws.amazon.com/cloudwatch/home"
f"?region=us-east-1#dashboards:name=GenAI-Production-Observability"
)
print(f"✓ Dashboard created: {dashboard_url}")
return dashboard_url
def _build_all_widgets(self) -> List[Dict]:
"""Build all dashboard widgets"""
widgets = []
# Row 1: Quality Metrics (0, 0)
widgets.append(self._quality_metrics_widget(x=0, y=0))
widgets.append(self._quality_distribution_widget(x=12, y=0))
# Row 2: Performance Metrics (0, 6)
widgets.append(self._latency_breakdown_widget(x=0, y=6))
widgets.append(self._throughput_widget(x=12, y=6))
# Row 3: Cost & Tokens (0, 12)
widgets.append(self._cost_metrics_widget(x=0, y=12))
widgets.append(self._token_usage_widget(x=8, y=12))
widgets.append(self._cost_per_user_widget(x=16, y=12))
# Row 4: Errors & Alerts (0, 18)
widgets.append(self._error_rate_widget(x=0, y=18))
widgets.append(self._error_breakdown_widget(x=8, y=18))
widgets.append(self._recent_errors_log_widget(x=16, y=18))
# Row 5: Model Performance (0, 24)
widgets.append(self._model_comparison_widget(x=0, y=24))
widgets.append(self._stop_reasons_widget(x=12, y=24))
# Row 6: User Experience (0, 30)
widgets.append(self._user_satisfaction_widget(x=0, y=30))
widgets.append(self._session_metrics_widget(x=12, y=30))
# Row 7: X-Ray Service Map (0, 36)
widgets.append(self._xray_service_map_widget(x=0, y=36))
return widgets
def _quality_metrics_widget(self, x: int, y: int) -> Dict:
"""Real-time quality metrics"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/Quality", "Faithfulness", {
"stat": "Average",
"label": "Faithfulness"
}],
[".", "AnswerRelevancy", {
"stat": "Average",
"label": "Relevancy"
}],
[".", "ContextPrecision", {
"stat": "Average",
"label": "Context Precision"
}],
[".", "ContextRecall", {
"stat": "Average",
"label": "Context Recall"
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "📊 RAG Quality Metrics",
"period": 300,
"yAxis": {
"left": {
"min": 0,
"max": 1,
"label": "Score"
}
},
"annotations": {
"horizontal": [
{
"value": 0.85,
"label": "Target",
"color": "#2ca02c"
},
{
"value": 0.75,
"label": "Warning",
"color": "#ff7f0e"
},
{
"value": 0.60,
"label": "Critical",
"color": "#d62728"
}
]
}
}
}
def _quality_distribution_widget(self, x: int, y: int) -> Dict:
"""Quality score distribution"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/Quality", "Faithfulness", {
"stat": "p50",
"label": "P50"
}],
["...", {
"stat": "p90",
"label": "P90"
}],
["...", {
"stat": "p99",
"label": "P99"
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "📈 Faithfulness Distribution (P50/P90/P99)",
"period": 300
}
}
def _latency_breakdown_widget(self, x: int, y: int) -> Dict:
"""Latency breakdown by component"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/Performance", "EmbeddingLatency", {
"stat": "Average",
"label": "Embeddings"
}],
[".", "VectorSearchLatency", {
"stat": "Average",
"label": "Vector Search"
}],
[".", "RerankLatency", {
"stat": "Average",
"label": "Reranking"
}],
[".", "GenerationLatency", {
"stat": "Average",
"label": "LLM Generation"
}],
[".", "EndToEndLatency", {
"stat": "Average",
"label": "Total",
"color": "#1f77b4"
}]
],
"view": "timeSeries",
"stacked": True,
"region": "us-east-1",
"title": "⚡ Latency Breakdown (Stacked)",
"period": 300,
"yAxis": {
"left": {
"label": "Milliseconds"
}
}
}
}
def _throughput_widget(self, x: int, y: int) -> Dict:
"""Request throughput"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/Throughput", "RequestCount", {
"stat": "Sum",
"label": "Total Requests"
}],
[".", "SuccessfulRequests", {
"stat": "Sum",
"label": "Successful"
}],
[".", "FailedRequests", {
"stat": "Sum",
"label": "Failed"
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "🔄 Request Throughput",
"period": 300
}
}
def _cost_metrics_widget(self, x: int, y: int) -> Dict:
"""Cost tracking"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 8,
"height": 6,
"properties": {
"metrics": [
["GenAI/Cost", "TotalCost", {
"stat": "Sum",
"label": "Total Cost"
}],
[".", "EmbeddingCost", {
"stat": "Sum",
"label": "Embeddings"
}],
[".", "GenerationCost", {
"stat": "Sum",
"label": "Generation"
}]
],
"view": "timeSeries",
"stacked": True,
"region": "us-east-1",
"title": "💰 Cost Breakdown (USD)",
"period": 300,
"yAxis": {
"left": {
"label": "USD"
}
}
}
}
def _token_usage_widget(self, x: int, y: int) -> Dict:
"""Token consumption"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 8,
"height": 6,
"properties": {
"metrics": [
["GenAI/Tokens", "InputTokens", {
"stat": "Sum",
"label": "Input Tokens"
}],
[".", "OutputTokens", {
"stat": "Sum",
"label": "Output Tokens"
}]
],
"view": "timeSeries",
"stacked": True,
"region": "us-east-1",
"title": "🎫 Token Usage",
"period": 300
}
}
def _cost_per_user_widget(self, x: int, y: int) -> Dict:
"""Cost per user/query"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 8,
"height": 6,
"properties": {
"metrics": [
["GenAI/Cost", "CostPerQuery", {
"stat": "Average",
"label": "Avg per Query"
}],
["...", {
"stat": "p95",
"label": "P95 per Query"
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "💵 Cost Per Query",
"period": 300,
"yAxis": {
"left": {
"label": "USD"
}
}
}
}
def _error_rate_widget(self, x: int, y: int) -> Dict:
"""Error rate tracking"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 8,
"height": 6,
"properties": {
"metrics": [
["GenAI/Errors", "ErrorRate", {
"stat": "Average",
"label": "Error Rate %"
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "❌ Error Rate",
"period": 300,
"yAxis": {
"left": {
"label": "Percentage",
"min": 0,
"max": 100
}
},
"annotations": {
"horizontal": [
{
"value": 1,
"label": "Target < 1%",
"color": "#2ca02c"
},
{
"value": 5,
"label": "Critical > 5%",
"color": "#d62728"
}
]
}
}
}
def _error_breakdown_widget(self, x: int, y: int) -> Dict:
"""Error breakdown by type"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 8,
"height": 6,
"properties": {
"metrics": [
["GenAI/Errors", "RetrievalErrors", {
"stat": "Sum",
"label": "Retrieval"
}],
[".", "GenerationErrors", {
"stat": "Sum",
"label": "Generation"
}],
[".", "ThrottlingErrors", {
"stat": "Sum",
"label": "Throttling"
}],
[".", "ValidationErrors", {
"stat": "Sum",
"label": "Validation"
}]
],
"view": "timeSeries",
"stacked": True,
"region": "us-east-1",
"title": "🔍 Error Breakdown",
"period": 300
}
}
def _recent_errors_log_widget(self, x: int, y: int) -> Dict:
"""Recent errors from logs"""
return {
"type": "log",
"x": x,
"y": y,
"width": 8,
"height": 6,
"properties": {
"query": """
SOURCE '/aws/lambda/rag-api'
| fields @timestamp, @message, error_type, request_id
| filter @message like /ERROR/
| sort @timestamp desc
| limit 20
""",
"region": "us-east-1",
"title": "📋 Recent Errors",
"view": "table"
}
}
def _model_comparison_widget(self, x: int, y: int) -> Dict:
"""Compare model performance"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/Models", "AvgLatency", {
"stat": "Average",
"dimensions": {"ModelId": "claude-sonnet-4"}
}],
["...", {
"dimensions": {"ModelId": "claude-opus-4"}
}],
["...", {
"dimensions": {"ModelId": "claude-haiku-4"}
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "🤖 Model Latency Comparison",
"period": 300
}
}
def _stop_reasons_widget(self, x: int, y: int) -> Dict:
"""LLM stop reasons distribution"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/Behavior", "StopReason", {
"stat": "SampleCount",
"dimensions": {"Reason": "end_turn"}
}],
["...", {
"dimensions": {"Reason": "max_tokens"}
}],
["...", {
"dimensions": {"Reason": "stop_sequence"}
}]
],
"view": "timeSeries",
"stacked": True,
"region": "us-east-1",
"title": "🛑 Stop Reasons",
"period": 300
}
}
def _user_satisfaction_widget(self, x: int, y: int) -> Dict:
"""User feedback scores"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/UserExperience", "FeedbackScore", {
"stat": "Average",
"label": "Avg Satisfaction"
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "⭐ User Satisfaction (1-5)",
"period": 300,
"yAxis": {
"left": {
"min": 1,
"max": 5
}
}
}
}
def _session_metrics_widget(self, x: int, y: int) -> Dict:
"""Session-level metrics"""
return {
"type": "metric",
"x": x,
"y": y,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["GenAI/Sessions", "AvgSessionDuration", {
"stat": "Average",
"label": "Avg Duration (s)"
}],
[".", "AvgTurnsPerSession", {
"stat": "Average",
"label": "Avg Turns"
}]
],
"view": "timeSeries",
"stacked": False,
"region": "us-east-1",
"title": "💬 Session Metrics",
"period": 300
}
}
def _xray_service_map_widget(self, x: int, y: int) -> Dict:
"""X-Ray service map"""
return {
"type": "trace",
"x": x,
"y": y,
"width": 24,
"height": 8,
"properties": {
"title": "🗺️ X-Ray Service Map - RAG System",
"region": "us-east-1"
}
}
# Create dashboard
builder = GenAIDashboardBuilder()
dashboard_url = builder.create_production_dashboard()
Alarming Strategy for GenAI Systems
Set up intelligent alarms that catch real issues:
# genai_alarms.py
import boto3
from typing import Dict, List
class GenAIAlarmManager:
"""Comprehensive alarming for GenAI systems"""
def __init__(self, sns_topic_arn: str):
self.cloudwatch = boto3.client('cloudwatch')
self.sns_topic_arn = sns_topic_arn
def create_all_alarms(self):
"""Create complete alarm suite"""
alarms = [
# Quality alarms
self._quality_degradation_alarm(),
self._faithfulness_critical_alarm(),
# Performance alarms
self._high_latency_alarm(),
self._latency_spike_alarm(),
# Cost alarms
self._cost_spike_alarm(),
self._daily_budget_alarm(),
# Error alarms
self._high_error_rate_alarm(),
self._retrieval_failure_alarm(),
# Composite alarms
self._system_degraded_composite_alarm()
]
for alarm_config in alarms:
self.cloudwatch.put_metric_alarm(**alarm_config)
print(f"✓ Created alarm: {alarm_config['AlarmName']}")
def _quality_degradation_alarm(self) -> Dict:
"""Alert when quality metrics drop"""
return {
'AlarmName': 'RAG-Quality-Degradation',
'ComparisonOperator': 'LessThanThreshold',
'EvaluationPeriods': 2,
'DatapointsToAlarm': 2, # 2 out of 2
'MetricName': 'Faithfulness',
'Namespace': 'GenAI/Quality',
'Period': 300,
'Statistic': 'Average',
'Threshold': 0.75,
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'Faithfulness score dropped below 0.75 for 10 minutes',
'TreatMissingData': 'notBreaching'
}
def _faithfulness_critical_alarm(self) -> Dict:
"""Critical alarm for severe quality drop"""
return {
'AlarmName': 'RAG-Faithfulness-Critical',
'ComparisonOperator': 'LessThanThreshold',
'EvaluationPeriods': 1,
'DatapointsToAlarm': 1,
'MetricName': 'Faithfulness',
'Namespace': 'GenAI/Quality',
'Period': 300,
'Statistic': 'Average',
'Threshold': 0.60,
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'CRITICAL: Faithfulness below 0.60 - immediate action required',
'TreatMissingData': 'breaching'
}
def _high_latency_alarm(self) -> Dict:
"""Alert on high P95 latency"""
return {
'AlarmName': 'RAG-High-Latency-P95',
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 3,
'DatapointsToAlarm': 2,
'MetricName': 'EndToEndLatency',
'Namespace': 'GenAI/Performance',
'Period': 300,
'ExtendedStatistic': 'p95',
'Threshold': 5000, # 5 seconds
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'P95 latency exceeded 5 seconds',
'TreatMissingData': 'notBreaching'
}
def _latency_spike_alarm(self) -> Dict:
"""Detect sudden latency spikes using anomaly detection"""
return {
'AlarmName': 'RAG-Latency-Anomaly',
'ComparisonOperator': 'GreaterThanUpperThreshold',
'EvaluationPeriods': 2,
'Metrics': [
{
'Id': 'm1',
'ReturnData': True,
'MetricStat': {
'Metric': {
'Namespace': 'GenAI/Performance',
'MetricName': 'EndToEndLatency'
},
'Period': 300,
'Stat': 'Average'
}
},
{
'Id': 'ad1',
'Expression': 'ANOMALY_DETECTION_BAND(m1, 2)',
'Label': 'Latency (expected)'
}
],
'ThresholdMetricId': 'ad1',
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'Latency anomaly detected (2 standard deviations)',
'TreatMissingData': 'notBreaching'
}
def _cost_spike_alarm(self) -> Dict:
"""Alert on unexpected cost spikes"""
return {
'AlarmName': 'RAG-Cost-Spike',
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 1,
'DatapointsToAlarm': 1,
'MetricName': 'TotalCost',
'Namespace': 'GenAI/Cost',
'Period': 300,
'Statistic': 'Sum',
'Threshold': 50.0, # $50 per 5 minutes
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'Cost spike detected: >$50 in 5 minutes',
'TreatMissingData': 'notBreaching'
}
def _daily_budget_alarm(self) -> Dict:
"""Alert when approaching daily budget"""
return {
'AlarmName': 'RAG-Daily-Budget-Warning',
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 1,
'DatapointsToAlarm': 1,
'MetricName': 'TotalCost',
'Namespace': 'GenAI/Cost',
'Period': 86400, # 24 hours
'Statistic': 'Sum',
'Threshold': 800.0, # $800 per day (80% of $1000 budget)
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'Daily cost approaching budget limit (80%)',
'TreatMissingData': 'notBreaching'
}
def _high_error_rate_alarm(self) -> Dict:
"""Alert on elevated error rate"""
return {
'AlarmName': 'RAG-High-Error-Rate',
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 2,
'DatapointsToAlarm': 2,
'MetricName': 'ErrorRate',
'Namespace': 'GenAI/Errors',
'Period': 300,
'Statistic': 'Average',
'Threshold': 5.0, # 5% error rate
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'Error rate exceeded 5%',
'TreatMissingData': 'notBreaching'
}
def _retrieval_failure_alarm(self) -> Dict:
"""Alert on retrieval failures"""
return {
'AlarmName': 'RAG-Retrieval-Failures',
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 1,
'DatapointsToAlarm': 1,
'MetricName': 'RetrievalErrors',
'Namespace': 'GenAI/Errors',
'Period': 300,
'Statistic': 'Sum',
'Threshold': 10, # 10 retrieval failures in 5 min
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': 'Multiple retrieval failures detected',
'TreatMissingData': 'notBreaching'
}
def _system_degraded_composite_alarm(self) -> Dict:
"""Composite alarm for multiple degradation signals"""
return {
'AlarmName': 'RAG-System-Degraded',
'AlarmRule': (
'(ALARM("RAG-Quality-Degradation") OR ALARM("RAG-High-Latency-P95")) '
'AND ALARM("RAG-High-Error-Rate")'
),
'ActionsEnabled': True,
'AlarmActions': [self.sns_topic_arn],
'AlarmDescription': (
'System degraded: Multiple quality/performance/error issues detected'
)
}
# Usage
alarm_manager = GenAIAlarmManager(
sns_topic_arn='arn:aws:sns:us-east-1:123456789:genai-alerts'
)
alarm_manager.create_all_alarms()
Integration with Existing Observability Tools
Grafana Integration
Datadog Integration
Key Takeaways
CloudWatch GenAI Observability is purpose-built - Provides out-of-the-box dashboards for Bedrock model invocations and AgentCore agents. No custom instrumentation needed for basic metrics.
OpenTelemetry + ADOT enables custom observability - Use ADOT to instrument your application with custom spans capturing retrieval quality, token usage, and costs. Automatically traces boto3 AWS SDK calls.
X-Ray provides the service map - Distributed tracing shows bottlenecks across your RAG pipeline. Service maps visualize dependencies and highlight slow components (typically vector search).
Comprehensive dashboards require custom metrics - Quality scores (faithfulness, relevancy), cost per query, and token breakdowns need custom CloudWatch metrics alongside out-of-the-box Bedrock metrics.
Intelligent alarming prevents incidents - Set thresholds for quality degradation, cost spikes, and latency. Use composite alarms for multi-signal degradation detection. Anomaly detection catches unusual patterns.
Integration extends visibility - Export to Grafana, Datadog, or existing observability stacks using CloudWatch exporters or direct API integration. Don't build in isolation.
Traces + Metrics + Logs = Complete picture - You need all three: traces for request flows, metrics for aggregates, logs for debugging specific failures. CloudWatch GenAI Observability provides this unified view.
What's Next in This Series
Part 4: Production Hardening & Advanced Patterns
We'll close the series with production-ready patterns:
- Guardrails in production: Content filtering, PII detection, toxicity screening
- Human-in-the-loop evaluation: Building feedback loops and annotation workflows
- Incident response playbooks: What to do when GenAI fails at 3 AM
- A/B testing strategies: Testing prompts, models, and RAG configurations
- Canary deployments: Safe rollout strategies with automated rollback
- Advanced cost optimization: Model routing, caching, and batch processing
- Security hardening: Protecting against prompt injection and jailbreaks
Additional Resources
AWS Documentation:
- CloudWatch GenAI Observability
- AWS X-Ray Developer Guide
- AWS Distro for OpenTelemetry
- OpenTelemetry Python SDK
Sample Code & Workshops:
OpenTelemetry Resources:
Integration Guides:
Let's Connect!
Building observability for production GenAI systems? Let's share experiences!
Follow me for Part 4 (the series finale!) on Production Hardening & Advanced Patterns. We'll cover guardrails, incident response, A/B testing, and cost optimization—everything you need to run GenAI at scale.
About the Author
Connect with me on:
Tags: #aws #genai #observability #cloudwatch #xray #opentelemetry #monitoring #genaops #bedrock #distributedtracing













Top comments (0)