DEV Community

Arpad Toth for AWS Community Builders

Posted on • Originally published at arpadt.com

Updating data status with API Gateway WebSocket API

TL;DR: We can use Amazon API Gateway's WebSocket API to keep the front-end up to date with the latest data, providing a near-real-time alternative to polling a REST API endpoint for status notifications.

Table of contents

1. The scenario

Suppose we operate a click data processing application. Customers interact with links and buttons on our page, and the application records the number and type of clicks.

Our application also features a dashboard where authorized users monitor current click counts. The dashboard is browser-based, and the front-end exclusively displays data with no business logic involved.

The application persists click results in a database, and the client retrieves them from the backend. But here's a question: How should we display the current click count status?

2. Solution overview

First, let's review the options briefly.

2.1. Requirements

Some of the requirements are:

  • The dashboard should display the clicks real or near real time.
  • Minimize human interaction. The solution should be as automated as possible.
  • Click results should be persisted in a database.
  • The solution should scale.
  • The architecture needs to be extendable and its elements should be transferrable to other business scenarios.
  • The infrastructure must be hosted on AWS and be serverless.

2.2. Options

We can consider two solutions: polling and websockets.

After testing the polling solution, we might found it reasonable but expensive. The client must send a request to a REST API every few seconds to get the current state of the click results.

Frequent network calls increase the application's response time. Also, polling logic in the client requires work, like managing the poller’s status and clearing intervals. Still, polling can be useful in some cases.

Instead, we can implement a WebSocket API — a protocol that enables two-way, real-time communication — rather than writing poller code that would invoke an endpoint every few seconds.

2.3. Diagram

The following diagram summarizes the architecture.

Click data architecture

This post focuses on keeping the client (the browser application) synchronised with the current click status. The Considerations section discusses variations to the diagrammed architecture.

3. Pre-requisites

This post is not a comprehensive tutorial; instead, it highlights key steps. It does not explain how to

  • create API Gateways (REST and WebSocket)
  • create Lambda functions
  • create a DynamoDB table
  • enable and work with DynamoDB streams.

I'll include links at the end for resource help, if needed.

4. Main steps

Let's quickly review the main steps in the flow.

4.1. Ingestion

When users click any elements (links or buttons) on the page, the client sends click data to the /ingest REST endpoint created in API Gateway. The IngestClickData Lambda function receives, validates and possibly transforms the data.

4.2. Click data persistence

We persist click stream data in a DynamoDB table. The IngestClickData function persists click data to the table using the PutItem operation.

Alternatively, some business cases may justify direct integration between API Gateway and DynamoDB. If the input does not require validation or transformation, we can omit the Lambda function. Choosing this approach saves costs and improves performance by a few dozen (or even hundreds) of milliseconds.

4.3. Dashboard connection

On the dashboard, administrators monitor the status of click data.

As soon as the dashboard page is loaded, the client connects to the WebSocket API. The connectWebSocket function in the front-end code might look like this:

function connectWebSocket() {
  // 1. Use the WebSocket object
  ws = new WebSocket(WEBSOCKET_URL);

  // 2. When the connection is open
  ws.onopen = () => {
    // If we want to display the connection status
    // in the UI:
    updateConnectionStatus(ConnectionStatus.CONNECTED);
  };

  // 3. When a click data message is received,
  // `handleMessage` is invoked
  ws.onmessage = handleMessage;

  // 4. When the connection is closed
  ws.onclose = (event) => {
    // If we want to display the connection status
    // in the UI:
    updateConnectionStatus(ConnectionStatus.DISCONNECTED);

    // We can implement reconnection logic
    // here if it wasn't an intentional close
  };

  // 5. Error occurred
  ws.onerror = (error) => {
    updateConnectionStatus(ConnectionStatus.DISCONNECTED);

    // The onerror event is always followed by onclose, so
    // reconnection will be handled in the onclose handler
  };
}
Enter fullscreen mode Exit fullscreen mode

The code uses the native WebSocket API (1). The WEBSOCKET_URL's form is similar to REST-type APIs:

wss://<API_ID>.execute-api.eu-central-1.amazonaws.com/<STAGE>/
Enter fullscreen mode Exit fullscreen mode

If we use infrastructure-as-code tools, like the CDK, we can inject the WebSocket URL into the client code dynamically.

Then, we listen to the available WebSocket events. The open event (2) is fired when a connection to the API is established. If we implement any reconnection logic (possibly with exponential backoff), we can clear any pending timeout here. We can also update the connection status here if we want to display it in the UI.

The message event (3) indicates a new click status from the backend (see below). The handleMessage function performs the logic to update a counter or display the current count in a chart.

For closed connections (4) and errors (5), we can implement reconnection logic when reasonable, for example, when the connection closure was unintentional.

4.4. Persisting connection IDs

When we create a WebSocket API in API Gateway, three pre-defined routes are available: $connect, $disconnect and $default.

$connect triggers when the client (the dashboard page) opens a new WebSocket connection to the API. As with the REST-type APIs, we can assign a Lambda integration to the route, specifically the PersistConnectionIds function. As its name suggests, this function stores the WebSocket connection IDs (issued by API Gateway) in the DynamoDB table. The stream processor function will need the connection IDs to broadcast click data statuses to the connected clients (see below).

The $disconnect route is wired to the DeleteConnectionIDs function, which, however unbelievable it may sound, removes the corresponding connection ID from the table. This Lambda function runs when an admin user leaves the dashboard page or closes the browser tab. With the DeleteConnectionIDs function removing the unused connection IDs, the stream processor will not broadcast click data results to unconnected dashboard clients.

$default serves as a fallback if no routes match. This route is not implemented here.

