亚马逊AWS官方博客

构建一站式车联网数据分析平台

1. 介绍

1.1 背景

根据 IDC 的预测,到 2027 年,全球互联汽车数据量将达到 33,297 EB。这庞大的数据为车辆预警、电池管理、救援服务和车辆维护等实时互联服务提供了关键的技术基础。随着信息技术在新能源领域的飞速发展,车联网也为汽车行业带来了重大变革,车联网是通过将车辆与互联网连接起来,实现车辆与外界的信息交互和数据共享。将这些数据与营销、制造和其他多维度数据相结合,可应用在移动互联网、汽车营销、媒体推广和保险服务等场景,帮助车企实现业务增长。车联网技术的发展,不仅提升了驾驶体验,也为车企、监管部门和消费者带来了全新的数据价值。

车联网数据平台作为车联网生态中的关键基础设施,担负着数据采集、存储、分析和应用的重要功能。它可以整合来自车载设备、手机 APP、道路监控等多方面的数据源,为各方主体提供数据服务。

车联网数据平台具有以下几个典型特点:

  • 海量数据采集能力。车载设备可以实时采集车辆运行状态、驾驶行为、电池状态等大量数据采集到云端,为后续分析和应用奠定基础。
  • 强大的数据处理能力。平台需要具备高效的数据存储、计算和分析能力,才能应对海量、多样化的车联网数据。
  • 实时分析能力。对于一些关键场景,如充电时电池的情况预警、实时路况分析,可以在极短时间内完成数据的采集、分析和反馈。
  • 开放的数据共享机制。平台应当建立安全可靠的数据共享机制,满足不同主体和用户的个性化需求。
  • 丰富的应用场景。车联网数据可应用于交通管理、保险、维修等多个领域,为各方带来新的商业价值。

1.2 问题

车联网数据平台面临的一大挑战就是如何应对海量、实时的数据处理需求。传统的架构,在面对突发的数据峰值时,可能会出现以下问题:

  • 大量的数据移动。由于数据存放于多种存储介质,数据需要各个引擎中流动,增加了开发的工作量,也降低了数据分析的时效性。
  • 资源配置难题。需要预先购买足够的计算、存储资源,以应对车主早晚通勤带来的数据高峰,但在低峰时会造成大量资源浪费。
  • 运维负担重。需要持续监控资源使用情况,并对多种组件及时扩缩容以及对配置匹配的权限管控,对运维团队提出了较高要求。
  • 成本控制困难。难以根据实际负载动态调整资源配置,容易导致成本失控。

因此,车企需要升级车联网数据平台架构,以满足快速增长的交互式分析查询和实时业务数据可见性的需求。

2. 方案介绍

基于以上的问题,在设计方案时需要考虑的原则:存算分离,数据不丢,业务不断,因此拥有以下特点:

  • 一份数据,多处使用。
  • 保证数据接口一致,降低运维复杂度
  • 数据不丢失,保持可用可查
  • 计算按需定制,采用无服务方式节省开支,免运维

采用云原生架构为车联网数据平台提供弹性、可扩展、可靠的解决方案,帮助应对海量实时数据处理需求,提升整体运营效率和成本控制能力。

2.1 方案架构

在数据采集端选择了 Serverless 服务,例如 Amazon MSK Serverless,Managed Service Apache Flink(MSF),借助了 Serverless 服务的弹性扩缩容,能够根据实时的数据访问量自动扩展计算资源,无需预先规划。用户无需负责基础设施的运维和管理,大大降低了用户的运维负担;开发人员可以快速部署应用程序。另外在数据平台,采用数据湖技术(Amazon S3 + Iceberg)结合 Serverless 服务 Amazon Athena ,Amazon EMR Serverless 实现了数据一次存储多处使用。并且对于具有交互式分析场景,使用 Amazon EMR 结合 Apache Ranage + Open LDAP 的方案实现了对于多组织,多团队用户场景的身份认证和数据权限管控。

本文将分别从数据湖的选择、数据接入、数据加工、交互式分析、权限管控几个角度来介绍整体方案的实现。

2.2 数据湖的选择

