DEV Community

Chirag (Srce Cde) for AWS Community Builders

Posted on • Edited on • Originally published at srcecde.me

Handle SQS message failure in batch with partial batch response feature

AWS has announced, “AWS Lambda now supports partial batch response for SQS as an event source”. Before we go through, how it works let understand that how SQS messages were handled before, and then we will go through how the partial batch response feature will add value.

1st scenario

Assumption: The SQS trigger is configured for the lambda function. The exception is not handled in case of any message processing failure for a given batch.

The lambda function is triggered with the batch of 5 messages and if the lambda function fails to process any of the messages and throws an error then all the 5 messages would be kept on the queue to be reprocessed after a visibility timeout period. In this case, either the batch processing would be completely successful and the messages would be deleted from the SQS queue or it would completely fail and put the whole batch in the queue for reprocessing.

1st scenario

import json
import boto3

def lambda_handler(event, context):
    region_name = os.environ['AWS_REGION']
    if event:
        for record in event['Records']:
            body = record['body']
            # process message
    return {
        'statusCode': 200,
        'body': json.dumps('Message processed successfully!')
    }
Enter fullscreen mode Exit fullscreen mode

Re-processing already processed message is not feasible, so to avoid it at some level let’s delete each message from the batch once it is processed.

2nd scenario

Assumption: The SQS trigger is configured for the lambda function. The exception is not handled in case of any message processing failure. But the delete functionality is added to delete each message after it is processed successfully.

In this case, let’s say the first 2 messages in the batch are processed successfully and deleted. The 3rd message failed and lambda returns an error, so for this failure, the 3rd, 4th & 5th messages will be set for a retry since the 1st & 2nd messages are processed successfully and deleted from the queue. Hence, the processing of already processed messages will not happen.

2nd scenario

import os
import json
import boto3

def lambda_handler(event, context):
    region_name = os.environ['AWS_REGION']
    if event:
        sqs = boto3.client('sqs', region_name=region_name)
        queue_name = event['Records'][0]['eventSourceARN'].split(':')[-1]
        queue_url = sqs.get_queue_url(
                    QueueName=queue_name,
                )

        for record in event['Records']:
            body = record['body']
            print(body)
            # process message
            response = sqs.delete_message(
                        QueueUrl=queue_url['QueueUrl'],
                        ReceiptHandle=record['receiptHandle']
                    )

    return {
        'statusCode': 200,
        'body': json.dumps('Message processed successfully!')
    }
view raw
Enter fullscreen mode Exit fullscreen mode

The lambda failed to process the 3rd message from the batch and due to that further processing of the rest of the messages is interrupted. But let’s say we want all the messages to be processed even if any message is failing and the successfully processed messages should be deleted and only the failed message should be retried.

3rd scenario

Assumption: The SQS trigger is configured for the lambda function. Exception handling for failed messages is configured on top of delete functionality.

Here, we will maintain a flag to determine if any message is failing. Let’s say again the 3rd message failed. Now, since we have error handling in place, it will handle the failed message and process the rest of the messages in the batch followed by the deletion. Finally, the manual exception will be raised based on the flag condition which will cause the failed message to retry since the rest of the messages are processed and deleted successfully. In this scenario, we cannot control which messages we want lambda to retry with and if we want to control the messages that lambda should retry then the partial batch response feature is the answer.

3rd scenario

import os
import json
import boto3

def lambda_handler(event, context):
    region_name = os.environ['AWS_REGION']
    if event:
        sqs = boto3.resource('sqs', region_name=region_name)
        queue_name = event['Records'][0]['eventSourceARN'].split(':')[-1]
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        failed_flag = False
        messages_to_delete = []
        for record in event['Records']:
            try:
                body = record['body']
                # process message
                messages_to_delete.append({
                    'Id': record['messageId'],
                    'ReceiptHandle': record['receiptHandle']
                })
            except RuntimeError as e:
                failed_flag =True

        if messages_to_delete:
            delete_response = queue.delete_messages(
                    Entries=messages_to_delete)
        if failed_flag:
            raise RuntimeError('Failed to process messages')

    return {
        'statusCode': 200,
        'body': json.dumps('Messages processed successfully!')
    }
Enter fullscreen mode Exit fullscreen mode

All the 3 scenarios were about how lambda can handle the messages depending on the requirements before the partial batch response was introduced.

4th Scenario

With the partial batch response feature, a lambda can identify the failed messages from the batch and return the identified messages back to the queue which will allow reprocessing of only the failed or asked messages. This will make the SQS queue processing more efficient, kill the need for repetitive data transfer with increased throughput, improve processing performance, and on top of that it does not come with any additional cost beyond the standard price.

While using this feature, exception handling should be in place and the lambda function has to return the message ids of the messages that requires reprocessing in the particular format given below.

4th scenario

To enable and handle partial batch failure, check the Report batch item failures option under Additional settings while adding the SQS trigger.

SQS trigger config screen

After the SQS event source configuration, the response part in the code should be in a particular format that is given below for the partial batch response failure to work.

{    
    "batchItemFailures": [          
        {             
            "itemIdentifier": "id2"         
        },         
        {             
            "itemIdentifier": "id4"         
        }     
    ] 
}
Enter fullscreen mode Exit fullscreen mode
import os
import json
import boto3

def lambda_handler(event, context):
    if event:
        messages_to_reprocess = []
        batch_failure_response = {}
        for record in event["Records"]:
            try:
                body = record["body"]
                # process message
            except Exception as e:
                messages_to_reprocess.append({"itemIdentifier": record['messageId']})

        batch_failure_response["batchItemFailures"] = messages_to_reprocess
        return batch_failure_response
Enter fullscreen mode Exit fullscreen mode

For a detailed video please refer to the below video.

Recommendation

For most of the situations/scenarios adopting the implementation used in the 4th scenario would be beneficial and have added advantage of efficient & fast processing, reduced repetitive data transfer hence increased throughput.

If you have any questions, comments, or feedback then please leave them below. Subscribe to my channel for more.

Top comments (0)