DEV Community

Cover image for A Practical Introduction to Amazon SageMaker Python SDK
Temiloluwa Adeoti
Temiloluwa Adeoti

Posted on

A Practical Introduction to Amazon SageMaker Python SDK

Introduction

On the 12th of October, 2022, I presented a Knowledge Share to my colleagues at Machine Learning Reply GmBH titled, "Developing Solutions with Sagemaker". Knowledge Sharing is a tradition we observe weekly at Machine Learning Reply GmBH that helps us as consultants to develop a broad range of skill sets. There was little time to go delve into the Sagemaker Python SDK on the day. With this follow-up blog post, I would like to explore the Estimator API, Model API, Preprocessor API, and Predictor API using the AWS Sagemaker Python SDK.

AWS Sagemaker Python SDK

The Amazon SageMaker Python SDK is the recommended library for developing solutions is Sagemaker. The other ways of interacting with Sagemaker are the AWS CLI, Boto3, and the AWS web console.
In theory, the SDK should offer the best developer experience, but I discovered a learning curve exists to hit the ground running with it.

This post walks through a simple regression task that showcases the important APIs in the SDK.
I also highlight "gotchas" encountered while developing this solution. The entire codebase is found here.

Regression Task: Fuel Consumption Prediction

I selected a regression task I tackled as a budding Data scientist (notebook link): to predict fuel consumption of vehicles in MPG (problem definition). I broke down the problem into three stages:

  1. A preprocessing stage for feature engineering
  2. A model training and evaluation stage
  3. A model inferencing stage

Each of these stages produces reusable model artifacts that are stored in S

Sagemaker Preprocessing and Training

Two things are king in Sagemaker: S3 and Docker containers. S3 is the primary location for storing training data and the destination for exporting training artifacts like models. The SDK provides Preprocessors and Estimators as the fundamental interfaces for data preprocessing and model training. These two APIs are simply wrappers for Sagemaker Docker containers. This is what happens under the hood when a preprocessing job is created with a Preprocessor or training job with an Estimator:

  1. Data is transferred from S3 into the Sagemaker Docker container
  2. The Job (training or preprocessing) is executed in the container that runs on the compute instance you have specified for the job
  3. Output artifacts (models, preprocessed features) are exported to S3 when the job is concluded
Sagemaker Preprocessing Container
This image depicts data transfer into and out of a preprocessing container from S3.

Sagemaker Containers

It is crucial to get familiar with the environmental variables and pre-configured path locations in Sagemaker containers. More information is found on the Sagemaker Containers' Github page. For example, Preprocessors, receive data from S3 into /opt/ml/preprocessing/input while Estimators store training data in /opt/ml/input/data/train. Some environmental variables include SM_MODEL_DIR for exporting models, SM_NUM_CPUS, and SM_HP_{hyperparameter_name}.

Project Folder Structure

The diagram below shows the project's folder structure. The main script is the python notebook auto_mpg_prediction.ipynb whose cells are executed in Sagemaker Studio. Training and preprocessing scripts are located in the scripts folder.

├── Blog.md
├── LICENSE
├── README.md
├── auto_mpg_prediction.ipynb
└── scripts
    ├── model
    │   ├── inference.py
    │   └── train.py
    └── preprocessor
        ├── custom_preprocessor.py
        ├── inference.py
        └── train.py
Enter fullscreen mode Exit fullscreen mode

Preliminary Steps

Let's start with initializing a Sagemaker session followed by boilerplate steps of getting the region, execution role, and default bucket. I create prefixes to key s3 locations for data storage, and the export of preprocessed features and models.

import os
import json
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
import boto3
import sagemaker
from sagemaker import get_execution_role
from io import StringIO

# initialize sagemaker session 
sess = sagemaker.Session()
region = sess.boto_session.region_name
bucket = sess.default_bucket() 
role = get_execution_role()

# boto3 client
sm_client = boto3.client('sagemaker')

prefix = "auto_mpg"

# raw data path
raw_train_prefix = f"{prefix}/data/bronze/train"
raw_val_prefix = f"{prefix}/data/bronze/val"
raw_test_prefix = f"{prefix}/data/bronze/test"

# preprocessed features path
pp_train_prefix = f"{prefix}/data/gold/train"
pp_val_prefix = f"{prefix}/data/gold/val"
pp_test_prefix = f"{prefix}/data/gold/test"

