Containers

Fault tolerant distributed machine learning training with the TorchElastic Controller for Kubernetes

Introduction

Kubernetes enables machine learning teams to run training jobs distributed across fleets of powerful GPU instances like Amazon EC2 P3, reducing training time from days to hours. However, distributed training comes with limitations compared to the more traditional microservice based applications typically associated with Kubernetes. Distributed training jobs are not fault tolerant, and a job cannot continue if a node failure or reclamation interrupts training. Further, jobs cannot start without acquiring all requested resources, or scale up and down without being restarted. This lack of resiliency and flexibility results in increased training time and costs.

In this post, we cover a new open source collaboration between the Kubernetes team at AWS and the PyTorch team at Facebook, the TorchElastic Controller for Kubernetes, which addresses these limitations and unlocks new capabilities with PyTorch built models and Kubernetes distributed training, including the ability to train on EC2 Spot instances, run jobs that are resilient to hardware failures, and dynamically scale jobs based on cluster resource availability.

PyTorch Elastic integration with Kubernetes

PyTorch Elastic is a library for training large-scale deep learning models where it’s critical to scale compute resources dynamically based on availability. It provides the primitives and interfaces for you to write your PyTorch job in such a way that it can be run on multiple machines with elasticity. That is, your distributed job is able to start as soon as min number of workers are present and allowed to grow up to max number of workers without being stopped or restarted. Elasticity is the ability for the framework to scale nodes up or down during a training job. Fault tolerance is the ability for the framework to detect and replace failed nodes during a training job without requiring job restart.

PyTorch Elastic includes a component called Rendezvous that is used to gather participants of a training job (i.e. workers) such that they all agree on the same list of participants and everyone’s roles, as well as make a consistent collective decision on when training can begin/resume. To learn more visit the open source project on GitHub.

The TorchElastic Controller for Kubernetes (TECK) is a native Kubernetes implementation of the PyTorch Elastic interface that automatically manages the lifecycle of the Kubernetes pods and services required for TorchElastic training. Using a simple custom resource definition, you specify your Elastic compatible training image along with the desired, min, and max number of workers. It allows you to start mission critical distributed training jobs on Kubernetes clusters with a portion of the requested compute resources, and dynamically scale later as more resources become available, without having to stop and restart the jobs. In addition, jobs can recover from nodes that are replaced due to node failures or reclamation. With the TorchElastic Controller for Kubernetes, you can reduce distributed training time and cost by limiting idle cluster resources and training on Amazon EC2 Spot Instances.

How does it work?

The TorchElastic Controller for Kubernetes watches for a specific custom resource type ElasticJob, which is created as part of submitting a TorchElastic job. When a relevant event occurs on this resource, a reconcile loop is started to ensure that current state matches the desired state.

All ElasticJob workers are equivalent and the controller creates a pod and a headless service for pod to pod communication, as well as pass the required arguments to the PyTorch Elastic training launcher. You can dynamically change the desired worker size between the [min, max] range, and the controller scales up and down workers correspondingly without any job interruption. Each time there is a change in membership in the set of pods, PyTorch Elastic runs a rendezvous and continues training.

Walkthrough

TECK works with any Kubernetes cluster version 1.12 or newer. In this example, we use Amazon EKS, which has become a leading choice for machine learning workloads, as it combines the scalability of Kubernetes with powerful accelerated instances types available on AWS. Further, the EKS-Optimized GPU AMI makes it easy to run machine learning based applications without managing complicated dependencies.

To get started, create a cluster of GPU instances using eksctl. If you don’t already have eksctl installed, follow the instructions here.

eksctl create cluster \
    --name=torchelastic \
    --region=us-west-2 \
    --ssh-access \
    --ssh-public-key=~/.ssh/id_rsa.pub \
    --node-type=p3.2xlarge \
    --nodes=2 \
    --nodes-min=1 \
    --nodes-max=3

Next, install the NVIDIA device plugin on your cluster so the Kubernetes scheduler understands the GPU devices on your worker nodes.

kubectl apply -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/1.0.0-beta4/nvidia-device-plugin.yml

Now, clone the PyTorch Elastic repository, and install the TorchElastic Controller for Kubernetes onto your cluster.

git clone https://github.com/pytorch/elastic.git
cd elastic/kubernetes

kubectl apply -k config/default 

You should see output similar to below.

namespace/elastic-job created
customresourcedefinition.apiextensions.k8s.io/elasticjobs.elastic.pytorch.org created
role.rbac.authorization.k8s.io/leader-election-role created
clusterrole.rbac.authorization.k8s.io/elastic-job-k8s-controller-role created
rolebinding.rbac.authorization.k8s.io/leader-election-rolebinding created
clusterrolebinding.rbac.authorization.k8s.io/elastic-job-k8s-controller-rolebinding created
deployment.apps/elastic-job-k8s-controller created

Run the following command to verify that the controller is ready.

kubectl get pods -n elastic-job

NAME                                          READY   STATUS    RESTARTS   AGE
elastic-job-k8s-controller-845ff64bc7-6lcqj   1/1     Running   0          3m45s

