DEV Community

Cover image for Async events with AppSync Events API
2

Async events with AppSync Events API

Photo by Dawn McDonald on Unsplash


Simple setup for live events with AppSync Events

Scenario

Let's build a simple system, where Users can log in and request a report generation. Generating a report takes a while, and we don't want them to wait for it. We will let them know once the report is generated.

What we are building

Image description

To achieve this goal I will utilize AppSync Evetns API which is a fairly new service that allows implement WebSockets in an easy way.

Architecture

AppSync Events API contains two main pieces - channels for asynchronous communication, and HTTP endpoint to publish events to channels.

The user will use the web page to log into the system, and connect to the channel generated-repots/<user-id>. It is important that the user can see only its own reports.

To generate a new report user calls generate-report endpoint on the API Gateway. From there, the lambda function extracts user ID from the authorizer context and triggers StepFunction, which simulates the long-running task. Once the job is done, StepFunction publishes the information about it to the generated-repots/<user-id> channel.

Implementation

The code is available IN THIS REPO

Authentication

For auth I am using Cognito.

With AWS CDK, setting up the Cognito is quite straightforward. The only tricky part is that we need to define a default authenticated role and a default policy.

// lib/appsync_events-stack.ts

// ...
    const cognitoUserPool = new cdk.aws_cognito.UserPool(this, "EventsApp", {
      selfSignUpEnabled: true,
      signInAliases: { email: true },
      autoVerify: { email: true },
      passwordPolicy: {
        minLength: 8,
        requireLowercase: true,
        requireUppercase: true,
        requireDigits: true,
      },
    });

    const cognitoUserPoolClient = new cdk.aws_cognito.UserPoolClient(
      this,
      "EventsAppClient",
      {
        userPool: cognitoUserPool,
        generateSecret: false,
        authFlows: {
          adminUserPassword: true,
          userSrp: true,
        },
      }
    );

    const cognitoIdentity = new cdk.aws_cognito.CfnIdentityPool(
      this,
      "EventsAppIdentityProvider",
      {
        allowUnauthenticatedIdentities: false,
        cognitoIdentityProviders: [
          {
            clientId: cognitoUserPoolClient.userPoolClientId,
            providerName: cognitoUserPool.userPoolProviderName,
          },
        ],
      }
    );


    const authenticatedRole = new cdk.aws_iam.Role(
      this,
      "CognitoDefaultAuthenticatedRole",
      {
        assumedBy: new cdk.aws_iam.FederatedPrincipal(
          "cognito-identity.amazonaws.com",
          {
            StringEquals: {
              "cognito-identity.amazonaws.com:aud": cognitoIdentity.ref,
            },
            "ForAnyValue:StringLike": {
              "cognito-identity.amazonaws.com:amr": "authenticated",
            },
          },
          "sts:AssumeRoleWithWebIdentity"
        ),
      }
    );

    const defaultPolicy = new cdk.aws_cognito.CfnIdentityPoolRoleAttachment(
      this,
      "IdentityPoolRoleAttachment",
      {
        identityPoolId: cognitoIdentity.ref,
        roles: {
          authenticated: authenticatedRole.roleArn,
        },
      }
    );
//...
Enter fullscreen mode Exit fullscreen mode

AppSync Events

For the moment I am writing this post, there are no constructs for AppSync Events for AWS CDK yet. This is not a blocker, as there is a CloudFormation support, so I can use L1 constructs inside CDK. In other words, I can write CloudFormation inside my typescript CDK stack definition.

The initial declaration contains only some information about auth providers:

// lib/appsync_events-stack.ts

//...
    const eventsAPI = new cdk.aws_appsync.CfnApi(this, "MyEventsAPI", {
      name: "MyEventsAPI",
      eventConfig: {
        authProviders: [
          {
            authType: "API_KEY",
          },
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
            cognitoConfig: {
              userPoolId: cognitoUserPool.userPoolId,
              awsRegion: "us-east-1",
            },
          },
        ],
        connectionAuthModes: [
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
          },
        ],
        defaultPublishAuthModes: [
          {
            authType: "API_KEY",
          },
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
          },
        ],
        defaultSubscribeAuthModes: [
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
          },
        ],
      },
    });
// ...
Enter fullscreen mode Exit fullscreen mode

I picked two flows. Cognito user pool is for users, and API key is for my StepFunction.

It is almost it. I need to define the channel namespace and API key

// lib/appsync_events-stack.ts

//...
    const generatedRequestNamespace = new cdk.aws_appsync.CfnChannelNamespace(
      this,
      "GeneratedRequestNamespace",
      {
        apiId: eventsAPI.attrApiId,
        name: "generated-reports",
      }
    );

    const eventsApiKey = new cdk.aws_appsync.CfnApiKey(this, "EventsApiKey", {
      apiId: eventsAPI.attrApiId,
    });
// ...
Enter fullscreen mode Exit fullscreen mode

StepFunction

My state machine is fairly simple and contains two steps: waiting, and publishing the event.

To do the latter I will utilize Call HTTPs APIs action, which is the absolute killer feature for StepFunctions, as it allows integration with any API-based service within or outside AWS.

To be able to call the endpoint securely, I need to define a Connection (the same we would use for the API Destinations in the EventBridge)

// lib/appsync_events-stack.ts

// ...
    const eventsAPIKeySecret = new cdk.aws_secretsmanager.Secret(
      this,
      "EventsApiKeySecret"
    );

    const eventsConnection = new cdk.aws_events.Connection(
      this,
      "EventsAPIConnection",
      {
        authorization: cdk.aws_events.Authorization.apiKey(
          "x-api-key",
          cdk.SecretValue.secretsManager(eventsAPIKeySecret.secretName)
        ),
      }
    );
// ...
Enter fullscreen mode Exit fullscreen mode

Here is where my code gets a bit clunky - I need to have Secret defined to be used for Connection, even though under the hood, the Connection creates its own secret in the SecretManager. I haven't figured out yet how to get rid of the spare custom Secret.

Here are the tasks defined for the state machine. For anything larger than a small demo I would use a separate .asl.json file to store the StepFunction definition. I almost always debug and update the StepFunction in the AWS console, and download the updated definition to keep it in the source control. This approach has its pros and cons, but this is a topic for another discussion.

In this example, tasks are defined in the CDK

// lib/appsync_events-stack.ts

//...
const waitTask = new cdk.aws_stepfunctions.Wait(this, "ComplexTask", {
      time: cdk.aws_stepfunctions.WaitTime.duration(cdk.Duration.seconds(30)),
    });

    const publishToAppsync = new cdk.aws_stepfunctions_tasks.HttpInvoke(
      this,
      "PublishToAppSync",
      {
        apiEndpoint: cdk.aws_stepfunctions.TaskInput.fromText("/event"),
        method: cdk.aws_stepfunctions.TaskInput.fromText("POST"),
        connection: eventsConnection,
        body: cdk.aws_stepfunctions.TaskInput.fromJsonPathAt("$.input"),
        apiRoot: "https://" + eventsAPI.getAtt("Dns.Http").toString(),
      }
    );
// ...
Enter fullscreen mode Exit fullscreen mode

The StepFunction definition brings it all together

// lib/appsync_events-stack.ts

// ...
    const longTaskSfn = new cdk.aws_stepfunctions.StateMachine(
      this,
      "LongTaskStateMachine",
      {
        definition: waitTask.next(publishToAppsync),
        stateMachineType: cdk.aws_stepfunctions.StateMachineType.EXPRESS,
        logs: {
          destination: new cdk.aws_logs.LogGroup(
            this,
            "LongTaskStateMachineLogGroup"
          ),
          level: cdk.aws_stepfunctions.LogLevel.ALL,
          includeExecutionData: true,
        },
      }
    );
// ...
Enter fullscreen mode Exit fullscreen mode

API Gateway