# preprocessor and ml models
pp_model_prefix = f"{prefix}/models/preprocessor"
ml_model_prefix = f"{prefix}/models/ml"


def get_s3_path(prefix, bucket=bucket):
    """ get full path in s3 """
    return f"s3://{bucket}/{prefix}"

Enter fullscreen mode Exit fullscreen mode

Raw Data Transfer to S3

Next, we have to transfer our raw data to S3. In a production setting, an ETL job sets an S3 bucket as the final data destination. I have implemented a function that downloads the raw data, splits it into the train, validation, and test sets then uploads them all to their respective s3 paths in the default bucket based on pre-defined prefixes.


def upload_raw_data_to_s3(sess,
                          raw_train_prefix=raw_train_prefix,
                          raw_val_prefix=raw_val_prefix,
                          raw_test_prefix=raw_test_prefix, 
                          split=0.8):
    """
    Read MPG dataset, perform train test split, then upload to s3
    """
    # filenames
    train_fn = "train.csv"
    val_fn = "val.csv"
    test_fn = "test.csv"

    # download data
    data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data"
    res = requests.get(data_url)
    file = StringIO(res.text)

    # read data
    data = pd.read_csv(file, header = None, delimiter = '\s+', low_memory = False, na_values = "?")
    data_frame = data.drop(columns = 8)
    data_frame = data_frame.fillna(data_frame.mean())
    data_frame = data_frame.rename(index = int, columns = {0: "mpg", 1:"cylinders", 2: "displacement",3: "horsepower", 4: "weight", 5:"acceleration",6:"model year",7:"origin"})

    # train - test - split
    train_df = data_frame.sample(frac=split)
    test_df = data_frame.drop(train_df.index)

    # take the last 10 rows of test_df as the test data and the 
    val_df = test_df[:-10]
    test_df = test_df[-10:]

    assert set(list(train_df.index)).intersection(list(test_df.index)) == set([]), "overlap between train and test"

    # save data locally and upload data to s3
    train_df.to_csv(train_fn, index=False, sep=',', encoding='utf-8')
    train_path = sess.upload_data(path=train_fn, bucket=bucket, key_prefix=raw_train_prefix)

    val_df.to_csv(val_fn, index=False, sep=',', encoding='utf-8')
    val_path = sess.upload_data(path=val_fn, bucket=bucket, key_prefix=raw_val_prefix)

    test_df.to_csv(test_fn, index=False, sep=',', encoding='utf-8')
    test_path = sess.upload_data(path=test_fn, bucket=bucket, key_prefix=raw_test_prefix)

    # delete local versions of the data
    os.remove(train_fn)
    os.remove(val_fn)
    os.remove(test_fn)

    print("Path to raw train data:", train_path)
    print("Path to raw val data:", val_path)
    print("Path to raw test data:", test_path)

    return train_path, val_path, test_path

train_path, val_path, test_path = upload_raw_data_to_s3(sess)

Enter fullscreen mode Exit fullscreen mode

Stage 1: Feature Engineering

The preprocessing steps are implemented using the Sklearn python library. These are the goals of this stage:

  1. Preprocess the raw train and validation .csv data into features and export them to s3 in .npy format
  2. Save the preprocessing model using joblib and export it to s3. This saved model will be deployed as the first step of our inference pipeline. During inferencing, its task will be to generate features (.npy) for raw test data.

The Sagemaker Python SDK offers Sklearn Preprocessors and PySpark Preprocessors. These are preprocessors that already come with Sklearn and Pyspark pre-installed. Unfortunately, I discovered it is not possible to use custom scripts or dependencies with both. Therefore, I had to use the Framework Preprocessor.

To instantiate the Framework Preprocessor with the Sklearn library, I supplied SKlearn estimator Class to the estimator_cls parameter. The .run method of the preprocessor comes with a code parameter for specifying the entry point script and source_dir parameter for indicating the directory that contains all custom scripts.

