In this article we are going to see how Amazon Simple Queue Service (SQS) helps you to build Event Driven Architecture, and how it makes your application be more scalable
The main parts of this article:
- What's Queue
- About Amazon SQS (Main components)
- Example
What is Queue
Message Queue service is the system that pushes a message into a queue by providing an asynchronous communications protocol. There are two parties involved in queue service. Those parties are called Sender and Receiver
SQS helps you to decouple your components, for example your component "A" sends a message to "B", and "B" doesn't care what's going on in the "A", later each component can be replaced by another service, this way you will have highly independent and flexible architecture
Queue is used when things don’t have to be processed immediately, and when you have shared resources and you don't want to excess the load on the resource
Some keywords we need to know:
- Producers: components that send messages to the queue
- Consumers: components that receive messages from the queue
- The queue: which holds messages
About Amazon SQS
Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications
SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent
Standard Queue:
- Unlimited Throughput - Standard queues support a nearly unlimited number of transactions per second (TPS) per API action
- At-Least-Once Delivery - A message is delivered at least once, but occasionally more than one copy of a message is delivered
- Best-Effort Ordering - Occasionally, messages might be delivered in an order different from which they were sent
FIFO Queue:
- Limited Throughput - 300 transactions per second (TPS)
- Exactly-Once Processing - A message is delivered once and remains available until a consumer processes and deletes it. Duplicates aren't introduced into the queue
- First-In-First-Out Delivery - The order in which messages are sent and received is strictly preserved (i.e. First-In-First-Out)
📋 Note: for more information about sqs features you can visit this link
Example
In our example we will have an API that triggers a lambda function, once the lambda is executed it puts a message in the Queue which later triggers another lambda function
- Architecture:
First thing you need to do is to create your Queue from SQS, in my case I am using AWS-CDK to do so
import { Construct, NestedStack, StackProps } from '@aws-cdk/core';
import { Queue } from '@aws-cdk/aws-sqs';
export class SQSStack extends NestedStack {
constructor(app: Construct, id: string, props?: StackProps) {
super(app, id);
const MyQueue = new Queue(app, 'my-queue', {
queueName: 'my-queue'
});
}
}
To keep this article simple I'm just passing the queueName parameter, to dive deeper in all the options that you can configure, check this CDK Docs for SQS at the Construct Props section
let us add the required permissions
iamRoleStatements:
- Effect: Allow
Action:
- "sqs:SendMessage"
Resource:
- "arn:aws:sqs:${env:region}:${env:accountId}:my-queue"
- Effect: Allow
Action:
- "s3:PutObject"
Resource:
- "arn:aws:s3:${env:region}:${env:accountId}:${env:bucket}/*"
Notice I'm adding minimum permissions required for my lambda functions to do the job
.env
file holds the required parameters like my acccount id, the region I want my services to be deployed at, my bucket name etc...
my-queue
is the queue name that we created previously
- routes.yml
addMessage:
handler: src/modules/Queue/controller/lambda.addMessage
events:
- http:
method: post
path: queue/message
cors: true
consumeMessage:
handler: src/modules/Queue/controller/lambda.getMessage
events:
- sqs:
arn: arn:aws:sqs:${env:region}:${env:accountId}:my-queue
batchSize: 5
As we can see we have 2 APIs the first one just triggers a lambda function addMessage
, which sends a message to the queue, once we have a message the second function will be executed consumeMessage
Batch size is the number of records to send to the function in each batch. For a standard queue, this can be up to 10,000 records. For a FIFO queue, the maximum is 10
- Our lambda functions:
const { putMessage } = require('../service/queue.service');
const { putObject } = require('../service/s3.service');
function Response(statusCode, data) {
return {
statusCode,
body: JSON.stringify(data, null, 2),
};
}
module.exports.addMessage = async (event) => {
try {
console.log(event);
const body = JSON.parse(event.body);
const putMessageResult = await putMessage(body);
console.log('putMessageResult =>', putMessageResult);
const {
MessageId,
} = putMessageResult;
// save data in DynamoDB
return Response(200, {
message: 'Message sent to Queue',
messageId: MessageId,
});
} catch (error) {
console.log(error);
return Response(500, {
message: 'Something went wrong'
});
}
};
module.exports.getMessage = async (event) => {
try {
console.log(event);
await Promise.all(event.Records.map(async (item) => {
const body = JSON.parse(item.body);
const putObjectResult = await putObject(body);
console.log('putObjectResult =>', putObjectResult);
}));
} catch (error) {
console.log(error);
}
};
📋 Note: To see how you can add record in DynamoDB, you can check this blog, where we dive deep how EventBridge works, and in the article I add some data to DynamoDB
const AWS = require('aws-sdk');
const sqs = new AWS.SQS();
module.exports.putMessage = (body) => {
const QueueUrl = `https://sqs.${process.env.region}.amazonaws.com/${process.env.accountId}/my-queue`;
const params = {
MessageBody: JSON.stringify(body),
QueueUrl,
};
return sqs.sendMessage(params).promise();
}
We can see how we are sending the message to the queue, through using AWS-SDK, the sendMessage
function takes the MessageBody attribute which holds the data that we want to send, and with our queue url
To store our JSON data to S3, I am also using AWS-SDK, with putObject
function which takes certain parameters as written below
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const { v4: uuidv4 } = require('uuid');
module.exports.putObject = (body) => {
const Key = `${uuidv4()}.json`;
const ContentType = "application/json";
return s3.putObject({
Bucket: process.env.bucket,
Key,
Body: JSON.stringify(body),
ContentType
}).promise();
}
Now let's test our system, first I will send the body written below from postman, using our API Gateway endpoint that will be created once we deploy our system
{
"title": "this is test queue",
"data": "some description goes here"
}
After calling the API you can check inside CloudWatch the consumeMessage Lambda function has been triggered, which will create the json file in S3 bucket
- Inside our S3 Bucket:
Once you download the file, you will see the data that we saved from our lambda function which holds our body payload
Conclusion
As we can see SQS is very essential feature, and with AWS it's easier in connecting your services seamlessly with each other.
After using queues your architecture will become easier to be managed and ready to be scaled
This article is part of "Messaging with Serverless" series that I've been writing for a while, if you are interested to read other Serverless offerings from AWS, feel free to visit the links below
Top comments (0)