亚马逊AWS官方博客

通过使用 Amazon Graviton2 提升 EMR 的性价比

1.前言

从2020年10月开始,基于亚马逊云科技 Graviton2 的 Amazon EMR 实例逐步推出,现在已经有很多客户在不同的场景开始使用这种类型的实例。

Graviton2 处理器由亚马逊云科技使用 64 位 ARM Neoverse 内核定制,对第一代 Graviton 处理器进行了多种性能优化。这包括 7 倍的性能、4 倍的计算核心数量、每个内核 2 倍的私有内存、5 倍的内存速度和每个核心 2 倍的浮点性能。此外,Graviton2 处理器还具有全天候运行的全加密 DDR4 内存功能,且每核加密性能提高 50%。这些性能提升使得用了Graviton2 的实例成为大数据工作负载的上佳之选。

本文将向您展示在Amazon EMR大数据集群中的Graviton2 R6g实例对R5实例的性能对比测试。我们可以清楚地看到,在标准的TPC-DS基准测试环境(99个典型的SQL场景),R6g实例比之同等资源配置的R5实例性能获得提升,而且每小时单价有所下降,在这双重因素的叠加之下,在EMR大数据负载场景,选择新一代的R6g实例,具有更好的性价比。

2.环境准备

总体说明:

1.测试环境(含操作客户端,R5实例,R6g实例):均位于新加坡区域。

2.测试EMR组件为Spark 3.1.2(对应EMR版本为6.4.0);

3.所有EMR集群均使用Amazon Glue Data Catalog作为MDS;

4.提交测试任务方式:手工登录集群的Master节点使用Spark Shell提交;

5.测试过程中,每种场景测试多次,确保CPU(集群),内存(集群),网络(每一个Core节点),磁盘(每一个Core节点)等资源层面使用率不高于70%;

6.我们以100G的TPC-DS基准数据为测试数据,取对应查询结果做对比。

2.1环境信息

A.操作客户端(此客户端也是造数据的集群)

实例类型 vCPU 内存(GiB) 磁盘(GiB) 角色
r5.4xlarge 16 128 1500 1个Master

B.测试EMR集群:emr640-ic(基于Intel架构的R5机型)

实例类型 vCPU 内存(GiB) 磁盘(GiB) 角色
r5.4xlarge 16 128 500 1个Master
r5.8xlarge 32 256 1500 3个Core

C.测试EMR集群:emr640-ac(基于Graviton2架构的R6g机型)

实例类型 vCPU 内存(GiB) 磁盘(GiB) 角色
r6g.4xlarge 16 128 500 1个Master
r6g.8xlarge 32 256 1500 3个Core

注意:集群名称里面的ic表示intel cluster,ac表示Graviton2(arm) cluster的简称。

可能很多读者会问,为什么测试的时候只选择这一个机型?因为我们经过多轮测试以后,发现8xlarge的机型可以满足各项资源使用率不高于70%的标准。

2.2测试工具

我们使用GitHub上的TPC-DS源代码编译测试用的代码。

2.2.1 准备代码环境

登录测试客户端实例,编译如下代码(先使用sudo su – 到root用户):

yum install gcc make flex bison byacc git jq -y 

mkdir -p /opt/tpcds && cd /opt/tpcds
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make

准备造数据的对应代码:

cd /opt/tpcds

git clone https://github.com/databricks/spark-sql-perf.git
cd spark-sql-perf

wget https://github.com/sbt/sbt/releases/download/v0.13.18/sbt-0.13.18.tgz

tar zxvf sbt-0.13.18.tgz

cd build
mv sbt sbt.back
ln -s ../sbt/bin/sbt sbt
cd ..

编辑插件文件:

vi project/plugins.sbt

# 添加如下一行
# resolvers += "spark-packages" at "https://repos.spark-packages.org/"

可选操作(免得后续每一个环境都需要编译代码,我们把代码上传到S3):

cd /opt/tpcds/spark-sql-perf/

./build/sbt +package

cd /opt
tar zcvf tpcds.tgz tpcds

aws s3 cp tpcds.tgz s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz

2.2.2 准备测试数据

进入测试客户端机器

spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar

用如下的方式造100g数据

import com.databricks.spark.sql.perf.tpcds.TPCDSTables

val rootDir = "s3://dse-710469168206-sin/tpcds/data100g"
val dsdgenDir = "/opt/tpcds/tpcds-kit/tools"
val scaleFactor = "100"
val format = "parquet"
val databaseName = "tpcds100g"
val sqlContext = spark.sqlContext

val tables = new TPCDSTables(sqlContext,
dsdgenDir = dsdgenDir, 
scaleFactor = scaleFactor,
useDoubleForDecimal = true, 
useStringForDate = true)

tables.genData(
location = rootDir,
format = format,
overwrite = true,
partitionTables = true, 
clusterByPartitionColumns = true, 
filterOutNullPartitionValues = false, 
tableFilter = "", 
numPartitions = 1024)

sql(s"create database $databaseName") 

tables.createExternalTables(rootDir, 
format, 
databaseName, 
overwrite = true, 
discoverPartitions = true)

tables.analyzeTables(databaseName, analyzeColumns = true)

2.3验证数据

2.3.1 控制台验证

登陆S3控制台,打开对应存储桶,查看对应数据目录内容(我们造了多个数据库,有10g的,有1000g的,但是此处我们只测试100g的场景)。

登录Glue控制台,找到对应数据库,查看对应数据库和表。

2.3.2 代码验证

使用如下代码执行查询并输出结果

spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar


import com.databricks.spark.sql.perf.tpcds.TPCDS

val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)

// name of database with TPCDS data.
val databaseName = "tpcds10g" 
sql(s"use $databaseName")

// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ic"

// how many iterations of queries to run.
val iterations = 1 

// queries to run.
val queries = tpcds.tpcds2_4Queries

// timeout per query, in seconds.
val timeout = 7200
 
//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)

// wait for finish
experiment.waitForFinish(timeout)

验证运行99个查询后的结果输出(记得在这个master上执行aws configure配置ak/sk并设置profile为default)

pyspark

# 然后执行
from pyspark.sql import Row
resultLocation = "s3a://dse-710469168206-sin/result/tpcds100g/ic"

result = spark.read.format("json").load(resultLocation + "/timestamp=1631844109600").select("results.name","results.executionTime")

def getRowList(row):
            lst = []
            for i in range(0, len(row.name)):
                        lst.append(Row(name=row.name[i],executionTime=row.executionTime[i]))
            return lst

resultDF = result.rdd.flatMap(lambda row: getRowList(row)).toDF().show(104)

3.对比测试过程

我们在部署Intel和Graviton2集群时均做了如下所示的Spark优化配置(在部署EMR集群时作为参数带入):

[
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.executor.cores": "5",
            "spark.driver.cores": "16",
            "spark.executor.memory": "32g",
            "spark.driver.memory": "32g",
            "spark.executor.instances": "16",
            "spark.default.parallelism": "144",
            "spark.sql.shuffle.partitions": "1024",
            "spark.network.timeout": "900s",
            "spark.driver.maxResultSize": "0",
            "spark.executor.heartbeatInterval": "60s",
            "spark.dynamicAllocation.enabled": "true",
            "spark.yarn.scheduler.reporterThread.maxFailures": "2000",
            "spark.sql.debug.maxToStringFields": "2000",
            "spark.sql.autoBroadcastJoinThreshold": "-1"
        }
    },
    {
        "Classification": "yarn-site",
        "Properties": {
            "yarn.nodemanager.vmem-check-enabled": "false",
            "yarn.nodemanager.pmem-check-enabled": "false"
        }
    }
]

并使用之前单节点测试集群的Glue Data Catalog作为MDS。

3.1Intel集群测试

我们测试的过程中发现,不管测试多少轮,其实结果都类似,所以我们取3次正常的数据作为结果。

3.1.1准备测试代码

启动集群后,登录Master和Core节点复制代码(sudo su -)

cd /opt

yum install gcc make flex bison byacc git jq -y

aws s3 cp s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz tpcds.tgz

tar zxvf tpcds.tgz

3.1.2执行查询测试

登录集群Master,通过如下方式提交查询测试任务(此为100g数据源的代码,其他的请酌情修改)

spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar

import com.databricks.spark.sql.perf.tpcds.TPCDS

val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)

val databaseName = "tpcds100g" 
sql(s"use $databaseName")

// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ic"

// how many iterations of queries to run.
val iterations = 1

// queries to run.
val queries = tpcds.tpcds2_4Queries

// timeout per query, in seconds.
val timeout = 7200

//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)

// wait for finish
experiment.waitForFinish(timeout)

注意查询结束后输出的类似如下的结果地址

s3://dse-710469168206-sin/result/tpcds100g/ic/timestamp=1632029414066

多执行几次,并打开Ganglia查看对应监控信息,以99个SQL无任何错误,使用率不超过70%为判断依据。

3.1.3监控情况

从监控数据显示,系统CPU和内存使用几乎接近如下所示(截图仅供参考),我们尽可能保证在没有任何异常的情况下,CPU和内存使用率均保持在50%左右,避免因某些因素导致测试结果失去公平性。

CPU使用情况截图:

内存使用情况截图:

网络使用情况截图:

十五分钟集群负载:

十五分钟单机负载:

只要碰到监控异常或者99个SQL执行结果异常的数据就剔除不要。

3.2Graviton2集群测试

3.2.1准备测试代码

启动集群后,登录Master和Core节点复制代码(sudo su -)

cd /opt

yum install gcc make flex bison byacc git jq -y

aws s3 cp s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz tpcds.tgz

tar zxvf tpcds.tgz

3.2.2执行查询测试

登录集群Master,通过如下方式提交查询测试任务(此为100g数据源的代码,其他的请酌情修改)

spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar

import com.databricks.spark.sql.perf.tpcds.TPCDS

val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)

val databaseName = "tpcds100g" 
sql(s"use $databaseName")

// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ac"

// how many iterations of queries to run.
val iterations = 1

// queries to run.
val queries = tpcds.tpcds2_4Queries

// timeout per query, in seconds.
val timeout = 7200

//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)

// wait for finish
experiment.waitForFinish(timeout)

注意查询结束后输出的类似如下的结果地址

s3://dse-710469168206-sin/result/tpcds100g/ac/timestamp=1632029414066

多执行几次,并打开Ganglia查看对应监控信息,以99个SQL无任何错误,使用率不超过70%为判断依据。

3.2.3监控情况

从监控数据显示,系统CPU和内存使用几乎接近如下所示(截图仅供参考),我们尽可能保证在没有任何异常的情况下,CPU和内存使用率均保持在50%左右,避免因某些因素导致测试结果失去公平性。

CPU使用情况截图:

内存使用情况截图:

网络使用情况截图:

十五分钟集群负载:

十五分钟单机负载:

只要碰到监控异常或者99个SQL执行结果异常的数据就剔除不要。

3.3测试结果

3.3.1取结果的方式

登录单Master的集群的Master节点,查询对应结果

pyspark

# 然后执行
from pyspark.sql import Row
resultLocation = "s3a://dse-710469168206-sin/result/tpcds100g/ac"

result = spark.read.format("json").load(resultLocation + "/timestamp=1632029414066").select("results.name","results.executionTime")

def getRowList(row):
            lst = []
            for i in range(0, len(row.name)):
                        lst.append(Row(name=row.name[i],executionTime=row.executionTime[i]))
            return lst


resultDF = result.rdd.flatMap(lambda row: getRowList(row)).toDF().show(104)

3.3.2结果汇总

Intel集群(emr640-ic)测试数据:

Graviton2集群(emr640-ac)测试数据

4.对比分析

因为一共有99个SQL,所以没办法简单的通过单个SQL(不管是执行时间最长的还是最短的或者中位线)的执行时间来评判性能,所以我们先把多次(此处为3次)的执行结果取平均值,然后根据99个SQL的执行结果的评价值算出来百分比,根据平均延时(秒)和百分比的乘积算出总数,并各自基于总数折算单成本(此处的成本不是指钱,是指计算成本),最后加和得出一个总值作为测试对比结果。

4.1性能对比

按照上述计算方式我们算出的性能成本如下:

平台 总时间成本 单位成本 比例
Intel 1102.26 30.5217
Graviton2 1067.36 26.2709 16.18%

从结果看,基于TPC-DS的100G数据和99个SQL查询,Graviton2(ARM)架构的EMR集群性能同比约优胜16%。

4.2性价比对比

根据官网的报价(https://thinkwithwp.com/cn/emr/pricing/),我们选择新加坡区域,列表价如下:

类型 EC2列表价($) EMR列表价($) 合计 比例
r5.4xlarge 1.2160 0.2520 1.4680
r6g.4xlarge 0.9728 0.2016 1.1744 25.00%
r5.8xlarge 2.4320 0.2700 2.7020
r6g.8xlarge 1.9456 0.4032 2.3488 15.03%

此处我们不考虑预留实例,企业折扣等优惠方式,纯从列表价看,合并后(包括EC2实例和EMR)的总单价,Graviton2架构明显优于Intel架构。

4.3结论

细心的读者可能会留意到,我们并没有额外再针对Graviton2(ARM)架构重新编译代码,同样的代码在两套架构上完全通用。当然,因为我们全部用的是Spark SQL方式所以架构差异较少,如果是客户的实际业务环境还是可能会有差异的,建议读者朋友们以测试结果为准。

从我们的测试结果看,从Intel架构迁移到Graviton2(ARM)架构,不仅可以获得性能的提升,还可以获得更好的性价比。

本篇作者

陈卫琼

亚马逊云科技资深解决方案架构师,负责协助客户业务系统上云的解决方案架构设计和咨询,现致力于大数据和机器学习相关领域的研究。

龙斌

亚马逊云科技解决方案架构师,负责协助客户业务系统上云的解决方案架构设计和咨询,现致力于容器和机器学习相关领域的研究。