DEV Community

Mike Chambers for AWS

Posted on

Complete Tutorial: Streaming Agents on AWS

This is Part 2 of a two-part series. If you haven't read the architecture overview yet, start with Part 1: Streaming Agents with API Gateway to understand the concepts before diving into the implementation.

This guide walks you through the complete setup: CDK stacks, agent code, authentication flow, and deployment. By the end, you'll have a production-ready streaming agent protected by API Gateway.

Complete code: on GitHub


Prerequisites

  • AWS Account with appropriate permissions
  • AWS CDK installed (npm install -g aws-cdk)
  • Python 3.11+ with uv (pip install uv)
  • Basic understanding of CDK, API Gateway, and Cognito

Architecture Overview

We'll deploy three CDK stacks in order:

  1. Cognito Stack: User Pool for OAuth2/JWT authentication
  2. Runtime Stack: AgentCore Runtime with JWT authorizer
  3. API Gateway Stack: REST API with streaming enabled

The deployment order matters because each stack depends on outputs from the previous one.


Project Structure

api-gw-sr-runtime/
├── app.py                    # CDK app entry point
├── chatbot_spa_cdk/
│   ├── chatbot_spa_stack.py  # Cognito + API Gateway
│   └── agent_runtime_stack.py # AgentCore Runtime
├── agent/
│   └── agent.py              # Streaming agent code
├── spa/                      # Frontend application
└── pyproject.toml
Enter fullscreen mode Exit fullscreen mode

Step 1: CDK App Setup

The main CDK app orchestrates the three stacks with proper dependencies. The actual implementation includes environment configuration and resource naming:

# app.py (simplified - see repo for full version)
from aws_cdk import App, Environment
from chatbot_spa_cdk.chatbot_spa_stack import ChatbotSpaStack
from chatbot_spa_cdk.agent_runtime_stack import AgentRuntimeStack

app = App()

# Step 1: Deploy Cognito first
cognito_stack = ChatbotSpaStack(
    app, "ChatbotCognitoStack",
    resource_prefix="chatbot-spa",
    backend_url=None,  # Skip API Gateway for now
    callback_url="http://localhost:3000/callback.html",
    env=env
)

# Step 2: Deploy Runtime with Cognito references
runtime_stack = AgentRuntimeStack(
    app, "ChatbotAgentRuntimeStack",
    resource_prefix="chatbot-spa",
    user_pool=cognito_stack.user_pool,
    user_pool_client=cognito_stack.user_pool_client,
    env=env
)
runtime_stack.add_dependency(cognito_stack)

# Step 3: Deploy API Gateway pointing to Runtime
api_stack = ChatbotSpaStack(
    app, "ChatbotApiGatewayStack",
    resource_prefix="chatbot-spa",
    backend_url=runtime_stack.runtime_endpoint,
    existing_user_pool=cognito_stack.user_pool,
    existing_user_pool_client=cognito_stack.user_pool_client,
    env=env
)
api_stack.add_dependency(runtime_stack)

app.synth()
Enter fullscreen mode Exit fullscreen mode

Key points:

  • Cognito deploys first (no dependencies)
  • Runtime depends on Cognito (needs User Pool for JWT validation)
  • API Gateway depends on Runtime (needs endpoint URL)
  • The resource_prefix parameter makes resources easily identifiable in the console

Step 2: Cognito Stack

The Cognito configuration is part of the ChatbotSpaStack. When deployed without a backend_url, it creates just the User Pool:

# From chatbot_spa_cdk/chatbot_spa_stack.py (simplified)
from aws_cdk import Stack, Duration
from aws_cdk import aws_cognito as cognito

