Problem Statement
Scenario
A user wants to run a data-processing (e.g. ML) workload on an EC2 instance. The data to be processed in this workload is a CSV file stored in an S3 bucket.
Currently the user has to manually spin up an EC2 instance (with a user-data
script that installs the tools and starts the data processing), after uploading the data (a CSV file) to their S3 bucket.
Wouldn't it be great if this could be automated? So that creation of a file in the S3 bucket (from any source) would automatically spin up an appropriate EC2 instance to process the file? And creation of an output file in the same bucket would likewise automatically terminate all relevant (e.g. tagged) EC2 instances?
Requirements
Use Case 1
When a CSV is uploaded to the inputs
"directory" in a given S3 bucket, an EC2 "data-processing" instance (i.e. tagged with {"Purpose": "data-processing"}
) should be launched, but only if a "data-processing" tagged instance isn't already running. One instance at a time is sufficient for processing workloads.
Use Case 2
When a new file is created in the outputs
"directory" in the same S3 bucket, it means the workload has finished processing, and all "data-processing"-tagged instances should now be identified and terminated.
Solution Using S3 Triggers and Lambda
As explained at Configuring Amazon S3 Event Notifications
, S3 can send notifications upon:
- object creation
- object removal
- object restore
- replication events
- RRS failures
S3 can publish these events to 3 possible destinations:
- SNS
- SQS
- Lambda
We want S3 to push object creation
notifications directly to a Lambda function, which will have the necessary logic to process these events and determine further actions.
Here's how to do this using Python3 and boto3
in a simple Lambda function.
AWS Console: Setup the S3 bucket
Go to the AWS S3 console. Go to the bucket you want to use (or create a new one with default settings) for triggering your Lambda function.
Create three "folders" in this bucket ('folders/directories' in S3 are actually 'key-prefixes' or paths) like so:
What are these dirs for?
-
config
: to hold various settings for the Lambda function -
inputs
: a CSV file uploaded here will trigger the ML workload on an EC2 instance -
outputs
: any file here indicates completion of the ML workload and should cause any running data-processing (i.e. specially-tagged) EC2 instances to terminate
The config
folder
We need to supply the EC2 launch information to the Lambda function. By "launch information" we mean, the instance-type, the AMI-Id, the security groups, tags, ssh keypair, user-data, etc.
One way to do this is to hard-code all of this into the Lambda code itself, but this is never a good idea. It's always better to externalize the config, and one way to do this by storing it in the S3 bucket itself like so:
Save this as ec2-launch-config.json
in the config
folder:
{
"ami": "ami-0123b531fc646552f",
"region": "ap-south-1",
"instance_type": "t2.nano",
"ssh_key_name": "ssh-connect",
"security_group_ids": [
"sg-08b6b31110601e924"
],
"filter_tag_key": "Purpose",
"filter_tag_value": "data-processing",
"set_new_instance_tags": [
{
"Key": "Purpose",
"Value": "data-processing"
},
{
"Key": "Name",
"Value": "ML-runner"
}
]
}
The params are quite self-explanatory, and you can tweak them as you need to.
"filter_tag_key": "Purpose"
: "filter_tag_value": "data-processing"
--> this is the tag that will be used to identify (i.e. filter) already-running data-processing
EC2 instances.
You'll notice that user-data
isn't part of the above JSON config. It's read in from a separate file called user-data
, just so that it's easier to write and maintain:
#!/bin/bash
apt-get update -y
apt-get install -y apache2
systemctl start apache2
systemctl enable apache2.service
echo "Congrats, your setup is a success!" > /var/www/html/index.html
The above user-data
script will install the Apache2 webserver, and writes a congratulatory message that will be served on the instance's public IP address.
Lambda function logic
The Lambda function needs to:
- receive an incoming S3 object creation notification JSON object
- parse out the S3 bucket name and S3 object key name from the JSON
- pull in a JSON-based EC2 launch configuration previously stored in S3
- if the S3 object key (i.e. "directory") matches
^inputs/
, check if we need to launch a new EC2 instance and if so, launch one - if the S3 object key (i.e. "directory") matches
^outputs/
, terminate any running tagged instances
AWS Console: New Lambda function setup
Go to the AWS Lambda console and click the Create function
button.
Select Author from scratch
, enter a function name, and select Python 3.8
as the Runtime.
Select Create a new role with basic Lambda permissions
in the Choose or create an execution role
dropdown.
Click the Create function
button.
This will take you to the Configuration tab.
Setup an S3 trigger from the bucket in question, like so:
- Select your bucket
- Select the events you're interested in (
All object create events
in this case) - Leave
Prefix
andSuffix
empty, as we will take care of prefixes (inputs
andoutputs
bucket paths) in our function - Select
Enable trigger
As it says near the bottom of the screenshot:
Lambda will add the necessary permissions for Amazon S3 to invoke your Lambda function from this trigger.
So we don't need to go into S3 to configure notifications separately.
- Click the
Add
button to save this trigger configuration.
Back on the main Lambda designer tab, you'll notice that S3 is now linked up to the our newly-created Lambda function:
Click the name of the Lambda function to open up the Function code
editor underneath the Designer pane.
Lambda function code
Copy the following Lambda function code into the Function code
editor, then click the Save
button on the top-right.
import boto3
import json
import base64
from urllib.parse import unquote_plus
BUCKET_NAME = "YOUR_S3_BUCKET_NAME"
CONFIG_FILE_KEY = "config/ec2-launch-config.json"
USER_DATA_FILE_KEY = "config/user-data"
BUCKET_INPUT_DIR = "inputs"
BUCKET_OUTPUT_DIR = "outputs"
def launch_instance(EC2, config, user_data):
tag_specs = [{}]
tag_specs[0]['ResourceType'] = 'instance'
tag_specs[0]['Tags'] = config['set_new_instance_tags']
ec2_response = EC2.run_instances(
ImageId=config['ami'], # ami-0123b531fc646552f
InstanceType=config['instance_type'], # t2.nano
KeyName=config['ssh_key_name'], # ambar-default
MinCount=1,
MaxCount=1,
SecurityGroupIds=config['security_group_ids'], # sg-08b6b31110601e924
TagSpecifications=tag_specs,
# UserData=base64.b64encode(user_data).decode("ascii")
UserData=user_data
)
new_instance_resp = ec2_response['Instances'][0]
instance_id = new_instance_resp['InstanceId']
# print(f"[DEBUG] Full ec2 instance response data for '{instance_id}': {new_instance_resp}")
return (instance_id, new_instance_resp)
def lambda_handler(raw_event, context):
print(f"Received raw event: {raw_event}")
# event = raw_event['Records']
for record in raw_event['Records']:
bucket = record['s3']['bucket']['name']
print(f"Triggering S3 Bucket: {bucket}")
key = unquote_plus(record['s3']['object']['key'])
print(f"Triggering key in S3: {key}")
# get config from config file stored in S3
S3 = boto3.client('s3')
result = S3.get_object(Bucket=BUCKET_NAME, Key=CONFIG_FILE_KEY)
ec2_config = json.loads(result["Body"].read().decode())
print(f"Config from S3: {ec2_config}")
ec2_filters = [
{
'Name': f"tag:{ec2_config['filter_tag_key']}",
'Values':[ ec2_config['filter_tag_value'] ]
}
]
EC2 = boto3.client('ec2', region_name=ec2_config['region'])
# launch new EC2 instance if necessary
if bucket == BUCKET_NAME and key.startswith(f"{BUCKET_INPUT_DIR}/"):
print("[INFO] Describing EC2 instances with target tags...")
resp = EC2.describe_instances(Filters=ec2_filters)
# print(f"[DEBUG] describe_instances response: {resp}")
if resp["Reservations"] is not []: # at least one instance with target tags was found
for reservation in resp["Reservations"] :
for instance in reservation["Instances"]:
print(f"[INFO] Found '{instance['State']['Name']}' instance '{ instance['InstanceId'] }'"
f" having target tags: {instance['Tags']} ")
if instance['State']['Code'] == 16: # instance has target tags AND also is in running state
print(f"[INFO] instance '{ instance['InstanceId'] }' is already running: so not launching any more instances")
return {
"newInstanceLaunched": False,
"old-instanceId": instance['InstanceId'],
"new-instanceId": ""
}
print("[INFO] Could not find even a single running instance matching the desired tag, launching a new one")
# retrieve EC2 user-data for launch
result = S3.get_object(Bucket=BUCKET_NAME, Key=USER_DATA_FILE_KEY)
user_data = result["Body"].read()
print(f"UserData from S3: {user_data}")
result = launch_instance(EC2, ec2_config, user_data)
print(f"[INFO] LAUNCHED EC2 instance-id '{result[0]}'")
# print(f"[DEBUG] EC2 launch_resp:\n {result[1]}")
return {
"newInstanceLaunched": True,
"old-instanceId": "",
"new-instanceId": result[0]
}
# terminate all tagged EC2 instances
if bucket == BUCKET_NAME and key.startswith(f"{BUCKET_OUTPUT_DIR}/"):
print("[INFO] Describing EC2 instances with target tags...")
resp = EC2.describe_instances(Filters=ec2_filters)
# print(f"[DEBUG] describe_instances response: {resp}")
terminated_instance_ids = []
if resp["Reservations"] is not []: # at least one instance with target tags was found
for reservation in resp["Reservations"] :
for instance in reservation["Instances"]:
print(f"[INFO] Found '{instance['State']['Name']}' instance '{ instance['InstanceId'] }'"
f" having target tags: {instance['Tags']} ")
if instance['State']['Code'] == 16: # instance has target tags AND also is in running state
print(f"[INFO] instance '{ instance['InstanceId'] }' is running: terminating it")
terminated_instance_ids.append(instance['InstanceId'])
boto3.resource('ec2').Instance(instance['InstanceId']).terminate()
return {
"terminated-instance-ids:": terminated_instance_ids
}
Lambda Execution IAM Role
Our lambda function won't work just yet. It needs S3 access to read in the config and it needs permissions to describe, launch, and terminate EC2 instances.
Scroll down on the Lambda configuration page to the Execution role
and click the View role
link:
Click on the AWSLambdaBasic...
link to edit the lambda function's policy:
Click {} JSON
, Edit Policy
and then JSON
.
Now add the following JSON to the existing Statement
Section
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": "*"
},
{
"Action": [
"ec2:RunInstances",
"ec2:CreateTags",
"ec2:ReportInstanceStatus",
"ec2:DescribeInstanceStatus",
"ec2:DescribeInstances",
"ec2:TerminateInstances"
],
"Effect": "Allow",
"Resource": "*"
}
Click Review Policy
and Save changes
.
Go!
Our setup's finally complete. We're ready to test it out!
Test Case 1: new instance launch
Upload a file from the S3 console into the inputs
folder. If you used the exact same config as above, this would have triggered the Lambda function, which in turn would have launched a new t2.nano
instance with the Purpose: data-processing
tag on it.
If you put the instance's public IP address into a browser (after giving it a minute or so to boot and warm up), you should also see the test message served to you: which indicates that the user-data
did indeed execute successfully upon boot.
Test Case 2: another instance should not launch
As long as there is at least one Purpose: data-processing
tagged instance running, another one should not spawn. Let's upload another file to the inputs
folder. And indeed the actual behavior matches the expectation.
If we kill the already-running instance, and then upload another file to the inputs
folder, it will launch a new instance.
Test Case 3: instance termination condition
Upload a file into the outputs
folder. This will trigger the Lambda function into terminating any already-running instances that are tagged with Purpose: data-processing
.
Bonus: S3 event object to test your lambda function
S3-object-creation-notification (to test Lambda function)
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "ap-south-1",
"eventTime": "2019-09-03T19:37:27.192Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
},
"requestParameters": {
"sourceIPAddress": "205.255.255.255"
},
"responseElements": {
"x-amz-request-id": "D82B88E5F771F645",
"x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo="
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
"bucket": {
"name": "YOUR_S3_BUCKET_NAME",
"ownerIdentity": {
"principalId": "A3I5XTEXAMAI3E"
},
"arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
},
"object": {
"key": "b21b84d653bb07b05b1e6b33684dc11b",
"size": 1305107,
"eTag": "b21b84d653bb07b05b1e6b33684dc11b",
"sequencer": "0C0F6F405D6ED209E1"
}
}
}
]
}
That's all, folks!
Top comments (4)
Thanks for the good article. I'm curious what are the advantages to using an EC2 instance instead of running the data processing in a Lambda function i.e. completely serverless?
Some of the main factors that can help you decide between the two:
Lambda has a hard limit of 15 min execution time. If your job will ever need to run longer than that (which is often the case in "big data" scenarios), it automatically eliminates Lambda from your choices.
If your job needs more than 3 gigs of memory, or heavy compute power, e.g. cluster-compute, then again, Lambda won't cut it for you.
If your jobs need to run very frequently (say hundred of times per day), or need persistent up-time, Lambda might become very expensive, even despite the generous free tier.
Lambda is much better suited for sporadic, event-driven compute tasks that don't need state and typically finish executing in a few seconds or minutes. Especially ad-hoc jobs whose invocation is sporadic (e.g. one-off service requests). Why keep an EC2 instance running 24/7 when you only need it to handle 1 minute executions that occur unpredictably?
This is why micro-services are often suited to Lambda functions - smallish workloads that run quickly (with small cold-start times of typically < 5 seconds) and finish quickly.
Otherwise running your own fleet of auto-scaling containers (as opposed to Lambda's containers with their own limitations) with something like AWS Fargate or even Kubernetes (e.g. EKS or ECS) can also be a good option.
I followed all the code. Did not understand about user-data file. where do I create that?
I see that this question was asked over a year ago but I wanted to respond anyway lol. The user-data file is a shell script that bootstraps the ec-2 instance. It does have a max. length but you can do almost anything, if not anything, that you could/would do by SSH-ing into the instance to install applications, create directories, create and run files, run updates, pull and run docker containers, etc etc etc.
In this example, his user-data shell script is an object in the bucket that he retrieves and uses in the following lines:
# retrieve EC2 user-data for launch
result = S3.get_object(Bucket=BUCKET_NAME, Key=USER_DATA_FILE_KEY)
user_data = result["Body"].read()
print(f"UserData from S3: {user_data}")
USER_DATA_FILE_KEY is the name of his user-data shell script object in the s3 bucket and in that first line he retrieves that file from the bucket. He then parses out the actual shell-script from the S3 response and passes it in to the launch_instance function which, ultimately, launches an instance and runs the shell script automatically. You can see the log output of this user-data shell script (usually anyway) in var/log/cloud-init-output.log. Note that if the runtime of the shell script is kind of long, you may find that you are able to SSH into the instance before the script has actually completed. You can "refresh" (closing and re-opening) that log file to see when the shell script is complete.