Photo by Dawn McDonald on Unsplash
Simple setup for live events with AppSync Events
Scenario
Let's build a simple system, where Users can log in and request a report generation. Generating a report takes a while, and we don't want them to wait for it. We will let them know once the report is generated.
What we are building
To achieve this goal I will utilize AppSync Evetns API
which is a fairly new service that allows implement WebSockets in an easy way.
Architecture
AppSync Events API contains two main pieces - channels for asynchronous communication, and HTTP endpoint to publish events to channels.
The user will use the web page to log into the system, and connect to the channel generated-repots/<user-id>
. It is important that the user can see only its own reports.
To generate a new report user calls generate-report
endpoint on the API Gateway. From there, the lambda function extracts user ID
from the authorizer context and triggers StepFunction, which simulates the long-running task. Once the job is done, StepFunction publishes the information about it to the generated-repots/<user-id>
channel.
Implementation
The code is available IN THIS REPO
Authentication
For auth I am using Cognito.
With AWS CDK, setting up the Cognito is quite straightforward. The only tricky part is that we need to define a default authenticated role and a default policy.
// lib/appsync_events-stack.ts
// ...
const cognitoUserPool = new cdk.aws_cognito.UserPool(this, "EventsApp", {
selfSignUpEnabled: true,
signInAliases: { email: true },
autoVerify: { email: true },
passwordPolicy: {
minLength: 8,
requireLowercase: true,
requireUppercase: true,
requireDigits: true,
},
});
const cognitoUserPoolClient = new cdk.aws_cognito.UserPoolClient(
this,
"EventsAppClient",
{
userPool: cognitoUserPool,
generateSecret: false,
authFlows: {
adminUserPassword: true,
userSrp: true,
},
}
);
const cognitoIdentity = new cdk.aws_cognito.CfnIdentityPool(
this,
"EventsAppIdentityProvider",
{
allowUnauthenticatedIdentities: false,
cognitoIdentityProviders: [
{
clientId: cognitoUserPoolClient.userPoolClientId,
providerName: cognitoUserPool.userPoolProviderName,
},
],
}
);
const authenticatedRole = new cdk.aws_iam.Role(
this,
"CognitoDefaultAuthenticatedRole",
{
assumedBy: new cdk.aws_iam.FederatedPrincipal(
"cognito-identity.amazonaws.com",
{
StringEquals: {
"cognito-identity.amazonaws.com:aud": cognitoIdentity.ref,
},
"ForAnyValue:StringLike": {
"cognito-identity.amazonaws.com:amr": "authenticated",
},
},
"sts:AssumeRoleWithWebIdentity"
),
}
);
const defaultPolicy = new cdk.aws_cognito.CfnIdentityPoolRoleAttachment(
this,
"IdentityPoolRoleAttachment",
{
identityPoolId: cognitoIdentity.ref,
roles: {
authenticated: authenticatedRole.roleArn,
},
}
);
//...
AppSync Events
For the moment I am writing this post, there are no constructs for AppSync Events for AWS CDK yet. This is not a blocker, as there is a CloudFormation support, so I can use L1
constructs inside CDK. In other words, I can write CloudFormation inside my typescript CDK stack definition.
The initial declaration contains only some information about auth providers:
// lib/appsync_events-stack.ts
//...
const eventsAPI = new cdk.aws_appsync.CfnApi(this, "MyEventsAPI", {
name: "MyEventsAPI",
eventConfig: {
authProviders: [
{
authType: "API_KEY",
},
{
authType: "AMAZON_COGNITO_USER_POOLS",
cognitoConfig: {
userPoolId: cognitoUserPool.userPoolId,
awsRegion: "us-east-1",
},
},
],
connectionAuthModes: [
{
authType: "AMAZON_COGNITO_USER_POOLS",
},
],
defaultPublishAuthModes: [
{
authType: "API_KEY",
},
{
authType: "AMAZON_COGNITO_USER_POOLS",
},
],
defaultSubscribeAuthModes: [
{
authType: "AMAZON_COGNITO_USER_POOLS",
},
],
},
});
// ...
I picked two flows. Cognito user pool
is for users, and API key
is for my StepFunction.
It is almost it. I need to define the channel namespace and API key
// lib/appsync_events-stack.ts
//...
const generatedRequestNamespace = new cdk.aws_appsync.CfnChannelNamespace(
this,
"GeneratedRequestNamespace",
{
apiId: eventsAPI.attrApiId,
name: "generated-reports",
}
);
const eventsApiKey = new cdk.aws_appsync.CfnApiKey(this, "EventsApiKey", {
apiId: eventsAPI.attrApiId,
});
// ...
StepFunction
My state machine is fairly simple and contains two steps: waiting, and publishing the event.
To do the latter I will utilize Call HTTPs APIs
action, which is the absolute killer feature for StepFunctions, as it allows integration with any API-based service within or outside AWS.
To be able to call the endpoint securely, I need to define a Connection
(the same we would use for the API Destinations
in the EventBridge)
// lib/appsync_events-stack.ts
// ...
const eventsAPIKeySecret = new cdk.aws_secretsmanager.Secret(
this,
"EventsApiKeySecret"
);
const eventsConnection = new cdk.aws_events.Connection(
this,
"EventsAPIConnection",
{
authorization: cdk.aws_events.Authorization.apiKey(
"x-api-key",
cdk.SecretValue.secretsManager(eventsAPIKeySecret.secretName)
),
}
);
// ...
Here is where my code gets a bit clunky - I need to have Secret
defined to be used for Connection
, even though under the hood, the Connection
creates its own secret in the SecretManager
. I haven't figured out yet how to get rid of the spare custom Secret
.
Here are the tasks defined for the state machine. For anything larger than a small demo I would use a separate .asl.json
file to store the StepFunction definition. I almost always debug and update the StepFunction in the AWS console, and download the updated definition to keep it in the source control. This approach has its pros and cons, but this is a topic for another discussion.
In this example, tasks are defined in the CDK
// lib/appsync_events-stack.ts
//...
const waitTask = new cdk.aws_stepfunctions.Wait(this, "ComplexTask", {
time: cdk.aws_stepfunctions.WaitTime.duration(cdk.Duration.seconds(30)),
});
const publishToAppsync = new cdk.aws_stepfunctions_tasks.HttpInvoke(
this,
"PublishToAppSync",
{
apiEndpoint: cdk.aws_stepfunctions.TaskInput.fromText("/event"),
method: cdk.aws_stepfunctions.TaskInput.fromText("POST"),
connection: eventsConnection,
body: cdk.aws_stepfunctions.TaskInput.fromJsonPathAt("$.input"),
apiRoot: "https://" + eventsAPI.getAtt("Dns.Http").toString(),
}
);
// ...
The StepFunction definition brings it all together
// lib/appsync_events-stack.ts
// ...
const longTaskSfn = new cdk.aws_stepfunctions.StateMachine(
this,
"LongTaskStateMachine",
{
definition: waitTask.next(publishToAppsync),
stateMachineType: cdk.aws_stepfunctions.StateMachineType.EXPRESS,
logs: {
destination: new cdk.aws_logs.LogGroup(
this,
"LongTaskStateMachineLogGroup"
),
level: cdk.aws_stepfunctions.LogLevel.ALL,
includeExecutionData: true,
},
}
);
// ...
API Gateway
API HTTP Gateway will handle a single endpoint and use Cognito
authorizer to authorize requests.
// lib/appsync_events-stack.ts
// ...
const httpApi = new cdk.aws_apigatewayv2.HttpApi(this, "HttpApi", {
corsPreflight: {
allowOrigins: ["*"],
allowMethods: [cdk.aws_apigatewayv2.CorsHttpMethod.ANY],
allowHeaders: ["*"],
},
});
const cognitoAuthorizer =
new cdk.aws_apigatewayv2_authorizers.HttpUserPoolAuthorizer(
"CognitoAuthorizer",
cognitoUserPool,
{
userPoolClients: [cognitoUserPoolClient],
}
);
// ...
I need a Lambda function to handle the request. Lambda
needs permissions to invoke StepFunction
// lib/appsync_events-stack.ts
// ...
const reportHandlerFunction = new rustLambda.RustFunction(this, "reportHandlerFunction", {
entry: ("http_handler"),
environment: {
STEP_FUNCTION_ARN: longTaskSfn.stateMachineArn,
},
});
longTaskSfn.grantStartExecution(reportHandlerFunction);
httpApi.addRoutes({
path: "/generate-report",
methods: [HttpMethod.POST],
integration:
new cdk.aws_apigatewayv2_integrations.HttpLambdaIntegration(
"reportHandlerIntegration",
reportHandlerFunction
),
authorizer: cognitoAuthorizer,
});
// ...
Lambda Function - code
Once the infrastructure is defined, let's create a Lambda function to handle the requests.
// http_handler/src/main.rs
use aws_config::BehaviorVersion;
use lambda_http::{run, service_fn, tracing, Body, Error, Request, RequestExt, Response};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Message {
message: String,
}
#[derive(Serialize, Deserialize)]
struct AppSyncEventsRequest {
channel: String,
events: Vec<String>,
}
#[derive(Serialize, Deserialize)]
struct StepFunctionInput {
input: AppSyncEventsRequest,
}
async fn function_handler(
sfn_client: &aws_sdk_sfn::Client,
event: Request,
) -> Result<Response<Body>, Error> {
let machine_arn = std::env::var("STEP_FUNCTION_ARN").unwrap();
let context = event.request_context();
let authorizer_data = context.authorizer().unwrap();
let jwt_data = authorizer_data.jwt.clone().unwrap();
let user_name = jwt_data.claims.get("username").unwrap();
let message = Message {
message: "report generated".to_string(),
};
let step_function_input = StepFunctionInput {
input: AppSyncEventsRequest{
channel: format!("generated-reports/{user_name}"),
events: vec![serde_json::to_string(&message).unwrap()],
}
};
let result = sfn_client
.start_execution()
.set_input(Some(serde_json::to_string(&step_function_input).unwrap()))
.state_machine_arn(machine_arn)
.send()
.await
.unwrap();
let message = format!("{:?}", result);
let resp = Response::builder()
.status(200)
.header("content-type", "text/html")
.body(message.into())
.map_err(Box::new)?;
Ok(resp)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let sfn_client = aws_sdk_sfn::Client::new(&config);
run(service_fn(|ev| function_handler(&sfn_client, ev))).await
}
UI
I've created a new Vue3 application. I will use Amplify
library which makes integration with Cognito
and AppSync
very easy. It can be used even if you are not planning to use Amplify
service to deploy your frontend.
To configure Cognito
and AppSync Events
I add in the main.ts
// fe/src/main.ts
// ...
import { Amplify } from "aws-amplify";
Amplify.configure({
API: {
Events: {
endpoint:
"https://<your_endpoint>.appsync-api.us-east-1.amazonaws.com/event",
region: "us-east-1",
defaultAuthMode: "userPool",
},
},
Auth: {
Cognito: {
userPoolId: "<user_pool>",
userPoolClientId: "<client_id>",
identityPoolId: "<identity_id>",
loginWith: {
email: true,
},
signUpVerificationMethod: "code",
userAttributes: {
email: {
required: true,
},
},
allowGuestAccess: true,
passwordFormat: {
minLength: 8,
requireLowercase: true,
requireUppercase: true,
requireNumbers: true,
requireSpecialCharacters: true,
},
},
},
});
// ...
I wrap the whole application in the authenticator
tag in the App.vue
// fe/src/App.vue
// ...
import { Authenticator } from '@aws-amplify/ui-vue'
import "@aws-amplify/ui-vue/styles.css";
</script>
<template>
<authenticator>
<div class="wrapper">
<HelloWorld msg="You did it!" />
</div>
</authenticator>
</template>
// ...
To connect to the channel, it is enough to call events.connect
// fe/src/components/HelloWorld.vue
// ...
import { events } from 'aws-amplify/data'
const { user } = useAuthenticator();
onMounted(async () => {
const channel = await events.connect(`/generated-reports/${user.username}`);
channel.subscribe({
next: (data) => {
console.log('received', data.event);
reports.value.push(data.event)
},
error: (err) => console.error('error', err),
});
})
// ...
In the real world, you would move the connection part to a separate composable or keep it in the store, but for now, it is enough to keep it directly in the component.
Deploy
Once CDK is deployed, we need to make sure that our Connection
has the right API key. Grab the key from the AppSync Events console
and update it in the Connections
section in the EventBridge
Test
Start the frontend
cd fe
npm run dev
There is a default Amplify
login form. Sign up for the service and log in.
After login, I can see my amazing UI, which contains from the user-id
and a single button to generate report
I can check in the dev tools and see, that there is a WebSockets
connection opened.
After clicking the button I need to wait for ~30 seconds, and I receive a message from the channel I subscribed for
It works 🎉 🎉
Summary
AppSync Events API allows the setup of a WebSocket connection with minimal configuration overwhelm. It lets publishing events via HTTPs endpoint, which any authorized service, including StepFunction can use.
Amplify library simplifies setting up the authorized connection from the UI to Events API.
Top comments (4)
Good info but we can already do this for years with IoT Core. I really miss the fuzz about appsync events still.
Thanks for your comment Robert. I really like your comparison between IoT Core and AppSync Events: dev.to/slootjes/aws-appsync-events...
IoT Core is super powerful. AppSync Events API is simpler and can be easily integrated into the front end with
Amplify
.This is a fantastic introduction to AppSync Events API! This example really helps break down the flow of async events. Thanks for sharing!
Thank you Joanne!