class ChatbotSpaStack(Stack):
    def __init__(self, scope, construct_id, resource_prefix, callback_url, **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        # Create User Pool
        user_pool = cognito.UserPool(
            self,
            "UserPool",
            user_pool_name=f"{resource_prefix}-user-pool",
            self_sign_up_enabled=False,
            sign_in_aliases=cognito.SignInAliases(email=True),
            auto_verify=cognito.AutoVerifiedAttrs(email=True),
            password_policy=cognito.PasswordPolicy(
                min_length=8,
                require_uppercase=True,
                require_lowercase=True,
                require_digits=True,
                require_symbols=False,
            ),
        )

        # Enable Managed Login UI (Essentials tier)
        cfn_user_pool = user_pool.node.default_child
        cfn_user_pool.add_property_override("UserPoolTier", "ESSENTIALS")

        # Add domain for hosted UI
        user_pool_domain = user_pool.add_domain(
            "UserPoolDomain",
            cognito_domain=cognito.CognitoDomainOptions(
                domain_prefix=f"{resource_prefix}-{self.account}",
            ),
        )

        # Create OAuth2 client
        user_pool_client = user_pool.add_client(
            "UserPoolClient",
            user_pool_client_name=f"{resource_prefix}-client",
            generate_secret=False,  # Public client for web apps
            o_auth=cognito.OAuthSettings(
                flows=cognito.OAuthFlows(authorization_code_grant=True),
                scopes=[
                    cognito.OAuthScope.OPENID,
                    cognito.OAuthScope.EMAIL,
                    cognito.OAuthScope.PROFILE,
                ],
                callback_urls=[callback_url, "http://localhost:3000"],
                logout_urls=["http://localhost:3000"],
            ),
            refresh_token_validity=Duration.days(30),
            access_token_validity=Duration.minutes(60),
            id_token_validity=Duration.minutes(60),
        )

        # Export for other stacks
        self.user_pool = user_pool
        self.user_pool_client = user_pool_client
Enter fullscreen mode Exit fullscreen mode

Configuration details:

  • self_sign_up_enabled=False: Prevents public registration (you control who gets access)
  • sign_in_aliases: Users sign in with email addresses
  • generate_secret=False: Public client (web apps can't keep secrets)
  • authorization_code_grant: Standard OAuth2 flow for web applications
  • OPENID scope: Required for ID tokens
  • callback_urls: Where Cognito redirects after authentication

What you get:

  • User Pool that issues JWT ID tokens
  • Hosted UI for authentication (optional, you can build your own)
  • OAuth2 client configured for web applications

Step 3: AgentCore Runtime Stack

Deploy your agent to AgentCore Runtime with JWT authorization. The actual implementation uses CfnResource and includes bundling logic for dependencies:

# From chatbot_spa_cdk/agent_runtime_stack.py (simplified)
from aws_cdk import Stack, CfnResource
from aws_cdk.aws_s3_assets import Asset

class AgentRuntimeStack(Stack):
    def __init__(self, scope, construct_id, user_pool, user_pool_client, 
                 resource_prefix, **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        # Package agent code with dependencies
        # (See repo for full bundling configuration)
        agent_asset = Asset(
            self,
            "AgentCodeAsset",
            path="./agent",
            # bundling configuration omitted for brevity
        )

        # Build Cognito OIDC discovery URL
        discovery_url = (
            f"https://cognito-idp.{self.region}.amazonaws.com/"
            f"{user_pool.user_pool_id}/.well-known/openid-configuration"
        )

        # Create runtime using CfnResource (Layer 1 construct)
        runtime_name = resource_prefix.replace("-", "_") + "_agent_runtime"

        runtime = CfnResource(
            self,
            "AgentCoreRuntime",
            type="AWS::BedrockAgentCore::Runtime",
            properties={
                "AgentRuntimeName": runtime_name,
                "Description": f"Runtime for {resource_prefix} with streaming",
                "RoleArn": runtime_role.role_arn,  # IAM role created separately
                "NetworkConfiguration": {
                    "NetworkMode": "PUBLIC",
                },
                "AuthorizerConfiguration": {
                    "CustomJWTAuthorizer": {
                        "DiscoveryUrl": discovery_url,
                        "AllowedAudience": [user_pool_client.user_pool_client_id],
                    }
                },
                "AgentRuntimeArtifact": {
                    "CodeConfiguration": {
                        "Code": {
                            "S3": {
                                "Bucket": agent_asset.s3_bucket_name,
                                "Prefix": agent_asset.s3_object_key,
                            }
                        },
                        "EntryPoint": ["agent.py"],
                        "Runtime": "PYTHON_3_12",
                    }
                },
            },
        )

        # Build the OAuth2 endpoint URL
        runtime_id = runtime.ref
        runtime_endpoint = (
            f"https://bedrock-agentcore.{self.region}.amazonaws.com/"
            f"runtimes/{runtime_id}/invocations"
            f"?qualifier=DEFAULT&accountId={self.account}"
        )

        self.runtime_endpoint = runtime_endpoint
Enter fullscreen mode Exit fullscreen mode

Critical details:

  • CustomJWTAuthorizer: Uses OIDC discovery to validate ID tokens from Cognito
  • DiscoveryUrl: Points to Cognito's OIDC configuration endpoint
  • AllowedAudience: The User Pool Client ID (ID tokens must have this in their aud claim)
  • /invocations endpoint: The OAuth2 endpoint that supports streaming
  • qualifier=DEFAULT: Uses the default runtime version
  • accountId: Required for cross-account access control
  • CfnResource: Used because CDK doesn't have L2 constructs for AgentCore yet

What happens:

  • Runtime validates every request's JWT ID token
  • Invalid or missing tokens are rejected with 401
  • Valid tokens allow the request to proceed to your agent

Step 4: API Gateway Stack

Create the REST API with streaming enabled:

from aws_cdk import Duration
from aws_cdk import aws_apigateway as apigw

class ApiGatewayStack(Stack):
    def __init__(self, scope, construct_id, runtime_endpoint, user_pool, **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        # Create REST API
        api = apigw.RestApi(
            self,
            "Api",
            rest_api_name="agent-api",
            default_cors_preflight_options=apigw.CorsOptions(
                allow_origins=["http://localhost:3000"],
                allow_methods=["POST", "OPTIONS"],
                allow_headers=["Content-Type", "Authorization"],
            ),
        )

        # Cognito authorizer
        authorizer = apigw.CognitoUserPoolsAuthorizer(
            self,
            "CognitoAuthorizer",
            cognito_user_pools=[user_pool],
        )

        # HTTP Proxy Integration
        integration = apigw.HttpIntegration(
            runtime_endpoint,  # The OAuth2 endpoint from Runtime Stack
            http_method="POST",
            proxy=True,
            options=apigw.IntegrationOptions(
                connection_type=apigw.ConnectionType.INTERNET,
                timeout=Duration.seconds(900),  # 15 minutes with streaming
                request_parameters={
                    "integration.request.header.Authorization": 
                        "method.request.header.Authorization",
                },
            ),
        )

        # Add method
        chat_resource = api.root.add_resource("chat")
        post_method = chat_resource.add_method(
            "POST",
            integration,
            authorizer=authorizer,
            authorization_type=apigw.AuthorizationType.COGNITO,
        )

        # CRITICAL: Enable streaming with escape hatch
        cfn_method = post_method.node.default_child
        cfn_method.add_property_override("Integration.ResponseTransferMode", "STREAM")

        self.api_url = api.url
Enter fullscreen mode Exit fullscreen mode

Why the escape hatch?

CDK's HttpIntegration doesn't expose ResponseTransferMode directly yet. The escape hatch lets you set it on the underlying CloudFormation resource.

What this does:

  • API Gateway validates the JWT ID token (first layer of defense)
  • Forwards the Authorization header to Runtime (second layer of defense)
  • Streams the response instead of buffering it
  • Allows up to 15 minutes for the request to complete

CORS configuration:

  • Allows requests from your frontend origin
  • Includes Authorization header in allowed headers
  • Handles preflight OPTIONS requests

Step 5: Agent Implementation

This example uses two SDKs to simplify development:

Both SDKs are optional. You can build agents with any framework that returns async generators, but these make it much easier to get up and running quickly for this demo.

Your agent code needs to return an async generator for streaming:

# agent/agent.py
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands import Agent
from strands_tools import calculator

app = BedrockAgentCoreApp()

# Lazy load agent for performance
_agent = None

def get_agent():
    global _agent
    if _agent is None:
        _agent = Agent(
            system_prompt="You are a helpful assistant that can perform calculations.",
            tools=[calculator]
        )
    return _agent

@app.entrypoint
async def invoke(payload, context):
    """Entry point that returns an async generator for streaming"""
    agent = get_agent()
    prompt = payload.get("prompt", "Hello!")

    # Return an async generator
    async def generate_stream():
        agent_stream = agent.stream_async(prompt)
        async for event in agent_stream:
            if "data" in event:
                yield event["data"]
            # You can also handle tool use events here if needed

    return generate_stream()
Enter fullscreen mode Exit fullscreen mode

How it works:

  • BedrockAgentCoreApp detects when you return an async generator
  • It handles the streaming protocol automatically
  • Each yield sends a chunk to the client immediately
  • The stream flows: Agent → Runtime → API Gateway → Client

Why lazy load the agent?

The runtime reuses the same container across invocations, which means the agent instance stays in memory. This is crucial for maintaining conversation context and history between requests. By lazy loading, you initialize the agent once and it persists across all subsequent invocations, allowing multi-turn conversations to work naturally.

Agent requirements:

# agent/pyproject.toml
[project]
name = "streaming-agent"
version = "0.1.0"
dependencies = [
    "bedrock-agentcore-runtime",
    "strands",
    "strands-tools",
]
Enter fullscreen mode Exit fullscreen mode

Step 6: Frontend Implementation

Handle streaming on the client side:

// Get ID token from Cognito (after OAuth2 flow)
const idToken = sessionStorage.getItem('id_token');

async function sendMessage(prompt) {
    const response = await fetch(
        'https://your-api.execute-api.us-west-2.amazonaws.com/chat',
        {
            method: 'POST',
            headers: {
                'Authorization': `Bearer ${idToken}`,
                'Content-Type': 'application/json',
            },
            body: JSON.stringify({ prompt }),
        }
    );

    // Read the stream
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        // Decode chunk (may contain partial UTF-8 sequences)
        buffer += decoder.decode(value, { stream: true });

        // Display immediately
        appendToMessage(buffer);
        buffer = '';
    }

    // Flush any remaining buffer
    if (buffer) {
        appendToMessage(buffer);
    }
}
Enter fullscreen mode Exit fullscreen mode

Important: Use { stream: true } in TextDecoder.decode(). This handles partial UTF-8 sequences that can occur at chunk boundaries.

OAuth2 flow (simplified):

// Redirect to Cognito for authentication
function login() {
    const cognitoDomain = 'https://agent-123456789.auth.us-west-2.amazoncognito.com';
    const clientId = 'your-client-id';
    const redirectUri = 'http://localhost:3000/callback.html';

    window.location.href = 
        `${cognitoDomain}/oauth2/authorize?` +
        `client_id=${clientId}&` +
        `response_type=code&` +
        `scope=openid+email&` +
        `redirect_uri=${encodeURIComponent(redirectUri)}`;
}

// Handle callback (in callback.html)
async function handleCallback() {
    const params = new URLSearchParams(window.location.search);
    const code = params.get('code');

    // Exchange code for tokens
    const response = await fetch(
        `${cognitoDomain}/oauth2/token`,
        {
            method: 'POST',
            headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
            body: new URLSearchParams({
                grant_type: 'authorization_code',
                client_id: clientId,
                code: code,
                redirect_uri: redirectUri,
            }),
        }
    );

    const tokens = await response.json();
    sessionStorage.setItem('id_token', tokens.id_token);
    sessionStorage.setItem('access_token', tokens.access_token);

    // Redirect back to app
    window.location.href = '/';
}
Enter fullscreen mode Exit fullscreen mode

Deployment

Deploy the stacks in order:

cd api-gw-sr-runtime

# Bootstrap CDK (first time only)
export AWS_PROFILE=your-profile
uv run cdk bootstrap

# Deploy all stacks
uv run cdk deploy --all

# Update frontend config with API URL
./update-spa-config.sh

# Create a test user
./create-test-user.sh testuser@example.com TestPassword123!
Enter fullscreen mode Exit fullscreen mode

What happens:

  1. Cognito stack deploys (User Pool + Client)
  2. Runtime stack deploys (references Cognito)
  3. API Gateway stack deploys (references Runtime endpoint)
  4. Scripts configure frontend and create test user

Testing

Test with curl

# Get your ID token (from browser sessionStorage or Cognito)
ID_TOKEN="eyJraWQiOi..."

# Test with -N flag for no buffering
curl -N -X POST \
  https://your-api.execute-api.us-west-2.amazonaws.com/chat \
  -H "Authorization: Bearer $ID_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"prompt":"What is 25 * 4? Show your work."}'
Enter fullscreen mode Exit fullscreen mode

You should see the response appear incrementally, not all at once.

Test with frontend

cd spa
python -m http.server 3000
Enter fullscreen mode Exit fullscreen mode

Open http://localhost:3000, log in, and send a message. You should see the response stream in real-time.


Troubleshooting

Streaming not working (response appears all at once)

Check:

  • Is ResponseTransferMode: STREAM set on the API Gateway method?
  • Are you using the /invocations endpoint?
  • Is your agent returning an async generator?

401 Unauthorized

Check:

  • Is the ID token valid? (Check expiration)
  • Is the token in the Authorization header?
  • Does the JWT authorizer configuration match your Cognito User Pool?
  • Are you using the ID token (not access token)?

502 Bad Gateway

Check:

  • Is the Runtime endpoint URL correct?
  • Does the Runtime have the JWT authorizer configured?
  • Is the agent code deployed correctly?

Connection drops after 30 seconds

You're using an edge-optimized endpoint. Switch to regional:

api = apigw.RestApi(
    self,
    "Api",
    endpoint_types=[apigw.EndpointType.REGIONAL],  # Add this
    ...
)
Enter fullscreen mode Exit fullscreen mode

Agent not streaming

Check:

  • Is your agent returning an async generator?
  • Are you yielding chunks, not returning a complete response?
  • Is the agent actually generating data? (Add logging)

Performance Optimization

Lazy Load Your Agent

_agent = None

def get_agent():
    global _agent
    if _agent is None:
        _agent = Agent(...)  # Only initialize once
    return _agent
Enter fullscreen mode Exit fullscreen mode

The runtime reuses the same container across invocations, lazy loading keeping the agent instance in memory. This allows the agent to maintain conversation history and context between requests, enabling natural multi-turn conversations without needing external storage.

Full Code Repository

Complete code: on GitHub

Includes:

  • All CDK stacks
  • Agent implementation
  • Frontend with OAuth2
  • Deployment scripts
  • Test utilities

References

Top comments (0)