亚马逊AWS官方博客

使用 Flink Hudi 处理变更数据流并通过 Redshift Spectrum 进行数据分析实践

1.背景介绍

这是一个典型的数据分析场景,分散在不同业务数据库里的表格数据,需要汇总到数据湖或数据仓库中进行统一分析。运营人员希望尽可能的实时查询到这些数据,他们需要第一手的订单数据以便快速做出决策。

为实现这类场景需求,有很多种办法,本文介绍通过可视化交互开发平台 Zeppelin 上使用 Flink SQL 实现 ETL 任务,并支持近实时高并发 Upsert 到数据湖,利用 Redshift Spectrum 进行快速分析查询。

2.架构设计及服务介绍

2.1 整体架构

  • 使用 DMS CDC 工具,将多个数据库的变更数据实时采集到 MSK(Kafka)中,并记录到 S3,作为备份使用;
  • 在 AWS EMR 中创建 zeppelin、flink 服务,提交 Flink 任务,消费 MSK 数据,写入到 S3 hudi 数据湖中;
  • 在 S3 的数据,可以分别通过 Athena、Redshift Spectrum 进行外表查询。

2.2 Flink Hudi 流式数据湖

Apache Hudi 是由 Uber 公司从 2014 年开始的内部实践项目,目标是解决 Hadoop 上的大数据更新问题。更新一直是 Hadoop 的痛点之一,在很长一段时间没有好的解决方案。Hudi 自 2016 年开源,在 2020 年成为 Apache 顶级项目,至今已成为最流行的开源数据湖格式之一,Hudi 为文件存储带来行级更新能力。通过中立开放的湖格式和不同的查询引擎对接,提供全量、增量、流读等查询视图。

Apache Hudi 的 table format 对流计算友好的特性使得 Flink On Hudi 成为 Apache Hudi 项目最值得探索和挖掘的方向之一,Flink 不仅为 Hudi 解锁了超大数据流的实时更新能力、更添加了流式消费和计算的能力,让端到端近实时 ETL 得以在低成本的文件存储上轻松实现。

Flink On Hudi 目前解决的经典场景是 CDC 数据的流写流读,用户可以通过 Flink CDC connector/format 将数据库数据同步入湖,下游既可以对接 presto、spark、Athena、Redshift 等 OLAP 引擎作即席查询,也可以对接 Flink 流读实现端到端的近实时 ETL。

2.3 Redshift Spectrum 介绍

通过使用 Amazon Redshift Spectrum,用户可以从 Simple Storage Service(Amazon S3)中的文件有效地查询和检索结构化和半结构化数据,而不必将数据加载到 Amazon Redshift 表中。Redshift Spectrum 查询采用了大规模并行以便针对大型数据集极快地运行。很多处理发生在 Redshift Spectrum 层中,而大多数数据位于 Simple Storage Service(Amazon S3)中。多个集群可同时查询 Simple Storage Service(Amazon S3)上的同一数据集,而无需为每个集群复制数据。

Redshift Spectrum 为 Lake House 架构提供支持,使用户可以在 Redshift、Lake House 和运营数据库中查询数据,而无需使用 ETL 或加载数据。Redshift Spectrum 支持开放数据格式,例如 Parquet、ORC、JSON 和 CSV。Redshift Spectrum 还支持查询具有复杂数据类型(例如 struct、array 或 map)的嵌套数据。同样也可以使用 Amazon Redshift 通过开源 Apache Hudi 或 Delta Lake 对Amazon S3 数据湖中的表格运行读取查询。

3.示例 Demo 操作

3.1 CDC 数据采集

使用 DMS 服务订阅数据库的 CDC 数据,将增量数据发送至 MSK 队列中;

消息格式如下(以 MongoDB doc 模式为示例):

{
    "data":{
        "_id":"6427cb5048c0877ec076e9aa",
        "_doc":"{ \"_id\" : { \"$oid\" : \"6427cb5048c0877ec076e9aa\" }, \"account_balance\" : 85.459999999999993747, \"country\" : \"FR\", \"email\" : \"cbrawleyn@auda.org.au\", \"favorite_color\" : \"#4be765\", \"first_name\" : \"Calypso\", \"gender\" : \"Female\", \"id\" : 3.0, \"ip_address\" : \"207.254.231.161\", \"last_login\" : \"2014-07-31T21:03:23Z\", \"last_name\" : \"Brawley\", \"test\" : 1.0, \"test2\" : 2.0 }"
    },
    "metadata":{
        "timestamp":"2023-04-20T02:34:16.170165Z",
        "record-type":"data",
        "operation":"load",
        "partition-key-type":"primary-key",
        "schema-name":"test",
        "table-name":"user_info"
    }
}

3.2 hudi Flink 处理

使用 Flink SQL(1.16)on EMR(6.10)创建 kafka 、hudi(0.13)表,把数据实时从 kafka 消费出 upsert 写入 hudi 表。

INSERT INTO hudiTable
(_id, account_balance, country, email, first_name, gender, ip_address, last_login, last_name, test, test2, `timestamp`, `partition`)
SELECT
    data._id,
    CAST(JSON_VALUE(data._doc, '$.account_balance') AS DOUBLE) AS account_balance,
    CAST(JSON_VALUE(data._doc, '$.country') AS STRING) AS country,
    CAST(JSON_VALUE(data._doc, '$.email') AS STRING) AS email,
    CAST(JSON_VALUE(data._doc, '$.first_name') AS STRING) AS first_name,
    CAST(JSON_VALUE(data._doc, '$.gender') AS STRING) AS gender,
    CAST(JSON_VALUE(data._doc, '$.ip_address') AS STRING) AS ip_address,
    CAST(JSON_VALUE(data._doc, '$.last_login') AS STRING) AS last_login,
    CAST(JSON_VALUE(data._doc, '$.last_name') AS STRING) AS last_name,
    CAST(JSON_VALUE(data._doc, '$.test') AS DOUBLE) AS test,
    CAST(JSON_VALUE(data._doc, '$.test2') AS DOUBLE) AS test2,
    metadata.`timestamp` AS `timestamp`,
    DATE_FORMAT(TO_TIMESTAMP(metadata.`timestamp`, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS''Z'''), 'yyyy-MM-dd')  AS `partition`
FROM KafkaTable
WHERE data._doc IS NOT NULL
AND metadata.`table-name` = 'user_info'


3.3 创建 Redshift Spectrum 关联 hudi 表并加载分区数据

drop table  mongo.mongo_cdc;
CREATE EXTERNAL TABLE mongo.mongo_cdc(
   _id varchar(255),
  account_balance double precision,
  country varchar(255),
  email varchar(255),
  first_name varchar(255),
  gender varchar(255),
  ip_address varchar(255),
  last_login varchar(255),
  last_name varchar(255),
  test double precision,
  test2 double precision,
  "timestamp" varchar(255)
)
PARTITIONED BY(
    "partition" varchar(255)
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://kafka-connect-data-2023/mongo-hudi/' ;

ALTER TABLE mongo.mongo_cdc
ADD IF NOT EXISTS PARTITION(partition='2023-04-20')
LOCATION 's3://kafka-connect-data-2023/mongo-hudi/2023-04-20';

3.4 使用 Redshift 进行数据查询并做后续数仓的处理


4.总结

本文提供了一个简单构建使用 Flink Hudi 将实时的变更流数据采集写入数据湖的架构,并使用 Redshift 进行数仓处理及 OLAP 分析的方式,解决了以下问题:

1)稳定的 CDC 数据捕获机制,通过 DMS 服务将 CDC 数据写到 MSK 和 S3 备份,利用服务本身高可用性,为采集管道提供高度可靠保障;

2)使用托管在 EMR 的 Flink HUDI 提供流式数据湖方案,将变更数据近实时反馈到数据湖中;

3)结合使用 Redshift Spectrum 功能,为数据湖提供高性能的查询能力。并可直接将湖中数据作为数仓原始数据层生成数仓高级层数据。

参考文档

Redshift Spectrum:https://docs.thinkwithwp.com/zh_cn/redshift/latest/dg/c-using-spectrum.html

Flink HUDI:https://hudi.apache.org/cn/docs/flink-quick-start-guide

HUDI 设计:https://hudi.apache.org/docs/record_payload

CDC 数据采集 DMS:https://docs.thinkwithwp.com/zh_cn/dms/latest/userguide/CHAP_Task.CDC.html

本篇作者

张鑫

AWS 解决方案架构师,负责基于 AWS 云平台的解决方案咨询和设计,在系统架构、数仓和实时离线计算领域有丰富的研发和架构实践经验。