DEV Community

Olalekan Fuad Elesin
Olalekan Fuad Elesin

Posted on • Originally published at Medium on

1 1

Automating Machine Learning Workflows Pt2: SageMaker Processing, SageMaker and AWS Step Functions…

Automating Machine Learning Workflows Pt2: Amazon SageMaker Processing and AWS Step Functions Data Science SDK

Automating Machine Learning Workflows with Amazon SageMaker Processing, Amazon SageMaker and AWS Step Functions Data Science SDK

In the previous blogpost, I demonstrated how to automate machine learning workflows with AWS Step Functions from data preparation with PySpark on AWS Glue to Model (Endpoint) Deployment with Amazon SageMaker. In this tutorial, I will repeat almost the same approach, however with a little adjustment in the data preparation phase.

Not all data preparation in machine learning require distributed nature of PySpark. Hence, what happens if you don’t need PySpark? Would you need to run Glue Jobs anyway? You could, but would probably be wasting Glue compute resources. One more thing to consider is the use of external libraries with AWS Glue; you have to package external libraries as zip files and upload to some external location. This process in itself might not be user friendly especially when you want to focus only on building your machine learning pipeline.

Enter Amazon SageMaker Processing, fully managed data processing and model evaluation solution. You can read more about Amazon SageMakerprocessing in this blogpost from AWS. As of the time of this post, Amazon SageMaker Processing is not yet part of AWS Step Functions Service Integrations, however, AWS Step Functions offers the flexibility to orchestrate AWS services with Lambda Functions.

Prepare the workflow

Knowing that Amazon SageMaker Processing is not natively integrated with AWS Step Functions, we will start by creating 2 Amazon Lambda functions that will orchestrate: Amazon SageMaker Processing Job creation and Amazon SageMaker Processing job status checker.

