DEV Community

Tris Duong
Tris Duong

Posted on

Tutorial Use AWS EventBridge and AppSync for Real-time Notification to Client-side

Introduction:

Real-time notifications are becoming increasingly important for many
applications, as users expect to be notified of important events as soon
as they happen. AWS EventBridge and AWS AppSync can be powerful tools to
accomplish this goal with minimal cost. In this summary, I will walk
through the setup of a demo application that uses EventBridge and
AppSync to provide real-time updates to a client-side web application
and the cost-saving analyst.

AWS EventBridge:

AWS EventBridge is a powerful event bus service offered by Amazon Web
Services (AWS) that enables users to build real-time applications that
respond to data changes. With EventBridge, developers can set up rules
to route events from different sources, including AWS services,
third-party software-as-a-service (SaaS) applications, and custom
applications, to one or more targets. Targets can include AWS services,
such as AWS Lambda or Amazon SNS, as well as custom HTTP endpoints.

One of the main benefits of using AWS EventBridge is its ability to
simplify event-driven architectures. Instead of creating custom code to
handle each event source, developers can use EventBridge to route events
to the right targets automatically. This not only reduces the amount of
code required but also makes it easier to scale and manage applications.
Additionally, EventBridge provides built-in security features, such as
encryption and access control, to ensure that event data is secure and
compliant.

AWS AppSync:

AWS AppSync is a managed service provided by AWS that simplifies the
process of building scalable GraphQL APIs. With AppSync, developers can
define and map a data schema to one or more data sources, such as AWS
DynamoDB or AWS Lambda functions. AppSync then automatically generates
GraphQL APIs that can be accessed by client-side applications.

One of the key features of AppSync is its ability to support real-time
updates through subscriptions. With subscriptions, client-side
applications can receive real-time updates when data changes. AppSync
supports three types of subscriptions: on-demand, real-time, and hybrid.
On-demand subscriptions are triggered by client requests, while
real-time subscriptions are pushed by the server when data changes.
Hybrid subscriptions combine both on-demand and real-time subscriptions
to provide a more flexible approach.

Additionally, AppSync provides features to simplify data
synchronization, offline data access, and conflict resolution. These
features make it easier to build scalable and reliable applications that
work seamlessly across different devices and network conditions.

Setting Up the Demo Application:

Image description

Steps:

  • Create an EventBridge Rule:

    • Go to the EventBridge console
    • Click on \"Create Rule\"
    • Enter a name for the rule and a description (optional)

Image description

-   Under \"Event Pattern\", choose \"Custom Pattern\"

