In my last post I covered creating a lambda & API Gateway to accept a POST request. Now lets save that request to a table in DynamoDB and send it to a queue in SQS for processing.
Creating the Orders Table
We create a new table by updating the template.yml file. Before defining the new resource, lets define a few common properties that we can leverage for all resources. Add a Parameters section to the template under Description, with the following values -
These parameters are used when setting up environment variables for various resources.
Under Resources, we add a resource for the DynamoDB table -
Rather than defining environment variables for each function, we are going to define them under Globals so that they are available to all the lambda functions we create.
We also need to update the properties for the Create function so that it has the required permissions to insert records into the dynamodb table. We add an aws defined policy — AmazonDynamoDBFullAccess
Creating the Orders Queue
We create an SQS queue for the orders by adding another resource to the template.yaml file -
We need to tell the lambda function about the queue, so add another environment variable to the Globals: Function: Environment: Variables:
We also need to give the Create lambda access to add orders to the queue. Add another policy to the Policies list for the function -
The template.yaml should look like this -
AWSTemplateFormatVersion: '2010-09-09' | |
Transform: AWS::Serverless-2016-10-31 | |
Description: > | |
python3.9 | |
Sample SAM Template for serverless-arch-example | |
Parameters: | |
Environment: | |
Type: String | |
Description: AWS Environment where code is being executed (AWS_SAM_LOCAL or AWS) | |
Default: 'AWS' | |
DynamoDBUri: | |
Type: String | |
Description: AWS local DynamoDB instance URI (will only be used if AWSENVNAME is AWS_SAM_LOCAL) | |
Default: 'http://docker.for.mac.host.internal:8000' | |
ProjectName: | |
Type: String | |
Description: 'Name of the project' | |
Default: 'serverless-arch-example' | |
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst | |
Globals: | |
Function: | |
Timeout: 120 | |
MemorySize: 2048 | |
Environment: | |
Variables: | |
ENVIRONMENT: !Ref Environment | |
DYNAMODB_DEV_URI: !Ref DynamoDBUri | |
ORDERS_TABLE_NAME: !Ref OrdersTable | |
SQS_QUEUE: !Ref OrdersQueue | |
Resources: | |
OrdersTable: | |
Type: AWS::DynamoDB::Table | |
Properties: | |
TableName: !Join ['-', [!Sub '${ProjectName}', 'orders']] | |
AttributeDefinitions: | |
- AttributeName: request_id | |
AttributeType: S | |
KeySchema: | |
- AttributeName: request_id | |
KeyType: HASH | |
ProvisionedThroughput: | |
ReadCapacityUnits: 3 | |
WriteCapacityUnits: 3 | |
OrdersQueue: | |
Type: AWS::SQS::Queue | |
Properties: | |
QueueName: !Join ['-', [!Sub '${ProjectName}', 'orders']] | |
VisibilityTimeout: 120 # must be same as lambda timeout | |
CreateFunction: | |
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction | |
Properties: | |
PackageType: Image | |
ImageConfig: | |
Command: | |
- create.lambda_handler | |
Architectures: | |
- x86_64 | |
Events: | |
CreateAPI: | |
Type: Api # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api | |
Properties: | |
Path: /example/create | |
Method: post | |
Policies: | |
- AmazonDynamoDBFullAccess | |
- SQSSendMessagePolicy: | |
QueueName: !GetAtt OrdersQueue.QueueName | |
Metadata: | |
Dockerfile: Dockerfile | |
DockerContext: ./src | |
DockerTag: python3.9-v1 | |
Outputs: | |
# ServerlessRestApi is an implicit API created out of Events key under Serverless::Function | |
# Find out more about other implicit resources you can reference within SAM | |
# https://github.com/awslabs/serverless-application-model/blob/master/docs/internals/generated_resources.rst#api | |
CreateAPI: | |
Description: "API Gateway endpoint URL for Prod stage for Create function" | |
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/example/create" | |
CreateFunction: | |
Description: "Create Lambda Function ARN" | |
Value: !GetAtt CreateFunction.Arn | |
CreateFunctionIamRole: | |
Description: "Implicit IAM Role created for Create function" | |
Value: !GetAtt CreateFunctionRole.Arn | |
OrdersTable: | |
Description: "DynamoDB Table for orders" | |
Value: !GetAtt OrdersTable.Arn | |
OrdersQueue: | |
Description: "SQS Queue for orders" | |
Value: !GetAtt OrdersQueue.Arn |
DB Helper
Under the src directory, create a new directory called “db” and create the following 4 files under it -
To test database changes locally, we need to spin a local instance of DynamoDB and manually create a table in it.
We can use docker to spin up an instance of DynamoDB. Update docker-compose.yml with the following -
In the terminal, go to the db folder and spin up a dynamodb instance -
cd src/db
docker-compose up -d
The output should look like -
Creating dynamodb-local ... done
To create the Orders table locally, update the init_db.py file with the following code -
import boto3 | |
def create_orders_table(dynamodb): | |
table = dynamodb.create_table( | |
TableName='serverless-arch-example-orders', | |
KeySchema=[ | |
{ | |
'AttributeName': 'request_id', | |
'KeyType': 'HASH' # Partition key | |
} | |
], | |
AttributeDefinitions=[ | |
{ | |
'AttributeName': 'request_id', | |
'AttributeType': 'S' | |
} | |
], | |
ProvisionedThroughput={ | |
'ReadCapacityUnits': 3, | |
'WriteCapacityUnits': 3 | |
} | |
) | |
return table | |
if __name__ == '__main__': | |
dynamodb = boto3.resource('dynamodb', endpoint_url="http://localhost:8000") | |
table = create_orders_table(dynamodb) | |
print("Table status:", table.table_status) |
Add boto3 to the requirements.txt file and run pip install again -
cd ..
pip install -r requirements.txt
Run init_db.py to create the table on the local dynamodb instance -
python db/init_db.py
If the table is created, you should see the following output -
Table status: ACTIVE
**NOTE: You can use NoSQL Workbench to view the tables in your local dynamodb instance.
Update db_helper.py with the following code -
import boto3 | |
from boto3.dynamodb.conditions import Key | |
import os | |
import time | |
class DBHelper: | |
def __init__(self) -> None: | |
environment = os.environ['ENVIRONMENT'] | |
orders_table_name = os.environ['ORDERS_TABLE_NAME'] | |
if environment == "AWS_SAM_LOCAL": | |
dynamodb_dev_uri = os.environ['DYNAMODB_DEV_URI'] | |
self.dynamodb = boto3.resource('dynamodb', endpoint_url=dynamodb_dev_uri) | |
else: | |
self.dynamodb = boto3.resource('dynamodb') | |
self.orders_table = self.dynamodb.Table(orders_table_name) | |
def update_order_status(self, request, status, location=None): | |
return self.orders_table.put_item( | |
Item={ | |
'request_id': request['request_id'], | |
'url': request['url'], | |
'status': status, | |
'file_location': location, | |
'epoch_time': int(time.time()), | |
} | |
) | |
def get_order_status(self, request_id): | |
response = self.get_records_by_key(self.orders_table, 'request_id', request_id) | |
if 'Items' in response: | |
return response['Items'] | |
return None | |
def get_records_by_key(self, table, key, value): | |
try: | |
response = table.query( | |
KeyConditionExpression=Key(key).eq(value) | |
) | |
return response | |
except Exception as error: | |
print(error) | |
raise error |
Putting it all together
Update create.py to send the request to the sqs queue and update the order status in the table -
import json | |
from db import db_helper | |
import boto3 | |
import os | |
def lambda_handler(event, context): | |
if event['httpMethod'] != "POST": | |
return generate_response(404, "Invalid request method") | |
request = json.loads(event['body']) | |
if not validate_payload(request): | |
return generate_response(404, "Invalid payload") | |
# use lambda request id for better tracking purposes | |
request['request_id'] = context.aws_request_id | |
request_json = json.dumps(request) | |
print(f"Processing request with Request Id: {request['request_id']}") | |
try: | |
dbHelper = db_helper.DBHelper() | |
dbHelper.update_order_status(request=request, status='Created') | |
sqs = boto3.client('sqs') | |
sqs_queue = os.environ['SQS_QUEUE'] | |
print(f"Sending request to the Queue") | |
request_json = json.dumps(request) | |
response = sqs.send_message( | |
QueueUrl=sqs_queue, | |
MessageBody=request_json | |
) | |
if 'MD5OfMessageBody' in response: | |
dbHelper.update_order_status(request=request, status='Queued') | |
return generate_response(200, request_json) | |
else: | |
print(f'Error sending request to queue: {response}') | |
return generate_response(500, f"Error sending request to queue: {response}") | |
except Exception as e: | |
print(e) | |
return generate_response(500, f"Error processing request: {e}") | |
def validate_payload(json_map): | |
keys = json_map.keys() | |
payload_valid = True | |
# Check if required keys are in json_map | |
keys_required = {'url'} | |
for key in keys_required: | |
if key not in keys: | |
payload_valid = False | |
break | |
if str(json_map['url']).strip() == '': | |
return False | |
return payload_valid | |
def generate_response(response_code, message): | |
return { | |
"statusCode": response_code, | |
"body": message, | |
"headers": { | |
'Content-Type': 'application/json', | |
'Access-Control-Allow-Origin': '*', | |
'Access-Control-Allow-Headers': '*', | |
"Access-Control-Allow-Methods": "POST" | |
} | |
} |
For this to work in a container, we need to add another line to the Dockerfile -
Testing locally
Build the app locally -
sam build
Since we have specified environment variables in our template, we need to provide those env vars when running the code locally.
Create a new file under the tests folder called “env.json” with the following values -
**NOTE: SQS_QUEUE is empty since we can’t host an sqs queue locally. We will update that value with the deployed queue url once the changes are deployed.
Run the app locally -
sam local start-api --env-vars ./tests/env.json
Test the create endpoint from postman like before -
Since we did not setup the queue locally, an error is expected. We should however, be able to see a new record created in the dynamodb table for the request -
Deploy the app
Run the following command to deploy the app to aws
sam deploy
The output should look like this -
Test the changes by making the POST call to create a request from postman like before -
We can verify the request was saved to the table by going to aws console > DynamoDB > Tables > Explore items > serverless-arch-example-orders
The request status is also set as “Queued” indicating that the request was sent to SQS.
We can validate that as well by going to aws console > Amazon SQS and confirming that the value under Messages available is now 1
You can view the message that was sent by clicking on the queue name > Send and receive messages > Poll for messages
Click on the message ID to view the message
Source Code
Here is the source code for the project created here.
Next: Part 4: Web Scraping with Selenium & AWS Lambda
Top comments (0)