AWS Machine Learning Blog

Preprocess input data before making predictions using Amazon SageMaker inference pipelines and Scikit-learn

Amazon SageMaker enables developers and data scientists to build, train, tune, and deploy machine learning (ML) models at scale. You can deploy trained ML models for real-time or batch predictions on unseen data, a process known as inference. However, in most cases, the raw input data must be preprocessed and can’t be used directly for making predictions. This is because most ML models expect the data in a predefined format, so the raw data needs to be first cleaned and formatted in order for the ML model to process the data.

In this blog post, we’ll show how you can use the Amazon SageMaker built-in Scikit-learn library for preprocessing input data and then use the Amazon SageMaker built-in Linear Learner algorithm for predictions. We’ll deploy both the library and the algorithm on the same endpoint using the Amazon SageMaker Inference Pipelines feature so you can pass raw input data directly to Amazon SageMaker. We’ll also show how you can make your ML workflow modular and reuse the preprocessing code between training and inference to reduce development overhead and errors.

In our example (which is also published on GitHub), we’ll use the abalone dataset from the UCI machine learning repository. The dataset includes various data on abalones (a type of shellfish), including sex, length, diameter, height, shell weight, shucked weight, whole weight, viscera weight, and age.  Since measuring the age of abalones is a time-consuming task, building a model to predict the age of the abalone enables us to estimate the abalone’s age based on physical measurements alone, removing the need to manually measure the abalone’s age.

To accomplish this, we’ll first do some simple preprocessing with the Amazon SageMaker built in Scikit-learn library. We will use the SimpleImputer , StandardScaler, and OneHotEncoder transformers on the raw abalone data; these are commonly-used data transformers included in Scikit-learn’s preprocessing library that process the data into a format required by ML models.  Then, we’ll use our processed data to train the Amazon SageMaker Linear Learner algorithm to predict the age of abalones.  Finally, we’ll create a pipeline that combines the data processing and model prediction steps using Amazon SageMaker Inference Pipelines. Using this pipeline, we can pass raw input data to a single endpoint that is first preprocessed and then is used to make a prediction for a given abalone.

Step 1: Launch SageMaker notebook instance and set up exercise code

From the SageMaker landing page, choose Notebook instances in the left panel and choose Create notebook Instance.

Give your notebook instance a name and make sure you choose an AWS Identity and Access Management (IAM) role that has access to Amazon S3.  We’ll need to upload data to an Amazon S3 bucket for this project, so make sure you have a bucket you can access.  If you don’t have an Amazon S3 bucket, follow this guide to create one.

Leave all other fields as the default and choose Create notebook Instance to launch your notebook instance (the notebook will take a few minutes to spin up).  After the status reads “InService” your notebook instance is ready.  Click Open Jupyter to open the notebook environment.

When the notebook environment opens, choose New and then choose conda_python3 in the upper right corner to create a new Python notebook.  The subsequent steps will be fully contained within the notebook.

Step 2: Set up Amazon SageMaker role and download data

First we need to set up an Amazon S3 bucket to store our training data and model outputs.  Replace the ENTER BUCKET NAME HERE placeholder with the name of the bucket from Step 1.

# S3 prefix
s3_bucket = '< ENTER BUCKET NAME HERE >'
prefix = 'Scikit-LinearLearner-pipeline-abalone-example'

Now we need to set up the Amazon SageMaker execution role, so that Amazon SageMaker can communicate with other parts of AWS.

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

Finally, we download the abalone dataset to the Amazon SageMaker notebook instance.  This is the dataset used in this example:

!wget --directory-prefix=./abalone_data https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

Step 3:  Upload input data for Amazon SageMaker

We need to upload our dataset to Amazon S3 so Amazon SageMaker can call it later on:

WORK_DIRECTORY = 'abalone_data'

train_input = sagemaker_session.upload_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'abalone.csv'), 
    bucket=s3_bucket,
    key_prefix='{}/{}'.format(prefix, 'train'))

Step 4: Create preprocessing script

This code described in this step already exists on the SageMaker instance, so you do not need to run the code in the section – you will simply call the existing script in the next step.  However, we recommend that you take the time to explore how the pipeline is handled by reading through the code.

Now we are ready to create the container that will preprocess our data before it’s sent to the trained Linear Learner model.  This container will run the sklearn_abalone_featurizer.py’ script, which Amazon SageMaker will import for both training and prediction. Training is executed using the main method as the entry point, which parses arguments, reads the raw abalone dataset from Amazon S3, then runs the SimpleImputer and StandardScaler on the numeric features and SimpleImputer and OneHotEncoder on the categorical features. At the end of training, the script serializes the fitted ColumnTransformer to Amazon S3 so that it may be used during inference.

from __future__ import print_function

import time
import sys
from io import StringIO
import os
import shutil

import argparse
import csv
import json
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.externals import joblib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoder

from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)

# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    'sex', # M, F, and I (infant)
    'length', # Longest shell measurement
    'diameter', # perpendicular to length
    'height', # with meat in shell
    'whole_weight', # whole abalone
    'shucked_weight', # weight of meat
    'viscera_weight', # gut weight (after bleeding)
    'shell_weight'] # after being dried

label_column = 'rings'

feature_columns_dtype = {
    'sex': str,
    'length': np.float64,
    'diameter': np.float64,
    'height': np.float64,
    'whole_weight': np.float64,
    'shucked_weight': np.float64,
    'viscera_weight': np.float64,
    'shell_weight': np.float64}

label_column_dtype = {'rings': np.float64} # +1.5 gives the age in years

def merge_two_dicts(x, y):
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))

    raw_data = [ pd.read_csv(
        file, 
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)) for file in input_files ]
    concat_data = pd.concat(raw_data)

    # We will train our classifier with the following features:
    # Numeric Features:
    # - length:  Longest shell measurement
    # - diameter: Diameter perpendicular to length
    # - height:  Height with meat in shell
    # - whole_weight: Weight of whole abalone
    # - shucked_weight: Weight of meat
    # - viscera_weight: Gut weight (after bleeding)
    # - shell_weight: Weight after being dried
    # Categorical Features:
    # - sex: categories encoded as strings {'M', 'F', 'I'} where 'I' is Infant
    numeric_features = list(feature_columns_names)
    numeric_features.remove('sex')
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_features = ['sex']
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)],
        remainder="drop")

    preprocessor.fit(concat_data)

    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))

The next methods of the script are used during inference. The input_fn and output_fn methods will be used by Amazon SageMaker to parse the data payload and reformat the response. In this example, the input method only accepts ‘text/csv’ as the content-type, but can easily be modified to accept other input formats. The input_fn function also checks the length of the csv passed to determine whether to preprocess training data, which includes the label, or prediction data. The output method returns back in JSON format because by default the Inference Pipeline expects JSON between the containers, but can be modified to add other output formats.

def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), 
                         header=None)

        if len(df.columns) == len(feature_columns_names) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns_names + [label_column]
        elif len(df.columns) == len(feature_columns_names):
            # This is an unlabelled example.
            df.columns = feature_columns_names

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), accept, mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))

Our predict_fn will take the input data, which was parsed by our input_fn, and the deserialized  model from the model_fn (described in detail next) to transform the source data. The script also adds back labels if the source data had labels, which would be the case for preprocessing training data.

