DEV Community

Cover image for Website Monitoring using AWS Lambda and Aurora
Flynn's Blog
Flynn's Blog

Posted on

Website Monitoring using AWS Lambda and Aurora

What is the Agenda of the project?

The agenda of the project involves Real-time monitoring of website using AWS services.

We first launch an EC2 instance on AWS and install Amazon Kinesis in it. Then, Data analytics streams are created in amazon Kinesis for real-time streaming of data. Then after launching AWS EC2 instances, an Identity Access Management(IAM) role is assigned to the EC2 instance. It is followed by doing Log streaming to Kinesis data streams, followed by Creation of Kinesis Analytics. It is followed by
creation of Amazon Aurora MySQL followed by using Amazon SNS service and Secret Manager.

Problem Statement

One of the major areas where any cloud customer has to take precautions is to react immediately if there is a sudden change in usage cost. This is highly common retail scenarios like Amazon where the number of sales go up during Festival offer period or the site is misused by bad actors overwhelming the site with order requests.

So the solution below is used to identify any unusual activity in terms of maximum orders(which is set to 15) placed in every 15 seconds using AWS real-time streaming and processing systems.

Real time Solution(not part of implementation): Based on SNS notification, user can check out if the orders are authentic and take action like increase the number of backend servers for processing or block the IP address from order placing, etc

Architecture

Step 1: Amazon EC2 acts as website backend generating server logs
Step 2: Kinesis DataStreams reads the server logs in real time and pushes it to Kinesis Data Analytics for doing above computation(more than 15 orders per 15 seconds)
Step 3: We create a second stream in Data Analytics that actually notes such floods for past 1 minute and then send only messages to 2nd Data stream if the trend is noted continuously 1 min. This step is purely added to reduce number of SNS messages received in case of spike
Step 4: Second data stream is used to receive such alarm records and trigger lambda
Step 5: Lambda triggers SNS notification which in turn delivers a SMS message. It saves the copy of all the error messages in Aurora MySQL for aggregated view in future

Dataset

Here is a sample of data source "OnlineRetail.csv"
Image description

Create EC2 Instance & Log Generator

Install AWS-Kinesis-Agent:
https://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html

Generate Log from EC2

import csv
import time
import sys

sourceData = "OnlineRetail.csv"
placeholder = "LastLine.txt"

def GetLineCount():
    with open(sourceData) as f:
        for i, l in enumerate(f):
            pass
    return i

def MakeLog(startLine, numLines):
    destData = time.strftime("/var/log/mywebsite/%Y%m%d-%H%M%S.log")
    with open(sourceData, 'r') as csvfile:
        with open(destData, 'w') as dstfile:
            reader = csv.reader(csvfile)
            writer = csv.writer(dstfile)
            next (reader) #skip header
            inputRow = 0
            linesWritten = 0
            for row in reader:
                inputRow += 1
                if (inputRow > startLine):
                    writer.writerow(row)
                    linesWritten += 1
                    if (linesWritten >= numLines):
                        break
            return linesWritten


numLines = 100
startLine = 0            
if (len(sys.argv) > 1):
    numLines = int(sys.argv[1])

try:
    with open(placeholder, 'r') as f:
        for line in f:
             startLine = int(line)
except IOError:
    startLine = 0

print("Writing " + str(numLines) + " lines starting at line " + str(startLine) + "\n")

totalLinesWritten = 0
linesInFile = GetLineCount()

while (totalLinesWritten < numLines):
    linesWritten = MakeLog(startLine, numLines - totalLinesWritten)
    totalLinesWritten += linesWritten
    startLine += linesWritten
    if (startLine >= linesInFile):
        startLine = 0

print("Wrote " + str(totalLinesWritten) + " lines.\n")

with open(placeholder, 'w') as f:
    f.write(str(startLine))
Enter fullscreen mode Exit fullscreen mode

Create Kinesis Analytics

CREATE OR REPLACE STREAM "ALARM_STREAM" (order_count INTEGER);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
    INSERT INTO "ALARM_STREAM"
        SELECT STREAM order_count
        FROM (
            SELECT STREAM COUNT(*) OVER FIFTEEN_SECOND_SLIDING_WINDOW AS order_count
            FROM "SOURCE_SQL_STREAM_001"
            WINDOW TEN_SECOND_SLIDING_WINDOW AS (RANGE INTERVAL '15' SECOND PRECEDING)
        )
        WHERE order_count >= 30;

CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM(
    order_count INTEGER,
    trigger_count INTEGER
);

CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAM

SELECT STREAM order_count, trigger_count

FROM (
    SELECT STREAM order_count, COUNT(*) OVER W1 as trigger_count
    FROM "ALARM_STREAM"
    WINDOW W1 AS (RANGE INTERVAL '1' MINUTE PRECEDING)
)

WHERE trigger_count >= 1;
Enter fullscreen mode Exit fullscreen mode

Lambda trigger event to post to SNS and Aurora MySQL

from __future__ import print_function
import pymysql
import json
from datetime import datetime
import boto3

region_name = "ap-south-1"
secret_name = "aurora_secret"
session = boto3.session.Session()
client = session.client(service_name="secretsmanager", region_name=region_name)
try:
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
    print(e)
if "SecretString" in get_secret_value_response:
    secret_str = get_secret_value_response["SecretString"]
else:
    secret_str = base64.b64decode(get_secret_value_response["SecretBinary"])
secret = json.loads(secret_str)
hostname = str(secret["host"])
username = str(secret["username"])
pwd = str(secret["password"])

# lambda trigger event to post to SNS and Aurora MySQL
def lambda_handler(event, context):
    print(event)
    try:
        # Create SNS client using the topic arn
        client = boto3.client("sns")
        topic_arn = "arn:aws:sns:ap-south-1:338401225294:website-alarm"

        out = client.publish(
            TopicArn=topic_arn,
            Message="Investigate sudden surge in orders. Order Count: {} @ {}".format(
                event["order_count"], datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            ),
            Subject="Mywebsite Order Rate Alarm",
        )

        print(out)

        # Connect to Aurora Mysql to write alarm data
        conn = pymysql.connect(
            host=hostname, db="logs", user=username, password=pwd, connect_timeout=10
        )
        with conn.cursor() as cur:
            cur.execute(
                'insert into logs.logs_history values("{}",{})'.format(
                    datetime.now().strftime("%Y-%m-%d %H:%M:%S"), event["order_count"]
                )
            )
        conn.commit()

        print("Successfully delivered alarm message")
    except Exception:
        print("Either Aurora insert or SNS notification failed")

Enter fullscreen mode Exit fullscreen mode

Top comments (0)