Celeborn 基本介绍
AWS EMR 支持使用 Spot 实例来运行大数据工作负载,可以帮助降低成本。在创建 AWS EMR 集群时,您可以为核心节点和任务节点指定 Spot 实例,但 Spot 实例的中断特性给 AWS EMR 的运维带来了挑战。因此,合理利用 Spot 实例, 减轻实例中断的风险并设计相应的缓解措施是 AWS EMR 运维的一个重要的话题。
Spark 任务是 AWS EMR 中常见的 MapReduce 任务。在执行任务的时候,Spark 需要在不同的分区之间重新分配数据,这个过程通常被称作 shuffle。
当 AWS EMR 中的 Spot 实例中断时,可能带来的一个影响是计算中间的 shuffle 数据随着实例终止而一起丢失,这会导致 Spark 任务需要回到上一个 Stage 进行重算,从而导致任务时间变长等一系列问题,这里带来的隐形成本往往抵消了 Spot 带来的成本优势。为了解决这个问题,我们可以引入远程 Shuffle 服务 RSS(Remote Shuffle Service)。通过 RSS,shuffle 数据都在远程的 RSS 服务上写入和读取,提高了容错能力。
Apache Celeborn 是 2024 年 4 月毕业的一个 Apache 顶级项目,可以为 Spark、Flink 等计算引擎提供 RSS 服务,减少计算实例意外终止导致的重算开销并提升性能。因此,在 AWS EMR 中通过使用 Celebron 可以有效地应对 Spot 实例的回收,减少甚至避免重算,将 Spot 的成本优势最大化。
Apache Celeborn 最核心的设计是 Push Shuffle 和 Partition 聚合,它会把同一个 Partition 数据推送给同一个 Celeborn Worker。每一个 Partition 的数据都会最终形成一个文件,Reducer 在读取的时候只需要从 Worker 上读取这一个文件,因为 Shuffle 数据存在 Celeborn Cluster 里,不需要存放在 AWS EMR 集群节点的本地磁盘,所以可以更好地做存算分离。
Celebron 的架构分为 Master、Worker 和 Client 三个组成部分。Master 用来管理整个 Celeborn 集群的状态,并对资源进行分配,由 Raft 实现高可用。Worker 是集群的工作节点,主要提供对于 Shuffle 数据的存储、读写以及管理能力。Client由两种进程组成,LifecycleManager 和 Shuffle Client。在 Spark 任务中,LifecycleManger 运行在 Driver 上,而 Shuffle Client 运行在 executor 上。也就是说,Spark 引擎通过 Client 实现与 Celeborn 集群的交互,实现 Shuffle 数据的读写。同时,Client 也提供了对于应用 Shuffle 数据的生命周期管理能力。
Celeborn 的基本工作流程为:
① 通过 RegisterWorker 和 Heartbeat,集群的 Master 确认集群里资源的状态,并允许新的 worker 加入集群。Worker 与 Master 保持心跳,维护更新集群的状态。
② 在计算时,Mapper 通过 LifecycleManager 向 executor 进行 registerShuffle。注册成功之后,LifecycleManager 向 Master 去请求 slot 用来存储 shuffle 数据。
③ Master 根据资源情况指定提供 Slot 的 Worker。
④ Workers 根据资源情况把 Slot 信息提供给 LifecycleManager。
⑤ LifecycleManager 在 worker 上预留这些 Slot。
⑥ Mapper 将 shuffle 数据推送给指定的 Worker 中的 Slot。
⑦ Worker 会对 shuffle 的数据进行 Merge 并在其他的 peer worker 上进行 Replicate。Worker 周期性地把 shuffle 数据 Flush 到本地磁盘中。
⑧ 当 Mapper 任务完成后,触发 MapperEnd。
⑨ Worker 进行 commit files,将 shuffle 数据都存入本地磁盘,此时进入下一个 Reduce 阶段。Reducer 这时向 LifecycleManger 获取文件的位置,Reducer 开始读取文件。
接下来我们将在一个 AWS EMR 集群中,部署一套 Celebron 服务。通过模拟 Spark 任务运行过程中,Spot 实例回收的场景,来验证作为 RSS 服务,Celebron 避免了重算。
为了便于测试,系统的架构很简单。在同一个 VPC 的子网内,用三台 AWS EC2 部署一个 Celeborn 集群和一个 AWS EMR 集群。在 AWS EMR 上提交 Spark 任务时,指定该 Celeborn 外部集群用作 remote shuffle service。
测试方法
- 创建一个指定 spot 任务组的 AWS EMR 集群
- 通过 AWS Resilience Hub,创建一个模拟 Spot 中断的环境
- 创建 Celeborn 集群
- 不使用 Celeborn,在 AWS EMR 内提交一个 Spark 任务,并评估 Spot 中断带来的影响
- 使用 Celeborn,提交一个 Spark 任务,并评估 Spot 中断带来的影响
具体步骤
我们使用脚本 https://github.com/hortonworks/hive-testbench 生成 3TB 测试数据,并通过 Hive 生成元数据,这里我们选取 TPC-DS 脚本 https://github.com/hortonworks/hive-testbench/blob/hdp3/spark-queries-tpcds/q77.sql 以便模拟在 Spark 任务中有大量的 shuffle 数据出现。
创建一个指定 Spot 任务组的 AWS EMR 集群
1. 集群版本推荐选择 7.1
2. 推荐选择 Glue Data Catalog 作为元数据管理
3. 创建节点组。这里我们对 Master,Core 和 Task 节点组均选择 xlarge 实例类型。EBS volume 选择 100GB 的 GP3 类型
4. 使用 EMR-managed scaling 作为集群扩缩容的方式,仅扩展全部为 Spot 实例的 Task 集群
5. 集群填加标签 Key:Value = Name:celeborn_demo 以便接下来的测试
6. 创建集群
通过 FSI,创建一个模拟 Spot 中断的环境
1. 进入到 AWS Resilience Hub
2. 创建一个实验模版
3. 选择应用于测试的账号,添加如下 action
4. 对 Targets 进行编辑,选择通过 AWS EMR 创建的标签来定位参与试验的资源
5. 对其他配置如下,保存试验模版
创建 Celeborn 集群
我们用三台 Amazon EC2 组建一个 Master 两个 Worker 节点的 Celeborn 集群。
使用 AWS CloudFormation 模板及 Shell 脚本创建 Celeborn 集群
CloudFormation 模板代码:
AWSTemplateFormatVersion: '2010-09-09'
Description: Launch 3 EC2 instances with an existing key pair, security group, and an IAM role with permissions for S3 and DynamoDB operations. Execute a script from an S3 path during instance launch.
Parameters:
KeyName:
Description: Name of an existing EC2 key pair for SSH access
Type: AWS::EC2::KeyPair::KeyName
SecurityGroupId:
Description: ID of an existing security group to associate with the instances
Type: AWS::EC2::SecurityGroup::Id
InstanceType:
Description: EC2 instance type (M series)
Type: String
Default: m6g.large
AllowedValues:
- m6g.large
- m6g.xlarge
- m6g.2xlarge
ScriptS3Path:
Description: S3 path to the script to execute during instance launch
Type: String
Resources:
IAMRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- ec2.amazonaws.com
Action:
- 'sts:AssumeRole'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
Roles:
- !Ref IAMRole
EC2Instance1:
Type: AWS::EC2::Instance
Properties:
ImageId: ami-063f1c6928a072c56 # Replace with your desired AMI
InstanceType: !Ref InstanceType
KeyName: !Ref KeyName
SecurityGroupIds:
- !Ref SecurityGroupId
IamInstanceProfile: !Ref InstanceProfile
UserData:
Fn::Base64:
!Sub |
#!/bin/bash
aws s3 cp ${ScriptS3Path} /tmp/script.sh
chmod +x /tmp/script.sh
/tmp/script.sh /tmp/output.log 2>&1
EC2Instance2:
Type: AWS::EC2::Instance
Properties:
ImageId: ami-063f1c6928a072c56 # Replace with your desired AMI
InstanceType: !Ref InstanceType
KeyName: !Ref KeyName
SecurityGroupIds:
- !Ref SecurityGroupId
IamInstanceProfile: !Ref InstanceProfile
UserData:
Fn::Base64:
!Sub |
#!/bin/bash
aws s3 cp ${ScriptS3Path} /tmp/script.sh
chmod +x /tmp/script.sh
/tmp/script.sh /tmp/output.log 2>&1
EC2Instance3:
Type: AWS::EC2::Instance
Properties:
ImageId: ami-063f1c6928a072c56 # Replace with your desired AMI
InstanceType: !Ref InstanceType
KeyName: !Ref KeyName
SecurityGroupIds:
- !Ref SecurityGroupId
IamInstanceProfile: !Ref InstanceProfile
UserData:
Fn::Base64:
!Sub |
#!/bin/bash
aws s3 cp ${ScriptS3Path} /tmp/script.sh
chmod +x /tmp/script.sh
/tmp/script.sh > /tmp/output.log 2>&1
Outputs:
InstanceIds:
Description: The IDs of the launched EC2 instances
Value: !Join [',', [!Ref EC2Instance1, !Ref EC2Instance2, !Ref EC2Instance3]]
AWS CloudFormation 中引用的 script 脚本代码,将此脚本放到 AWS S3 桶中,在启动 Cloudformation 模版时将要求提供脚本的 S3 路径:
#!/bin/bash -xe
cd /home/ec2-user
sudo yum install java-1.8.0 -y
wget https://downloads.apache.org/celeborn/celeborn-0.4.1/apache-celeborn-0.4.1-bin.tgz
tar -xvf apache-celeborn-0.5.0-bin.tgz
cd /home/ec2-user/apache-celeborn-0.5.0-bin/conf
cp log4j2.xml.template log4j2.xml
cp celeborn-env.sh.template celeborn-env.sh
echo "celeborn.worker.storage.dirs=/home/ec2-user/apache-celeborn-0.5.0-bin/shuffle" > celeborn-defaults.conf
TABLE_EXISTS=$(aws dynamodb list-tables --query 'TableNames[?contains(@, `celeborn`)]' --region us-east-1 --output text)
if [ -n "$TABLE_EXISTS" ]; then
echo "Table 'celeborn' exists."
sleep 20
IP_ADDRESSES=$(aws dynamodb scan \
--table-name celeborn \
--projection-expression "IPAddress" \
--query "Items[*].IPAddress.S" \
--region us-east-1 \
--output text)
sudo /home/ec2-user/apache-celeborn-0.5.0-bin/sbin/start-worker.sh celeborn://$IP_ADDRESSES:9097
else
echo "Table 'celeborn' does not exist. Creating table..."
# 创建表
aws dynamodb create-table \
--table-name celeborn \
--attribute-definitions \
AttributeName=IPAddress,AttributeType=S \
--key-schema \
AttributeName=IPAddress,KeyType=HASH \
--provisioned-throughput \
ReadCapacityUnits=5,WriteCapacityUnits=5 \
--region us-east-1
if [ $? -eq 0 ]; then
echo "Table 'celeborn' created successfully."
LOCAL_PRIVATE_IP=$(sudo hostname -I | awk '{print $1}')
sleep 20
aws dynamodb put-item \
--table-name celeborn \
--item '{"ID": {"S": "LocalPrivateIP"}, "IPAddress": {"S": "'"$LOCAL_PRIVATE_IP"'"}}' \
--region us-east-1
sudo /home/ec2-user/apache-celeborn-0.5.0-bin/sbin/start-master.sh
else
echo "Failed to create table 'celeborn'."
exit 1
fi
fi
不使用 Celeborn,向 AWS EMR 集群提交 Spark 任务
1. 运行下列命令,用于生成数据集, 请注意替换 S3 数据集的位置为您自己的桶位置
sudo yum install git gcc
git clone https://github.com/hortonworks/hive-testbench.git
wget https://dlcdn.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
export MAVEN_HOME=/home/hadoop/apache-maven-3.8.8
export PATH=$PATH:$MAVEN_HOME/bin
cd hive-testbench/
sudo ./tpcds-build.sh
cd /usr/bin/hive-testbench/ddl-tpcds/text
hive -d DB=‘tpcds_20000' -d LOCATION=‘s3://<your S3 bucket store the tcpds data>’ -f alltables.sql
2. 提交 Spark 任务
为 q77.sql 文件添加一行
提交 spark sql 任务,运行这个 sql 文件
spark-sql -f q77.sql —num-executors=12 —conf spark.default.parallelism=50
通过观察 Spark History Server,我们可以看到有大量的 shuffle 过程和大量的 shuffle 数据产生。
3. 模拟中断
重新提交 spark 任务,通过 Spark History 服务持续观察 Spark Stage 中运行状态,当大数据量大 shuffle 数据运行到一半的时候,我们通过 FSI 模拟 Spot 实例中断。
中断后,我们会发现部分 Stage 执行失败
在 Spark UI 中寻找 Failed Stages
从中可以看到一些 Failed Stages 有这样的报错
通过详细日志 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 partition 21,我们可以得知失败的原因为 shuffle 数据丢失。Spot 实例上 Shuffle 数据丢失会导致 Spark 运行过程中 Stage 的部分步骤执行失败。如果多次重试都未成功,还可能导致整个任务运行失败。如果任务失败,不但影响了预期的任务进度,还会导致长达几个小时的计算资源浪费。
使用 Celeborn,向 EMR 集群提交 Spark 任务
从第四步的 AWS S3 桶中,在 EMR 的 Master 节点安装 Spark 的 Celeborn 依赖
sudo aws s3 cp s3:/<your S3 bucket>/celeborn-client-spark-3-shaded_2.12-0.5.0.jar /usr/lib/spark/jars/
提交新的 spark 任务,并由 celeborn 处理 shuffle 数据
spark-sql -f q77.sql --num-executors=12 --conf spark.default.parallelism=50 --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager --conf spark.shuffle.service.enabled=false --conf spark.sql.adaptive.localShuffleReader.enabled=false —conf spark.celeborn.master.endpoints=<celeborn master ip>:9097
通过 Spark History 服务持续观察 Spark Stage 中运行状态,当大数据量大 shuffle 数据运行到一半的时候,我们再次通过 FSI 模拟 Spot 实例中断。 持续观察 Stage 情况,我们可以发现,即使发生了 Spot 实例中断,任务也正常执行完成,没有发生重算。
在 Job 详情中,我们可以看到 Spot 实例终止时 Executor 发生的变化
通过以上测试我们可知,通过 Celeborn 将 Spark shuffle 数据与执行计算的 EMR 集群解耦,在 Spot 实例被回收时,Spot 实例上 Shuffle 数据因为存放在 Celeborn 集群中没有丢失,进而避免了任务重算。
总结
Celeborn 作为 Apache 的顶级项目,通过对 shuffle 阶段的存算分离,可以解决计算过程中 shuffle 数据中丢失的问题,减少重算的成本。与 Spots 实例结合,可以更有效的利用 AWS EC2 Spot 实例的价格优势,减少 AWS EMR 集群的运行成本。
参考链接
https://github.com/aws-samples/emr-remote-shuffle-service
https://celeborn.apache.org/
本篇作者