def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:

        rest of features either one hot encoded or standardized
    """
    features = model.transform(input_data)

    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features

The model_fn takes the location of a serialized model and returns the deserialized model back to Amazon SageMaker. Note that this is the only method that does not have a default because the definition of the method will be closely linked to the serialization method implemented in training. In this example, we use the joblib library included with Scikit-learn.

def model_fn(model_dir):
    """Deserialize fitted model
    """
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

Step 5: Fit the data preprocessor

We now create a preprocessor using the script we defined in step 4.  This will allow us to send raw data to the model and output the processed data.  To do this, we define an SKLearn estimator that accepts several constructor arguments:

  • entry_point: The path to the Python script that Amazon SageMaker runs for training and prediction (this is the script we defined in step 4).
  • role: Role Amazon Resource Name (ARN).
  • train_instance_type (optional): The type of Amazon SageMaker instances for training. Note: Because Scikit-learn does not natively support GPU training, Amazon SageMaker Scikit-learn does not currently support training on GPU instance types.
  • sagemaker_session (optional): The session used to train on Amazon SageMaker.
from sagemaker.sklearn.estimator import SKLearn

script_path = '/home/ec2-user/sample-notebooks/sagemaker-python-sdk/scikit_learn_inference_pipeline/sklearn_abalone_featurizer.py'

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    train_instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session)

sklearn_preprocessor.fit({'train': train_input})

It will take a few minutes (up to 5) for the preprocessor to be created.  After the preprocessor is ready, we can send our raw data to the preprocessor and store our processed abalone data back in Amazon S3. We’ll do this in the next step.

Step 6: Batch transform training data

Now that our preprocessor is ready, we can use it to batch transform raw data into preprocessed data for training.  To do this, we create a transformer and point it to the raw data on Amazon S3:

# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.m4.xlarge',
    assemble_with = 'Line',
    accept = 'text/csv')
# Preprocess training input
transformer.transform(train_input, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

When the transformer is done, our transformed data will be stored in Amazon S3.  You can find the location of the preprocessed data by looking at the values in the preprocessed_train variable.

Step 7: Fit the Linear Learner model with preprocessed data

Now that we’ve built the preprocessing container and processed the raw data, we need to create the model container that our processed data will be sent to.  This container will input the processed dataset and train a model to predict the age of a given abalone based on the (processed) feature values.  We’ll use the Amazon SageMaker Linear Learner for the prediction model.

First we define the Linear Learner image location using a helper function in the Python SDK:

import boto3
from sagemaker.amazon.amazon_estimator import get_image_uri
ll_image = get_image_uri(boto3.Session().region_name, 'linear-learner')

Now we can fit the model.  Fitting the model has four main steps:

  1. Define the Amazon S3 location in which to store the model results.
  2. Create the Linear Learner Estimator.
  3. Set Estimator hyperparameters, including number of features, predictor type, and mini-batch size.
  4. Define a variable for the location of our transformed data (from step 6), and use it to train the Linear Learner model.
s3_ll_output_key_prefix = "ll_training_output"
s3_ll_output_location = 's3://{}/{}/{}/{}'.format(s3_bucket, prefix, s3_ll_output_key_prefix, 'll_model')

ll_estimator = sagemaker.estimator.Estimator(
    ll_image,
    role, 
    train_instance_count=1, 
    train_instance_type='ml.m4.2xlarge',
    train_volume_size = 20,
    train_max_run = 3600,
    input_mode= 'File',
    output_path=s3_ll_output_location,
    sagemaker_session=sagemaker_session)

ll_estimator.set_hyperparameters(feature_dim=10, predictor_type='regressor', mini_batch_size=32)

ll_train_data = sagemaker.session.s3_input(
    preprocessed_train, 
    distribution='FullyReplicated',
    content_type='text/csv', 
    s3_data_type='S3Prefix')

data_channels = {'train': ll_train_data}
ll_estimator.fit(inputs=data_channels, logs=True)

As with the previous training jobs, it will take a few minutes (up to 5) for the Estimator model to fit.

Step 8: Create inference pipeline

In step 5, we created an inference preprocessor that will take input data and preprocess our features.  We now combine this preprocessor with the Linear Learner model in step 7 to create an inference pipeline that processes the raw data and sends it to the prediction model for prediction.  Notice that setting up the pipeline is straightforward. After defining the models and assigning names, we simply create a “PipelineMode” that points to our preprocessing and prediction models.  We then deploy the pipeline model to a single endpoint:

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
linear_learner_model = ll_estimator.create_model()

model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model = PipelineModel(
    name=model_name, 
    role=role, 
    models=[
        scikit_learn_inferencee_model, 
        linear_learner_model])

sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

After the endpoint has been created, our pipeline is ready to use!

Step 9: Make a prediction using the inference pipeline

We can test our pipeline by sending data for prediction.  The pipeline will accept raw data, transform it using the preprocessor we create in steps 3 and 4, and create a prediction using the Linear Learner model we created in step 7.

First, we define a ‘payload’ variable that contains the data we want to send through the pipeline.  Then we define a predictor using our pipeline endpoint, send the payload to the predictor, and print the model prediction:

from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

payload = 'M, 0.44, 0.365, 0.125, 0.516, 0.2155, 0.114, 0.155'
actual_rings = 10

predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON)

print(predictor.predict(payload))

Our model predicts an age of 9.53 for the abalone defined in our payload. Notice that we sent raw data into our pipeline, which preprocessed this data before sending it to the linear model for scoring.

Step 10: Delete endpoint

After we are finished we can delete the endpoints used in this example:

sm_client = sagemaker_session.boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)

Conclusion

In this blog post, we built a ML pipeline that uses Amazon SageMaker and the built-in Scikit-learn library to process raw data. We trained a ML model on the processed data using the Amazon SageMaker built-in Linear Learner algorithm, and created predictions with the trained model.  This allows us to pass raw data to the pipeline and get model predictions in Amazon S3 without having to repeat the intermediary data processing steps every time!

Citations

Dua, D. and Karra Taniskidou, E. (2017). UCI Machine Learning Repository. Irvine, CA: University of California, School of Information and Computer Science.


About the Authors

Matt McKenna is a Data Scientist focused on machine learning in Amazon Alexa, and is passionate about applying statistics and machine learning methods to solve real world problems. In his spare time, Matt enjoys playing guitar, running, craft beer, and rooting for Boston sports teams.

 

 

 

Eric Kim is an engineer in the Algorithms & Platforms Group of Amazon AI Labs. He helps support the AWS service SageMaker, and has experience in machine learning research, development, and application. Outside of work, he is an avid music lover and a fan of all dogs.

 

 

 

Urvashi Chowdhary is a Senior Product Manager for Amazon SageMaker. She is passionate about working with customers and making machine learning more accessible. In her spare time, she loves sailing, paddle boarding, and kayaking.