Containers

Distributed machine learning with Amazon ECS

Running distributed machine learning (ML) workloads on Amazon Elastic Container Service (Amazon ECS) allows ML teams to focus on creating, training and deploying models, rather than spending time managing the container orchestration engine. With a simple architecture, control plane transparent upgrades, and native AWS Identity and Access Management (IAM) authentication, Amazon ECS provides a great environment to run ML projects. Additionally, Amazon ECS supports workloads that use NVIDIA GPUs and provides optimized images with pre-installed NVIDIA Kernel drivers and Docker runtime.

When using a distributed training approach, multiple GPUs in a single instance (multi-gpu) or multiple instances with one or more GPUs (multi-node) are used for training. There are several techniques to accomplish distributed training: pipeline parallelism (different layers of the model are loaded in different GPUs), tensor parallelism (splits a single layer into multiple GPUs), and distributed data parallel (a full copy of the model is loaded in each GPU, with the training data split and processed in parallel across the GPUs).

This post aims to present an implementation of ML training with distributed data parallel in containers running on Amazon ECS using PyTorch and Ray Train libraries. By diving into this implementation, you can have a working example that gets you started in your distributed ML journey, and adapt it to your specific models, data, and use cases.

Solution overview

The solution consists of a single ECS cluster where the Ray cluster is going to be run. A Ray cluster is a set of worker processes connected to a common Ray head process. To accomplish this, two services are deployed in the ECS cluster: Ray head and Ray worker. The Ray head service contains a single task, while the worker service runs two tasks to showcase a multi-node scenario. All of the tasks run a single container. Amazon Simple Storage Service (Amazon S3) is used for shared storage between all tasks

Deployment architecture with Amazon ECS, a head service, a worker service and Amazon S3

Prerequisites

You need the following in order to proceed with this post:

  • Amazon ECS: A fully managed container orchestration service that helps you more efficiently deploy, manage, and scale containerized applications. In Amazon ECS, applications are deployed in clusters. Each cluster can contain multiple services, which in turn manage tasks. Each task can run one or more containers.
  • Amazon S3: Amazon S3 is an object storage service offering industry-leading scalability, data availability, security, and performance
  • PyTorch: An optimized library for deep learning using GPUs and CPUs. Includes datasets, model builders, and optimizers.
  • Ray Train: An ML library for distributed training and fine-tuning. It supports training code developed with PyTorch, among other frameworks. Train jobs run in Ray clusters, which consist of a head node (manages cluster state) and worker nodes (run the training code)

Walkthrough

During this walkthrough, we create an S3 bucket for shared storage. Then, we setup the infrastructure for the ray head and worker services in an ECS cluster with two autoscaling groups (ASGs), a capacity provider using an m5.xlarge instance for the head service and another one with g5.xlarge instances (with one A10 GPU each) for the worker service. Once the infrastructure is setup, we run a distributed training job with the resnet18 model using the FashionMNIST dataset. Since the model can fit entirely in a single GPU, we use distributed data parallel.

Deploy training infrastructure

Consider the following before continuing with this post:

  • We follow the best practices of running this workload in a private subnet. Since internet connectivity is needed to download the data sets, we create a NAT Gateway and an Internet Gateway (IGW).
  • We are only using a single subnet to improve latency, resulting in all the instances launching in the same Availability Zone (AZ). Additionally, this avoids inter-AZ data transfer charges.
  • Worker nodes are deployed with a cluster placement strategy. This strategy enables workloads to achieve a low-latency network performance.
  • Ray autoscaling is not supported in Amazon ECS. However, autoscaling of tasks can be done using service auto scaling
  • In a distributed training, setups with more GPUs per node have better performance. For example, if your training needs eight GPUs, then having eight instances with one GPU each generally has a lower performance than a single instance with eight GPUs. The same holds true for containers: having a single container accessing eight GPUs generally has a better performance than eight containers accessing one GPU each. This is due to additional overhead that is needed per each node in the training cluster. To keep costs low, this deployment uses g5.xlarge instances (one GPU per instance).

Steps

  1. Clone the ecs-blueprints repo
git clone https://github.com/aws-ia/ecs-blueprints.git
  1. Deploy the core infra
cd ./ecs-blueprints/terraform/ec2-examples/core-infra
terraform init
terraform apply -target=module.vpc \
-target=aws_service_discovery_private_dns_namespace.this
  1. Deploy the distributed ML training blueprint
cd ../distributed-ml-training
terraform init
terraform apply

Note the bucket name in the output, as we use it in the next section.

Run training job

Once the infrastructure has been provisioned, we can run our distributed training script. For simplicity, we connect to a container and run it directly in the bash shell. However, using notebooks with Amazon SageMaker or AWS Cloud9 provides a better user experience to run training jobs in Python.

  1. Connect to the Ray head node Amazon Elastic Compute Cloud (Amazon EC2) instance.
