DEV Community

Fatima Medlij
Fatima Medlij

Posted on

Automating Post-SQS Tasks with EventBridge

I had to run a job as soon as my sqs finished processing the messages, and i wanted to automate it. So in this article, I'm going to share the architectural design of the solution I made.

Image description

Image description

How do we know that the SQS is empty
These SQS attributes have to be null for several minutes

  • ApproximateNumberOfMessagesDelayed.
  • ApproximateNumberOfMessagesNotVisible.
  • ApproximateNumberOfMessages.

which can be observed on CloudWatch Dashboard

Image description

Create a function that will check these metrics

async checkIfSQSEmpty(queueurl) {
    var params = {
      QueueUrl: queueurl,
      AttributeNames: [
        "ApproximateNumberOfMessagesDelayed",
        "ApproximateNumberOfMessagesNotVisible",
        "ApproximateNumberOfMessages",
      ],
    };
    var isempty = sqs
      .getQueueAttributes(params)
      .promise()
      .then((data) => {
        var count =
          Number(data.Attributes.ApproximateNumberOfMessagesDelayed) +
          Number(data.Attributes.ApproximateNumberOfMessagesNotVisible) +
          Number(data.Attributes.ApproximateNumberOfMessages);
        if (count == 0) return true;
        else return false;
      });
    return await isempty;
  }
}
Enter fullscreen mode Exit fullscreen mode

Create Functions to Enable/Disable Scheduler

async enableEventBridgeScheduler(schedulename, sqsurl, invocationtime) {
    var params = await this.getSchedule(schedulename);
    params.State = "ENABLED";
    params.StartDate = invocationtime;
    // 3600 is 1 hour in Epoch seconds
    params.EndDate = invocationtime + duration * 3600; 
    params.Target.Input =
      '{"SQSUrl":"' + sqsurl + '","InvocationTime":' + invocationtime + "}";
    console.log(params);
    var result = await scheduler
      .updateSchedule(params)
      .promise()
      .then((data) => {
        return data;
      });
    return await result;
  }

  async disableEventBridgeScheduler(schedulename) {
    var params = await this.getSchedule(schedulename);
    params.State = "DISABLED";
    params.Target.Input = "{}";
    params.StartDate = null;
    params.EndDate = null;
    var result = await scheduler
      .updateSchedule(params)
      .promise()
      .then((data) => {
        return data;
      });
    return await result;
  }

  async getSchedule(schedulename) {
    var params = { Name: schedulename };
    var result = await scheduler
      .getSchedule(params)
      .promise()
      .then((data) => {
        var temp = {};
        temp.FlexibleTimeWindow = data.FlexibleTimeWindow;
        temp.Name = data.Name;
        temp.ScheduleExpression = data.ScheduleExpression;
        temp.ScheduleExpressionTimezone = data.ScheduleExpressionTimezone;
        temp.Target = data.Target;
        return temp;
      });
    return result;
  }
Enter fullscreen mode Exit fullscreen mode

Trigger for First Invocation
After you fill your SQS call the lambda function whether by api or invoked from another function, and differentiate between that and the invocation from EventBridge Scheduler.

"use strict";
const SQSServices = require("./sqs");
const sqs = new SQSServices();
const SchedulerServices = require("./scheduler");
const scheduler = new SchedulerServices();
const LambdaServices = require("./lambda");
const lambda = new LambdaServices();

const schedulename = process.env.SchedulerName;
module.exports.main = async (event) => {
  var response = { SQSEmpty: "", SQSName: "", SchedulerState: "" };
  var invocationtime = null;
  var data = event;

  if (event.body) { /*from ApiGateWay => First Invocation*/
    data = JSON.parse(event.body);
    var sqsurl = data.SQSUrl;
    /*divide by 1,000 to make epoch in s (api in ms) */
    invocationtime = ((event.requestContext.timeEpoch) + 1800) / 1000 
    await scheduler.enableEventBridgeScheduler(schedulename,sqsurl,invocationtime);
    response.SchedulerState = 'ENABLED';
    var url = sqsurl.split("/");
    response.SQSName = url[url.length - 1];
    response.SQSEmpty = await sqs.checkIfSQSEmpty(sqsurl);
    return response;
  } 

  if (event.InvocationTime) invocationtime = event.InvocationTime;  /*from EventBridge*/
  var sqsurl = data.SQSUrl;
  console.log(sqsurl)
  var url = sqsurl.split("/");
  response.SQSName = url[url.length - 1];
  response.SQSEmpty = await sqs.checkIfSQSEmpty(sqsurl);

  if (response.SQSEmpty == true) {
    // Actions if SQS batch is done

    // Disable Scheduler
    await scheduler.disableEventBridgeScheduler(schedulename);
    response.SchedulerState = 'DISABLED';
    return response;

  } else if (response.SQSEmpty == false) {
    await scheduler.enableEventBridgeScheduler(schedulename,sqsurl,invocationtime);
    response.SchedulerState = 'ENABLED';
    return response;
  }

  return response;
};
Enter fullscreen mode Exit fullscreen mode

Top comments (0)