Skip to content
loading...

AWS Lambda SQS events with Serverless Framework

Marcin Piczkowski on July 01, 2018

This post was initially published at Cloudly.Tech On 28 JUN 2018 Amazon announced adding support for SQS events in Lambda. This greatly simpl... [Read Full]
markdown guide
 

Hey I can't find much about reservedConcurrency when you say limit to 1 thread, do lambdas share threads with more concurrency? Also if you limit the concurrency are the extra executions queued up for a bit or just fail? like if I call 100 lambda executions and there is a reservedConcurrency of 1, will it run 1 at a time until all 100 are up?
Cheers
Greg

 

Greg, please have a look at this doc section "Throttling Behavior"

In short: when you exceed the reservedConcurrency limit, let's say call 100 lambdas while you set the limit to 1, you will experience throttling and the whether lambda will retry your call or not depends on the source of the event, e.g. if you call Lambda from API Gateway you will get the error from your API straight without any retry (you need to handle retries yourself in API called if you wish to)

 

After we experience throtlling, what happens to the messages that cannot be processed? are they going back to the queue? Does SQS considers them failed?

If your Lambda function is invoked asynchronously, e.g. from other function, AWS Lambda automatically retries the throttled event for up to six hours, with delays between retries.

In case of lambda invoked from sqs (poll-based) AWS Lambda will continue to retry until MessageRetentionPeriod set for the queue expires or if you have redrive policy set on the queue then until MaxReceiveCount after which the message goes to DLQ.

docs.aws.amazon.com/lambda/latest/...

Yep got you, problem is that you cannot have a normal sequential processing by a certain number, say by 1, and fill up the queue. In a similar case, I ended up setting the MaxReceiveCount to 1000 in order to achieve that. Which worked alright but brought other issues, e.g. the message must not fail cause it will just keep retrying for a long time.

I'm not sure I get your problem. Would be nice if you could illustrate it with example project. By sequential you mean processing messages in order I suppose but for this you would need fifo queues or design the app differently, e.g. say you store the current processing state in db. When message2 arrives before message1 you check in db that message1 was not yet processed and you put message1 in another queue and have another lambda which will forward the message back to the original queue (then the receive count is 0 again), when message1 arrives next you process it and update db state so when next time you receive message2 you will know that you can process it already. You could also use the DLQ for that and the other lambda could pass messages from DLQ back to the original queue. Just an idea, don't know your particular case but maybe it works for you.

Ah no I wasn't talking about order but rather the regulation flow. So I want the messages to wait in the queue until the worker can process them but with SQS/Lambda the messages won't wait since they are experiencing throttling and will fail and will either be removed or go to the DLQ.

But I liked you technique on preserving order. I haven't tried it, maybe I will in the future.

Hmm.. I see. So maybe you could try the same technique here. When throttled message goes to DLQ you have a lambda function running in some scheduled interval which you set by experimenting with throttle rate, and this lambda will forward the message from DLQ back to original queue from which it could possibly be processed again and hopefully not throttled again.

I just created this repo and remembered of our discussion. This is how I managed to create a sequential processing pipeline with AWS SQS and Lambda on a Serverless project.

Let me know what you think! Thank you.

github.com/kbariotis/sqs-lambda-co...

Nice :) I like it.

I tried to find out what happens to the CloudWatch event which is throttled, if it is retried or dropped but I could not see in docs, but I suppose it is more a problem of AWS what to do with high number of cumulated CloudWatch events which are continuously retried, rather than your problem, since setting of concurrency limit on Lambda to 1 provides that no more than 1 instance of this function will execute at certain time.

Nice wrap up. Thanks.

 

Hi Marcin,

Have an error when I try make a POST request to URL del API Gateway:

This is the response:
502 Bad Gateway
{
"message": "Internal server error"
}

What happend?

I not found nothing in CloudWatch Logs about of error into the REST resource.

Regards,

 

Make a test on API Gateway POST verb and your log show me is:
Execution failed due to configuration error: Malformed Lambda proxy response
Method completed with status: 502

But have well formed my response in my sender....

var response = {
"statusCode": responseCode,
"headers": {
"Content-Type": "application/json"
},
"body": JSON.stringify(responseBody),
"isBase64Encoded": false
};

Read this...
aws.amazon.com/es/premiumsupport/k...

But nothing! wuu

 

Can you put your whole handler code somewhere, e.g
on gist to check how the function looks like?

I just re-created the sender.js and it goes well. I think he wrote something wrong. The important thing is that I could check how the data exchange between lambda and SQS works. Thank you!

Only a tiny addition:
if (err) {
console.log('error:', "failed to send message" + err);
responseCode = 500;
responseBody = err;
...

I did not understand the concurrency part yet. If my Lambda has more than 2000 records to send to SQS, I will send them one by one to the queue. I think the lambda will take a long time, can I send batch queues? Each record is a message to the queue.

Maybe it illuminates us with an additional post about this?

Thank you so much Marcin!

you are allowed to send at most 10 messages in single batch docs.aws.amazon.com/AWSSimpleQueue...

I would delegate this task to multiple lambdas if you need to send batches concurrently to speedup.

 

Do I have to delete the message after receiving and processing?

Like:

var params = {
QueueUrl: 'STRING_VALUE', /* required /
ReceiptHandle: 'STRING_VALUE' /
required */
};

sqs.deleteMessage(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});

 

Hi Pedro, I was confused at the beginning too, because this was the way to handle messages before SQS events were supported in Lambda when you had to poll the queue yourself.

It is not needed anymore and explained in AWS documentation (docs.aws.amazon.com/lambda/latest/...)

Lambda takes care of:
 - Automatically retrieving messages and directing them to the target Lambda function.
 - Deleting them once your Lambda function successfully completes.

What is important is the "successfully completes", because if the Lambda fails the message will remain in the queue and will be processed again, so you need to handle exceptions carefully.

You can e.g. surround lambda in try-catch clause and do not let Lambda fail because of wrong message or configure dead-letter queues where failed messages will be redirected automatically after Lambda fails. Then you can have e.g. separate lambda for dead-letter queues to handle such messages.

I'm planning to write more about Lambda error handling soon.

 

One question more ...

This workflow work for FIFO Queue also? Or exist some difference?

Thanks!

 

No, it only works for standard queue, sqs triggers do not work for fifo. You are left with polling the queue yourself.

 

It means that I can publish (sender.js) messages from a lambda, but I can not read (receive.js) the queue from another lambda? In summary, what would the new flow be?

In receive.js you would need to use aws sqs API to fetch messages from your fifo queue. You would have to run receive.js periodically, e.g. schedule it as a cron job (see here), because it won't be triggered by new messages in the queue.

 
 

Cool, looks like Serverless Framework does not support yet triggering SQS from Api Gateway, so CloudFormation is the only way, but it is work in progress: forum.serverless.com/t/api-gateway...

code of conduct - report abuse