I had to run a job as soon as my sqs finished processing the messages, and i wanted to automate it. So in this article, I'm going to share the architectural design of the solution I made.
How do we know that the SQS is empty
These SQS attributes have to be null for several minutes
- ApproximateNumberOfMessagesDelayed.
- ApproximateNumberOfMessagesNotVisible.
- ApproximateNumberOfMessages.
which can be observed on CloudWatch Dashboard
Create a function that will check these metrics
async checkIfSQSEmpty(queueurl) {
var params = {
QueueUrl: queueurl,
AttributeNames: [
"ApproximateNumberOfMessagesDelayed",
"ApproximateNumberOfMessagesNotVisible",
"ApproximateNumberOfMessages",
],
};
var isempty = sqs
.getQueueAttributes(params)
.promise()
.then((data) => {
var count =
Number(data.Attributes.ApproximateNumberOfMessagesDelayed) +
Number(data.Attributes.ApproximateNumberOfMessagesNotVisible) +
Number(data.Attributes.ApproximateNumberOfMessages);
if (count == 0) return true;
else return false;
});
return await isempty;
}
}
Create Functions to Enable/Disable Scheduler
async enableEventBridgeScheduler(schedulename, sqsurl, invocationtime) {
var params = await this.getSchedule(schedulename);
params.State = "ENABLED";
params.StartDate = invocationtime;
// 3600 is 1 hour in Epoch seconds
params.EndDate = invocationtime + duration * 3600;
params.Target.Input =
'{"SQSUrl":"' + sqsurl + '","InvocationTime":' + invocationtime + "}";
console.log(params);
var result = await scheduler
.updateSchedule(params)
.promise()
.then((data) => {
return data;
});
return await result;
}
async disableEventBridgeScheduler(schedulename) {
var params = await this.getSchedule(schedulename);
params.State = "DISABLED";
params.Target.Input = "{}";
params.StartDate = null;
params.EndDate = null;
var result = await scheduler
.updateSchedule(params)
.promise()
.then((data) => {
return data;
});
return await result;
}
async getSchedule(schedulename) {
var params = { Name: schedulename };
var result = await scheduler
.getSchedule(params)
.promise()
.then((data) => {
var temp = {};
temp.FlexibleTimeWindow = data.FlexibleTimeWindow;
temp.Name = data.Name;
temp.ScheduleExpression = data.ScheduleExpression;
temp.ScheduleExpressionTimezone = data.ScheduleExpressionTimezone;
temp.Target = data.Target;
return temp;
});
return result;
}
Trigger for First Invocation
After you fill your SQS call the lambda function whether by api or invoked from another function, and differentiate between that and the invocation from EventBridge Scheduler.
"use strict";
const SQSServices = require("./sqs");
const sqs = new SQSServices();
const SchedulerServices = require("./scheduler");
const scheduler = new SchedulerServices();
const LambdaServices = require("./lambda");
const lambda = new LambdaServices();
const schedulename = process.env.SchedulerName;
module.exports.main = async (event) => {
var response = { SQSEmpty: "", SQSName: "", SchedulerState: "" };
var invocationtime = null;
var data = event;
if (event.body) { /*from ApiGateWay => First Invocation*/
data = JSON.parse(event.body);
var sqsurl = data.SQSUrl;
/*divide by 1,000 to make epoch in s (api in ms) */
invocationtime = ((event.requestContext.timeEpoch) + 1800) / 1000
await scheduler.enableEventBridgeScheduler(schedulename,sqsurl,invocationtime);
response.SchedulerState = 'ENABLED';
var url = sqsurl.split("/");
response.SQSName = url[url.length - 1];
response.SQSEmpty = await sqs.checkIfSQSEmpty(sqsurl);
return response;
}
if (event.InvocationTime) invocationtime = event.InvocationTime; /*from EventBridge*/
var sqsurl = data.SQSUrl;
console.log(sqsurl)
var url = sqsurl.split("/");
response.SQSName = url[url.length - 1];
response.SQSEmpty = await sqs.checkIfSQSEmpty(sqsurl);
if (response.SQSEmpty == true) {
// Actions if SQS batch is done
// Disable Scheduler
await scheduler.disableEventBridgeScheduler(schedulename);
response.SchedulerState = 'DISABLED';
return response;
} else if (response.SQSEmpty == false) {
await scheduler.enableEventBridgeScheduler(schedulename,sqsurl,invocationtime);
response.SchedulerState = 'ENABLED';
return response;
}
return response;
};
Top comments (0)