sm_client = boto3.client('sagemaker')
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
BASE_PROCESSING_IMAGE = ''
INPUT_DATA_DESTINATION = '/opt/ml/processing/input_data'
PROCESSED_DATA_PATH = '/opt/ml/processing/processed_data'
DEFAULT_VOLUME_SIZE = 100
DEFAULT_INSTANCE_TYPE = 'ml.m5.xlarge'
DEFAULT_INSTANCE_COUNT = 1
def lambda_handler(event, context):
"""
Creates a SageMaker Processing Job
:param event:
:param context:
:return:
"""
configuration = event['Configuration']
print(configuration)
job_name = configuration['JobName']
role = configuration['IAMRole']
instance_type = configuration.get('InstanceType', DEFAULT_INSTANCE_TYPE)
instance_count = configuration.get('InstanceCount', DEFAULT_INSTANCE_COUNT)
volume_size_in_gb = configuration.get('LocalStorageSizeGB', DEFAULT_VOLUME_SIZE)
ecr_container_uri = configuration.get('EcrContainerUri', BASE_PROCESSING_IMAGE)
script_processor = ScriptProcessor(
command=['python3'],
image_uri=ecr_container_uri,
role=role,
instance_count=instance_count,
instance_type=instance_type,
volume_size_in_gb=volume_size_in_gb
)
run_job_config = {
'job_name': job_name,
's3_script_path': configuration['S3CodePath'],
's3_input_data_path': configuration['S3InputDataPath'],
's3_output_data_path': configuration['S3OutputDataPath']
}
try:
run_script_processor_job(script_processor, run_job_config)
return {
'JobName': job_name
}
except ClientError as exception:
print("Error occurred creating SageMaker Processing Job: {}".format(job_name))
print(exception)
raise
def run_script_processor_job(script_processor: ScriptProcessor, job_config: dict = None):
"""
Args:
script_processor (sagemaker.processing.ScriptProcessor):
job_config (dict): The configuration to run ScriptProcessor job
:return:
"""
script_processor.run(
job_name=job_config['job_name'],
code=job_config['s3_script_path'],
wait=False,
inputs=[
ProcessingInput(
source=job_config['s3_input_data_path'],
destination=INPUT_DATA_DESTINATION,
input_name='data'
)
],
outputs=[
ProcessingOutput(
source=PROCESSED_DATA_PATH,
destination=job_config['s3_output_data_path'],
output_name='processed'
)
]
)
return None
import boto3
import json
sm_client = boto3.client('sagemaker')
def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
"""
job_name = event['JobName']
print(f'Job Name: {job_name}')
response = sm_client.describe_processing_job(
ProcessingJobName=job_name
)
job_status = response["ProcessingJobStatus"]
print(f'Current Job status: {job_status}')
return {
'ProcessingJobStatus': job_status,
'JobName': job_name,
'FailureReason': response.get('FailureReason', None),
'ExitMessage': response.get('ExitMessage', None),
}

Once we have both functions in place, we are ready to define our workflow with the AWS Step Functions Data Science SDK. For more details about the AWS Step Functions Data Science SDK, you can read my previous blogpost or visit the project page on GitHub.

Amazon SageMaker Processing Jobs run processing scripts with pre-baked docker containers hosted on AWS ECR. Below, we will create a docker container with the necessary packages required for our processing job.

set -eux
echo "Writing to Dockerfile"
cat <<EOF > Dockerfile
FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3 scikit-learn==0.21.3
ENV PYTHONUNBUFFERED=TRUE
ENTRYPOINT ["python3"]
EOF
echo "Completed writing to Dockerfile"
echo "Create docker image build script"
cat <<EOF > build_and_deploy.sh
ecr_repository="sagemaker-processing-container"
tag="latest"
REGION="eu-west-1"
processing_repository_uri="${AWS_ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${ecr_repository}:${tag}"
# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${ecr_repository}" > /dev/null 2>&1
if [ $? -ne 0 ]
then
aws ecr create-repository --repository-name "${ecr_repository}" > /dev/null
fi
docker build -t $ecr_repository docker
$(aws ecr get-login --region ${REGION} --registry-ids ${AWS_ACCOUNT_ID} --no-include-email)
docker tag "${ecr_repository}:${tag}" $processing_repository_uri
docker push $processing_repository_uri
EOF
echo "Done creating build script"
echo "Execute docker build"
bash ./build_and_deploy.sh

Note that you could as well achieve the above operation using AWS Cloudformation.

Workflow Definition

In my previous post, I demonstrated workflow definition steps from data preparation to model deployment. Since I am only replacing the AWS Glue Step, I will not go into the details of the model training and endpoint deployment steps.

Data Preparation with Amazon SageMaker Processing

data_processing_configuration = dict(
JobName=execution_input['JobName'],
IAMRole=execution_input['IAMRole'],
LocalStorageSizeGB=50,
S3CodePath=execution_input['S3CodePath'],
S3InputDataPath=execution_input['S3InputDataPath'],
S3OutputDataPath=execution_input['S3OutputDataPath'],
EcrContainerUri=execution_input['EcrContainerUri']
)
create_processing_job_step = LambdaStep(
state_id="StartDataProcessingJob",
parameters={
"FunctionName": "arn:aws:lambda:eu-west-1:1234567890:function:CreateProcessingJob",
"Payload": {
"Configuration": data_processing_configuration
}
}
)
create_processing_job_step.add_retry(Retry(
error_equals=["States.TaskFailed"],
interval_seconds=15,
max_attempts=2,
backoff_rate=4.0
))
create_processing_job_step.add_catch(Catch(
error_equals=["States.TaskFailed"],
next_step=workflow_failure
))

We created our create-processing-job Lambda function with sensible defaults ensuring that we provide the minimum possible arguments as Amazon SageMaker Processing job configurations. Next we poll for the job status with a Lambda Function step every 60 seconds.

## We poll for the processing Job at intervals
get_processing_job_status = LambdaStep(
state_id="GetDataProcessingJob",
parameters={
"FunctionName": "arn:aws:lambda:eu-west-1:1234567890:function:GetProcessingJobStatus", #replace with the name of the function you created
"Payload": {
"JobName": create_processing_job_step.output()['Payload']['JobName']
}
}
)
check_job_wait_state = Wait(
state_id="Wait",
seconds=60
)

We then need to create the processing script, which will contain our data transformations and will be executed by Amazon SageMaker Processing on our docker container.

import argparse
import os
import warnings
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.exceptions import DataConversionWarning
warnings.filterwarnings(action='ignore', category=DataConversionWarning)
columns = []
input_data_path = '/opt/ml/processing/input_data' # default input data path in Lambda function
output_data_path = '/opt/ml/processing/processed_data' # default output data path in Lambda function
if __name__ == '__main__':
parser = argparse.ArgumentParser()
args, _ = parser.parse_known_args()
print('Received arguments {}'.format(args))
print('Reading input data from {}'.format(input_data_path))
all_input_files = os.listdir(input_data_path)
print('All input files: ', all_input_files)
df = pd.concat([ pd.read_csv(os.path.join(input_data_path, str(input_file))) for input_file in all_input_files ])
df.drop_duplicates(inplace=True)
df.dropna(inplace=True)
print('Train data shape after preprocessing: {}'.format(df.shape))
df_column_subset = df.iloc[:, 0:16]
xTrain, xTest, yTrain, yTest = train_test_split(df_column_subset, df['CLASS_LABEL'], test_size = 0.1, random_state = 0)
train_df = pd.concat([yTrain, xTrain], axis=1)
validation_df = pd.concat([yTest[int(len(yTest)/2):], xTest[int(len(xTest)/2):]], axis=1)
train_df.reset_index(drop=True, inplace=True)
validation_df.reset_index(drop=True, inplace=True)
train_data_output_path = os.path.join(output_data_path, 'train.csv')
validation_data_output_path = os.path.join(output_data_path, 'validation.csv')
print('Train Data Output Path: {}'.format(train_data_output_path))
print('Validation Data Output Path: {}'.format(validation_data_output_path))
train_df.to_csv(train_data_output_path, header=False, index=False)
validation_df.to_csv(validation_data_output_path, header=False, index=False)
print("Completed Data Processing Job Successfully...")

The above script is an example data preparation logic, depending on your use case and requirements, you might have more complex data transformation logic. Last step is to upload the processing script to an S3 location.

$ aws s3 cp scripts/preprocessing.py s3://my-code-bucket/processing/scripts/preprocessing.py

Having completed the build up to our data preparation steps, we proceed to defining our machine learning steps. For the purpose of brevity, I defined these steps in my previous post, so I will not delve into the details, but show how to chain the respective steps together:

Automating Machine Learning Workflows with AWS Glue, SageMaker and AWS Step Functions Data Science…

ml\_steps\_definition = Chain([
    training\_step,
    model\_step,
    endpoint\_config\_step,
    endpoint\_step,
    workflow\_success
])

To build the entire workflow graph, we will make use of AWS Step Function Choice States, which adds a branching logic to our workflow. The choice state checks the status of the data processing job, and proceeds to next steps based on the job status:

check\_job\_choice = Choice(
    state\_id= **"IsProcessingJobComplete"**  
)

check\_job\_choice.add\_choice(
    ChoiceRule.StringEquals(variable=get\_processing\_job\_status.output()[**'Payload'**][**'ProcessingJobStatus'**], value= **'InProgress'** ),
    next\_step=get\_processing\_job\_status
)

check\_job\_choice.add\_choice(
    ChoiceRule.StringEquals(variable=get\_processing\_job\_status.output()[**'Payload'**][**'ProcessingJobStatus'**], value= **'Stopping'** ),
    next\_step=get\_processing\_job\_status
)

check\_job\_choice.add\_choice(
    ChoiceRule.StringEquals(variable=get\_processing\_job\_status.output()[**'Payload'**][**'ProcessingJobStatus'**], value= **'Failed'** ),
    next\_step=workflow\_failure
)

check\_job\_choice.add\_choice(
    ChoiceRule.StringEquals(variable=get\_processing\_job\_status.output()[**'Payload'**][**'ProcessingJobStatus'**], value= **'Stopped'** ),
    next\_step=workflow\_failure
)

check\_job\_choice.add\_choice(
    ChoiceRule.StringEquals(variable=get\_processing\_job\_status.output()[**'Payload'**][**'ProcessingJobStatus'**], value= **'Completed'** ),
    next\_step=ml\_steps\_definition
)

The choice state is explained as follows: if the processing job is running, it loops back to check the job status, if the job failed, the entire workflow is terminated and if the job is completed, it proceeds to the model training and endpoint deployment steps.

Our complete machine learning workflow is chained together as the following:

ml\_workflow\_definition = Chain(
    [
        create\_processing\_job\_step, 
        get\_processing\_job\_status,
        check\_job\_wait\_state,
        check\_job\_choice
    ]
)

ml\_workflow = Workflow(
    name= **"MyCompleteMLWorkflow\_v2"** ,
    definition=ml\_workflow\_definition,
    role=workflow\_execution\_role
)

Automating Machine Learning Workflows with Amazon SageMaker Processing, Amazon SageMaker and AWS Step Functions Data Science SDK

Create and execute the workflow:

**try** :
    workflow\_arn = ml\_workflow.create()
**except** BaseException **as** e:
    print( **"Workflow already exists"** )
    workflow\_arn = ml\_workflow.update(ml\_workflow\_definition)

# execute workflow 
ml\_workflow.execute(
    inputs={
**'IAMRole'** : **sagemaker\_execution\_role** ,
**'EcrContainerUri'** : **'1234567890.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-processing-container'** ,
**'S3InputDataPath'** : **f's3://{data\_bucket}/raw-data/'** ,
**'S3OutputDataPath'** : **f's3://{data\_bucket}/{processed\_data\_output\_path}/'** ,
**'S3CodePath'** : **'s3://my-code-bucket/processing/scripts/preprocessing.py'** ,
**'JobName'** : job\_name,   
**'ModelName'** : model\_name_,_  
**'EndpointName'** : endpoint\_name
    }
)

Successful machine learning workflow with Amazon SageMaker Processing, Amazon SageMaker and AWS Step Functions Data Science SDK

Conclusion

In my previous post, I demonstrated how to create an end-to-end machine learning workflow on AWS using AWS Glue for data preparation. In this post, I swapped the data preparation component with Amazon SageMaker processing. Your choice of AWS Service to execute data processing job on depends on your use case. You can also use with Step Functions service integration with AWS EMR to run your data preparation jobs which is also supported natively by the AWS Step Functions Data Science SDK.

Further Reading

Kindly share your thoughts and comments — looking forward to your feedback. You can reach me via email, follow me on Twitter or connect with me on LinkedIn. Can’t wait to hear from you!!

Image of Datadog

Master Mobile Monitoring for iOS Apps

Monitor your app’s health with real-time insights into crash-free rates, start times, and more. Optimize performance and prevent user churn by addressing critical issues like app hangs, and ANRs. Learn how to keep your iOS app running smoothly across all devices by downloading this eBook.

Get The eBook

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay