AWS Machine Learning Blog

Analyzing open-source ML pipeline models in real time using Amazon SageMaker Debugger

Open-source workflow managers are popular because they make it easy to orchestrate machine learning (ML) jobs for productions. Taking models into productions following a GitOps pattern is best managed by a container-friendly workflow manager, also known as MLOps. Kubeflow Pipelines (KFP) is one of the Kubernetes-based workflow managers used today. However, it doesn’t provide all the functionality you need for a best-in-class data science and ML engineer experience. A common issue when developing ML models is having access to the tensor-level metadata of how the job is performing. For extremely large models such as for natural language processing (NLP) and computer vision (CV), this can be critical to avoid wasted GPU resources. However, most training frameworks become a black box after starting to train a model.

Amazon SageMaker is a managed ML platform from AWS to build, train, and deploy ML models at scale. SageMaker Components for Kubeflow Pipelines offer the flexibility to run steps of your KFP workflows on SageMaker instead of on your Kubernetes cluster, which provides the extra capabilities of SageMaker to develop high-quality models. SageMaker Debugger offers the capability to debug ML models during training by identifying and detecting problems with the models in near-real time. This feature can be used when training models within Kubeflow Pipelines through the SageMaker Training component. When combined, you can ensure that if your training jobs aren’t continuously improving with decreasing loss rate, the job ends early, thereby saving both cost and time.

SageMaker Debugger allows you to capture and analyze the state from training with minimal code changes. The state is composed of the following:

  • The parameters being learned by the model, such as weights and biases for neural networks
  • The changes applied to these parameters by the optimizer, called gradients
  • The optimization parameters themselves
  • Scalar values, such as accuracies and losses
  • The output of each layer

The monitoring of these states is done through rules. SageMaker includes a variety of predefined rules, and you can also make custom rules using Python. For more information, see Amazon SageMaker Debugger – Debug Your Machine Learning Models.

In this post, we go over how to deploy a simple pipeline featuring a training component that has a debugger enabled.

Using SageMaker Debugger for Kubeflow Pipelines with XGBoost

This post demonstrates how adding additional parameters to configure the debugger component can allow us to easily find issues within a model. We train a gradient-boosting model on the Modified National Institute of Standards and Technology (MNIST) dataset using Kubeflow Pipelines. The MNIST dataset contains images of handwritten digits from 0–9 and is a popular ML problem. The MNIST dataset contains 60,000 training images and 10,000 test images.

This post walks through the following steps:

  • Generating your data
  • Cloning the sample repository
  • Creating the training pipeline
  • Adding debugger parameters
  • Compiling the pipeline
  • Deploying the training pipeline through Kubeflow Pipelines
  • Reading the debugger output

Prerequisites

To run the example in this post, you need the following prerequisites:

You can run this example from any instance that has Python installed and access to the Kubernetes cluster where Kubeflow pipelines is installed.

Generating your training data

This post uses a SageMaker prebuilt container to train an XGBoost model on the MNIST dataset. We include a Python file that uploads the MNIST dataset to an S3 bucket in the format that the XGBoost prebuilt container expects.

  1. Create an S3 bucket. This post uses the us-east-1 Region.
  2. Create a new file named s3_dsample_data_creator.py with the following code:
    import pickle, gzip, numpy, urllib.request, json
    from urllib.parse import urlparse
    
    ###################################################################
    # This is the only thing that you need to change to run this code 
    # Give the name of your S3 bucket 
    bucket = '<bucket-name>' 
    
    # If you are going to use the default values of the pipeline then 
    # give a bucket name which is in us-east-1 region 
    ###################################################################
    
    
    # Load the dataset
    urllib.request.urlretrieve("http://deeplearning.net/data/mnist/mnist.pkl.gz", "mnist.pkl.gz")
    with gzip.open('mnist.pkl.gz', 'rb') as f:
        train_set, valid_set, test_set = pickle.load(f, encoding='latin1')
    
    
    # Upload dataset to S3
    from sagemaker.amazon.common import write_numpy_to_dense_tensor
    import io
    import boto3
    
    train_data_key = 'mnist_kmeans_example/train_data'
    test_data_key = 'mnist_kmeans_example/test_data'
    train_data_location = 's3://{}/{}'.format(bucket, train_data_key)
    test_data_location = 's3://{}/{}'.format(bucket, test_data_key)
    print('training data will be uploaded to: {}'.format(train_data_location))
    print('training data will be uploaded to: {}'.format(test_data_location))
    
    # Convert the training data into the format required by 
    # the SageMaker XGBoost algorithm
    buf = io.BytesIO()
    write_numpy_to_dense_tensor(buf, train_set[0], train_set[1])
    buf.seek(0)
    
    boto3.resource('s3').Bucket(bucket).Object(train_data_key).upload_fileobj(buf)
    
    # Convert the test data into the format required by XGBoost algorithm
    write_numpy_to_dense_tensor(buf, test_set[0], test_set[1])
    buf.seek(0)
    
    boto3.resource('s3').Bucket(bucket).Object(test_data_key).upload_fileobj(buf)
    
    # Convert the valid data into the format required by XGBoost algorithm
    numpy.savetxt('valid-data.csv', valid_set[0], delimiter=',', fmt='%g')
    s3_client = boto3.client('s3')
    input_key = "{}/valid_data.csv".format("mnist_kmeans_example/input")
    s3_client.upload_file('valid-data.csv', bucket, input_key)
    
  1. Replace <bucket-name> with the name of the bucket you created.