An implementation of the Rendezvous component based on etcd is already provided in PyTorch Elastic, and is recommended for most users. Here, we install a single-member etcd instance in the cluster. For production usage, it’s recommended to use a highly available etcd cluster.

kubectl apply -f config/samples/etcd.yaml

For this example, we are training an ImageNet dataset. Update the rdzvEndpoint value in the ElasticJob spec config/samples/imagenet.yaml to the etcd service DNS name “etcd-service:2379.” Notice some of the other parameters in this job definition.

apiVersion: elastic.pytorch.org/v1alpha1
kind: ElasticJob
metadata:
  name: imagenet
  namespace: elastic-job
spec:
  rdzvEndpoint: "etcd-service:2379"
  minReplicas: 1
  maxReplicas: 3
  replicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: ExitCode
      template:
        apiVersion: v1
        kind: Pod
        spec:
          containers:
            - name: elasticjob-worker
              image: torchelastic/examples:0.2.0rc1
              imagePullPolicy: Always
              args:
                - "--nproc_per_node=1" # set nproc_per_node = # Num GPUs on instance type
                - "/workspace/examples/imagenet/main.py"
                - "--arch=resnet18"
                - "--epochs=20"
                - "--batch-size=32"
                # data loader workers, not trainers.
                # zero means load the data on the same process as the trainer
                - "--workers=0"
                - "/workspace/data/tiny-imagenet-200"
              resources:
                limits:
                  nvidia.com/gpu: 1

Right now, our cluster has 2 Amazon P3 Instances, each with one GPU. So to start, we set the desired number of replicas in our training job to use the maximum available resources. The minimum amount of workers is set to 1, and the maximum is set to 3. Now let’s submit the training job the cluster.

kubectl apply -f config/samples/imagenet.yaml

Check to ensure the workers are running. At this point, our PyTorch training job is happily chugging along.

kubectl get pods -n elastic-job

NAME                                          READY   STATUS    RESTARTS   AGE
elastic-job-k8s-controller-845ff64bc7-6lcqj   1/1     Running   0          3h32m
imagenet-worker-0                             1/1     Running   0          2m43s
imagenet-worker-1                             1/1     Running   0          2m43s

We are running our worker nodes on On-Demand Instances, but TECK also supports training on EC2 Spot Instances, and we can simulate a Spot Instance interruption by scaling down our node group, using the following commands.

eksctl get nodegroup --cluster torchelastic

# record the name of your node group and use in the command below
eksctl scale nodegroup --cluster=torchelastic --nodes=1 --name=ng-a345f4e1

Once the worker node is removed from the cluster, we check our pods again and see only one remaining.

kubectl get pods -n elastic-job

NAME                                          READY   STATUS        RESTARTS   AGE
elastic-job-k8s-controller-845ff64bc7-vldbf   1/1     Running       0          13s
imagenet-worker-0                             1/1     Running       0          2m16s

In a typical distributed training scenario, this occurrence would mean the job needs to be restarted, but we describe our training job, and see that the job has survived the node removal and is still running!

kubectl describe elasticjob imagenet -n elastic-job

Message:               ElasticJob imagenet is running.
    Reason:                ElasticJobRunning
    Status:                True
    Type:                  Running
  Replica Statuses:
    Worker:
      Active:  1

Now let’s take it a step further, and increase the capacity in our cluster. In a real world scenario, this may occur when another training job completes, and GPU instances are freed up.

eksctl scale nodegroup --cluster=torchelastic --nodes=3 --name=ng-a345f4e1

Our cluster now has 3 worker nodes, but our training job is only using one of them. With a non-elastic distributed training job, those extra GPUs would be left sitting idle. But with PyTorch Elastic and TECK, we can easily configure the job to scale dynamically and take advantage of the newly available capacity.

kubectl edit elasticjob imagenet -n elastic-job

# set .spec.replicaSpecs[Worker].replicas to 3, and save.

Let’s describe our job one last time.

kubectl describe elasticjob imagenet -n elastic-job

Message:               ElasticJob imagenet is running.
    Reason:                ElasticJobRunning
    Status:                True
    Type:                  Running
  Replica Statuses:
    Worker:
      Active:  3

And sure enough, the training job has dynamically scaled to 3 workers. Your data scientists are happy because training jobs will run to completion faster, and your finance department is pleased that the GPUs in your cluster are always being put to good use.

Conclusion

In this blog post, we introduced a new AWS and Facebook developed open source project, TorchElastic Controller for Kubernetes, which allows you to reduce distributed training time and costs by limiting idle cluster resources and enabling job recovery from failures. We covered some under the hood details of the controller, as well as walked through an example of how to run a simple fault tolerant distributed training job on an Amazon EKS cluster. Elastic and fault-tolerant training with the TorchElastic Controller for Kubernetes can help you take ML models to production faster and adopt state-of-the-art approaches to model exploration as architectures continue to increase in size and complexity.

Some ideas we have for future improvements include automatic worker resizing, as well as adding a concept of job priority and preemption. Let us know what you think about this new, exciting machine learning addition to the Kubernetes community and how you plan to use it, as well as consider contributing to the open source project at pytorch/elastic/kubernetes.