亚马逊AWS官方博客

利用 Apache Celeborn 减少 AWS EMR 集群中 Spot 实例回收导致的重算成本

Celeborn 基本介绍

AWS EMR 支持使用 Spot 实例来运行大数据工作负载,可以帮助降低成本。在创建 AWS EMR 集群时,您可以为核心节点和任务节点指定 Spot 实例,但 Spot 实例的中断特性给 AWS EMR 的运维带来了挑战。因此,合理利用 Spot 实例, 减轻实例中断的风险并设计相应的缓解措施是 AWS EMR 运维的一个重要的话题。

Spark 任务是 AWS EMR 中常见的 MapReduce 任务。在执行任务的时候,Spark 需要在不同的分区之间重新分配数据,这个过程通常被称作 shuffle。

当 AWS EMR 中的 Spot 实例中断时,可能带来的一个影响是计算中间的 shuffle 数据随着实例终止而一起丢失,这会导致 Spark 任务需要回到上一个 Stage 进行重算,从而导致任务时间变长等一系列问题,这里带来的隐形成本往往抵消了 Spot 带来的成本优势。为了解决这个问题,我们可以引入远程 Shuffle 服务 RSS(Remote Shuffle Service)。通过 RSS,shuffle 数据都在远程的 RSS 服务上写入和读取,提高了容错能力。

Apache Celeborn 是 2024 年 4 月毕业的一个 Apache 顶级项目,可以为 Spark、Flink 等计算引擎提供 RSS 服务,减少计算实例意外终止导致的重算开销并提升性能。因此,在 AWS EMR 中通过使用 Celebron 可以有效地应对 Spot 实例的回收,减少甚至避免重算,将 Spot 的成本优势最大化。

Apache Celeborn 最核心的设计是 Push Shuffle 和 Partition 聚合,它会把同一个 Partition 数据推送给同一个 Celeborn Worker。每一个 Partition 的数据都会最终形成一个文件,Reducer 在读取的时候只需要从 Worker 上读取这一个文件,因为 Shuffle 数据存在 Celeborn Cluster 里,不需要存放在 AWS EMR 集群节点的本地磁盘,所以可以更好地做存算分离。

Celebron 的架构分为 Master、Worker 和 Client 三个组成部分。Master 用来管理整个 Celeborn 集群的状态,并对资源进行分配,由 Raft 实现高可用。Worker 是集群的工作节点,主要提供对于 Shuffle 数据的存储、读写以及管理能力。Client由两种进程组成,LifecycleManager 和 Shuffle Client。在 Spark 任务中,LifecycleManger 运行在 Driver 上,而 Shuffle Client 运行在 executor 上。也就是说,Spark 引擎通过 Client 实现与 Celeborn 集群的交互,实现 Shuffle 数据的读写。同时,Client 也提供了对于应用 Shuffle 数据的生命周期管理能力。

Celeborn 的基本工作流程为:

① 通过 RegisterWorker 和 Heartbeat,集群的 Master 确认集群里资源的状态,并允许新的 worker 加入集群。Worker 与 Master 保持心跳,维护更新集群的状态。

② 在计算时,Mapper 通过 LifecycleManager 向 executor 进行 registerShuffle。注册成功之后,LifecycleManager 向 Master 去请求 slot 用来存储 shuffle 数据。

③ Master 根据资源情况指定提供 Slot 的 Worker。

④ Workers 根据资源情况把 Slot 信息提供给 LifecycleManager。

⑤ LifecycleManager 在 worker 上预留这些 Slot。

⑥ Mapper 将 shuffle 数据推送给指定的 Worker 中的 Slot。

⑦ Worker 会对 shuffle 的数据进行 Merge 并在其他的 peer worker 上进行 Replicate。Worker 周期性地把 shuffle 数据 Flush 到本地磁盘中。

⑧ 当 Mapper 任务完成后,触发 MapperEnd。

⑨ Worker 进行 commit files,将 shuffle 数据都存入本地磁盘,此时进入下一个 Reduce 阶段。Reducer 这时向 LifecycleManger 获取文件的位置,Reducer 开始读取文件。