4.5. Retrieving data from the table

When IngestClickData saves a new piece of data to the DynamoDB table, a DynamoDB Streams event is triggered. DynamoDB Streams capture data modification events, and here, whenever data in the table changes, related services can respond in near real time.

Although the diagram uses only one table, another approach could be to split the click data and connection IDs into separate tables.

The StreamProcessor function only cares about changes to click data statuses. With a single table storing both click data and connection IDs, we might want to implement an event filter here to optimize cost. The function doesn't need to run when PersistConnectionIDs saves the connection IDs to the table.

We can easily write a filter in the CDK infrastructure code that monitors a dedicated attribute of the item. Assuming that click data items have an entityType: CLICK attribute, we can write a filter similar to this:

// CDK code!!
// `streamProcessorLambda` is a Lambda Function
// construct defined earlier
streamProcessorLambda.addEventSource(
  new lambdaEventSources.DynamoEventSource(clickstreamTable, {
    // ... other properties
    filters: [
      lambda.FilterCriteria.filter({
        dynamodb: {
          NewImage: {
            entityType: {
              S: lambda.FilterRule.isEqual('CLICK'),
            },
          },
        },
      }),
    ],
  })
);
Enter fullscreen mode Exit fullscreen mode

Now, the processor function runs only when click data is persisted, not when a connection ID is stored.

The StreamProcessor function's handler can be similar to this:

// stream-processor.ts

export async function handler(event: DynamoDBStreamEvent): Promise<void> {
  // Process each stream record
  for (const record of event.Records) {
    try {
      // 1. Extract click data from NEW_IMAGE
      const clickData = extractClickData(record);

      // 2. Broadcast to all active connections
      await broadcastClickEvent(clickData);
    } catch (error) {
      // handle errors here
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

First, the handler extracts the data from the stream event record (1) containing the current status. I don't show this code for brevity, but I have attached some links below that describe how to do it.

Then, StreamProcessor's broadcastClickEvent function will query all connection IDs from the table. Then, it iterates over the array of connection IDs, and invokes the broadcastToConnection utility function for each connection ID (2).

4.6. Broadcasting click data

An extract of the broadcastToConnection function may look like this:

import {
  ApiGatewayManagementApiClient,
  PostToConnectionCommand,
} from '@aws-sdk/client-apigatewaymanagementapi';
// more import statements

const apiGatewayClient = new ApiGatewayManagementApiClient({
  endpoint: WEBSOCKET_API_ENDPOINT,
});

// more code

async function broadcastToConnection(
  connectionId: string,
  message: WebSocketMessage
): Promise<void> {
  try {
    const messageData = JSON.stringify(message);

    await apiGatewayClient.send(
      new PostToConnectionCommand({
        ConnectionId: connectionId,
        Data: Buffer.from(messageData),
      })
    );
  } catch (error) {
    // handle errors here
  }
}
Enter fullscreen mode Exit fullscreen mode

This function receives the message (i.e., the click data status) and a single connection ID. The parent function, broadcastClickEvent (see above) invokes this function for each connection ID.

We create an ApiGatewayManagementClient instance with the WebSocket endpoint, and use the PostToConnection action to send the stringified message to the client connected to the WebSocket API with the connectionId.

4.7. Receiving click results

Finally, the dashboard client receives the click data that StreamProcessor sends to the WebSocket API. When this happens, the message event available via the onmessage property is emitted. As discussed above, the handleMessage front-end function manages the data updates in the UI.

5. Considerations

The solution described here updates the click data status in near real-time. There's no expensive polling involved in the architecture. The browser application will automatically receive the new data through the WebSocket API.

5.1. No need for a REST API?

If there's no requirement for REST APIs, we can simplify the architecture by sending click data to the WebSocket API. We can create a custom route (or use the $default route) and integrate it with the IngestClickData function directly.

If, for example, we want to integrate external systems with a webhook or work with clients that depend on REST endpoints, the solution presented here can be viable.

5.2. Asynchronous workflows

The architecture works particularly well for asynchronous workflows, like order processing or booking. Typically, we don't want users to wait for every step of the workflow to finish. It would result in a suboptimal user experience. Instead, we return an Accepted response after the user has placed the order.

In this case, we probably want to connect the REST-type API Gateway to an SQS queue. A Lambda function can poll the queue for messages and process them at a pace that doesn't overload the backend.

The front-end application can then connect to a WebSocket API and receive the updated order status from the server as described above.

5.3. Flexibility

The architecture is flexible and extendable. In case of steadily heavy volume, API Gateway can directly integrate with Kinesis Data Streams, as described in the documentation. As with SQS queues, Lambda functions can poll the stream and process the records at their own pace.

As always, the solution presented here is not production ready, and cannot be taken as is. You always need to test your solutions before deploying them to production.

6. Summary

WebSocket APIs can be a good choice when we don't want to frequently poll a REST API for status data. API Gateway supports WebSocket APIs.

A key element in WebSocket API architectures is to persist connection IDs in a database, so that the resource (e.g., a Lambda function) broadcasting data to the API is aware of the active connections. Connected clients then automatically receive the current status through the WebSocket API.

7. Further reading

Getting started with Lambda - How to create a Lambda function

Creating a REST API in Amazon API Gateway - The title says it all

Getting started with DynamoDB - DynamoDB basics

Change data capture for DynamoDB Streams - AWS documentation page for DynamoDB Streams

Triggering multiple workflows with DynamoDB Streams - A more subjective and definitely more biased article on DynamoDB Streams

Building event-driven workflows with DynamoDB Streams - Another example of my emotionless self-promotion

Top comments (0)