API HTTP Gateway will handle a single endpoint and use Cognito authorizer to authorize requests.

// lib/appsync_events-stack.ts

// ...
const httpApi = new cdk.aws_apigatewayv2.HttpApi(this, "HttpApi", {
      corsPreflight: {
        allowOrigins: ["*"],
        allowMethods: [cdk.aws_apigatewayv2.CorsHttpMethod.ANY],
        allowHeaders: ["*"],
      },
    });

    const cognitoAuthorizer =
      new cdk.aws_apigatewayv2_authorizers.HttpUserPoolAuthorizer(
        "CognitoAuthorizer",
        cognitoUserPool,
        {
          userPoolClients: [cognitoUserPoolClient],
        }
      );
// ...
Enter fullscreen mode Exit fullscreen mode

I need a Lambda function to handle the request. Lambda needs permissions to invoke StepFunction

// lib/appsync_events-stack.ts

// ...
    const reportHandlerFunction = new rustLambda.RustFunction(this, "reportHandlerFunction", {
      entry: ("http_handler"),
      environment: {
        STEP_FUNCTION_ARN: longTaskSfn.stateMachineArn,
      },
    });

    longTaskSfn.grantStartExecution(reportHandlerFunction);

    httpApi.addRoutes({
      path: "/generate-report",
      methods: [HttpMethod.POST],
      integration:
        new cdk.aws_apigatewayv2_integrations.HttpLambdaIntegration(
          "reportHandlerIntegration",
          reportHandlerFunction
        ),
      authorizer: cognitoAuthorizer,
    });
// ...
Enter fullscreen mode Exit fullscreen mode

Lambda Function - code

Once the infrastructure is defined, let's create a Lambda function to handle the requests.

// http_handler/src/main.rs
use aws_config::BehaviorVersion;
use lambda_http::{run, service_fn, tracing, Body, Error, Request, RequestExt, Response};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Message {
    message: String,
}

#[derive(Serialize, Deserialize)]
struct AppSyncEventsRequest {
    channel: String,
    events: Vec<String>,
}

#[derive(Serialize, Deserialize)]
struct StepFunctionInput {
    input: AppSyncEventsRequest,
}

async fn function_handler(
    sfn_client: &aws_sdk_sfn::Client,
    event: Request,
) -> Result<Response<Body>, Error> {

    let machine_arn = std::env::var("STEP_FUNCTION_ARN").unwrap();

    let context = event.request_context();

    let authorizer_data = context.authorizer().unwrap();

    let jwt_data = authorizer_data.jwt.clone().unwrap();

    let user_name = jwt_data.claims.get("username").unwrap();

    let message = Message {
        message: "report generated".to_string(),
    };

    let step_function_input = StepFunctionInput {
        input: AppSyncEventsRequest{
            channel: format!("generated-reports/{user_name}"),
            events: vec![serde_json::to_string(&message).unwrap()],
        }
    };

    let result = sfn_client
        .start_execution()
        .set_input(Some(serde_json::to_string(&step_function_input).unwrap()))
        .state_machine_arn(machine_arn)
        .send()
        .await
        .unwrap();

    let message = format!("{:?}", result);

    let resp = Response::builder()
        .status(200)
        .header("content-type", "text/html")
        .body(message.into())
        .map_err(Box::new)?;
    Ok(resp)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing::init_default_subscriber();

    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;

    let sfn_client = aws_sdk_sfn::Client::new(&config);

    run(service_fn(|ev| function_handler(&sfn_client, ev))).await
}
Enter fullscreen mode Exit fullscreen mode

UI

I've created a new Vue3 application. I will use Amplify library which makes integration with Cognito and AppSync very easy. It can be used even if you are not planning to use Amplify service to deploy your frontend.

To configure Cognito and AppSync Events I add in the main.ts

// fe/src/main.ts
// ...
import { Amplify } from "aws-amplify";