接下来我们将在一个 AWS EMR 集群中,部署一套 Celebron 服务。通过模拟 Spark 任务运行过程中,Spot 实例回收的场景,来验证作为 RSS 服务,Celebron 避免了重算。

为了便于测试,系统的架构很简单。在同一个 VPC 的子网内,用三台 AWS EC2 部署一个 Celeborn 集群和一个 AWS EMR 集群。在 AWS EMR 上提交 Spark 任务时,指定该 Celeborn 外部集群用作 remote shuffle service。

测试方法

  • 创建一个指定 spot 任务组的 AWS EMR 集群
  • 通过 AWS Resilience Hub,创建一个模拟 Spot 中断的环境
  • 创建 Celeborn 集群
  • 不使用 Celeborn,在 AWS EMR 内提交一个 Spark 任务,并评估 Spot 中断带来的影响
  • 使用 Celeborn,提交一个 Spark 任务,并评估 Spot 中断带来的影响

具体步骤

我们使用脚本 https://github.com/hortonworks/hive-testbench 生成 3TB 测试数据,并通过 Hive 生成元数据,这里我们选取 TPC-DS 脚本 https://github.com/hortonworks/hive-testbench/blob/hdp3/spark-queries-tpcds/q77.sql 以便模拟在 Spark 任务中有大量的 shuffle 数据出现。

创建一个指定 Spot 任务组的 AWS EMR 集群

1. 集群版本推荐选择 7.1

2. 推荐选择 Glue Data Catalog 作为元数据管理

3. 创建节点组。这里我们对 Master,Core 和 Task 节点组均选择 xlarge 实例类型。EBS volume 选择 100GB 的 GP3 类型

4. 使用 EMR-managed scaling 作为集群扩缩容的方式,仅扩展全部为 Spot 实例的 Task 集群

5. 集群填加标签 Key:Value = Name:celeborn_demo 以便接下来的测试

6. 创建集群

通过 FSI,创建一个模拟 Spot 中断的环境

1. 进入到 AWS Resilience Hub

2. 创建一个实验模版

3. 选择应用于测试的账号,添加如下 action

4. 对 Targets 进行编辑,选择通过 AWS EMR 创建的标签来定位参与试验的资源

5. 对其他配置如下,保存试验模版

创建 Celeborn 集群

我们用三台 Amazon EC2 组建一个 Master 两个 Worker 节点的 Celeborn 集群。

使用 AWS CloudFormation 模板及 Shell 脚本创建 Celeborn 集群

CloudFormation 模板代码:

AWSTemplateFormatVersion: '2010-09-09'
Description: Launch 3 EC2 instances with an existing key pair, security group, and an IAM role with permissions for S3 and DynamoDB operations. Execute a script from an S3 path during instance launch.

Parameters:
  KeyName:
    Description: Name of an existing EC2 key pair for SSH access
    Type: AWS::EC2::KeyPair::KeyName
  SecurityGroupId:
    Description: ID of an existing security group to associate with the instances
    Type: AWS::EC2::SecurityGroup::Id
  InstanceType:
    Description: EC2 instance type (M series)
    Type: String
    Default: m6g.large
    AllowedValues:
      - m6g.large
      - m6g.xlarge
      - m6g.2xlarge
  ScriptS3Path:
    Description: S3 path to the script to execute during instance launch
    Type: String