当我们开始设计车联网数据平台的时候,首先需要考虑的是数据存储的格式,因为车联网数据由于数据合规等要求,需要能对数据进行删除等数据变更的操作。而单纯使用 Hive + Parquet 的方式,不易做到对文件数据的删除。所以锁定了当前业界比较流行的几种数据湖表格式(Apache Hudi,Apache Iceberg,DeltaLake),它们都可以支持对文件数据进行事务性的操作。另外对数据湖表格式的需求,还希望它能满足兼容多引擎,例如支持 Spark,Flink,Trino,Hive。并且有比较活跃的社区生态。因此,在这几种数据湖表格式中进行了对比。在对比测试的过程中,考虑到车辆网的数据特性,它在写入时几乎没有 Update 的操作,并且写入数量大,这时候发现如 Hudi 具备的自动小文件合并(File Sizing)的能力在这种场景下并不合适,写入时频繁的合并操作,会导致写入性能降低。因而最终方案选择了 Apache Iceberg。

Apache Iceberg 的优势

  • Iceberg 不绑定某一特定引擎,使用通用的数据组织格式,可以方便地与不同引擎(如 Flink、Hive、Spark、Trino)对接;
  • 良好的架构和开放的格式。相比于其他数据格式,Iceberg 的架构实现更为优雅,同时对于数据格式、类型有完备的定义和可进化的设计;
  • 面向对象存储的优化。Iceberg 在数据组织方式上充分考虑了对象存储(Amazon S3)的特性,避免耗时的 listing 和 rename 操作,使其在基于对象存储的数据湖架构适配上更有优势。
  • 亚马逊云科技中的分析服务,例如 Amazon EMR、AWS Glue、Amazon Athena 等都支持了 Apache Iceberg。并且如果使用 AWS Glue Data Catalog 作为 Apache Iceberg 的 Catalog,可以使用 AWS Glue 对 Iceberg 表进行自动的小文件压缩,这个操作是异步的,也不会影响写入的流程。

2.3 数据接入

2.3.1 数据实时接入

基于车联网的特性,对于数据的收集和处理都是实时的。因此采用了 Amazon MSK 作为数据采集的缓冲层,使用 Flink 来实时消费 TSP 数据入湖。在平台建设初期,由于对于数据量的评估和流量的峰值都无法有一个准确的判断,因此可以选择使用 Serverless 服务。

1. Amazon MSK Serverless 的最佳实践

Amazon MSK Serverless 能帮助用户在平台搭建初期,对 TSP 产生的数量还无法预估时,提供比较好的扩展能力,避免在集群模式下对于存储空间的预估偏差,导致集群由于数量的增长而需要对磁盘扩展的操作,带来的一些额外的运维工作和稳定性的问题。Amazon MSK Serverless 可以通过它灵活扩展的能力,承接波峰波谷非常明显的业务场景。但是,在使用 Amazon MSK Serverless 时也需要关注它的一些限制:

(1) 认证方式

Amazon MSK Serverless 默认启用了 IAM 认证,而且这也是它唯一的一种认证方式。因此,在实现时需要兼容 IAM 认证的模式。

(2) 吞吐量限制

  • Amazon MSK Serverless 会限制整个集群的读写吞吐量(写 200MB/s,读 400 MB/s)。
  • 单个分区的读写吞吐量限制(写 5MB/s,读 10 MB/s),建议通过在创建 Topic 时来增加分区数量提升写入的吞吐量,对已创建的 Topic 也可以通过修改分区数量来提升吞吐量,并且 Amazon MSK Serverless 会自动重分布分区。

2. Managed Service Apache Flink 写入 Iceberg 的最佳实践

数据接入的流程是通过 Managed Service Apache Flink(以下简称 MSF)消费 Amazon MSK Serverless 中的数据,实时的写入 S3 ,以 Iceberg 表格式的方式存储。MSF 可以自动的帮助用户管理资源的弹性伸缩和 Flink 作业的 savepoint ,但是需要关注如下几点问题:

1 依赖问题处理

在使用 MSF 时,最多遇到的问题就是依赖问题,由于 MSF 并没有原生的集成 Iceberg 组件,因此在通过 MSF 实现数据实时摄入 Iceberg 并且使用 Glue Data Catalog 时,会遇到如下异常:Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration。这是因为 MSF 的 Flink 镜像中并没有包含和 hadoop 相关的依赖,导致这里方法加载默认配置的时候,找不到 org/apache/hadoop/conf/Configuration 类,因此需要在自己的代码工程中重写 org.apache.flink.runtime.util.HadoopUtils 类。

下面我们来看一下,如何通过重写 HadoopUtils 类解决依赖问题。首先,在代码工程中创建一个 package,并且添加一个名为 HadoopUtils 的类,加入如下代码:

package org.apache.flink.runtime.util;

import org.apache.hadoop.conf.Configuration;

public class HadoopUtils {

    public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) {
        return new Configuration(false);
    }
}

然后修改 pom.xml 文件,找到 transformers,在它的后面添加 relocations 如下内容。利用 maven-shade-plugin 来解决包冲突的问题。目的就是将原来依赖包里的 org.apache.flink.runtime.util.HadoopUtils 迁移到一个指定包中,然后让程序运行的时候调用重写的类。

<!-- 这里是 main class -->
<transformers>
         <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                 <mainClass>com.amazonaws.emr.flink.mainclass</mainClass>
    </transformer>
</transformers>

<!-- 需要添加的内容 -->
<relocations>
   <relocation>
       <pattern>org.apache.hadoop.conf</pattern>
       <shadedPattern>shaded.org.apache.hadoop.conf</shadedPattern>
   </relocation>
   <relocation>
      <pattern>org.apache.flink.runtime.util.HadoopUtils</pattern>
      <shadedPattern>shadow.org.apache.flink.runtime.util.HadoopUtils</shadedPattern>
   </relocation>
 </relocations>

然后重新打包代码(也可以参考 github 上的样例代码)。

2 Iceberg 表文件的管理

由于车联网的场景,数据需要实时入湖,Iceberg 会因为写入的频率而生成比较多的小文件和元数据文件。因此,需要定期来维护 Iceberg 的表。最常用的场景是文件的合并和元数据文件与快照文件的过期。

Iceberg 小文件压缩

Iceberg 不会在写入时做小文件合并,所以它在写入数据时不会有频繁的 rewrite 操作。 因此产生的小文件需要通过手动或者定时任务的方式来优化 。下面介绍几种方式来减少小文件:

  • MSF 应用运行 Flink 作业默认的 checkpoint interval 是 5 秒,可以根据数据写入实际的实时性要求来增加这个时间,这样可以在每一次提交时,积累的数据更多,从而降低写入 Iceberg 表的文件数量。
  • 可以通过设置 Iceberg 表属性,在创建 Iceberg 表时添加如下参数,这样在写入 Iceberg 数据时,就从源头直接合并文件,一个 Task 会处理某个分区的数据,提交自己的 Datafile 文件。
    'write.distribution-mode'='hash'
  • 使用 Spark 提供的 procedure ,通过定期的调度任务执行 Spark SQL 来实现小文件压缩,它会根据建表参数中设置的 write.target-file-size-bytes 参数为单个文件的最大值来合并文件,默认 512 MB。 参考
    CALL <iceberg-catalog>.system.rewrite_data_files('<database>.<iceberg-table>')

Iceberg metadata 版本文件保留

在 Iceberg 表中另外一个会因为频繁提交而导致文件数量快速增加的就是 metadata 文件。可以在建表的时候来指定以下参数,设定 metadata 版本保留的最大数量,以及指定在提交的时候删除 metadata 文件,这样当 MSF 每次提交 checkpoint 写入数据文件的时候,会根据指定的保留版本数量来清理历史的 metadata 文件。

TBLPROPERTIES (
    ...
    'write.metadata.delete-after-commit.enabled' = 'true',
    'write.metadata.previous-versions-max' = '5',
    ...
)

清理 Iceberg 表中历史版本的数据文件

由于,数据文件是跟随 Iceberg Snapshot 保留下来的,如果一直保留,可以用于 Time Travel 等场景,也能通过这些 snapshot 回滚数据。但是,随着数据量越来越大,特别是在频繁提交,和频繁进行小文件合并操作后,会在目录下产生大量的冗余文件,会占用存储空间,这个时候是需要考虑清理一些不需要的历史数据的。可以使用 expire_snapshots 通过清理 snapshots 的方式来清理不需要的冗余历史文件。在 Spark SQL 中执行以下命令,设置过期的时间(或者在表参数中指定 snapshot 保留的时间 history.expire.max-snapshot-age-ms),即可根据设定保留快照的最大时间来清理历史数据文件。

%%sql
CALL <iceberg-catalog>.system.expire_snapshots(table => '<database>.<iceberg-table>',older_than => TIMESTAMP '2024-08-01 08:00:00')

2.3.2 数据批量接入

针对于离线场景多种数据源的数据同步,采用 AWS Glue 作为数据同步的工具。AWS Glue 已经在 3.0 版本开始支持 Iceberg 的写入方式,通过 Glue 可以快速的接入多种数据源的数据,并且以 Iceberg 的存储格式存放在 S3 中。

元数据同步问题

由于 Glue 作为数据同步工具将数据以 Iceberg 的表格式方式写入 S3 时,必须依赖 Glue Data Catalog,而如果通过部署外部数据库作为 Hive metastore 的 Amazon EMR 集群对这些表进行查询时,就涉及到元数据的同步。如下图:

通过 Glue ETL 创建的 Iceberg 表,写入数据后,需要将表共享给使用 External hive metastore 作为元数据的引擎的 EMR 集群,则需要在 Amazon EMR 集群中新建一个 hive 外表来关联到这张 Iceberg 表在 S3 上存放数据的路径,需要在建表的时候指定表属性 current-snapshot-idmetadata_location

如果没有指定 current-snapshot-idmetadata_location 而创建的表,将是一张新表,并且它会自动创建一个新的 metastore.json 文件,这样就没有办法读到历史数据来。

  1. 通过 Athena Spark SQL,获取需要同步表的 metastore file 和 最新的 snapshotid,根据自己的实际情况,替换表名
    spark.sql("SELECT file,latest_snapshot_id from spark_catalog.icebergdb.<tablename>.metadata_log_entries where latest_snapshot_id = (SELECT snapshot_id from spark_catalog.icebergdb.<tablename>.snapshots order by committed_at desc limit 1)").show(truncate = False)
  1. 从 Glue Data Catalog 获取建表 DDL(Athean/Sparksql),以下命令通过 hive 引擎执行
    SET iceberg.catalog.hive_prod.type=hive;
    SET iceberg.catalog.hive_prod.uri=thrift://<ec2-server>:9083;
    SET iceberg.catalog.hive_prod.clients=10;
    SET iceberg.catalog.hive_prod.warehouse=s3://<s3bucket>/data/iceberg-folder/;
    
    add jar /usr/lib/hive/auxlib/iceberg-hive3-runtime.jar;
    
    CREATE EXTERNAL TABLE iceberg_db.<tablename> (
    `column1` STRING,
    ....
    )
    STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
    LOCATION 's3://<s3bucket>/data/iceberg-folder/icebergdb.db/tb_user_order_list_01'
    TBLPROPERTIES(
    'current-snapshot-id' = '<snapshotid>',
    'metadata_location'='s3://<s3bucket>/data/iceberg-folder/icebergdb.db/<tablename>/metadata/00047-22b52076-984b-4f25-8f30-7f009cf58b1d.metadata.json',
    'format' = 'iceberg/parquet',
    'format-version' = '2',
    'write.distribution-mode' = 'hash',
    'write.metadata.delete-after-commit.enabled' = 'true',
    'write.metadata.previous-versions-max' = '10',
    ......);
    

这样,通过 Glue ETL 创建的 Iceberg 表,也同样可以在使用 External hive metastore 作为元数据的引擎的 Amazon EMR 集群中访问。但是,需要注意的是在创建表之后,避免从 hive 引擎对表进行增删改操作,这样将造成两边元数据不同步,要遵循如下规则:

  • Glue/Athena:读/写
  • EMR(Hive):读

如果通过 AWS Glue 或者 Amazon Athena 对表进行 Insert/update/delete 或者是 DDL 的操作,仍然需要再获取一次最新的 metastore file 和 snapshotid,更新一次 hive 的表属性。

2.4 数据加工与交互式分析场景

车联网数据平台有两大类的使用场景,一是开发或者数据分析人员,使用数据平台进行一些交互式分析,通常把它定位为一个开发环境。另一种场景就是比较传统的批量数据处理场景,把它定位为生产环境。从资源利用率的角度来看,开发环境是需要有一个常驻集群,等待用户不定期的作业提交和数据探索的需求,而生产环境运行的都是已知的定期作业任务。因此从这两种场景上分析,选择使用不同的产品组合方式来来提升数据分析平台的资源利用率。

通常情况下,在建设大数据平台时,为了隔离开发与生产环境,需要部署多个大数据集群。这样增加了运维的复杂度,并且也有资源的浪费。因此,可以通过 Amazon EMR 集群和 Amazon EMR Serverless 相结合的模式。开发环境可以使用小规模的 Amazon EMR 集群,提供给数据开发人员,并且在集群上部署 OpenLDAP + Apache Ranger 来控制开发人员的身份认证与数据访问权限,并且利用 Amazon EMR 动态扩缩的能力,可以在资源不足时自动扩展集群,也可以使用 Amazon EMR 集群中的 Trino 作为数据探索的计算引擎。而生产环境,不允许开发用户访问,可以使用 Amazon EMR Serverless,在其上运行每天定时的跑批生产作业,充分利用了 Serverless 的计费灵活性,降低了使用 Spark/Hive 调用资源的成本。

使用 Apache Kyuubi 解决 Spark SQL 资源隔离问题

在使用 Hue 执行 Spark SQL 作业时,通常会使用 Spark Thrift Server 服务作为提交的入口,但是 Spark Thrift Server 存在 Driver 单点问题,随着 Executor 数量的增加,Driver 需要处理更多控制面的 RPC 消息,另外它不支持多租户,启动的 Application 只有全局唯一的用户名,在资源上,占用单个资源队列,导致多个开发用户同时访问 Hue ,提交 SQL 作业时无法细粒度的控制用户使用的资源大小。因此,选择了 Apache Kyuubi。

Apache Kyuubi 可以支持多种编程语言的接口,包括 JDBC/ODBC、Thrift、REST 等,可以更好地适应不同应用场景的需求。更重要的是它针对性地进行了性能优化,如支持动态资源分配、缓存管理等机制,可以提供更稳定和可靠的服务。另外,整个方案中使用了 Apache Ranger 作为权限控制的服务,而其中使用了 Hive metastore plugin,即可以控制 hive 的访问权限,也可以控制 sparksql,Kyyubi 同样有效。

Apache Kyuubi 集成 Amazon EMR Hue 的部署方式如下,在 Amazon EMR 主节点上部署:

cd /mnt
wget https://archive.apache.org/dist/kyuubi/kyuubi-1.8.1/apache-kyuubi-1.8.1-bin.tgz
tar -zxvf apache-kyuubi-1.8.1-bin.tgz
mv apache-kyuubi-1.8.1-bin kyuubi
rm -rf apache-kyuubi-1.8.1-bin.tgz
cp /mnt/kyuubi/conf/kyuubi-env.sh.template /mnt/kyuubi/conf/kyuubi-env.sh
 
cat <<EOF >> /mnt/kyuubi/conf/kyuubi-env.sh
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
EOF
 
cp /mnt/kyuubi/conf/kyuubi-defaults.conf.template /mnt/kyuubi/conf/kyuubi-defaults.conf
 
node='Master'
cluster_id=`cat /mnt/var/lib/info/job-flow.json | jq -r ".jobFlowId"`
igid=$(cat /mnt/var/lib/info/job-flow.json | jq --arg node $node -r '.instanceGroups[] |select(.instanceRole == $node).instanceGroupId')
emr_region=`curl -s http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region`
zk_cluster=`aws emr list-instances --region $emr_region --cluster-id $cluster_id --instance-group-id $igid|grep -Po '"PrivateDnsName": *\K"[^"]*"'|sed 's/"//g' | tr '\n' ',' | sed 's/,$//'`
 
cat <<EOF >> /mnt/kyuubi/conf/kyuubi-defaults.conf
kyuubi.ha.enabled true
kyuubi.ha.zookeeper.quorum ${zk_cluster}
kyuubi.ha.zookeeper.client.port 2181
kyuubi.session.engine.idle.timeout=PT1M
EOF
 
/mnt/kyuubi/bin/kyuubi start

当 Kyuubi 启动成功之后,就可以通过 beeline -u 'jdbc:hive2://<ec2-server>:10009/' -n hadoop 进入 beeline 命令行来验证,然后执行 show databases,可以成功显示 database 信息,如下图所示:

接下来,在 Hue 中配置 Kyuubi,将以下配置添加到 Amazon EMR 的配置中:

[
    {
        "Classification":"spark",
        "Properties":{
            "sql_server_host":"<emr-master-node-server>",
            "sql_server_port":"10009"
        }
    },
    {
        "Classification":"notebook",
        "Properties":{

        },
        "Configurations":[
            {
                "Classification":"interpreters",
                "Properties":{

                },
                "Configurations":[
                    {
                        "Classification":"sql",
                        "Properties":{
                            "name":"SparkSql",
                            "interface":"hiveserver2"
                        },
                        "Configurations":[

                        ]
                    }
                ]
            }
        ]
    }
]

待 Amazon EMR 中各个实例组的配置更新后,就可以在 Hue 中验证查询:

2.5 权限管控

作为企业统一的数据分析平台,多个团队/部门共同使用,因此需要在平台之上构建一套权限管控的机制。对于使用传统 Hadoop 生态大数据组件的用户,可以通过 Open LDAP + Apache Ranger 与 Amazon EMR 的集成来实现身份认证和权限管理的。这里用户具有多入口接入大数据平台的使用习惯,例如通过边缘节点,使用 beeline,hive cli,trino cli 等工具来访问大数据平台。因此需要在 Amazon EMR 上实现对使用的这些开源组件都开启身份认证。

参考如上架构,通过在 Amazon EMR 外部搭建独立的 Open LDAP 服务和 Apache Ranger 服务来对用户的身份认证和数据访问权限进行管控。

1. 身份认证

使用 LDAP 来实现身份认证,通过在 EC2 部署 Open LDAP。

2. 数据权限控制

使用 Apache Ranger 来控制用户访问数据的权限,整体方案围绕 Hive Metastore 作为元数据的统一入口来控制用户的数据访问权限,因此选择了开源的 Hive Metastore Plugin ,这样凡是使用 hive metastore 作为元数据的引擎,包括 Hive、Spark、Trino,都会统一由 Hive Ranger Plugin 来控制权限,避免了多个引擎在多个 Plugin 中配置权限的问题。

关于身份认证和权限管控的技术实现,可以参考博客:在多主节点的 Amazon EMR 集群中实现用户身份认证与细粒度访问控制,我们就不在这里做详细探讨。

3. 总结

在部署海外的车联网数据平台时,满足当地的合规要求、运营需求和服务至关重要。从数据合规、数据处理和生态丰富度等方面考虑,车联网数据平台的选型涉及数据摄入、数据开发、交互式分析、湖仓选型、数据安全等话题。本文介绍了围绕 Amazon EMR 和统一数据湖(S3)在构建车联网数据平台的实践,用户无需在不同数据分析引擎之间移动数据,通过 Amazon EMR 让开发人员可以继续沿用以往在 hadoop 生态体系下的数据平台的操作习惯,实现细粒度的数据权限和身份认证控制,另外针对车联网数据具有实时性、数据流量存在波峰波谷不确定等特性,利用亚马逊云科技 Serverless 服务的优势,在通过弹性资源优化资源利用率,也可以快速完成整个车联网数据平台架构的验证与运营,为助力车企向“服务和数据驱动”转型变革保驾护航。

本篇作者

黄霄

亚马逊云科技数据分析解决方案架构师,专注于大数据解决方案架构设计,具有多年大数据领域开发和架构设计经验。

潘浏阳

亚马逊云科技解决方案架构师,主要负责 Auto 行业客户的架构优化和迁移等工作,拥有超过 10 年的项目实施和运维经验。

沈金

亚马逊云科技数据分析高级产品经理,15+ 年 data 相关经验,现负责数据分析产品 GTM,拥有多年在CDP(客户数据平台)/ VDP(车联网数据平台)架构选型,集中及联邦数据治理的经验。