DEV Community

Eric BIANCHI for AWS Switzerland

Posted on

Monitoring and instrumenting a multi-stage SQS pipeline using Cloudwatch

Many of our AWS customers in Switzerland are doing some data ingestion right now, especially in the Financial Service Industry (FSI) where they are massively web scrapping all kind of data across different type of API and websites. We are talking hundreds of thousands of API calls per day. Building data ingestion pipelines at this scale requires setting up different layers and stages that need to be synchronized, monitored and governed.

There are different ways to build data ingestion pipelines in AWS. Reference architectures and solutions are available all over the place. In this article, we will discuss a data ingestion pipeline based on Amazon Simple Queue Service (SQS) and AWS Lambda functions.

Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications while AWS Lambda is a serverless compute service that lets you run code without provisioning or managing servers. Running these 2 services together removes all the heavy lifting of setting up complex data ingestion and processing workloads.

Let’s take a really world use case of a multi-stage data ingestion pipelines as shown on the architecture diagram below:
Architecture Diagram

The first queue (SQS 1) is filled-in very quickly with a list of tasks (it can be URLs with parameters, it can includes some specific instructions, some DSL, it can be anything really). Now the first lambdas are being triggered and start querying, processing, doing some data transformation and write their output in a new SQS queue (SQS 2). At this stage you can add other layered queues (SQS3, SQS4 … SQS-N). The number of queues really depends of your needs and use cases: for example you would like to enforce some separation of concerns or because the lambda function constrains the execution time.

Challenge

You can repeat this pattern over and over but at some point, you want to make sure that all the data pipeline processing is over so you can trigger the next action. You will face the following question: "How do I know that I'm done" or putting it differently:

"How do I know that all messages in the queues have been processed and that no lambda is still running”

They are different options available, which include using step functions, managing a static counter in a DynamoDB table (or in any other kind of storage), writing the final results directly on S3, etc. For the purpose of this article, we assume the end result is written in SQS and I will show a stateless way of triggering your services when your data is ready, relying only on CloudWatch.

Setting-up the stage

A producer (Lambda 1) is filling up a first SQS queue (SQS 1) with a list of instructions from different sources. This Lambda function is triggered manually or by a cron expression. The first queue is filled-in really quickly. For the purpose of the article, we will fill-in the SQS queue with 100 messages containing a random integer :

import boto3
from random import random

def handler_name(event, context): 
    sqs = boto3.client('sqs')
    queue = get_queue_by_name(QueueName='SQS1')

    for _ in range(100):
        response = sqs.send_message(QueueUrl=queue['QueueUrl'], MessageBody=str(random()))

    return ''

Enter fullscreen mode Exit fullscreen mode

At second stage, Lambda functions (lambda2) are polling the first queue, doing some data processing and transformation and filling up a new queue (SQS2) with the results (we just copy the message as-is here):

import boto3

def handler_name(event, context): 
    sqs = boto3.client('sqs')
    queue_source = get_queue_by_name(QueueName='SQS1')
    queue_target = get_queue_by_name(QueueName='SQS2')

    while True:   
        response = sqs.receive_message(QueueUrl=queue_source['QueueUrl'])

        if 'Messages' not in response.keys():
            break

        message = response['Messages'][0]
        receipt_handle = message['ReceiptHandle']
        sqs.delete_message(QueueUrl=queue_source['QueueUrl'], ReceiptHandle=receipt_handle)
        sqs.send_message(QueueUrl=queue_target['QueueUrl'], MessageBody=message['Body'])
Enter fullscreen mode Exit fullscreen mode

Solution Overview

First, let's reframe the problem as:

“In a given time window (the time it takes for the whole pipeline to execute), if I have sent the same number of messages to the first SQS queue and the last SQS queue, then it means that I have processed all my messages and that all lambdas are done running ”

So let’s go to CloudWatch > Metrics > SQS > Queue Metrics and pick the right metrics:

  • a first metric for my first queue which is NumberOfMessagesSent in firstQueue (id = m1)
  • a second metric for my second queue which is NumberOfMessagesSent in secondQueue (id = m2) Picking the right SQS Metrics

Then we create a math expression e1, which compares both numbers and return true if they are equals and greater than 0 on my time window: IF(m1 == m2 AND m1 > 0, 1, 0)
Math Expression
You can see on the screenshot the queues being filled with 100 messages and being emptied in a 5-minute-time-window.

On the following screenshot, you can see that both queues have received the same number of messages in the specified time interval and that e1, our math expression comparing the number of messages in both queue is True.
CloudWatch Status

You can build more complex expressions taking into account your different stages and dead letter queues if needed, but the methodology remains the same.

Next we set an alarm on e1, which triggers a SNS notification if the condition criteria is met (on the given time window), then we can trigger our final workload based on the SNS Notification:
SNS Configuration

Now you can trigger a lambda function that will launch the last piece of your multi-stage pipeline.

Note that it works with many other services where data is flowing in and out of these services, such as Amazon Kinesis, AWS API Gateway, etc.

Do you want to learn more? Passing your AWS Certified Sysops Administrator - Associate Certification is a great way to learn how to leverage one of the most powerful AWS service out there: Amazon CloudWatch.

Have fun with CloudWatch!
-Eric

Top comments (0)