Resources:
  IAMRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - ec2.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
        - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess

  InstanceProfile:
    Type: AWS::IAM::InstanceProfile
    Properties:
      Roles:
        - !Ref IAMRole

  EC2Instance1:
    Type: AWS::EC2::Instance
    Properties:
      ImageId: ami-063f1c6928a072c56  # Replace with your desired AMI
      InstanceType: !Ref InstanceType
      KeyName: !Ref KeyName
      SecurityGroupIds:
        - !Ref SecurityGroupId
      IamInstanceProfile: !Ref InstanceProfile
      UserData:
        Fn::Base64:
          !Sub |
            #!/bin/bash
            aws s3 cp ${ScriptS3Path} /tmp/script.sh
            chmod +x /tmp/script.sh
            /tmp/script.sh /tmp/output.log 2>&1

  EC2Instance2:
    Type: AWS::EC2::Instance
    Properties:
      ImageId: ami-063f1c6928a072c56 # Replace with your desired AMI
      InstanceType: !Ref InstanceType
      KeyName: !Ref KeyName
      SecurityGroupIds:
        - !Ref SecurityGroupId
      IamInstanceProfile: !Ref InstanceProfile
      UserData:
        Fn::Base64:
          !Sub |
            #!/bin/bash
            aws s3 cp ${ScriptS3Path} /tmp/script.sh
            chmod +x /tmp/script.sh
            /tmp/script.sh  /tmp/output.log 2>&1

  EC2Instance3:
    Type: AWS::EC2::Instance
    Properties:
      ImageId: ami-063f1c6928a072c56  # Replace with your desired AMI
      InstanceType: !Ref InstanceType
      KeyName: !Ref KeyName
      SecurityGroupIds:
        - !Ref SecurityGroupId
      IamInstanceProfile: !Ref InstanceProfile
      UserData:
        Fn::Base64:
          !Sub |
            #!/bin/bash
            aws s3 cp ${ScriptS3Path} /tmp/script.sh
            chmod +x /tmp/script.sh
            /tmp/script.sh >  /tmp/output.log 2>&1

Outputs:
  InstanceIds:
    Description: The IDs of the launched EC2 instances
    Value: !Join [',', [!Ref EC2Instance1, !Ref EC2Instance2, !Ref EC2Instance3]]

AWS CloudFormation 中引用的 script 脚本代码,将此脚本放到 AWS S3 桶中,在启动 Cloudformation 模版时将要求提供脚本的 S3 路径:

#!/bin/bash -xe
cd /home/ec2-user
sudo yum install java-1.8.0 -y

wget https://downloads.apache.org/celeborn/celeborn-0.4.1/apache-celeborn-0.4.1-bin.tgz

tar -xvf apache-celeborn-0.5.0-bin.tgz

cd /home/ec2-user/apache-celeborn-0.5.0-bin/conf

cp log4j2.xml.template log4j2.xml

cp celeborn-env.sh.template celeborn-env.sh

echo "celeborn.worker.storage.dirs=/home/ec2-user/apache-celeborn-0.5.0-bin/shuffle" > celeborn-defaults.conf


TABLE_EXISTS=$(aws dynamodb list-tables --query 'TableNames[?contains(@, `celeborn`)]' --region us-east-1 --output text)

if [ -n "$TABLE_EXISTS" ]; then
    echo "Table 'celeborn' exists."
    sleep 20
     IP_ADDRESSES=$(aws dynamodb scan \
    --table-name celeborn \
    --projection-expression "IPAddress" \
    --query "Items[*].IPAddress.S" \
    --region us-east-1 \
    --output text)
   sudo  /home/ec2-user/apache-celeborn-0.5.0-bin/sbin/start-worker.sh celeborn://$IP_ADDRESSES:9097
else
    echo "Table 'celeborn' does not exist. Creating table..."

    # 创建表
    aws dynamodb create-table \
        --table-name celeborn \
        --attribute-definitions \
            AttributeName=IPAddress,AttributeType=S \
        --key-schema \
            AttributeName=IPAddress,KeyType=HASH \
        --provisioned-throughput \
            ReadCapacityUnits=5,WriteCapacityUnits=5 \
        --region us-east-1

    if [ $? -eq 0 ]; then
        echo "Table 'celeborn' created successfully."
        LOCAL_PRIVATE_IP=$(sudo hostname -I | awk '{print $1}')
        sleep 20
        aws dynamodb put-item \
            --table-name celeborn \
            --item '{"ID": {"S": "LocalPrivateIP"}, "IPAddress": {"S": "'"$LOCAL_PRIVATE_IP"'"}}' \
            --region us-east-1
    sudo /home/ec2-user/apache-celeborn-0.5.0-bin/sbin/start-master.sh
    else
        echo "Failed to create table 'celeborn'."
        exit 1
    fi
fi

