亚马逊AWS官方博客

在 Amazon SageMaker Autopilot 推理管道中部署您的自有数据处理代码

Original URL:  https://thinkwithwp.com/cn/blogs/machine-learning/deploying-your-own-data-processing-code-in-an-amazon-sagemaker-autopilot-inference-pipeline/

机器学习(ML)模型的构建过程,要求数据科学家们手动组织数据特征、选择适当的算法并优化其模型参数,这些任务需要投入大量精力并辅以艰深的专业知识。 Amazon SageMaker Autopilot 消除了机器学习模型构建过程中的种种繁重工作,它能够检查您的数据集、生成多条机器学习管道,比较不同管道的推理性能并推荐备选管道。每条候选管道都将数据预处理步骤、机器学习算法以及超参数优化整合起来,可供您轻松加以部署,借此快速实现实时预测或批量预测。

但是,如果我们希望在调用Amazon SageMaker Autopilot之前对数据进行预处理,又该如何操作?例如,大家可能拥有一套包含多项特征的数据集,并需要使用自定义特征选择方法以剔除其中的非相关变量,而后再使用这部分数据训练Autopilot作业中的模型。接下来,您还打算将这些自定义处理代码部署到实时端点或者批量处理管道当中。在本文中,我们将向大家展示如何使用自有数据处理代码建立自定义Autopilot推理管道,文中涉及的全部代码皆可通过此GitHub repo获取。

解决方案概述

使用自定义特征选择和Autopilot模型的自定义管道解决方案包括以下步骤:

  1. 准备一套包含100项特征的数据集,作为本文中的示例数据集,并将其上传至 Amazon Simple Storage Service (Amazon S3)。
  2. 训练特征选择模型,并使用 sagemaker-scikit-learn-container为Autopilot准备数据集。
  3. 配置并启动Autopilot作业。
  4. 创建一条推理管道,将特征选择与Autopilot模型整合起来。
  5. 使用推理管道生成预测结果。

下图所示,为这一工作流的基本架构。

准备并上传数据集

首先,我们使用sklearn.datasets.make_regression生成一套回归数据集。这里将特征数量设置为100,其中5项特征提供了有用的信息 。将这100个变量的名称索引为 x_i,目标变量命名为 y

X, y = make_regression(n_features = 100, n_samples = 1500, n_informative = 5, random_state=0)
df_X = pd.DataFrame(X).rename(columns=lambda x: 'x_'+ str(x))
df_y = pd.DataFrame(y).rename(columns=lambda x: 'y')
df = pd.concat([df_X, df_y], axis=1)

 

下面的列表显示了生成的数据。您可以将这套数据集上传至Amazon S3以供后续步骤使用。

训练特征选择模型并准备数据集

特征选择,是指为机器学习模型训练作业选择相关度最高的特征子集的过程。特征选择能够简化训练过程、缩短训练时长,并降低发生过度拟合问题的可能性。sklearn.feature_selection模块当中包含多种特征选择算法。在本文中,我们使用以下算法:

  • feature_selection.RFE – 递归特征消除(RFE)算法,通过递归方式对特征集进行逐步缩减,由此实现特征选择。我们首先需要在初始特征集上训练估计器,并提取每项特征的重要性指标。接下来,从当前特征集中删除重要性最低的特征。我们使用Epsilon支持向量回归(sklearn.svm.SVR)作为RFE的学习估计器。
  • feature_selection.SelectKBest – SelectKBest 算法能够选择特定指标中得分最高的k个特征。我们使用mutual information 与 f regression作为评分函数——这两种方法能够衡量不同变量之间的依赖性。关于mutual information 与 f regression的更多详细信息,请参阅特征选择

我们将这三种特征选择算法堆叠到同一sklearn.pipeline.Pipeline当中。在默认情况下,RFE能够将特征数量减少50%。我们使用SelectKBest通过 f_regression 方法选取前30项特征,而后使用mutual_info_regression方法进一步将特征数量降低至10项。请注意,此处使用的特征选择算法仅供演示。您可以更新脚本以并入您希望使用的其他特征选择算法。

我们还创建一用于特征选择的Python脚本。在以下示例代码中,我们构建了一个sklearn管道对象,用于实现我们描述的方法:

'''Feature selection pipeline'''
feature_selection_pipe = pipe = Pipeline([
                 ('svr', RFE(SVR(kernel="linear"))),
                 ('f_reg',SelectKBest(f_regression, k=30)),
                ('mut_info',SelectKBest(mutual_info_regression, k=10))
                ])
feature_selection_pipe.fit(X_train,y_train)

 

为了更清晰的展示过程中选定的特征,我们使用以下脚本提取所选定特征的名称,并将其保存为列表形式:

  '''Save selected feature names'''
    feature_names = concat_data.columns[:-1]
    feature_names = feature_names[pipe.named_steps['svr'].get_support()]
    feature_names = feature_names[pipe.named_steps['f_reg'].get_support()]
    feature_names = feature_names[pipe.named_steps['mut_info'].get_support()]

 

我们使用带有特征选择脚本的Amazon SageMaker SKLearn Estimator作为入口点。该脚本与大家在Amazon SageMaker外部运行的训练脚本非常相似,您可以通过各种环境变量(例如SM_MODEL_DIR)访问训练环境的各项相关属性,其中代表容器内用于写入模型工件的目录路径。这些工件将通过Amazon SageMaker训练作业被上传至Amazon S3输出路径。训练完成之后,我们保存模型工件并将推理过程中使用的列名称选定为SM_MODEL_DIR。详见以下代码:

    joblib.dump(feature_selection_pipe, os.path.join(args.model_dir, "model.joblib"))
    ...
    joblib.dump(feature_selection_pipe, os.path.join(args.model_dir, "selected_feature_names.joblib"))

 

本文示例使用特征选择算法,但大家也可以使用其他自定义算法并向此入口点脚本添加其他数据预处理代码,例如数据插补或其他形式的数据清洗代码。

现在,我们的特征选择模型已经正确完成拟合,接下来将原始输入数据转换为选定特征的训练数据集。要使用Amazon SageMaker对原始数据进行批量转换,并将结果存储回Amazon zS3,请输入以下代码:

# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer_output = os.path.join('s3://',bucket, prefix, 'Feature_selection_output/')
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.m4.xlarge',
    output_path = transformer_output,
    assemble_with = 'Line',
    accept = 'text/csv')
    
transformer.transform(train_input, content_type='text/csv') 

 

此notebook当中包含一个附加步骤,用于将选定的列名称作为标题添加到所生成的CSV数据文件当中。

配置并启动Autopilot作业

批量转换的输出结果,即为Autopilot所使用的全新训练数据集。这套新数据集包含10项特征。要使用Autopilot,我们只需提供新数据集并将目标列选择为y即可。Autopilot会自动检查我们的数据集并运行多个候选对象,以确保数据预处理的具体步骤、机器学习算法与超参数最佳组合。在启动Autopilot作业之前,我们需要首先定义作业的输入配置、输出配置与停止条件:

input_data_config = [{
      'DataSource': {
        'S3DataSource': {
          'S3DataType': 'S3Prefix',
          'S3Uri': 's3://{}/{}/training_data_new'.format(bucket,prefix)
        }
      },
      'TargetAttributeName': 'y'
    }
  ]

output_data_config = {
    'S3OutputPath': 's3://{}/{}/autopilot_job_output'.format(bucket,prefix)
  }

AutoML_Job_Config = {
    'CompletionCriteria': {
            'MaxCandidates': 50,
            'MaxAutoMLJobRuntimeInSeconds': 1800
        }
  }

 

完成之后,调用create_auto_ml_job API以启动Autopilot作业:

ssm = boto3.Session().client(service_name='sagemaker',region_name=region)
timestamp_suffix = strftime('%d-%H-%M-%S', gmtime())

auto_ml_job_name = 'automl-blog' + timestamp_suffix
print('AutoMLJobName: ' + auto_ml_job_name)

sm.create_auto_ml_job(AutoMLJobName=auto_ml_job_name,
                      InputDataConfig=input_data_config,
                      OutputDataConfig=output_data_config,
                      AutoMLJobConfig = AutoML_Job_Config,
                      RoleArn=role)

 

创建推理管道,将特征选择与Autopilot模型加以整合

到这里,我们已经创建出一套模型,其使用包含100项特征的原始数据集并选择其中相关度最高的10项特征。我们还使用Autopilot创建数据处理管道与机器学习模型,用以预测出y。现在,我们需要将特征选择模型与Autopilot模型相结合,借此创建出推理管道。在完成模型定义并为其分配名称之后,我们即可创建指向预处理与预测模型的PipelineModel。大家可以在GitHub上获取pipeline.py文件,具体参见以下代码:

sklearn_image = sklearn_preprocessor.image_name
container_1_source = os.path.join("s3://", 
                                  sagemaker_session.default_bucket(), 
                                  sklearn_preprocessor.latest_training_job.job_name,
                                  "sourcedir.tar.gz"
                                 )
inference_containers = [
        {
            'Image': sklearn_image,
            'ModelDataUrl': sklearn_preprocessor.model_data,
            'Environment': {
                'SAGEMAKER_SUBMIT_DIRECTORY':container_1_source,
                'SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT': "text/csv",
                'SAGEMAKER_PROGRAM':'sklearn_feature_selection.py'
            }
        }]

inference_containers.extend(best_candidate['InferenceContainers'])

response = sagemaker.create_model(
        ModelName=pipeline_name,
        Containers=inference_containers,
        ExecutionRoleArn=role)

 

接下来,我们将这套管道模型部署至单一端点:

response = sagemaker.create_endpoint(
        EndpointName=pipeline_endpoint_name,
        EndpointConfigName=pipeline_endpoint_config_name,
    )

 

使用推理管道进行预测

我们可以将数据发送至推理管道,借此测试其预测性能。管道接收原始数据,使用特征选择模型对其进行转换,并使用自动生成的模型创建预测结果。

首先,我们定义一项payload变量,其中包含我们希望通过管道发送的数据。我们使用训练数据中的前五千作为payload。接下来,我们使用管道端点定义预测器,将payload发送至预测器,并输出模型预测结果:

from sagemaker.predictor import RealTimePredictor, csv_serializer
from sagemaker.content_types import CONTENT_TYPE_CSV
predictor = RealTimePredictor(
    endpoint=pipeline_endpoint_name,
    serializer=csv_serializer,
    sagemaker_session=sagemaker_session,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_CSV)

predictor.content_type = 'text/csv'
predictor.predict(test_data.to_csv(sep=',', header=True, index=False)).decode('utf-8')

 

我们的Amazon SageMaker端点将针对所发送数据的每一对应行返回一项预测,具体参见以下代码:

'-102.248855591\n-165.823532104\n115.50453186\n111.306632996\n5.91651535034'

删除端点

在端点运行完成之后,请注意将其删除以节约运营成本:

sm_client = sagemaker_session.boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=pipeline_endpoint_name)

 

总结

在本文中,我们演示了如何使用您自己的数据处理代码构建起自定义Autopilot推理管道。我们首先训练出特征选择模型,而后使用经过训练的特征选择模型对原始数据进行转换。接下来,我们启动Amazon SageMaker Autopilot作业,针对我们的回归问题自动训练并优化出最佳机器学习模型。我们还构建起一套将特征选择与Autopilot模型加以结合的推理管道。最后,我们使用推理管道获取预测结果。关于Amazon SageMaker Autopilot的更多细节信息,请参阅Amazon SageMaker Autopilot

本篇作者

Qingwei Li

Amazon Web Services机器学习专家。在超出研究补助预算却未能成功拿下预想中的诺贝尔奖之后,他开始转向运筹学领域。目前,他帮助金融服务与保险业客户在AWS上构建机器学习解决方案。在业余时间,他喜欢阅读和教学。

Piali Das

AWS SageMaker Autopilot团队高级软件工程师。她曾为SageMaker的算法构建做出贡献。她热爱科学编程,并对机器学习与分布式系统抱有浓厚兴趣。