This script requires you to install Python3, boto3, and NumPy.

  1. Run this script by using python3 s3_sample_data_creator.py.
  2. Verify that the data was successfully uploaded.

In your S3 bucket, you should now see a folder called mnist_kmeans_example, and under input, there should be a CSV file named valid-data.

Cloning the sample repository

In a terminal window, clone the Kubeflow pipelines repository and navigate to the directory with the sample code:

git clone https://github.com/kubeflow/pipelines
cd pipelines/samples/contrib/aws-samples/sagemaker_debugger_demo

We now go over how to create the training pipeline debugger-component-demo.py. This folder contains what the final pipeline should be.

Creating a training pipeline

Create a debugger-component-demo.py Python file as our training pipeline. The pipeline specified has poor hyperparameters and results in a poor model. It doesn’t yet have a debugger configured, but can still be compiled and submitted as a training job, and outputs a model.

See the following code:

#!/usr/bin/env python3

import kfp
import json
import os
import copy
from kfp import components
from kfp import dsl


cur_file_dir = os.path.dirname(__file__)
components_dir = os.path.join(cur_file_dir, '../../../../components/aws/sagemaker/')

sagemaker_train_op = components.load_component_from_file(components_dir + '/train/component.yaml')

def training_input(input_name, s3_uri, content_type):
    return {
        "ChannelName": input_name,
        "DataSource": {"S3DataSource": {"S3Uri": s3_uri, "S3DataType": "S3Prefix"}},
        "ContentType": content_type
    }

bad_hyperparameters = {
    'max_depth': '5',
    'eta': '0',
    'gamma': '4',
    'min_child_weight': '6',
    'silent': '0',
    'subsample': '0.7',
    'num_round': '50'
}


