This is Part 2 of a two-part series. If you haven't read the architecture overview yet, start with Part 1: Streaming Agents with API Gateway to understand the concepts before diving into the implementation.
This guide walks you through the complete setup: CDK stacks, agent code, authentication flow, and deployment. By the end, you'll have a production-ready streaming agent protected by API Gateway.
Complete code: on GitHub
Prerequisites
- AWS Account with appropriate permissions
- AWS CDK installed (
npm install -g aws-cdk) - Python 3.11+ with uv (
pip install uv) - Basic understanding of CDK, API Gateway, and Cognito
Architecture Overview
We'll deploy three CDK stacks in order:
- Cognito Stack: User Pool for OAuth2/JWT authentication
- Runtime Stack: AgentCore Runtime with JWT authorizer
- API Gateway Stack: REST API with streaming enabled
The deployment order matters because each stack depends on outputs from the previous one.
Project Structure
api-gw-sr-runtime/
├── app.py # CDK app entry point
├── chatbot_spa_cdk/
│ ├── chatbot_spa_stack.py # Cognito + API Gateway
│ └── agent_runtime_stack.py # AgentCore Runtime
├── agent/
│ └── agent.py # Streaming agent code
├── spa/ # Frontend application
└── pyproject.toml
Step 1: CDK App Setup
The main CDK app orchestrates the three stacks with proper dependencies. The actual implementation includes environment configuration and resource naming:
# app.py (simplified - see repo for full version)
from aws_cdk import App, Environment
from chatbot_spa_cdk.chatbot_spa_stack import ChatbotSpaStack
from chatbot_spa_cdk.agent_runtime_stack import AgentRuntimeStack
app = App()
# Step 1: Deploy Cognito first
cognito_stack = ChatbotSpaStack(
app, "ChatbotCognitoStack",
resource_prefix="chatbot-spa",
backend_url=None, # Skip API Gateway for now
callback_url="http://localhost:3000/callback.html",
env=env
)
# Step 2: Deploy Runtime with Cognito references
runtime_stack = AgentRuntimeStack(
app, "ChatbotAgentRuntimeStack",
resource_prefix="chatbot-spa",
user_pool=cognito_stack.user_pool,
user_pool_client=cognito_stack.user_pool_client,
env=env
)
runtime_stack.add_dependency(cognito_stack)
# Step 3: Deploy API Gateway pointing to Runtime
api_stack = ChatbotSpaStack(
app, "ChatbotApiGatewayStack",
resource_prefix="chatbot-spa",
backend_url=runtime_stack.runtime_endpoint,
existing_user_pool=cognito_stack.user_pool,
existing_user_pool_client=cognito_stack.user_pool_client,
env=env
)
api_stack.add_dependency(runtime_stack)
app.synth()
Key points:
- Cognito deploys first (no dependencies)
- Runtime depends on Cognito (needs User Pool for JWT validation)
- API Gateway depends on Runtime (needs endpoint URL)
- The
resource_prefixparameter makes resources easily identifiable in the console
Step 2: Cognito Stack
The Cognito configuration is part of the ChatbotSpaStack. When deployed without a backend_url, it creates just the User Pool:
# From chatbot_spa_cdk/chatbot_spa_stack.py (simplified)
from aws_cdk import Stack, Duration
from aws_cdk import aws_cognito as cognito
class ChatbotSpaStack(Stack):
def __init__(self, scope, construct_id, resource_prefix, callback_url, **kwargs):
super().__init__(scope, construct_id, **kwargs)
# Create User Pool
user_pool = cognito.UserPool(
self,
"UserPool",
user_pool_name=f"{resource_prefix}-user-pool",
self_sign_up_enabled=False,
sign_in_aliases=cognito.SignInAliases(email=True),
auto_verify=cognito.AutoVerifiedAttrs(email=True),
password_policy=cognito.PasswordPolicy(
min_length=8,
require_uppercase=True,
require_lowercase=True,
require_digits=True,
require_symbols=False,
),
)
# Enable Managed Login UI (Essentials tier)
cfn_user_pool = user_pool.node.default_child
cfn_user_pool.add_property_override("UserPoolTier", "ESSENTIALS")
# Add domain for hosted UI
user_pool_domain = user_pool.add_domain(
"UserPoolDomain",
cognito_domain=cognito.CognitoDomainOptions(
domain_prefix=f"{resource_prefix}-{self.account}",
),
)
# Create OAuth2 client
user_pool_client = user_pool.add_client(
"UserPoolClient",
user_pool_client_name=f"{resource_prefix}-client",
generate_secret=False, # Public client for web apps
o_auth=cognito.OAuthSettings(
flows=cognito.OAuthFlows(authorization_code_grant=True),
scopes=[
cognito.OAuthScope.OPENID,
cognito.OAuthScope.EMAIL,
cognito.OAuthScope.PROFILE,
],
callback_urls=[callback_url, "http://localhost:3000"],
logout_urls=["http://localhost:3000"],
),
refresh_token_validity=Duration.days(30),
access_token_validity=Duration.minutes(60),
id_token_validity=Duration.minutes(60),
)
# Export for other stacks
self.user_pool = user_pool
self.user_pool_client = user_pool_client
Configuration details:
- self_sign_up_enabled=False: Prevents public registration (you control who gets access)
- sign_in_aliases: Users sign in with email addresses
- generate_secret=False: Public client (web apps can't keep secrets)
- authorization_code_grant: Standard OAuth2 flow for web applications
- OPENID scope: Required for ID tokens
- callback_urls: Where Cognito redirects after authentication
What you get:
- User Pool that issues JWT ID tokens
- Hosted UI for authentication (optional, you can build your own)
- OAuth2 client configured for web applications
Step 3: AgentCore Runtime Stack
Deploy your agent to AgentCore Runtime with JWT authorization. The actual implementation uses CfnResource and includes bundling logic for dependencies:
# From chatbot_spa_cdk/agent_runtime_stack.py (simplified)
from aws_cdk import Stack, CfnResource
from aws_cdk.aws_s3_assets import Asset
class AgentRuntimeStack(Stack):
def __init__(self, scope, construct_id, user_pool, user_pool_client,
resource_prefix, **kwargs):
super().__init__(scope, construct_id, **kwargs)
# Package agent code with dependencies
# (See repo for full bundling configuration)
agent_asset = Asset(
self,
"AgentCodeAsset",
path="./agent",
# bundling configuration omitted for brevity
)
# Build Cognito OIDC discovery URL
discovery_url = (
f"https://cognito-idp.{self.region}.amazonaws.com/"
f"{user_pool.user_pool_id}/.well-known/openid-configuration"
)
# Create runtime using CfnResource (Layer 1 construct)
runtime_name = resource_prefix.replace("-", "_") + "_agent_runtime"
runtime = CfnResource(
self,
"AgentCoreRuntime",
type="AWS::BedrockAgentCore::Runtime",
properties={
"AgentRuntimeName": runtime_name,
"Description": f"Runtime for {resource_prefix} with streaming",
"RoleArn": runtime_role.role_arn, # IAM role created separately
"NetworkConfiguration": {
"NetworkMode": "PUBLIC",
},
"AuthorizerConfiguration": {
"CustomJWTAuthorizer": {
"DiscoveryUrl": discovery_url,
"AllowedAudience": [user_pool_client.user_pool_client_id],
}
},
"AgentRuntimeArtifact": {
"CodeConfiguration": {
"Code": {
"S3": {
"Bucket": agent_asset.s3_bucket_name,
"Prefix": agent_asset.s3_object_key,
}
},
"EntryPoint": ["agent.py"],
"Runtime": "PYTHON_3_12",
}
},
},
)
# Build the OAuth2 endpoint URL
runtime_id = runtime.ref
runtime_endpoint = (
f"https://bedrock-agentcore.{self.region}.amazonaws.com/"
f"runtimes/{runtime_id}/invocations"
f"?qualifier=DEFAULT&accountId={self.account}"
)
self.runtime_endpoint = runtime_endpoint
Critical details:
- CustomJWTAuthorizer: Uses OIDC discovery to validate ID tokens from Cognito
- DiscoveryUrl: Points to Cognito's OIDC configuration endpoint
-
AllowedAudience: The User Pool Client ID (ID tokens must have this in their
audclaim) - /invocations endpoint: The OAuth2 endpoint that supports streaming
- qualifier=DEFAULT: Uses the default runtime version
- accountId: Required for cross-account access control
- CfnResource: Used because CDK doesn't have L2 constructs for AgentCore yet
What happens:
- Runtime validates every request's JWT ID token
- Invalid or missing tokens are rejected with 401
- Valid tokens allow the request to proceed to your agent
Step 4: API Gateway Stack
Create the REST API with streaming enabled:
from aws_cdk import Duration
from aws_cdk import aws_apigateway as apigw
class ApiGatewayStack(Stack):
def __init__(self, scope, construct_id, runtime_endpoint, user_pool, **kwargs):
super().__init__(scope, construct_id, **kwargs)
# Create REST API
api = apigw.RestApi(
self,
"Api",
rest_api_name="agent-api",
default_cors_preflight_options=apigw.CorsOptions(
allow_origins=["http://localhost:3000"],
allow_methods=["POST", "OPTIONS"],
allow_headers=["Content-Type", "Authorization"],
),
)
# Cognito authorizer
authorizer = apigw.CognitoUserPoolsAuthorizer(
self,
"CognitoAuthorizer",
cognito_user_pools=[user_pool],
)
# HTTP Proxy Integration
integration = apigw.HttpIntegration(
runtime_endpoint, # The OAuth2 endpoint from Runtime Stack
http_method="POST",
proxy=True,
options=apigw.IntegrationOptions(
connection_type=apigw.ConnectionType.INTERNET,
timeout=Duration.seconds(900), # 15 minutes with streaming
request_parameters={
"integration.request.header.Authorization":
"method.request.header.Authorization",
},
),
)
# Add method
chat_resource = api.root.add_resource("chat")
post_method = chat_resource.add_method(
"POST",
integration,
authorizer=authorizer,
authorization_type=apigw.AuthorizationType.COGNITO,
)
# CRITICAL: Enable streaming with escape hatch
cfn_method = post_method.node.default_child
cfn_method.add_property_override("Integration.ResponseTransferMode", "STREAM")
self.api_url = api.url
Why the escape hatch?
CDK's HttpIntegration doesn't expose ResponseTransferMode directly yet. The escape hatch lets you set it on the underlying CloudFormation resource.
What this does:
- API Gateway validates the JWT ID token (first layer of defense)
- Forwards the Authorization header to Runtime (second layer of defense)
- Streams the response instead of buffering it
- Allows up to 15 minutes for the request to complete
CORS configuration:
- Allows requests from your frontend origin
- Includes Authorization header in allowed headers
- Handles preflight OPTIONS requests
Step 5: Agent Implementation
This example uses two SDKs to simplify development:
- Strands Agents SDK: A Python framework for building agentic workflows with streaming support built-in
- Amazon Bedrock AgentCore SDK: Handles the AgentCore Runtime integration and streaming protocol
Both SDKs are optional. You can build agents with any framework that returns async generators, but these make it much easier to get up and running quickly for this demo.
Your agent code needs to return an async generator for streaming:
# agent/agent.py
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands import Agent
from strands_tools import calculator
app = BedrockAgentCoreApp()
# Lazy load agent for performance
_agent = None
def get_agent():
global _agent
if _agent is None:
_agent = Agent(
system_prompt="You are a helpful assistant that can perform calculations.",
tools=[calculator]
)
return _agent
@app.entrypoint
async def invoke(payload, context):
"""Entry point that returns an async generator for streaming"""
agent = get_agent()
prompt = payload.get("prompt", "Hello!")
# Return an async generator
async def generate_stream():
agent_stream = agent.stream_async(prompt)
async for event in agent_stream:
if "data" in event:
yield event["data"]
# You can also handle tool use events here if needed
return generate_stream()
How it works:
-
BedrockAgentCoreAppdetects when you return an async generator - It handles the streaming protocol automatically
- Each
yieldsends a chunk to the client immediately - The stream flows: Agent → Runtime → API Gateway → Client
Why lazy load the agent?
The runtime reuses the same container across invocations, which means the agent instance stays in memory. This is crucial for maintaining conversation context and history between requests. By lazy loading, you initialize the agent once and it persists across all subsequent invocations, allowing multi-turn conversations to work naturally.
Agent requirements:
# agent/pyproject.toml
[project]
name = "streaming-agent"
version = "0.1.0"
dependencies = [
"bedrock-agentcore-runtime",
"strands",
"strands-tools",
]
Step 6: Frontend Implementation
Handle streaming on the client side:
// Get ID token from Cognito (after OAuth2 flow)
const idToken = sessionStorage.getItem('id_token');
async function sendMessage(prompt) {
const response = await fetch(
'https://your-api.execute-api.us-west-2.amazonaws.com/chat',
{
method: 'POST',
headers: {
'Authorization': `Bearer ${idToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt }),
}
);
// Read the stream
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Decode chunk (may contain partial UTF-8 sequences)
buffer += decoder.decode(value, { stream: true });
// Display immediately
appendToMessage(buffer);
buffer = '';
}
// Flush any remaining buffer
if (buffer) {
appendToMessage(buffer);
}
}
Important: Use { stream: true } in TextDecoder.decode(). This handles partial UTF-8 sequences that can occur at chunk boundaries.
OAuth2 flow (simplified):
// Redirect to Cognito for authentication
function login() {
const cognitoDomain = 'https://agent-123456789.auth.us-west-2.amazoncognito.com';
const clientId = 'your-client-id';
const redirectUri = 'http://localhost:3000/callback.html';
window.location.href =
`${cognitoDomain}/oauth2/authorize?` +
`client_id=${clientId}&` +
`response_type=code&` +
`scope=openid+email&` +
`redirect_uri=${encodeURIComponent(redirectUri)}`;
}
// Handle callback (in callback.html)
async function handleCallback() {
const params = new URLSearchParams(window.location.search);
const code = params.get('code');
// Exchange code for tokens
const response = await fetch(
`${cognitoDomain}/oauth2/token`,
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
client_id: clientId,
code: code,
redirect_uri: redirectUri,
}),
}
);
const tokens = await response.json();
sessionStorage.setItem('id_token', tokens.id_token);
sessionStorage.setItem('access_token', tokens.access_token);
// Redirect back to app
window.location.href = '/';
}
Deployment
Deploy the stacks in order:
cd api-gw-sr-runtime
# Bootstrap CDK (first time only)
export AWS_PROFILE=your-profile
uv run cdk bootstrap
# Deploy all stacks
uv run cdk deploy --all
# Update frontend config with API URL
./update-spa-config.sh
# Create a test user
./create-test-user.sh testuser@example.com TestPassword123!
What happens:
- Cognito stack deploys (User Pool + Client)
- Runtime stack deploys (references Cognito)
- API Gateway stack deploys (references Runtime endpoint)
- Scripts configure frontend and create test user
Testing
Test with curl
# Get your ID token (from browser sessionStorage or Cognito)
ID_TOKEN="eyJraWQiOi..."
# Test with -N flag for no buffering
curl -N -X POST \
https://your-api.execute-api.us-west-2.amazonaws.com/chat \
-H "Authorization: Bearer $ID_TOKEN" \
-H "Content-Type: application/json" \
-d '{"prompt":"What is 25 * 4? Show your work."}'
You should see the response appear incrementally, not all at once.
Test with frontend
cd spa
python -m http.server 3000
Open http://localhost:3000, log in, and send a message. You should see the response stream in real-time.
Troubleshooting
Streaming not working (response appears all at once)
Check:
- Is
ResponseTransferMode: STREAMset on the API Gateway method? - Are you using the
/invocationsendpoint? - Is your agent returning an async generator?
401 Unauthorized
Check:
- Is the ID token valid? (Check expiration)
- Is the token in the Authorization header?
- Does the JWT authorizer configuration match your Cognito User Pool?
- Are you using the ID token (not access token)?
502 Bad Gateway
Check:
- Is the Runtime endpoint URL correct?
- Does the Runtime have the JWT authorizer configured?
- Is the agent code deployed correctly?
Connection drops after 30 seconds
You're using an edge-optimized endpoint. Switch to regional:
api = apigw.RestApi(
self,
"Api",
endpoint_types=[apigw.EndpointType.REGIONAL], # Add this
...
)
Agent not streaming
Check:
- Is your agent returning an async generator?
- Are you yielding chunks, not returning a complete response?
- Is the agent actually generating data? (Add logging)
Performance Optimization
Lazy Load Your Agent
_agent = None
def get_agent():
global _agent
if _agent is None:
_agent = Agent(...) # Only initialize once
return _agent
The runtime reuses the same container across invocations, lazy loading keeping the agent instance in memory. This allows the agent to maintain conversation history and context between requests, enabling natural multi-turn conversations without needing external storage.
Full Code Repository
Complete code: on GitHub
Includes:
- All CDK stacks
- Agent implementation
- Frontend with OAuth2
- Deployment scripts
- Test utilities
Top comments (0)