What is the Agenda of the project?
The agenda of the project involves Real-time monitoring of website using AWS services.
We first launch an EC2 instance on AWS and install Amazon Kinesis in it. Then, Data analytics streams are created in amazon Kinesis for real-time streaming of data. Then after launching AWS EC2 instances, an Identity Access Management(IAM) role is assigned to the EC2 instance. It is followed by doing Log streaming to Kinesis data streams, followed by Creation of Kinesis Analytics. It is followed by
creation of Amazon Aurora MySQL followed by using Amazon SNS service and Secret Manager.
Problem Statement
One of the major areas where any cloud customer has to take precautions is to react immediately if there is a sudden change in usage cost. This is highly common retail scenarios like Amazon where the number of sales go up during Festival offer period or the site is misused by bad actors overwhelming the site with order requests.
So the solution below is used to identify any unusual activity in terms of maximum orders(which is set to 15) placed in every 15 seconds using AWS real-time streaming and processing systems.
Real time Solution(not part of implementation): Based on SNS notification, user can check out if the orders are authentic and take action like increase the number of backend servers for processing or block the IP address from order placing, etc
Architecture
Step 1: Amazon EC2 acts as website backend generating server logs
Step 2: Kinesis DataStreams reads the server logs in real time and pushes it to Kinesis Data Analytics for doing above computation(more than 15 orders per 15 seconds)
Step 3: We create a second stream in Data Analytics that actually notes such floods for past 1 minute and then send only messages to 2nd Data stream if the trend is noted continuously 1 min. This step is purely added to reduce number of SNS messages received in case of spike
Step 4: Second data stream is used to receive such alarm records and trigger lambda
Step 5: Lambda triggers SNS notification which in turn delivers a SMS message. It saves the copy of all the error messages in Aurora MySQL for aggregated view in future
Dataset
Here is a sample of data source "OnlineRetail.csv"
Create EC2 Instance & Log Generator
Install AWS-Kinesis-Agent:
https://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html
Generate Log from EC2
import csv
import time
import sys
sourceData = "OnlineRetail.csv"
placeholder = "LastLine.txt"
def GetLineCount():
with open(sourceData) as f:
for i, l in enumerate(f):
pass
return i
def MakeLog(startLine, numLines):
destData = time.strftime("/var/log/mywebsite/%Y%m%d-%H%M%S.log")
with open(sourceData, 'r') as csvfile:
with open(destData, 'w') as dstfile:
reader = csv.reader(csvfile)
writer = csv.writer(dstfile)
next (reader) #skip header
inputRow = 0
linesWritten = 0
for row in reader:
inputRow += 1
if (inputRow > startLine):
writer.writerow(row)
linesWritten += 1
if (linesWritten >= numLines):
break
return linesWritten
numLines = 100
startLine = 0
if (len(sys.argv) > 1):
numLines = int(sys.argv[1])
try:
with open(placeholder, 'r') as f:
for line in f:
startLine = int(line)
except IOError:
startLine = 0
print("Writing " + str(numLines) + " lines starting at line " + str(startLine) + "\n")
totalLinesWritten = 0
linesInFile = GetLineCount()
while (totalLinesWritten < numLines):
linesWritten = MakeLog(startLine, numLines - totalLinesWritten)
totalLinesWritten += linesWritten
startLine += linesWritten
if (startLine >= linesInFile):
startLine = 0
print("Wrote " + str(totalLinesWritten) + " lines.\n")
with open(placeholder, 'w') as f:
f.write(str(startLine))
Create Kinesis Analytics
CREATE OR REPLACE STREAM "ALARM_STREAM" (order_count INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "ALARM_STREAM"
SELECT STREAM order_count
FROM (
SELECT STREAM COUNT(*) OVER FIFTEEN_SECOND_SLIDING_WINDOW AS order_count
FROM "SOURCE_SQL_STREAM_001"
WINDOW TEN_SECOND_SLIDING_WINDOW AS (RANGE INTERVAL '15' SECOND PRECEDING)
)
WHERE order_count >= 30;
CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM(
order_count INTEGER,
trigger_count INTEGER
);
CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAM
SELECT STREAM order_count, trigger_count
FROM (
SELECT STREAM order_count, COUNT(*) OVER W1 as trigger_count
FROM "ALARM_STREAM"
WINDOW W1 AS (RANGE INTERVAL '1' MINUTE PRECEDING)
)
WHERE trigger_count >= 1;
Lambda trigger event to post to SNS and Aurora MySQL
from __future__ import print_function
import pymysql
import json
from datetime import datetime
import boto3
region_name = "ap-south-1"
secret_name = "aurora_secret"
session = boto3.session.Session()
client = session.client(service_name="secretsmanager", region_name=region_name)
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
print(e)
if "SecretString" in get_secret_value_response:
secret_str = get_secret_value_response["SecretString"]
else:
secret_str = base64.b64decode(get_secret_value_response["SecretBinary"])
secret = json.loads(secret_str)
hostname = str(secret["host"])
username = str(secret["username"])
pwd = str(secret["password"])
# lambda trigger event to post to SNS and Aurora MySQL
def lambda_handler(event, context):
print(event)
try:
# Create SNS client using the topic arn
client = boto3.client("sns")
topic_arn = "arn:aws:sns:ap-south-1:338401225294:website-alarm"
out = client.publish(
TopicArn=topic_arn,
Message="Investigate sudden surge in orders. Order Count: {} @ {}".format(
event["order_count"], datetime.now().strftime("%Y-%m-%d %H:%M:%S")
),
Subject="Mywebsite Order Rate Alarm",
)
print(out)
# Connect to Aurora Mysql to write alarm data
conn = pymysql.connect(
host=hostname, db="logs", user=username, password=pwd, connect_timeout=10
)
with conn.cursor() as cur:
cur.execute(
'insert into logs.logs_history values("{}",{})'.format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), event["order_count"]
)
)
conn.commit()
print("Successfully delivered alarm message")
except Exception:
print("Either Aurora insert or SNS notification failed")
Top comments (0)