DEV Community

Karthik Subramanian for AWS Community Builders

Posted on • Edited on • Originally published at Medium

1

Storing AWS SQS messages to DynamoDB with AWS Lambda

In my last post I covered creating a lambda & API Gateway to accept a POST request. Now lets save that request to a table in DynamoDB and send it to a queue in SQS for processing.

Creating the Orders Table

We create a new table by updating the template.yml file. Before defining the new resource, lets define a few common properties that we can leverage for all resources. Add a Parameters section to the template under Description, with the following values -

Template parameters section

These parameters are used when setting up environment variables for various resources.

Under Resources, we add a resource for the DynamoDB table -

Template resources section

Rather than defining environment variables for each function, we are going to define them under Globals so that they are available to all the lambda functions we create.

Template globals section

We also need to update the properties for the Create function so that it has the required permissions to insert records into the dynamodb table. We add an aws defined policy — AmazonDynamoDBFullAccess

Template policies section

Creating the Orders Queue

We create an SQS queue for the orders by adding another resource to the template.yaml file -

Template resource section

We need to tell the lambda function about the queue, so add another environment variable to the Globals: Function: Environment: Variables:

Template environment variables

We also need to give the Create lambda access to add orders to the queue. Add another policy to the Policies list for the function -

Template policy section

The template.yaml should look like this -

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
python3.9
Sample SAM Template for serverless-arch-example
Parameters:
Environment:
Type: String
Description: AWS Environment where code is being executed (AWS_SAM_LOCAL or AWS)
Default: 'AWS'
DynamoDBUri:
Type: String
Description: AWS local DynamoDB instance URI (will only be used if AWSENVNAME is AWS_SAM_LOCAL)
Default: 'http://docker.for.mac.host.internal:8000'
ProjectName:
Type: String
Description: 'Name of the project'
Default: 'serverless-arch-example'
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 120
MemorySize: 2048
Environment:
Variables:
ENVIRONMENT: !Ref Environment
DYNAMODB_DEV_URI: !Ref DynamoDBUri
ORDERS_TABLE_NAME: !Ref OrdersTable
SQS_QUEUE: !Ref OrdersQueue
Resources:
OrdersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Join ['-', [!Sub '${ProjectName}', 'orders']]
AttributeDefinitions:
- AttributeName: request_id
AttributeType: S
KeySchema:
- AttributeName: request_id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 3
WriteCapacityUnits: 3
OrdersQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Join ['-', [!Sub '${ProjectName}', 'orders']]
VisibilityTimeout: 120 # must be same as lambda timeout
CreateFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
PackageType: Image
ImageConfig:
Command:
- create.lambda_handler
Architectures:
- x86_64
Events:
CreateAPI:
Type: Api # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api
Properties:
Path: /example/create
Method: post
Policies:
- AmazonDynamoDBFullAccess
- SQSSendMessagePolicy:
QueueName: !GetAtt OrdersQueue.QueueName
Metadata:
Dockerfile: Dockerfile
DockerContext: ./src
DockerTag: python3.9-v1
Outputs:
# ServerlessRestApi is an implicit API created out of Events key under Serverless::Function
# Find out more about other implicit resources you can reference within SAM
# https://github.com/awslabs/serverless-application-model/blob/master/docs/internals/generated_resources.rst#api
CreateAPI:
Description: "API Gateway endpoint URL for Prod stage for Create function"
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/example/create"
CreateFunction:
Description: "Create Lambda Function ARN"
Value: !GetAtt CreateFunction.Arn
CreateFunctionIamRole:
Description: "Implicit IAM Role created for Create function"
Value: !GetAtt CreateFunctionRole.Arn
OrdersTable:
Description: "DynamoDB Table for orders"
Value: !GetAtt OrdersTable.Arn
OrdersQueue:
Description: "SQS Queue for orders"
Value: !GetAtt OrdersQueue.Arn
view raw template.yaml hosted with ❤ by GitHub

DB Helper

Under the src directory, create a new directory called “db” and create the following 4 files under it -

DB folder structure

To test database changes locally, we need to spin a local instance of DynamoDB and manually create a table in it.

We can use docker to spin up an instance of DynamoDB. Update docker-compose.yml with the following -

Docker compose file

In the terminal, go to the db folder and spin up a dynamodb instance -

cd src/db
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

The output should look like -

Creating dynamodb-local ... done
Enter fullscreen mode Exit fullscreen mode

To create the Orders table locally, update the init_db.py file with the following code -