HEAD_INSTANCE_ID=$(aws ec2 describe-instances \
  --filters 'Name=tag:Name,Values=ecs-demo-distributed-ml-training-head' 'Name=instance-state-name,Values=running' \
  --query 'Reservations[*].Instances[0].InstanceId' --output text --region us-west-2
)
aws ssm start-session --target $HEAD_INSTANCE_ID --region us-west-2
  1. In the head EC2 instance, retrieve the container ID and run an interactive shell. This lets us run commands inside the container.

It can take several minutes until the docker image is downloaded and the containers reach a running state. The following command fails if the container is not running yet.

CONTAINER_ID=$(sudo docker ps -qf "name=.*-rayhead-.*")
sudo docker exec -it $CONTAINER_ID bash
  1. Check ray status
ray status

Example output:

(...)
Node status
---------------------------------------------------------------
Healthy:
 1 node_a3d74b6d5089c52f9848c1529349ba5c4966edaa633374b0566c7d69
 1 node_a5a1aa596068c73e17e029ca221bfad7a7b0085a0273da3c7ad86096
 1 node_3ae0c0cabb682158fef418bbabdf2ea63820e8b68e4ae2f4b24c8e66
Pending:
 (no pending nodes)
Recent failures:
 (no failures)
(...)
Resources
---------------------------------------------------------------
Usage:
 0.0/6.0 CPU
 0.0/2.0 GPU
 0B/38.00GiB memory
 0B/11.87GiB object_store_memory
Demands:
 (no resource demands)

If you do not see 2.0 GPUs, then the workers have not started yet. It might take several minutes until they join the cluster.

  1. Run the distributed script, passing the bucket name as a parameter. The bucket name is printed as the output of the terraform apply command executed previously.

You can look at the comments in the script to learn more about the code that is executed.

export RAY_DEDUP_LOGS=0 # Makes the logs verbose per each process in the training
cd /tmp
wget https://raw.githubusercontent.com/aws-ia/ecs-blueprints/main/terraform/ec2-examples/distributed-ml-training/training_example.py
python training_example.py REPLACE_WITH_YOUR_BUCKET_NAME

Example output:

(...)
Wrapping provided model in DistributedDataParallel.
(...)

(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 0 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.660568237304688]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 0 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.65453052520752]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 1 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.172431230545044]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 1 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.17476797103882]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 2 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.807305574417114]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 2 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.807661056518555]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 3 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.16184115409851]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 3 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.164414882659912]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 4 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.43423628807068]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 4 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.430140495300293]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 5 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.319995880126953]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 5 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.331279277801514]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 6 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.402108669281006]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 6 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.385886192321777]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 7 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.865890741348267]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 7 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.86034846305847]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 8 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.0880389213562]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 8 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.094018697738647]
(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 9 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.191094160079956]
(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 9 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.189364910125732]

(..)
╭───────────────────────────────╮
│ Training result               │
├───────────────────────────────┤
│ checkpoint_dir_name           │
│ time_this_iter_s      182.976 │
│ time_total_s          182.976 │
│ training_iteration          1 │
│ accuracy               0.8852 │
│ loss                  0.41928 │
╰───────────────────────────────╯

(...) Total running time: 3min 7s

Result(
  metrics={'loss': 0.4192830347106792, 'accuracy': 0.8852},
  (...)
)

The preceding logs show:

  • The GPU ID as seen by the local instance, in this case, it is always GPU0 because there is a single GPU in each instance.
  • The global process rank (unique in the training cluster), either 0 or 1, since we have two GPUs total.
  • Epoch numbers from 0 to 9.
  • The batch size, number of examples used per each iteration.
  • Steps: number of loops that are needed to complete a pass of the entire data set. In this case it is 235, but where does this number come from? The FashionMNIST dataset has 60,000 examples. With a batch size of 128, we need to perform 469 steps per epoch to go over the entire dataset (60,000/128). However, since we are using two GPUs, those 469 steps are divided across the GPUs. If we divide 469 by 2 GPUs, then we get the 235 steps we see in the logs.

Cleaning up

To avoid incurring additional costs, remember to terminate all the resources created previously with Terraform.

  1. Destroy the distributed ML blueprint:
terraform destroy
  1. Destroy the core infra:
cd ../core-infra
terraform destroy

Conclusion

In this post, we run a distributed data parallel training job using Amazon ECS. We started with the infrastructure setup, which included the ECS cluster, capacity provider, task definitions, and service deployments. Then, we executed a training job for the resnet18 model using the FashionMNIST dataset, distributing the work across multiple GPUs with Ray Train and PyTorch libraries. Using a distributed approach allowed us to get the model trained faster than using a single GPU.

Before you go, make sure to check the Amazon ECS GPU documentation to learn more about the different setups supported and additional capabilities of Amazon ECS.