不使用 Celeborn,向 AWS EMR 集群提交 Spark 任务

1. 运行下列命令,用于生成数据集, 请注意替换 S3 数据集的位置为您自己的桶位置

sudo yum install git gcc
git clone https://github.com/hortonworks/hive-testbench.git
wget https://dlcdn.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
export MAVEN_HOME=/home/hadoop/apache-maven-3.8.8
export PATH=$PATH:$MAVEN_HOME/bin
cd hive-testbench/
sudo ./tpcds-build.sh
cd /usr/bin/hive-testbench/ddl-tpcds/text
hive -d DB=‘tpcds_20000' -d LOCATION=‘s3://<your S3 bucket store the tcpds data>’ -f alltables.sql

2. 提交 Spark 任务

为 q77.sql 文件添加一行

use tpcds_20000;

提交 spark sql 任务,运行这个 sql 文件

spark-sql -f q77.sql —num-executors=12 —conf spark.default.parallelism=50

通过观察 Spark History Server,我们可以看到有大量的 shuffle 过程和大量的 shuffle 数据产生。

3. 模拟中断

重新提交 spark 任务,通过 Spark History 服务持续观察 Spark Stage 中运行状态,当大数据量大 shuffle 数据运行到一半的时候,我们通过 FSI 模拟 Spot 实例中断。

中断后,我们会发现部分 Stage 执行失败

在 Spark UI 中寻找 Failed Stages

从中可以看到一些 Failed Stages 有这样的报错

通过详细日志 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 partition 21,我们可以得知失败的原因为 shuffle 数据丢失。Spot 实例上 Shuffle 数据丢失会导致 Spark 运行过程中 Stage 的部分步骤执行失败。如果多次重试都未成功,还可能导致整个任务运行失败。如果任务失败,不但影响了预期的任务进度,还会导致长达几个小时的计算资源浪费。

使用 Celeborn,向 EMR 集群提交 Spark 任务

从第四步的 AWS S3 桶中,在 EMR 的 Master 节点安装 Spark 的 Celeborn 依赖

sudo aws s3 cp s3:/<your S3 bucket>/celeborn-client-spark-3-shaded_2.12-0.5.0.jar /usr/lib/spark/jars/

提交新的 spark 任务,并由 celeborn 处理 shuffle 数据

spark-sql -f q77.sql --num-executors=12 --conf spark.default.parallelism=50 --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager --conf spark.shuffle.service.enabled=false --conf spark.sql.adaptive.localShuffleReader.enabled=false —conf spark.celeborn.master.endpoints=<celeborn master ip>:9097

通过 Spark History 服务持续观察 Spark Stage 中运行状态,当大数据量大 shuffle 数据运行到一半的时候,我们再次通过 FSI 模拟 Spot 实例中断。 持续观察 Stage 情况,我们可以发现,即使发生了 Spot 实例中断,任务也正常执行完成,没有发生重算。

在 Job 详情中,我们可以看到 Spot 实例终止时 Executor 发生的变化

通过以上测试我们可知,通过 Celeborn 将 Spark shuffle 数据与执行计算的 EMR 集群解耦,在 Spot 实例被回收时,Spot 实例上 Shuffle 数据因为存放在 Celeborn 集群中没有丢失,进而避免了任务重算。

总结

Celeborn 作为 Apache 的顶级项目,通过对 shuffle 阶段的存算分离,可以解决计算过程中 shuffle 数据中丢失的问题,减少重算的成本。与 Spots 实例结合,可以更有效的利用 AWS EC2 Spot 实例的价格优势,减少 AWS EMR 集群的运行成本。

参考链接

https://github.com/aws-samples/emr-remote-shuffle-service

https://celeborn.apache.org/

本篇作者

陈泽军

亚马逊云科技技术客户经理,近十年专注云上大数据服务。

唐冠群

亚马逊云科技客户技术经理。主要负责 Auto 行业客户的架构优化、成本管理、技术咨询等工作。拥有超过 10 年项目实施和客户支持经验。有丰富的大数据项目优化经验。在加入 AWS 前曾就职于 IBM,主要服务于大型金融类客户。