import boto3
def create_orders_table(dynamodb):
table = dynamodb.create_table(
TableName='serverless-arch-example-orders',
KeySchema=[
{
'AttributeName': 'request_id',
'KeyType': 'HASH' # Partition key
}
],
AttributeDefinitions=[
{
'AttributeName': 'request_id',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 3,
'WriteCapacityUnits': 3
}
)
return table
if __name__ == '__main__':
dynamodb = boto3.resource('dynamodb', endpoint_url="http://localhost:8000")
table = create_orders_table(dynamodb)
print("Table status:", table.table_status)
view raw init_db.py hosted with ❤ by GitHub



Add boto3 to the requirements.txt file and run pip install again -

requirements file

cd ..
pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Run init_db.py to create the table on the local dynamodb instance -

python db/init_db.py
Enter fullscreen mode Exit fullscreen mode

If the table is created, you should see the following output -

Table status: ACTIVE
Enter fullscreen mode Exit fullscreen mode

**NOTE: You can use NoSQL Workbench to view the tables in your local dynamodb instance.

Update db_helper.py with the following code -

import boto3
from boto3.dynamodb.conditions import Key
import os
import time
class DBHelper:
def __init__(self) -> None:
environment = os.environ['ENVIRONMENT']
orders_table_name = os.environ['ORDERS_TABLE_NAME']
if environment == "AWS_SAM_LOCAL":
dynamodb_dev_uri = os.environ['DYNAMODB_DEV_URI']
self.dynamodb = boto3.resource('dynamodb', endpoint_url=dynamodb_dev_uri)
else:
self.dynamodb = boto3.resource('dynamodb')
self.orders_table = self.dynamodb.Table(orders_table_name)
def update_order_status(self, request, status, location=None):
return self.orders_table.put_item(
Item={
'request_id': request['request_id'],
'url': request['url'],
'status': status,
'file_location': location,
'epoch_time': int(time.time()),
}
)
def get_order_status(self, request_id):
response = self.get_records_by_key(self.orders_table, 'request_id', request_id)
if 'Items' in response:
return response['Items']
return None
def get_records_by_key(self, table, key, value):
try:
response = table.query(
KeyConditionExpression=Key(key).eq(value)
)
return response
except Exception as error:
print(error)
raise error
view raw db_helper.py hosted with ❤ by GitHub



Putting it all together

Update create.py to send the request to the sqs queue and update the order status in the table -

import json
from db import db_helper
import boto3
import os
def lambda_handler(event, context):
if event['httpMethod'] != "POST":
return generate_response(404, "Invalid request method")
request = json.loads(event['body'])
if not validate_payload(request):
return generate_response(404, "Invalid payload")
# use lambda request id for better tracking purposes
request['request_id'] = context.aws_request_id
request_json = json.dumps(request)
print(f"Processing request with Request Id: {request['request_id']}")
try:
dbHelper = db_helper.DBHelper()
dbHelper.update_order_status(request=request, status='Created')
sqs = boto3.client('sqs')
sqs_queue = os.environ['SQS_QUEUE']
print(f"Sending request to the Queue")
request_json = json.dumps(request)
response = sqs.send_message(
QueueUrl=sqs_queue,
MessageBody=request_json
)
if 'MD5OfMessageBody' in response:
dbHelper.update_order_status(request=request, status='Queued')
return generate_response(200, request_json)
else:
print(f'Error sending request to queue: {response}')
return generate_response(500, f"Error sending request to queue: {response}")
except Exception as e:
print(e)
return generate_response(500, f"Error processing request: {e}")
def validate_payload(json_map):
keys = json_map.keys()
payload_valid = True
# Check if required keys are in json_map
keys_required = {'url'}
for key in keys_required:
if key not in keys:
payload_valid = False
break
if str(json_map['url']).strip() == '':
return False
return payload_valid
def generate_response(response_code, message):
return {
"statusCode": response_code,
"body": message,
"headers": {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': '*',
"Access-Control-Allow-Methods": "POST"
}
}
view raw create.py hosted with ❤ by GitHub

For this to work in a container, we need to add another line to the Dockerfile -

Dockerfile

Testing locally

Build the app locally -

sam build
Enter fullscreen mode Exit fullscreen mode

Since we have specified environment variables in our template, we need to provide those env vars when running the code locally.

Create a new file under the tests folder called “env.json” with the following values -

env.json file

**NOTE: SQS_QUEUE is empty since we can’t host an sqs queue locally. We will update that value with the deployed queue url once the changes are deployed.

Run the app locally -

sam local start-api --env-vars ./tests/env.json
Enter fullscreen mode Exit fullscreen mode

Test the create endpoint from postman like before -

Postman output

Since we did not setup the queue locally, an error is expected. We should however, be able to see a new record created in the dynamodb table for the request -

DynamoDB table

Deploy the app

Run the following command to deploy the app to aws

sam deploy
Enter fullscreen mode Exit fullscreen mode

The output should look like this -

SAM Deploy output

Test the changes by making the POST call to create a request from postman like before -

Postman output

We can verify the request was saved to the table by going to aws console > DynamoDB > Tables > Explore items > serverless-arch-example-orders

AWS DynamoDB Table

The request status is also set as “Queued” indicating that the request was sent to SQS.

We can validate that as well by going to aws console > Amazon SQS and confirming that the value under Messages available is now 1

AWS SQS Queue

You can view the message that was sent by clicking on the queue name > Send and receive messages > Poll for messages

SQS Queue

Click on the message ID to view the message

SQS Message

Source Code

Here is the source code for the project created here.

Next: Part 4: Web Scraping with Selenium & AWS Lambda

Billboard image

Deploy and scale your apps on AWS and GCP with a world class developer experience

Coherence makes it easy to set up and maintain cloud infrastructure. Harness the extensibility, compliance and cost efficiency of the cloud.

Learn more

Top comments (0)

Retry later
👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay