概述
在本教程中,您将学习如何通过 AWS Trainium 实例使用 Amazon SageMaker 训练一个机器学习 (ML) 模型。由 AWS Trainium 加速器提供支持的 Amazon EC2 Trn1 实例专为高性能的深度学习 (DL) 训练构建,与同类 Amazon EC2 实例相比,Amazon EC2 Trn1 实例最多可节省 50% 的训练成本。Amazon SageMaker 是一项全托管服务,可以为每个开发人员和数据科学家提供大规模构建、训练和部署机器学习模型的能力。
在本教程中,您将使用 Amazon SageMaker Studio 这个提供全托管 Jupyter 笔记本界面的机器学习集成开发环境 (IDE),通过 AWS Trainium 构建和运行训练作业。我们将使用 IMDB 数据构建一个基于 BERT 的情感分析模型。BERT 是一个在大型英文数据语料库上进行自监督预训练的 Transformer 模型。该模型的主要目的是对使用整个语句(可能做了掩码处理)做决策的任务进行微调,例如序列分类、令牌分类和问答。该数据集包含 IMDB 的评论,这些评论可能是正面评论,也可能是负面评论。这次练习的目标是构建一个模型,用于预测给定的评论是正面评论还是负面评论。
要完成的目标
在本指南中,您将:
- 使用 AWS Trainium (Trn1) 实例上的 Amazon SageMaker Training 训练一个用于文本分类的 BERT 模型
Prerequisites
开始本指南之前,您需要先满足以下条件:
- AWS 账户:如果您还没有 AWS 账户,请遵循设置 AWS 环境指南中的说明获取快速概览。
AWS 使用经验
新手
完成时间
15 分钟
所需费用
请参考 Amazon SageMaker 定价来估算本教程所需成本。
前提条件
您必须登录 AWS 账户。
使用的服务
Amazon SageMaker Training
上次更新时间
2023 年 5 月 2 日
步骤 1:创建 AWS 账户
AWS 账户在每个 AWS 区域中只能拥有一个 SageMaker Studio 域。如果您已经拥有一个美国东部(弗吉尼亚州北部)区域的 SageMaker Studio 域,请按照 SageMaker Studio 设置指南将所需的 AWS IAM 策略附加到您的 SageMaker Studio 账户,然后跳过步骤 1,直接执行步骤 2 以设置 SageMaker Studio 笔记本。
如果没有现有的 SageMaker Studio 域,请继续执行步骤 1,运行 AWS CloudFormation 模板,创建 SageMaker Studio 域并添加本教程后续步骤所需的权限。
选择 AWS CloudFormation 堆栈链接。您将通过此链接打开 AWS CloudFormation 控制台,并创建 SageMaker Studio 域和名为 studio-user 的用户。您还将为您的 SageMaker Studio 账户添加所需的权限。在 CloudFormation 控制台上,确认右上角显示的区域是美国东部(弗吉尼亚州北部)。堆栈名称应为 CFN-SM-IM-Lambda-catalog,不应更改。系统需要 10 分钟左右来创建此堆栈的所有资源。
此堆栈假定您已在账户中设置了公共 VPC。如果没有公共 VPC,请参阅使用单个公有子网的 VPC 了解如何创建公共 VPC。
勾选“I acknowledge that AWS CloudFormation might create IAM resources”(我了解 AWS CloudFormation 可能会创建 IAM 资源),然后点击 Create stack(创建堆栈)。
在 CloudFormation 窗格中,选择 Stacks(堆栈)。创建堆栈后,堆栈的状态应从 CREATE_IN_PROGRESS 更改为 CREATE_COMPLETE。
步骤 2:设置 SageMaker Studio 笔记本
在此步骤中,您将启动一个新的 SageMaker Studio 笔记本、安装必要的开源库并设置与其他服务(包括 Amazon Simple Storage Service 即 Amazon S3)交互所需的 SageMaker 变量。
在控制台搜索栏中输入 SageMaker Studio,然后选择 SageMaker Studio。
从 SageMaker 控制台右上角的区域下拉菜单中选择 US East (N. Virginia)(美国东部(弗吉尼亚州北部))。从 Launch app(启动应用程序)下拉菜单中选择 Studio,可使用 studio-user 配置文件打开 SageMaker Studio。
打开 SageMaker Studio 用户界面。在导航栏中,依次选择 File(文件)> New(新建)> Notebook(笔记本)。
在 Set up notebook environment(设置笔记本环境)对话框中,在 Image(镜像)下选择 Data Science 2.0。自动选择的内核是 Python 3。点击 Select(选择)。
这时笔记本右上角的内核应当显示 Python 3 (Data Science 2.0)。
步骤 3:准备数据
在此步骤中,您将使用 Amazon SageMaker Studio 笔记本预处理训练机器学习模型所需的数据,然后将数据上传到 Amazon S3。
我们将使用 Hugging Face 的 IMDB 数据集来训练模型。为了下载数据集,我们需要安装 datasets 和 transformers 库。要安装这些库的特定版本,请复制以下代码段并将其粘贴到笔记本的单元格中,然后按下 Shift+Enter 运行当前单元格。请忽略任何重启内核的警告和依赖冲突错误。
!pip install transformers==4.21.3 datasets==2.5.2
将下列导入代码添加到笔记本的单元格中
from datasets import load_dataset
from datasets.filesystems import S3FileSystem
from tqdm.auto import tqdm
现在,我们可以使用 load_dataset 函数从 Hugging Face 数据集下载数据。我们将把数据集分为训练集和测试集。复制以下代码段并将其粘贴到笔记本的单元格中。
dataset = load_dataset("imdb",split="train",ignore_verifications=True)
dataset = dataset.train_test_split()
最后,我们将把数据集上传到 Amazon S3。 您可以指定要使用的 S3 存储桶,如果没有指定,代码将使用默认存储桶。复制以下代码并将其粘贴到笔记本的单元格中,然后执行代码。单元格输出应打印角色、存储桶和区域。
import sagemaker
from sagemaker.pytorch import PyTorch
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
# set to default bucket if a bucket name is not given
sagemaker_session_bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")
我们现在有一个存储桶,我们将使用 Hugging Face 数据集 API 将数据集上传到 S3。复制并粘贴以下代码,将数据集上传到 S3。
s3 = S3FileSystem()
s3_prefix = 'HFDatasets/imdb'
# save train_dataset to s3
training_input_path = f's3://{sagemaker_session_bucket}/{s3_prefix}'
dataset.save_to_disk(training_input_path,fs=s3)
步骤 4:构建训练脚本
借助 SageMaker,您可以在用于训练的 Python 脚本中引入自己的逻辑。通过在脚本中封装训练逻辑,您可以在使用 PyTorch 等常用机器学习框架容器的同时,纳入自定义训练例程和模型配置。在本教程中,您将准备一个训练脚本,该脚本使用 Hugging Face Transformers 库中的 BERT Transformer 模型,借助该脚本,您可以使用在上一步中上传的 IMDB 数据来训练文本分类模型。
脚本模式的第一级功能是在独立的自定义 Python 脚本中定义自己的训练过程,并将其作为定义 SageMaker 估算器时的入口。复制并粘贴以下代码块,以编写封装模型训练逻辑的 Python 脚本。
%%writefile train.py
import argparse
import os
import torch
import torch_xla.core.xla_model as xm
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_backend
from datasets import load_from_disk
from torch.optim import AdamW
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm.auto import tqdm
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from datasets import load_from_disk
# Initialize XLA process group for torchrun
import torch_xla.distributed.xla_backend
import random
import evaluate
device = "xla"
torch.distributed.init_process_group(device)
world_size = xm.xrt_world_size()
def parse_args():
parser = argparse.ArgumentParser(description="Finetune a transformers model on a text classification task")
parser.add_argument(
"--train_data", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
parser.add_argument(
"--max_length",type=int,default=128)
parser.add_argument(
"--model_name_or_path",
type=str,
help="Path to pretrained model or model identifier from huggingface.co/models.",
required=True,
)
parser.add_argument(
"--per_device_train_batch_size",
type=int,
default=8,
help="Batch size (per device) for the training dataloader.",
)
parser.add_argument(
"--per_device_eval_batch_size",
type=int,
default=8,
help="Batch size (per device) for the evaluation dataloader.",
)
parser.add_argument(
"--learning_rate",
type=float,
default=5e-5,
help="Initial learning rate (after the potential warmup period) to use.",
)
parser.add_argument("--num_train_epochs", type=int, default=2, help="Total number of training epochs to perform.")
parser.add_argument(
"--max_train_steps",
type=int,
default=2000,
help="Total number of training steps to perform. If provided, overrides num_train_epochs.",
)
parser.add_argument("--output_dir", type=str, default=os.environ["SM_MODEL_DIR"], help="Where to store the final model.")
parser.add_argument("--seed", type=int, default=100, help="A seed for reproducible training.")
args = parser.parse_args()
# Sanity checks
if args.train_data is None:
raise ValueError("Need a training file.")
args.local_rank = int(os.environ["LOCAL_RANK"])
args.world_rank = int(os.environ["RANK"])
args.world_size = int(os.environ["WORLD_SIZE"])
print("Local rank {} , World Rank {} , World Size {}".format(args.local_rank,args.world_rank,args.world_size))
return args
def gather(tensor, name="gather tensor"):
return xm.mesh_reduce(name, tensor, torch.cat)
def main():
# Retrieve args passed to the training script
args = parse_args()
dataset = load_from_disk(args.train_data)
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path)
# tokenizer helper function
def tokenize(batch):
return tokenizer(batch['text'], max_length=args.max_length, padding='max_length', truncation=True)
# load dataset
train_dataset = dataset['train'].shuffle()
eval_dataset = dataset['test'].shuffle()
# tokenize dataset
train_dataset = train_dataset.map(tokenize, batched=True)
eval_dataset = eval_dataset.map(tokenize, batched=True)
xm.rendezvous("wait_for_everyone_to_reach")
# Log a few random samples from the training set:
# set format for pytorch
train_dataset = train_dataset.rename_column("label", "labels")
train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
if args.world_rank == 0:
for index in random.sample(range(len(train_dataset)), 3):
print(f"Sample {index} of the training set: {train_dataset[index]}.")
eval_dataset = eval_dataset.rename_column("label", "labels")
eval_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
if args.world_rank == 0:
for index in random.sample(range(len(eval_dataset)), 3):
print(f"Sample {index} of the training set: {eval_dataset[index]}.")
# Set up distributed data loader
train_sampler = None
if world_size > 1: # if more than one core
train_sampler = DistributedSampler(
train_dataset,
num_replicas = args.world_size,
rank = args.world_rank,
shuffle = True,
)
train_loader = DataLoader(
train_dataset,
batch_size = args.per_device_train_batch_size,
sampler=train_sampler,
shuffle=False if train_sampler else True,
)
if world_size > 1: # if more than one core
eval_sampler = DistributedSampler(
eval_dataset,
num_replicas = args.world_size,
rank = args.world_rank,
shuffle = True,
)
eval_loader = DataLoader(
eval_dataset,
batch_size = args.per_device_eval_batch_size,
sampler=eval_sampler,
shuffle=False if eval_sampler else True,
)
train_device_loader = pl.MpDeviceLoader(train_loader, device)
eval_device_loader = pl.MpDeviceLoader(eval_loader, device)
num_training_steps = args.num_train_epochs * len(train_device_loader)
progress_bar = tqdm(range(num_training_steps))
model = AutoModelForSequenceClassification.from_pretrained(args.model_name_or_path)
model.to(device)
optimizer = AdamW(model.parameters(), lr=args.learning_rate)
# Get the metric function
metric = evaluate.load("accuracy")
for epoch in range(args.num_train_epochs):
model.train()
for batch in train_device_loader:
batch = {k: v.to(device) for k, v, in batch.items()}
outputs = model(**batch)
optimizer.zero_grad()
loss = outputs.loss
loss.backward()
xm.optimizer_step(optimizer) #gather gradient updates from all cores and apply them
if args.world_rank == 0:
progress_bar.update(1)
if args.world_rank == 0:
print(
"Epoch {}, rank {}, Loss {:0.4f}".format(epoch, args.world_rank, loss.detach().to("cpu"))
)
# Run evaluation after each epochs
model.eval()
if args.world_rank == 0:
print("Running evaluation for the model")
for eval_batch in eval_device_loader:
with torch.no_grad():
batch = {k: v.to(device) for k, v, in eval_batch.items()}
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
xm.rendezvous("wait_for_everyone_to_reach")
# Gather predictions and labels from all workers to compute accuracy.
predictions = gather(predictions)
references = gather(batch["labels"])
metric.add_batch(
predictions=predictions,
references=references
)
eval_metric = metric.compute()
if args.world_rank == 0:
print(f"epoch {epoch} : Validation Accuracy -: {eval_metric}")
# Save checkpoint for evaluation (xm.save ensures only one process save)
if args.output_dir is not None:
xm.save(model.state_dict(), f"{args.output_dir}/checkpoint.pt")
if args.world_rank == 0:
tokenizer.save_pretrained(args.output_dir)
if args.world_rank == 0:
print('----------End Training ---------------')
if __name__ == '__main__':
main()
1. 在训练脚本中,有几个重要细节值得一提:
较小的 Trainium 实例 (trn1.2xlarge) 包含 2 个神经元内核,而 trn1.32xlarge 包含 32 个神经元内核。为了高效地进行训练,我们需要一种将训练分配到可用神经元内核的机制。我们使用 Pytorch XLA 来达成该目标。PyTorch/XLA 是一个 Python 软件包,使用 XLA 深度学习编译器连接 PyTorch 深度学习框架和 AWS Trainium 等云加速器。您只需几行 XLA 专用代码就可以构建一个新的 PyTorch 神经网络或将现有网络切换到 XLA 设备上运行。
- 这里,Pytorch/XLA 设备取代了 GPU 设备。由于我们使用的是概率分布,因此需要将 XLA 用作设备来初始化训练,如下图所示。
device = "xla"
torch.distributed.init_process_group(device)
- PyTorch/XLA MpDeviceLoader 用于数据摄取管道。Pytorch/XLA MpDeviceLoader 可以同时执行以下三个步骤:跟踪、编译和数据批量加载到设备。这有助于提高性能。我们需要用 MpDeviceDataLoader 包装 PyTorch 数据加载器,如下所示。
train_device_loader = pl.MpDeviceLoader(train_loader, "xla")
- 使用 XLA 提供的 API 运行优化步骤,如下所示。这样就可以巩固核心之间的梯度并发出 XLA 设备步长计算。
torch_xla.core.xla_model.optimizer_step(optimizer)
2. S3 中的数据将被复制到训练实例中,其路径将以 SM_CHANNEL_TRAIN 通道下的环境变量形式提供。
3. 传递给训练任务的超参数以参数形式提供。我们将使用 Argparser 在代码中读取参数,如下所示。
parser = argparse.ArgumentParser(description="Finetune a transformers model on a text classification task")
parser.add_argument(
"--train_data", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
4. 使用数据集 API 从通道路径 SM_CHANNEL_TRAIN 加载数据集。
dataset = load_from_disk(args.train_data)
5. 训练好的模型配置和权重存储在环境变量 SM_MODEL_DIR 提供的路径下。训练完成后,Amazon SageMaker 会将 SM_MODEL_DIR 路径下的文件复制到 S3 存储桶。然后,我们可以使用该模型,将其部署到我们选择的任何硬件上。我们需要确保将模型文件存储在 SM_MODEL_DIR 环境变量提供的路径下。
SageMaker 还提供了一种机制:在提供训练脚本的同时也提供 requirements.txt 文件,从而方便地安装训练所需的其他库。在本例中,我们还需要安装一些库才能使用 transformers 库。复制以下代码段并将其粘贴到笔记本的单元格中,然后执行代码以创建 requirements.txt 文件。
%%writefile requirements.txt
datasets==2.5.2
evaluate==0.3.0
transformers==4.21.0
我们成功创建了训练脚本和 requirements.txt 文件。
步骤 5:训练机器学习模型
接下来,您将实例化一个 SageMaker 估算器。您将使用 AWS 托管的 PyTorch 估算器来运行自定义脚本。若要实例化 PyTorch 估算器,请复制并粘贴以下代码。
base_job_name = "imdb-sentiment-classification"
hyperparameters = {}
hyperparameters["model_name_or_path"] = "bert-base-uncased"
hyperparameters["seed"] = 100
hyperparameters["max_length"] = 128
hyperparameters["per_device_train_batch_size"] = 8
hyperparameters["per_device_eval_batch_size"] = 8
hyperparameters["learning_rate"] = 5e-5
hyperparameters["max_train_steps"] = 2000
hyperparameters["num_train_epochs"] = 1
接下来,您将实例化一个 SageMaker 估算器。您将使用 AWS 托管的 PyTorch 估算器来运行自定义脚本。若要实例化 PyTorch 估算器,请复制并粘贴以下代码。
pt_estimator = PyTorch(
entry_point="train.py",
source_dir="./",
role=sagemaker.get_execution_role(),
instance_count=1,
instance_type="ml.trn1.2xlarge",
framework_version="1.11.0",
py_version="py38",
disable_profiler=True,
base_job_name=base_job_name,
hyperparameters=hyperparameters,
distribution={"torch_distributed": {"enabled": True}},
)
这里有一些细节需要注意。entry_point 将指向我们在步骤 4 中创建的脚本。 还要注意有一个 source_dir 属性指向当前目录。这是必需的,因为我们需要复制 requirements.txt 文件并将其安装到训练实例中。对于实例类型,我们将其指定为 AWS Trainium,它有两种类型:trn1.2xlarge 和 trn1.32xlarge。在本练习中,我们将使用 trn1.2xlarge 实例。
我们现已创建估算器,需要使用 S3 数据路径调用 fit 方法来启动训练作业。
复制、粘贴并执行下面的单元格,启动训练作业。
pt_estimator.fit({"train": training_input_path})
这将使用 Trn1.2xlarge 实例运行训练。您可以在 Studio 笔记本中查看培训日志。
结论
恭喜您!您已完成使用 AWS Trainium 和 Amazon SageMaker 训练机器学习模型教程。
在本教程中,您使用了 AWS Trainium 实例,利用 Amazon SageMaker 训练基于 BERT 的分类模型。AWS Trainium 为大规模机器学习模型训练提供了经济高效的机制,结合 Amazon SageMaker 易于构建机器学习模型的特点,您应当能够快速进行实验和扩展。
后续步骤
若要了解如何在 Service Quotas 控制台上设置 CloudWatch 报警以及如何创建配额请求模板,请查看以下资源。