Amplify.configure({
  API: {
    Events: {
      endpoint:
        "https://<your_endpoint>.appsync-api.us-east-1.amazonaws.com/event",
      region: "us-east-1",
      defaultAuthMode: "userPool",
    },
  },
  Auth: {
    Cognito: {
      userPoolId: "<user_pool>",
      userPoolClientId: "<client_id>",
      identityPoolId: "<identity_id>",
      loginWith: {
        email: true,
      },
      signUpVerificationMethod: "code",
      userAttributes: {
        email: {
          required: true,
        },
      },
      allowGuestAccess: true,
      passwordFormat: {
        minLength: 8,
        requireLowercase: true,
        requireUppercase: true,
        requireNumbers: true,
        requireSpecialCharacters: true,
      },
    },
  },
});
// ...
Enter fullscreen mode Exit fullscreen mode

I wrap the whole application in the authenticator tag in the App.vue

// fe/src/App.vue
// ...
import { Authenticator } from '@aws-amplify/ui-vue'
import "@aws-amplify/ui-vue/styles.css";
</script>

<template>
  <authenticator>
    <div class="wrapper">
      <HelloWorld msg="You did it!" />
    </div>
  </authenticator>
</template>
// ...
Enter fullscreen mode Exit fullscreen mode

To connect to the channel, it is enough to call events.connect

// fe/src/components/HelloWorld.vue
// ...
import { events } from 'aws-amplify/data'
const { user } = useAuthenticator();

onMounted(async () => {

  const channel = await events.connect(`/generated-reports/${user.username}`);

  channel.subscribe({
    next: (data) => {
      console.log('received', data.event);
      reports.value.push(data.event)
    },
    error: (err) => console.error('error', err),
  });

})
// ...
Enter fullscreen mode Exit fullscreen mode

In the real world, you would move the connection part to a separate composable or keep it in the store, but for now, it is enough to keep it directly in the component.

Deploy

Once CDK is deployed, we need to make sure that our Connection has the right API key. Grab the key from the AppSync Events console

Image description

and update it in the Connections section in the EventBridge

Image description

Test

Start the frontend

cd fe
npm run dev
Enter fullscreen mode Exit fullscreen mode

There is a default Amplify login form. Sign up for the service and log in.

After login, I can see my amazing UI, which contains from the user-id and a single button to generate report

Image description

I can check in the dev tools and see, that there is a WebSockets connection opened.

Image description

After clicking the button I need to wait for ~30 seconds, and I receive a message from the channel I subscribed for

Image description

It works 🎉 🎉

Summary

AppSync Events API allows the setup of a WebSocket connection with minimal configuration overwhelm. It lets publishing events via HTTPs endpoint, which any authorized service, including StepFunction can use.

Amplify library simplifies setting up the authorized connection from the UI to Events API.

Image of Datadog

The Essential Toolkit for Front-end Developers

Take a user-centric approach to front-end monitoring that evolves alongside increasingly complex frameworks and single-page applications.

Get The Kit

Top comments (4)

Collapse
 
slootjes profile image
Robert Slootjes

Good info but we can already do this for years with IoT Core. I really miss the fuzz about appsync events still.

Collapse
 
szymonszym profile image
szymon-szym • Edited

Thanks for your comment Robert. I really like your comparison between IoT Core and AppSync Events: dev.to/slootjes/aws-appsync-events...
IoT Core is super powerful. AppSync Events API is simpler and can be easily integrated into the front end with Amplify.

Collapse
 
drjoanneskiles profile image
Joanne Skiles

This is a fantastic introduction to AppSync Events API! This example really helps break down the flow of async events. Thanks for sharing!

Collapse
 
szymonszym profile image
szymon-szym

Thank you Joanne!

Best Practices for Running  Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK cover image

Best Practices for Running Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK

This post discusses the process of migrating a growing WordPress eShop business to AWS using AWS CDK for an easily scalable, high availability architecture. The detailed structure encompasses several pillars: Compute, Storage, Database, Cache, CDN, DNS, Security, and Backup.

Read full post