Pay close attention to how data is transferred into and exported out of the preprocessing container using ProcessingInput and ProcessingOutput APIs. You will see how container (/opt/ml/*) and S3 paths for data transfer are specified. Note that unlike Estimators that are executed using a .fit method, Preprocessors use a .run method.


from datetime import datetime
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

current_time = datetime.now().strftime("%d-%b-%Y-%H:%M:%S").replace(":", "-")
TRAIN_FN = 'train.csv'
VAL_FN = 'val.csv'
TRAIN_FEATS_FN = 'train_feats.npy'
VAL_FEATS_FN = 'val_feats.npy'


sklearn_processor = FrameworkProcessor(
    base_job_name=f"auto-mpg-feature-eng-{current_time}",
    framework_version="1.0-1",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    estimator_cls=SKLearn
)

sklearn_processor.run(
    code="train.py",
    source_dir="scripts/preprocessor/",
    inputs=[
        ProcessingInput(source=get_s3_path(raw_train_prefix), destination="/opt/ml/processing/input/train"),
        ProcessingInput(source=get_s3_path(raw_val_prefix), destination="/opt/ml/processing/input/test")
    ],
    outputs=[
        ProcessingOutput(output_name="train_features", source="/opt/ml/processing/train", destination=get_s3_path(pp_train_prefix)),
        ProcessingOutput(output_name="val_features", source="/opt/ml/processing/test", destination=get_s3_path(pp_val_prefix)),
        ProcessingOutput(output_name="preprocessor_model", source="/opt/ml/processing/output", destination=get_s3_path(pp_model_prefix)),
    ],
    arguments=["--train-filename", TRAIN_FN,
               "--test-filename", VAL_FN,
               "--train-feats-filename", TRAIN_FEATS_FN,
               "--test-feats-filename", VAL_FEATS_FN],
)

Enter fullscreen mode Exit fullscreen mode

Custom Preprocessor

I wanted to implement some custom preprocessing logic so I created a custom preprocessor class and configured it to follow the popular Sklearn .fit and .transform interfaces by extending BaseEstimator and TransformerMixin. The preprocessor engineers the Model Year Feature into Age and makes features Origin and Cylinders categorical. It is vital that this custom transformer be stored in a separate file and imported by the main preprocessing script. The reason for this will be explained during the inferencing step.

%%writefile scripts/preprocessor/custom_preprocessor.py

from sklearn.base import BaseEstimator, TransformerMixin

original_features = ['cylinders',
                     'displacement',
                     'horsepower',
                     'weight',
                     'acceleration',
                     'model year',
                     'origin']


class CustomFeaturePreprocessor(BaseEstimator, TransformerMixin):
    """
    This is a custom transformer class that does the following

        1. converts model year to age
        2. converts data type of categorical columns
    """

    feat = original_features
    new_datatypes = {'cylinders': 'category', 'origin': 'category'}

    def fit(self, X, y=None):
        """ Fit function"""
        return self

    def transform(self, X, y=None):
        """ Transform Dataset """
        assert set(list(X.columns)) - set(list(self.feat))\
                    ==  set([]), "input does have the right features"

        # convert model year to age
        X["model year"] = 82 - X["model year"]

        # change data types of cylinders and origin 
        X = X.astype(self.new_datatypes)

        return X

    def fit_transform(self, X, y=None):
        """ Fit transform function """
        x = self.fit(X)
        x = self.transform(X)
        return x
Enter fullscreen mode Exit fullscreen mode

Preprocessing Job

The preprocessing script at scripts/preprocessor/train.py is executed in the preprocessing container to perform the feature engineering. I create a Sklearn Pipeline model using my CustomFeaturePreprocessor class as its first step, followed by a OneHotEncoder for transforming categorical columns and finally StandardScaler for numerical columns. A Sklearn pipeline is an easy way to chain multiple Sklearn transformers together.

You should avoid data leakage during feature engineering as a good ML Engineer. Since the same transformer is applied to the train and validation, I excluded the first columns of the pandas dataframes which is the target. I also fitted the model on only the train set.

After the model is saved using joblib, it is imperative that it be compressed into a tar file. Sagemaker models are archived as tarfiles else, an error will be thrown when importing the model during inferencing.

%%writefile scripts/preprocessor/train.py

import os
import joblib
import argparse
import numpy as np
import pandas as pd
import tarfile
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import make_column_selector, ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from custom_preprocessor import original_features, CustomFeaturePreprocessor

def save_numpy(np_array, path):
    """ save np array """
    with open(path, 'wb') as f:
        np.save(f, np_array)

if __name__ == '__main__':
    CONTAINER_TRAIN_INPUT_PATH = "/opt/ml/processing/input/train"
    CONTAINER_VAL_INPUT_PATH = "/opt/ml/processing/input/test"
    CONTAINER_TRAIN_OUTPUT_PATH = "/opt/ml/processing/train"
    CONTAINER_VAL_OUTPUT_PATH = "/opt/ml/processing/test"
    CONTAINER_OUTPUT_PATH = "/opt/ml/processing/output"

    parser = argparse.ArgumentParser()
    parser.add_argument('--train-filename', type=str, default='train.csv')
    parser.add_argument('--val-filename', type=str, default='val.csv')
    parser.add_argument('--model-filename', type=str, default='model.tar.gz')
    parser.add_argument('--train-feats-filename', type=str, default='train_feats.npy')
    parser.add_argument('--val-feats-filename', type=str, default='val_feats.npy')

    args = parser.parse_args()

    # one hot categorical features
    # apply standard scaler to numerical features
    ct = ColumnTransformer([("categorical-feats", OneHotEncoder(), make_column_selector(dtype_include="category")),
                            ("numerical-feats", StandardScaler(), make_column_selector(dtype_exclude="category"))])

    # apply custom preprocessing
    pl = Pipeline([("custom-preprocessing", CustomFeaturePreprocessor()), ("column-preprocessing", ct)])

    train_data_path = os.path.join(CONTAINER_TRAIN_INPUT_PATH, args.train_filename)
    val_data_path = os.path.join(CONTAINER_VAL_INPUT_PATH, args.val_filename)

    # preprocess features, first column is target and the rest are features
    train_df = pd.read_csv(train_data_path)
    train_data = train_df.iloc[:, 1:]
    train_target = train_df["mpg"].values.reshape(-1, 1)
    train_features = pl.fit_transform(train_data)

    val_df = pd.read_csv(val_data_path)
    val_data = val_df.iloc[:, 1:]
    val_target = val_df["mpg"].values.reshape(-1, 1)
    val_features = pl.transform(val_data)

    # save features in output path with the container
    train_features = np.concatenate([train_target, train_features], axis=1)
    val_features = np.concatenate([val_target, val_features], axis=1)
    train_features_save_path = os.path.join(CONTAINER_TRAIN_OUTPUT_PATH, args.train_feats_filename)
    val_features_save_path = os.path.join(CONTAINER_VAL_OUTPUT_PATH, args.val_feats_filename)
    save_numpy(train_features, train_features_save_path)
    save_numpy(val_features, val_features_save_path)

    # save preprocessor model
    model_save_path = os.path.join(CONTAINER_OUTPUT_PATH, args.model_filename)

    # save model should be a tarfile so it can be loaded in the future
    joblib.dump(pl, "model.joblib")
    with tarfile.open(model_save_path, "w:gz") as tar_handle:
        tar_handle.add("model.joblib")
Enter fullscreen mode Exit fullscreen mode

Stage 2: Model Training and Evaluation

The different python library smexperiments is used for Experiment tracking in Sagemaker. A Trial in Sagemaker is synonymous with an MLFlow run. A trial could consist of multiple ML workflow stages, depending on the complexity of the solution. For example, model training and evaluation or just a single hyperparameter optimization step could be considered a trial. What's important is that metrics are logged for each trial run to enable the comparison of different trials.

I created a trial for just the training step and attributed it to the created experiment using the experiment_name parameter in the Trial.create call.


from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

current_time = datetime.now().strftime("%d-%b-%Y-%H:%M:%S").replace(":", "-")
experiment_name = "auto-mg-experiment"
# create a new experiment a load one if it exists
try:
    auto_experiment = Experiment.load(experiment_name=experiment_name)
    print(f'experiment {experiment_name} was loaded')
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        auto_experiment = Experiment.create(experiment_name = experiment_name,
                                            description = "Regression on Auto MPG dataset",
                                            tags = [{'Key': 'Name', 'Value': f"auto-mg-experiment-{current_time}"},
                                                    {'Key': 'MLEngineer', 'Value': f"Temiloluwa Adeoti"},
                                                   ])
        print(f'experiment {experiment_name} was created')

from sagemaker.sklearn.estimator import SKLearn

current_time = datetime.now().strftime("%d-%b-%Y-%H:%M:%S").replace(":", "-")
n_estimators = 10
trail_name = f"auto-mg-{n_estimators}-estimators"
# create a trial for the training job
training_job_trial = Trial.create(trial_name = f"{trail_name}-{current_time}",
                              experiment_name = auto_experiment.experiment_name,
                              sagemaker_boto_client=sm_client,
                              tags = [{'Key': 'Name', 'Value': f"auto-mg-{current_time}"},
                                       {'Key': 'MLEngineer', 'Value': f"Temiloluwa Adeoti"}])

# configure the estimator
model = SKLearn(
    entry_point="train.py",
    source_dir="./scripts/model",
    framework_version="1.0-1", 
    instance_type="ml.m5.xlarge", 
    role=role,
    output_path = get_s3_path(ml_model_prefix), # model output path
    hyperparameters = {
        "n_estimators": n_estimators
    },
    metric_definitions=[
            {"Name": "train:mae", "Regex": "train_mae=(.*?);"},
            {"Name": "test:mae", "Regex": "test_mae=(.*?);"},
            {"Name": "train:mse", "Regex": "train_mse=(.*?);"},
            {"Name": "test:mse", "Regex": "test_mse=(.*?);"},
            {"Name": "train:rmse", "Regex": "train_rmse=(.*?);"},
            {"Name": "test:rmse", "Regex": "test_rmse=(.*?);"},
        ],
    enable_sagemaker_metrics=True
)

# fit the estimator
model.fit(job_name=f"auto-mpg-{current_time}",
          inputs = {"train": get_s3_path(pp_train_prefix), 
                    "test": get_s3_path(pp_val_prefix)
                   }, 
          experiment_config={
            "TrialName": training_job_trial.trial_name,
            "TrialComponentDisplayName": f"Training-auto-mg-run-{current_time}",
          },
          logs="All")

Enter fullscreen mode Exit fullscreen mode

I reckon logging of metrics to be a bit involved in Sagemaker in comparison to other frameworks. This is how custom training metrics are captured:

  1. Create a logger that streams to standard output (logging.StreamHandler(sys.stdout)). The streamed logs are automatically captured by AWS Cloudwatch.
  2. Log metrics based on your predetermined format e.g metric-name=metric-value.
  3. When creating the estimator that runs the training script, a regex pattern that matches your metric logging format must be given to the metric_definition parameter.

The training job is executed by running the scripts/model/train.py file within an Sklearn Estimator container on a compute instance (ml.m5.xlarge in this case). Pay attention to how inputs are supplied to estimators using the inputs parameter and how the training job is assigned to the created trial using the experiment_config parameter.

The script trains a RandomForestRegressor on the .npy preprocessed train features and the model is evaluated on validation features. I will explain in what follows why I did not save the model as a tarfile.

%%writefile scripts/model/train.py
import logging
import sys
import argparse
import os
import pandas as pd
import numpy as np
import joblib
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error 

# configure logger to standard output
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s"))
logger.addHandler(stream_handler)


def get_metrics(train_y_true, train_y_pred, val_y_true, val_y_pred):
    """
    Return train and val metrics
    """

    # mae
    t_mae = mean_absolute_error(train_y_true, train_y_pred)
    ts_mae = mean_absolute_error(val_y_true, val_y_pred)

    # mse
    t_mse = mean_squared_error(train_y_true, train_y_pred, squared=False)
    ts_mse = mean_squared_error(val_y_true, val_y_pred, squared=False)

    # rmse
    t_rmse = mean_squared_error(train_y_true, train_y_pred, squared=True)
    ts_rmse = mean_squared_error(val_y_true, val_y_pred, squared=True)

    return t_mae, ts_mae, t_mse, ts_mse, t_rmse, ts_rmse


if __name__=='__main__':
    parser = argparse.ArgumentParser()
    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    # location in container: '/opt/ml/model'
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    # location in container: '/opt/ml/input/data/train'
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    # location in container: '/opt/ml/input/data/test'
    parser.add_argument('--test', type=str, default=os.environ['SM_CHANNEL_TEST'])
    # model filename
    parser.add_argument('--model-filename', type=str, default="model.joblib")

    # Hyperparameters are described here.
    parser.add_argument('--n_estimators', type=int)
    args = parser.parse_args()

    logger.debug(f"Number of Estimators: {args.n_estimators}")

    # Load numpy features saved in S3 
    # Targets are the first column, features are other columns
    train_data = np.load(os.path.join(args.train, "train_feats.npy"))
    train_feats = train_data[:, 1:]
    train_target = train_data[:, 0]

    val_data = np.load(os.path.join(args.test, "val_feats.npy"))
    val_feats = val_data[:, 1:]
    val_target = val_data[:, 0]

    # Train random forest model
    model = RandomForestRegressor(max_depth=args.n_estimators, random_state=0)
    model.fit(train_feats, train_target)
    logger.info("Model Trained ")

    train_pred = model.predict(train_feats)
    val_pred = model.predict(val_feats)

    # Evaluate Model
    train_mae, val_mae, train_mse, val_mse, train_rmse, val_rmse = \
        get_metrics(train_target, train_pred, val_target, val_pred)

    logger.info(f"train_mae={train_mae};  val_mae={val_mae};")
    logger.info(f"train_mse={train_mse};  val_mse={val_mse};")
    logger.info(f"train_rmse={train_rmse}; val_rmse={val_rmse};")

    # Save the Model
    joblib.dump(model, os.path.join(args.model_dir, args.model_filename))

Enter fullscreen mode Exit fullscreen mode

Stage 3: Model Inferencing

Sagemaker provides the Model API for model deployement to an endpoint and the Predictor API for making predictions with the endpoint. Since we have two models, the preprocessor and regressor models, we require a Sagemaker pipeline model to chain both and make a deployment.

I selected the SKLearnModel (Model with Sklearn dependencies pre-installed) for the preprocessor. To prepare the Model, I supplied to it paths to the saved tar model in S3, the inference script which is its entry point, and the custom_preprocessor.py.

Repeating the CustomFeaturePreprocessor in both the preprocessor model training (scripts/preprocessor/train.py) and inference (scripts/preprocessor/inference.py) scripts did not work. I needed to import the class from a separate file (scripts/preprocessor/custom_preprocessor.py.py) for inferencing. This was the error I encountered:
Can't get attribute 'CustomFeaturePreprocessor' on . It is a common problem faced during model deployment. You can read more on this type of problem at this Stackoverflow post

I did not save the regressor as a tarfile because I chose not to import the regressor model from S3. Instead, I created a Sagemaker Model directly from the trained Estimator with the .create_model method (another way to create Models).

When the two models are provided in a list in the pipeline model definition as shown below, Sagemaker automatically serves the output of the preprocessor model as input to the regressor model. I deployed the model ml.c4.xlarge instance and used a CSVSerializer for input requests.

from sagemaker.sklearn.model import SKLearnModel
from sagemaker.pipeline import PipelineModel
from sagemaker.serializers import CSVSerializer
from datetime import datetime

current_time = datetime.now().strftime("%d-%b-%Y-%H:%M:%S").replace(":", "-")
model_name = f"inference-pipeline-{current_time}"
endpoint_name = f"inference-pipeline-{current_time}"
pp_model_path = get_s3_path(pp_model_prefix) + "/model.tar.gz"

print("preprocessor model path ", pp_model_path)

# preprocessor
sklearn_processor_model = SKLearnModel(
                             model_data=pp_model_path,
                             role=role,
                             entry_point="scripts/preprocessor/inference.py",
                             dependencies=["scripts/preprocessor/custom_preprocessor.py"],
                             framework_version="1.0-1",
                             sagemaker_session=sess)

# regression model
reg_model = model.create_model(entry_point="inference.py",
                               source_dir="./scripts/model")

# create a pipeline model with the two models    
inference_pipeline = PipelineModel(
    name=model_name, role=role, models=[sklearn_processor_model, reg_model],
    sagemaker_session=sess
)

# deploy model
predictor = inference_pipeline.deploy(initial_instance_count=1, 
                                      instance_type="ml.c4.xlarge", 
                                      endpoint_name=endpoint_name,
                                      serializer=CSVSerializer() # to ensure input is csv
                                     )
Enter fullscreen mode Exit fullscreen mode

Sagemaker Inference Script Structure

In Sagemaker, the model server requires four functions to successfully deploy a model.

  1. The model_fn loads the model file e.g .joblib.
  2. The input_fn for parsing the input request to the model. Data deserialization and transformation can occur to prepare the request for the model.
  3. The predict_fn makes the actual prediction with the model e.g calls model.predict.
  4. The output_fn processes the response and delivers it in the desired format to the caller.

Our Scikit-learn model server already has default implementations of these functions which can be overridden in our inference script.

Preprocessor Inference Script

In my inference script for the preprocessor, I imported the custom dependencies to avoid the error earlier mentioned. The model_fn loads the .joblib model, while the input_fn ensures the request is in text or csv format and transforms the data into a Pandas dataframe. By default, the predict_fn should make a .predict call on the model, but since the preprocessor is a transform, the .transform method is used.

%%writefile scripts/preprocessor/inference.py
import os
import joblib
import argparse
import pandas as pd
from io import StringIO
# import the custom dependencies
from custom_preprocessor import original_features, CustomFeaturePreprocessor

def input_fn(input_data, content_type):
    """ Preprocess input data """
    if content_type == 'text/csv':
        df = pd.read_csv(StringIO(input_data), header=None)

        if len(df.columns) == len(original_features) + 1:
            df = df.iloc[:, 1:]

        if list(df.columns) != original_features:
            df.columns = original_features

        return df
    else:
        raise ValueError(f"Unsupported content type: {type(content_type)}")


def predict_fn(input_data, model):
    """ Call the transform method instead of the default predict method"""
    predictions = model.transform(input_data)
    return predictions


def model_fn(model_dir):
    """Load the model"""
    model_path = os.path.join(model_dir, "model.joblib")
    loaded_model = joblib.load(model_path)
    return loaded_model
Enter fullscreen mode Exit fullscreen mode

Regressor Inference Script

The regressor uses a much simpler inference script. Here the output_fn is implemented to provide a JSON response to the input request.

%%writefile scripts/model/inference.py
import sys
import os
import logging
import json
import joblib
from sagemaker_containers.beta.framework import worker

# configure logger to standard output
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s"))
logger.addHandler(stream_handler)

def model_fn(model_dir):
    """Deserialize fitted model
    """
    model = joblib.load(os.path.join(model_dir, "model.joblib"))
    return model


def output_fn(prediction, accept):
    """
    Preprocess numpy array to return JSON output
    """
    pred = []
    for i, row in enumerate(prediction.tolist()):
        pred.append({"id": i, "prediction": row})

    return worker.Response(json.dumps(pred))
Enter fullscreen mode Exit fullscreen mode

Making Predictions

All the major work is done and making predictions is a very simple process with the Predictor API.

In the code snippet below, I download the raw test set from S3 and store each line of the csv, except the header, as a string in the list called test_data. After instantiating the Predictor with the endpoint name and sagemaker session, I make predictions by calling the .predict method on the predictor instance.

from pprint import pprint

# download raw test data and read text files
sess.download_data(path=".", bucket=bucket, key_prefix=raw_test_prefix)
with open("test.csv", "r") as f:
    test_data = f.readlines()[1:]

# make predictions with the deployed endpoint
from sagemaker.predictor import Predictor
from sagemaker.deserializers import JSONLinesDeserializer

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=sess, serializer=CSVSerializer(), deserializer=JSONLinesDeserializer()
)

num_of_samples = 1
response = predictor.predict(test_data[:num_of_samples])
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this post, we have explored the Estimator API, Model API, Preprocessor API, and Predictor API using the AWS Sagemaker Python SDK by training and deploying a regression model. These are the fundamental APIs that are required for developing solutions in Sagemaker.

Sagemaker comes with over 20 features that cover most of the ML life cycle from data annotation and preprocessing to model deployment. A full feature list is found here. Some of these features like we saw with Sagemaker Experiments have separate python libraries. This could make Sagemaker challenging to learn. From my experience, I have found it helpful to study Sagemaker examples when working with unfamiliar Sagemaker features. 

Since most Sagemaker jobs run within a docker container, it's vital to have logging activated to debug errors. Logs streamed to standard output and default Sagemaker metrics are captured by Cloudwatch. I guarantee that this would save you a lot of pain when working with Sagemaker.

Top comments (0)