@dsl.pipeline(
    name='XGBoost Training Pipeline with bad hyperparameters',
    description='SageMaker training job test with debugger'
)
def training(role_arn="", bucket_name="my-bucket"):
    train_channels = [
        training_input("train", f"s3://{bucket_name}/mnist_kmeans_example/input/valid_data.csv", 'text/csv')
    ]

    training = sagemaker_train_op(
        region='us-east-1',
        # Refer this link for xgboost Registry URLs: https://docs.thinkwithwp.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html
        image='683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-2-cpu-py3',
        hyperparameters=bad_hyperparameters,
        channels=train_channels,
        instance_type='ml.m5.2xlarge',
        model_artifact_path=f's3://{bucket_name}/mnist_kmeans_example/output/model',
        role=role_arn,
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(training, __file__ + '.zip')

Adding debugger parameters

To enable SageMaker Debugger in your training jobs, you need to define the additional parameters to configure the debugger.

First, use debug_hook_config to select the tensor groups you want to collect for analysis and specify the frequency at which you want to save them. debug_hook_config takes in two parameters:

  • S3OutputPath – Points to the Amazon S3 URI where we intend to store our debugging tensors. SageMaker takes care of uploading these tensors transparently during the run.
  • CollectionConfigurations – Enumerates named collections of tensors we want to save. Collections are a convenient way to organize relevant tensors under same umbrella to make it easy to navigate them during analysis. In this particular example, one of the collections we instruct SageMaker Debugger to save is named metrics. We also instruct SageMaker Debugger to save metrics every three iterations.
    # Collections of tensors we want to save
    collections = {
        'feature_importance' : {
            'save_interval': '5'
        },
        'losses' : {
            'save_interval': '10'
        },
        'average_shap': {
            'save_interval': '5'
        },
        'metrics': {
            'save_interval': '3'
        }
    }
    
    
    # Helper method to format CollectionConfigurations
    def format_collection_config(collection_dict):
        output = []
        for key, val in collection_dict.items():
            output.append({'CollectionName': key, 'CollectionParameters': val})
        return output
    
    
    # Helper method to format debug_hook_config
    def training_debug_hook(s3_uri, collection_dict):
        return {
            'S3OutputPath': s3_uri,
            'CollectionConfigurations': format_collection_config(collection_dict)
        }
        
        
    # Provide the debug_hook_config input to the pipeline
    @dsl.pipeline(...)
    def training(role_arn="", bucket_name="my-bucket"):
        ...
        # debug_hook_config containing S3OutputPath and collections to be saved
        training = sagemaker_train_op(
            debug_hook_config=training_debug_hook(f's3://{bucket_name}/mnist_kmeans_example/hook_config', collections),

We also need to specify what rules we want to activate for automatic analysis using debug_rules_config. In this example, we use two SageMaker built-in rules: OverTraining and LossNotDecreasing. As the names suggest, the rules attempt to evaluate if the loss is not decreasing in the tensors captured by the debugging hook during training and also if the model is being over-trained (validation loss should not increase). See the following code:

# Helper method to format debug_rules_config
def training_debug_rules(rule_name, parameters):
    return {
        'RuleConfigurationName': rule_name,
        # Refer this link for Debugger Registry URLs: https://docs.thinkwithwp.com/sagemaker/latest/dg/debugger-docker-images-rules.html
        'RuleEvaluatorImage': '503895931360.dkr.ecr.us-east-1.amazonaws.com/sagemaker-debugger-rules:latest',
        'RuleParameters': parameters
    }

# Provide the debug_rule_config input to the pipeline
@dsl.pipeline(...)
def training(role_arn="", bucket_name="my-bucket"):
    ...
    # Rules and rule parameters
    train_debug_rules = [
        training_debug_rules("LossNotDecreasing", {"rule_to_invoke": "LossNotDecreasing", "tensor_regex": ".*"}),
        training_debug_rules("Overtraining", {'rule_to_invoke': 'Overtraining', 'patience_train': '10', 'patience_validation': '20'}),
    ]
    training = sagemaker_train_op(
        # Provide the debug_rule_config as input to the pipeline
        debug_rule_config=train_debug_rules,
        ...
    )

For more information about SageMaker rules and the configurations best suited for using them, see Amazon SageMaker Debugger RulesConfig.

The following code shows what the pipeline looks like after configuring the debug hook and rules:

#!/usr/bin/env python3

import kfp
import json
import os
import copy
from kfp import components
from kfp import dsl


cur_file_dir = os.path.dirname(__file__)
components_dir = os.path.join(cur_file_dir, '../../../../components/aws/sagemaker/')

sagemaker_train_op = components.load_component_from_file(components_dir + '/train/component.yaml')

def training_input(input_name, s3_uri, content_type):
    return {
        "ChannelName": input_name,
        "DataSource": {"S3DataSource": {"S3Uri": s3_uri, "S3DataType": "S3Prefix"}},
        "ContentType": content_type
    }


def training_debug_hook(s3_uri, collection_dict):
    return {
        'S3OutputPath': s3_uri,
        'CollectionConfigurations': format_collection_config(collection_dict)
    }


def training_debug_rules(rule_name, parameters):
    return {
        'RuleConfigurationName': rule_name,
        # Refer this link for Debugger Registry URLs: https://docs.thinkwithwp.com/sagemaker/latest/dg/debugger-docker-images-rules.html
        'RuleEvaluatorImage': '503895931360.dkr.ecr.us-east-1.amazonaws.com/sagemaker-debugger-rules:latest',
        'RuleParameters': parameters
    }


def format_collection_config(collection_dict):
    output = []
    for key, val in collection_dict.items():
        output.append({'CollectionName': key, 'CollectionParameters': val})
    return output


collections = {
    'feature_importance' : {
        'save_interval': '5'
    },
    'losses' : {
        'save_interval': '10'
    },
    'average_shap': {
        'save_interval': '5'
    },
    'metrics': {
        'save_interval': '3'
    }
}


bad_hyperparameters = {
    'max_depth': '5',
    'eta': '0',
    'gamma': '4',
    'min_child_weight': '6',
    'silent': '0',
    'subsample': '0.7',
    'num_round': '50'
}


@dsl.pipeline(
    name='XGBoost Training Pipeline with bad hyperparameters',
    description='SageMaker training job test with debugger'
)
def training(role_arn="", bucket_name="my-bucket"):
    train_channels = [
        training_input("train", f"s3://{bucket_name}/mnist_kmeans_example/input/valid_data.csv", 'text/csv')
    ]
    train_debug_rules = [
        training_debug_rules("LossNotDecreasing", {"rule_to_invoke": "LossNotDecreasing", "tensor_regex": ".*"}),
        training_debug_rules("Overtraining", {'rule_to_invoke': 'Overtraining', 'patience_train': '10', 'patience_validation': '20'}),
    ]
    training = sagemaker_train_op(
        region='us-east-1',
        # Refer this link for xgboost Registry URLs: https://docs.thinkwithwp.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html
        image='683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-2-cpu-py3',
        hyperparameters=bad_hyperparameters,
        channels=train_channels,
        instance_type='ml.m5.2xlarge',
        model_artifact_path=f's3://{bucket_name}/mnist_kmeans_example/output/model',
        debug_hook_config=training_debug_hook(f's3://{bucket_name}/mnist_kmeans_example/hook_config', collections),
        debug_rule_config=train_debug_rules,
        role=role_arn,
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(training, __file__ + '.zip')

Compiling the pipeline

Our pipeline is now complete and ready to be compiled using the following command:

dsl-compile --py debugger-component-demo.py --output debugger-component-demo.tar.gz

This creates debugger-component-demo.tar.gz in the same folder, and is the file we upload as our training job.

Deploying the pipeline

Now use kubectl to open up the KFP UI on our browser so we have access to the interface where we can upload the pipeline.

  1. In a new terminal window, run the following command (it’s possible to create pipelines and submit training jobs from the AWS Command Line Interface (AWS CLI)):
    $ kubectl port-forward -n kubeflow service/ml-pipeline-ui 8080:80
  1. Access the KFP UI by searching http://localhost:8080/ in your browser.
  2. Create a new pipeline and upload the compiled specification (.tar.gz file) as a new pipeline template.
  3. Provide the role_arn and bucket_name you created as pipeline inputs.

Reading the debugger output

When the training is complete, the logs display the status of each debugger rule.

The following screenshot shows an example of what the status of each debugger rule should be when the training job is complete.

The following screenshot shows an example of what the status of each debugger rule should be when the training job is complete.

We see here that our debugger rules haven’t found any issues with the model being overtrained. However, the debug rules indicate that our loss isn’t decreasing over time as it should.

The following screenshot shows the Amazon CloudWatch Logs, also printed on the Logs tab, which indeed show that the train-rmse is staying steady at 0.5 and isn’t decreasing.

The following screenshot shows the Amazon CloudWatch logs, also printed on the Logs tab, which indeed show that the train-rmse is staying steady at 0.5 and isn’t decreasing.

The reason that our loss isn’t decreasing is because our hyperparameters have been initialized suboptimally, specifically eta, which has been set to a poor value. eta determines the model’s learning rate and is currently at 0. This is clearly erroneous because it means that the subsequent steps aren’t progressing from the initial step. To address, this, use a non-zero learning rate, for example, set eta in hyperparameters to 0.2. You can see that the LossNotDecreasing rule is not triggered as train-rmse keeps decreasing steadily throughout the entire training duration. Rerunning the pipeline with the fix results in a model with no issues found.

Conclusion

Model debugging tools are critical to reduce total time, cost, and resources spent on creating a model. Using SageMaker Debugger in your Kubeflow Pipelines lets you go beyond just looking at scalars like losses and accuracies during training. You can get full visibility into all tensors flowing through the graph during training. Furthermore, it helps you monitor your training in near-real time using rules, and provides alerts if it detects an inconsistency in the training flow, which ultimately reduces costs and improves your company’s effectiveness on ML.

To get started using Kubeflow Pipelines with SageMaker, see the GitHub repo. You can also explore our native integration of SageMaker Operators for Kubernetes for MLOps.


About the Authors

Alex Chung is a Senior Product Manager with AWS in Deep Learning. His role is to make AWS Deep Learning products more accessible and cater to a wider audience. He’s passionate about social impact and technology, getting his regular gym workout, and cooking healthy meals.

 

 

Suraj Kota is a Software Engineer specialized in Machine Learning infrastructure. He builds tools to easily get started and scale machine learning workload on AWS. He worked on the Amazon Deep Learning Containers, Deep Learning AMI, SageMaker Operators for Kubernetes, and other open source integrations like Kubeflow.

 

 

Dustin Luong is a Software Development Engineering Intern with AWS in Deep Engines. He works on developing SageMaker integrations with open source platforms like Kubernetes and Kubeflow Pipelines. He’s currently a student at UC Berkeley and in his spare time he enjoys playing basketball, hiking, and playing board games.