-   In the \"Event Pattern\" text area, enter the following pattern
Enter fullscreen mode Exit fullscreen mode
{\"detail-type\": \[\"Cloud Notification\"\]}
Enter fullscreen mode Exit fullscreen mode

Image description

-   This pattern will match all events with the detail-type \"Cloud
    Notification\". We can use multiple patterns for each type of
    notification if we want.

-   Click on \"Create a new API destination\"

-   Choose "Create a new connection"

-   Enter a name for the connection and a description (optional)

-   Under \"Endpoint URL\", enter the AppSync endpoint URL

-   Choose the authentication method (API Key or Basic
    (Username/Password) or OAuth Client Credentials ). We can use
    OAuth Client of the Cloud side.

-   Click on "Additional settings" and choose "Configure
    transformer". This step will transform data of the event bridge
    to body data that need when post to GraphQL. We also can add
    data that we need in this step. It is very flexible.

-   Fill in "Target input transformer":
Enter fullscreen mode Exit fullscreen mode
{\"message\":\"\$.detail.message\",\"resourceId\":\"\$.detail.resource-id\",\"resourceType\":\"\$.detail.resource-type\",\"updatedAt\":\"\$.time\",\"userId\":\"\$.detail.user-id\"}
Enter fullscreen mode Exit fullscreen mode
-   Fill in "Template":
Enter fullscreen mode Exit fullscreen mode
{
  "query": "mutation PublishNotification($userId: ID!, $resourceId: ID!, $resourceType: String!, $message: String!, $updatedAt: AWSDateTime!) { publishNotification(userId: $userId, resourceId: $resourceId, resourceType: $resourceType, message: $message, updatedAt: $updatedAt) { userId resourceId resourceType message updatedAt } }",
  "operationName": "PublishNotification",
  "variables": {
    "userId": "<userId>",
    "resourceId": "<resourceId>",
    "resourceType": "<resourceType>",
    "message": "<message>",
    "updatedAt": "<updatedAt>"
  }
}
Enter fullscreen mode Exit fullscreen mode
-   Click on \"Create\"

> Now, all events with the detail-type 
\"Cloud Notification\" will be
> sent to the destination that you selected.
Enter fullscreen mode Exit fullscreen mode
  • Create an AppSync API:

    • Go to the AppSync console
    • Click on \"Create API\"
    • Choose \"Build from scratch\"
    • Enter a name for the API
    • Click on "Create"
    • Click on "Edit Schema" and use this example schema:
type Mutation {
    publishNotification(
        userId: ID!,
        resourceId: ID!,
        resourceType: String!,
        message: String!,
        updatedAt: AWSDateTime!
    ): Notification
}

type Notification {
    userId: ID!
    resourceId: ID!
    resourceType: String!
    message: String!
    updatedAt: AWSDateTime!
}

type Query {
    getNotification: Notification
}

type Subscription {
    onNotification(userId: ID!): Notification
        @aws_subscribe(mutations: ["publishNotification"])
}
Enter fullscreen mode Exit fullscreen mode
-   Under \"Data sources\", click on \"Create data source\"

-   Choose \"None\" as the data source( We can save notifications to
    a database if need, I chose "None" to not save)

-   Enter a name for the data source and a description (optional)

-   Under \"Resolver\" of mutation "publishNotification", click on
    \"Attach\"

-   Choose \"Create a new resolver\"

-   Enter a name for the resolver and a description (optional)

-   Under \"Data source\", choose the data source that you created

-   Under \"Request mapping template\", enter the following
    template:
Enter fullscreen mode Exit fullscreen mode
{
  "version": "2023-03-21",
  "payload": $util.toJson($context.args)
}
Enter fullscreen mode Exit fullscreen mode
-   This template will transform the event data into a format that
    can be used by the AppSync resolver.

-   Under \"Response mapping template\", enter the following
    template:

-   \$util.toJson(\$context.result)



-   This template will transform the AppSync resolver response to a
    format that the client-side application can use.

    -   Click on \"Create\"
Enter fullscreen mode Exit fullscreen mode

I have created an AppSync API with a resolver that can process events
from the EventBridge.

  • Create a client-side application:

    • Create a new project using your preferred client-side framework (React, Angular, etc.)
    • Install the AWS Amplify library
    • Configure the Amplify library with your AppSync endpoint URL and authentication method
    • In your application code, create a subscription to the AppSync API using the Amplify library
import { API, graphqlOperation } from 'aws-amplify';
import { onCreateNotification } from './graphql/subscriptions';

API.graphql(graphqlOperation(onCreateNotification, { userId: '123' }))
  .subscribe({
    next: (notification) => {
      console.log(notification);
    },
    error: (error) => {
      console.log(error);
    }
  });

Enter fullscreen mode Exit fullscreen mode

This code will subscribe to the AppSync API and receive real-time
notifications whenever a new notification is created for the user with
ID \"123\".

  • Create a client-side application with python:

    • Client code example to receive notification:
    • # subscription_client.py
# subscription_client.py

from base64 import b64encode, decode
from datetime import datetime
from uuid import uuid4

import websocket
import threading
import ssl

import json

# Constants Copied from AppSync API 'Settings'
API_URL = <YOUR_API_URL>
API_KEY = <YOUR_API_KEY>

USER_ID = <YOUR_USER_ID_WANT_TO_SEND_NOTIFICATION>

# GraphQL subscription Registration object
GQL_SUBSCRIPTION = json.dumps({
    "query": """
    subscription($userId: ID!) {
      onNotification(userId: $userId) {
        userId
        resourceId
        resourceType
        message
        updatedAt
      }
    }
    """,
    "variables": {
        "userId": USER_ID
    }
})


# Discovered values from the AppSync endpoint (API_URL)
WSS_URL = API_URL.replace('https', 'wss').replace('appsync-api', 'appsync-realtime-api')
HOST = API_URL.replace('https://', '').replace('/graphql', '')

# Subscription ID (client generated)
SUB_ID = str(uuid4())

# Set up Timeout Globals
timeout_timer = None
timeout_interval = 10


# Calculate UTC time in ISO format (AWS Friendly): YYYY-MM-DDTHH:mm:ssZ
def header_time():
    return datetime.utcnow().isoformat(sep='T', timespec='seconds') + 'Z'


# Encode Using Base 64
def header_encode(header_obj):
    return b64encode(json.dumps(header_obj).encode('utf-8')).decode('utf-8')


# reset the keep alive timeout daemon thread
def reset_timer(ws):
    global timeout_timer
    global timeout_interval

    if (timeout_timer):
        timeout_timer.cancel()
    timeout_timer = threading.Timer(timeout_interval, lambda: ws.close())
    timeout_timer.daemon = True
    timeout_timer.start()


# Create API key authentication header
api_header = {
    'host': HOST,
    'x-api-key': API_KEY
}


# Socket Event Callbacks, used in WebSocketApp Constructor
def on_message(ws, message):
    global timeout_timer
    global timeout_interval

    print('### message ###')
    print('<< ' + message)

    message_object = json.loads(message)
    message_type = message_object['type']

    if (message_type == 'ka'):
        reset_timer(ws)

    elif (message_type == 'connection_ack'):
        timeout_interval = int(json.dumps(message_object['payload']['connectionTimeoutMs']))

        register = {
            'id': SUB_ID,
            'payload': {
                'data': GQL_SUBSCRIPTION,
                'extensions': {
                    'authorization': {
                        'host': HOST,
                        'x-api-key': API_KEY
                    }
                }
            },
            'type': 'start'
        }
        start_sub = json.dumps(register)
        print('>> ' + start_sub)
        ws.send(start_sub)

    elif (message_type == 'data'):
        deregister = {
            'type': 'stop',
            'id': SUB_ID
        }
        end_sub = json.dumps(deregister)
        print('>> ' + end_sub)
        ws.send(end_sub)

    elif (message_object['type'] == 'error'):
        print('Error from AppSync: ' + message_object['payload'])


def on_error(ws, error):
    print('### error ###')
    print(error)


def on_close(ws):
    print('### closed ###')


def on_open(ws):
    print('### opened ###')
    init = {
        'type': 'connection_init'
    }
    init_conn = json.dumps(init)
    print('>> ' + init_conn)
    ws.send(init_conn)


if __name__ == '__main__':
    # Uncomment to see socket bytestreams
    # websocket.enableTrace(True)

    # Set up the connection URL, which includes the Authentication Header
    #   and a payload of '{}'.  All info is base 64 encoded
    connection_url = WSS_URL + '?header=' + header_encode(api_header) + '&payload=e30='

    # Create the websocket connection to AppSync's real-time endpoint
    #  also defines callback functions for websocket events
    #  NOTE: The connection requires a subprotocol 'graphql-ws'
    print('Connecting to: ' + connection_url)

    ws = websocket.WebSocketApp(connection_url,
                                subprotocols=['graphql-ws'],
                                on_open=on_open,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close, )

    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE, "ssl_version": ssl.PROTOCOL_TLSv1_2})
Enter fullscreen mode Exit fullscreen mode
-   Server code example to send notifications:
Enter fullscreen mode Exit fullscreen mode
import boto3
import json

event_bridge = boto3.client('events')

# Define the event details
event = {
    "detail-type": "Cloud Notification",
    "source": "experiments.created",
    "detail": {
        "user-id": "123",
        "resource-id": "345",
        "resource-type": "experiments",
        "message": "created successful"
    }
}

# Put the event to EventBridge
response = event_bridge.put_events(
    Entries=[
        {
            'Source': event['source'],
            'DetailType': event['detail-type'],
            'Detail': json.dumps(event['detail']),
            'EventBusName': 'notification'
        }
    ]
)

# Check the response
print(response)
Enter fullscreen mode Exit fullscreen mode

Demo success screenshot:

Image description

Cost analyst:

One potential challenge when building a real-time notification system is
managing costs. Fortunately, using AWS EventBridge and AWS AppSync can
be a cost-effective option. Both services offer a pay-as-you-go pricing
model, which means you only pay for the resources you use. Additionally,
both services require minimal setup and maintenance and offer automatic
scaling to optimize resource usage. By leveraging these services, you
can reduce operational costs and benefit from integration with other AWS
services.

The cost of using AWS EventBridge and AWS AppSync can vary depending on
the usage patterns of your application. AWS EventBridge charges per
event processed, with a tiered pricing model that offers reduced rates
for higher volume usage. AWS AppSync charges based on the number of API
requests and data transfers, with the first 1 million API requests per
month being free. Data transfer costs start at \$0.15 per GB per month.

An example cost breakdown for an application that processes 1 million
events per month and makes 1 million API requests per month, with 100 GB
of data transfer, would be \$20.00 per month.

Best Practices:

When building a real-time notification system using AWS EventBridge and
AppSync, several best practices can help ensure scalability,
reliability, and security. Some key best practices include:

  • Use fine-grained events: By using fine-grained events, you can
    reduce the amount of data sent to EventBridge and improve the
    efficiency of the notification system.

  • Use dead-letter queues: Dead-letter queues can be used to capture
    failed events and investigate any issues that occur. This can help
    prevent data loss and ensure the notification system works properly.

  • Use caching: Caching can help reduce the number of requests sent to
    data sources, improving the efficiency of the notification system.

  • Monitor and troubleshoot: It\'s important to monitor the
    notification system and troubleshoot any issues that arise. AWS
    provides tools like CloudWatch and X-Ray to help with monitoring and
    troubleshooting.

  • Use encryption and access control: Encryption and access control
    should be used to ensure that event data is secure and compliant
    with regulatory requirements. AWS provides features like AWS KMS and
    IAM to help with encryption and access control.

Conclusion:

In conclusion, using AWS EventBridge and AWS AppSync can be a
cost-effective solution for building a real-time notification system. By
leveraging these services, We can reduce operational costs, take
advantage of cost-effective scaling, and benefit from integration with
other AWS services. With the steps and best practices covered in this
summary, We can build a scalable and reliable real-time notification
system that meets the needs of our users.

References:

https://aws.amazon.com/blogs/mobile/appsync-eventbridge/

https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html

Top comments (0)