亚马逊AWS官方博客

多库多表场景下 CDC 数据实时写入 Redshift 数仓方案

一、背景说明

1.1 Redshift 简要说明

Redshift 作为当今时代性价比最优的企业级云上数仓,不仅采用 code generation,vectorized execution 等技术优化其执行引擎,提供卓越的性能表现。同时作为云原生的企业级数仓,Redshift 在稳定性、安全性、弹性扩展、容灾能力等方面也不断创新,为企业客户的智能数据分析保驾护航。这里列举几个 Redshift的功能特性。

  1. 存算分离的架构,底层数据以专用格式存储在 S3,计算资源不受存储限制可以灵活弹性扩展。
  2. DataSharing 可以做到跨账号,跨 Region 的数据秒级共享,无需移动数据,底层存储是同一份数据。
  3. Redshift Serverless 无需预置管理维护集群,随查询并发自动扩展 RPU 计算资源,按 RPU 计算时长付费,不查询不收费。
  4. Redshift ML 可以 SQL 的方式创建机器学习模型,自动选择最优的算法,自动生成推理的 UDF 函数,在仓中直接完成对数据的深度探索。
  5. Streaming Ingestion 可以直接将 MSK 和 KDS 的数据摄入到 Redshift,无需部署任何其它组件,秒级别延迟。
  6. zero-ETL 可以一键整库同步 Aurora MySQL 数据到 Redshift,实现 TP 数据在 AP 引擎的高效分析。到 2023 年,Redshift 历经 11 年的众多客户检验和技术创新,始终追求卓越,为数万企业保驾护航。

1.2 CDC 工具和 Redshift 结合

关于 CDC 的基本介绍及相关的工具对比在该篇数据入湖的文章中已经详细说明,这里不再赘述,唯一需要说明的是对于 MSK Connector 和 Flink CDC 其内核都是 Debezium。总体而言当前 CDC 实时写入 Redshift 方式有三种模式。

第一种模式,CDC 工具解析 Binlog(比如 MySQL)数据直接 Sink 到 Redshift。能以这种方式实现的工具,AWS 的托管服务目前可以使用 DMS(AWS Database Migartion Service),其它商业的公司的付费工具比如 Fivetran 也可以实现。开源的工具 Flink CDC 通过 DataStream API 做深度的定制也可以做到。直接使用 Flink JDBC 的 Connector Sink 数据是不可以的,这里多做一些解释,Flink JDBC Connector Sink 的原理是封装 Upsert(CDC 变更必须设置为 Upsert 模式)的 SQL 语法,对于不同的 Database Flink 对于幂等的实现有不同方式。比如 MySQL 是 INSERT .. ON DUPLICATE KEY UPDATE ..,PostgreSQL 是 INSERT .. ON CONFLICT .. DO UPDATE SET ..,目前 Redshift 是不支持 PostgreSQL 这个 Upsert 的语法的(Redshift 原生支持 Merge 语法), Flink 检查到语法不兼容时,程序是会报错的。如果自己重写 Flink Sink Redshift 的逻辑,如果不存在就 Insert,存在执行 Update,这个逻辑执行对于 Redshift 而言是比较低效的,此外 Redshift 最高效的更新逻辑是 Merge 的方式,实现 Merge 可以使用 Merge 语法,也可以 Staging 表做,核心都是将 Update 转换为 Delete+Insert,因此对于 Flink 而言将 CDC 这种有更新的数据直接通过 JDBC Sink 到 Redshift 现阶段是不可行的。如果通过 DataStream API 自己封装逻辑,将数据 Sink 到 S3 再 Merge 的方式是最高效的,但是代码的开发是有挑战的,一些客户本身 CDC 的数据量不大,也会使用 DataStream API 自己 JDBC 封装 Batch 的 Insert 到 Staging 表然后 Merge;如果 CDC 数据量较大,JDBC 的 Insert 将会成为性能瓶颈。Flink 有一个 FLIP-307 是关于 Flink Redshift Connector 的实现计划的说明。

第二种模式,CDC 工具解析 Binlog 将数据发送到 MSK(Amazon Managed Streaming for Apache Kafka),再用工具消费 MSK 中的数据写入到 Redshift。此种方式下写到 Redshift 目前比较好的选择是 Spark,因为亚马逊云科技官方提供了 Spark Redshift Connector。这个 Connector 是开源的,亚马逊云科技对 Spark Redshift Connector 在 Glue 和 EMR 环境里也有深度的优化。该 Connector 原理是将数据 Sink 到 S3,然后 Copy 数据到 Redshift,对于更新可以在写入 Staging 表后执行 postactions 在事务里封装 Delete+Insert 逻辑,此种方式的更新是非常高效的。Spark Structured Streaming 设置 Checkpoint Interval 控制 Batch 延迟,可以做到端到端小于 30s 延迟(延迟与 Spark 资源、同时并发同步表数据、Redshift 是用资源相关)。目前有多个客户生产中使用,其中有客户已实现千张表的整库同步。

第三种模式,如果您使用的是 Aurora MySQL version 3.03.1(compatible with MySQL 8.0.26)或更高版本,您可以使用 zero-ETL 的功能,将数据一键整库自动化同步到 Redshift,无需任何中间组件,非常方便高效,可以做到秒级别同步。

1.3 CDC 同步到 Redshift 要解决的问题

CDC 数据同步到 Redshift,我们会面临如下的挑战,以上第一种和第二种模式技术上要解决的问题如下:

  1. Full Load 阶段的性能。在全量数据阶段,是向 MySQL 发送的 Select 查询,对于单个大表的加载应该做合理的分片,并行加载数据。对于多张表要并行执行全量加载,而不是串行执行。
  2. 源端的 Schema 变更的支持程度。添加列,删除列是常规的变更,列类型的变更对于列存的引擎而言都是需要比较大的代价的,一般都不会支持,字符串类型定义的长度变更也是有些场景会遇到的。
  3. CDC 阶段的并行。我们知道 Binlog 的解析是单线程做的,但是对于解析的数据 Sink 到多张 Redshift 表是可以并行的,比如有 30 张表同时同步到 Redshift,并行可以降低多表的同步时间。
  4. 数据顺序的保证。如果我们 CDC 数据 Sink 到的 Kafka,Kafka 可以保证单分区有序,但是多分区是无序的,对于 CDC 数据而言,我们要保证同一张表相同主键的变更数据发送到同一分区,如果单分区对于大表会成为性能瓶颈,所以我们要自定义分区 Key 来保证相同主键 Sink 到同一分区,这样的做的还可以有两个优势,第一,单线程的 Binlog 解析的下游算子可以重新设定并行度,当我们想在 Binlog 解析阶段增加逻辑,可以多并行度处理提升性能。第二,多个表的数据 Sink 到同一个 Kafka 的 Topic 时,可以多分区保证性能,这对于一些客户有上千张表同步时,非常关键。
  5. 数据 Sink 到 Redshift 前合并优化。如果相同主键的数据在一个时间段内有大量的变更,比如同一条数据在 10s 内做了 20 次的 Update。我们没有必要将这 20 条变更应用到 Redshift,Sink 之前我们只需要按照时间做开窗函数取最后一次的 Update 就应用到 Redshift 就可以。这种方式的处理在变更频繁的场景下可以很好的提升同步性能。

我们从各方面总结了两种不同数据入仓(Redshift)的优缺点。对于需要更高的自主可控性,以及 CDC 数据要下游多端复用时,建议选择 MSK 解耦上下游的模型。

同步模式 架构 灵活性 开放性 监控 Full  Load 性能 Schema 变更 CDC 并行 合并优化 数据回溯 延迟 成本
1 DMS->Redshift 简单 稍差 内部封装,用户不可见 完善的监控体系 大表稍差 增加,删除列 可以(1~32 并行度) BatchApply 支持 创建新Task 指定 Binlog 位点 秒级别
2 CDC 工具→MSK→Spark→Redshift 复杂 上下游解构,较灵活 开放代码,自主可控 无,需要自己构建 自己控制并行度 增加,删除列,字符串列长度变更 自定义 开窗函数支持 指定 Kafka 消费位置或时间即可 秒级别 稍高

二、架构方案

本篇博客里,我们会重点说明 MSK 解耦模式的架构方案及部署实施。对于 DMS 直接 Sink 到 Redshift 在 AWS Console 配置即可,会在后面小结中说明使用时的注意事项。

2.1 架构说明

  • CDC 数据获取并写入 Kafka 主要有三种方式:DMS、MSK Connect for Debezium 或亚马逊托管的 Flink 服务 Amazon Managed Service for Apache Flink(MSF)。
    • DMS 和 MSK Connect 可以通过控制台进行部署。
    • MSF 需要代码实现,主要基于 CDC Connectors for Apache Flink ,我们根据客户的常用场景做了一些自定义,例如支持所有表单 topic,按库划分。由于都实现了主键作为 partition key,topic 都可以设置多分区。
  • Spark Streaming 消费 Kafka CDC 数据,多线程 Sink 多张表到 Redshift。Kafka 解耦上下游,链路更稳定,数据回溯更方便。Spark Streaming 有两种选择,EMR Serverless 和 Glue。

2.2 架构其他细节说明

  • 三种 CDC 方式均支持按照 MySQL 主键值作为 Kafka Partition Key,保证 CDC 顺序。DMS 和 MSK Connector 本身提供支持。KDA Flink CDC 代码里通过自定义 Kafka Partition 实现,由于做主键分区,所以可以在 CDC 阶段 Rebalance,下游算子不在需要设置单并行度保序,保证高性能。
  • 多线程并行写多表支持 insert,update,delete,秒级别延迟,更新删除逻辑比较轻,使用 Staging 转换 UPDATE = DELETE+INSERT。
  • Schema 自动变更支持增加列,删除列,修改字符串列的长度,使用 from_json 动态生成每批次的 DataFrame 的 Schema,以 DataFrame 的 Schema 为标准更新 Redshift 的 Schema。Full Load 阶段可以根据数据量调整资源快速加载。
  • Spark Structured Streaming 消费数据对单批次数据做了开窗处理,相同记录的多次更新在同一批次过来,可以做到数据合并为单条,这对于更新频繁的场景有很好性能提升,减小了 Sink 到下游的 Redshift 的压力。
  • 延迟可以做到秒级别,压测 50 线程 50 表并行写(包含 CIUD 操作),端到端数据延迟 40~60 秒,50 线程 100 张表 90~120 秒。延迟和数据量、Streaming 作业的资源及 Redshift 的集群资源相关,可在自己环境测试看延迟,可以调整 Checkpoint Interval 调整 Streaming 作业每批次间隔时间,一般 30 秒之内延迟没有问题。
  • 支持忽略 DDL 模式,用户自己控制建表和控制 Schema变更,如果设置之后,不再自动创建表,用户自己预先创建表,同样不会自动添加删除列,源端变更需要用户控制。
  • 支持 delete 数据单独写到一张表,表名自动以_delete 结尾,支持只同步 delete 数据,不同步原表数据,表名字自动以_delete 结尾。
  • Spark 根据消费的 CDC DataFrame 在 Redshift 自动创建表结构,维护 Schema 变更,需要注意的是自动创建的是根据 CDC 的 Json 种的数据类型映射的,而非 MySQL 表中的元数据类型,表中的列的顺序是按照字母顺序排序的。如果想自己创建表,指定类型和字段顺序也是支持的,开启忽略 DDL 模式即可。
  • 对于 Mongodb,Postgres,Sqlserver,Oracle,TiDB,Db2,Oceanbase,Flink CDC 都是支持的,但是当前 Sink 到 Redshift 的代码中只适配了 MySQL 的 CDC Format,对于其它数据库,如果有需要可以参考代码,自己加入逻辑,比较简单。
  • DMS 在 Full Load 阶段支持多表并行,单表多线程;Flink CDC Full Load 阶段支持单表多线程,但不支持多表并行;Debezium 2.2 版本之后可以设置 max.threads=4 支持多表并行,但是对于单表还是单线程,Debezium 2.2 之前版本是单线程同步整个库,所以在 Full Load 阶段会比较慢。

三、CDC Sink MSK 部署实施

我们会在这里展示从数据库 CDC Sink 到 MSK, Spark Structured Streaming 消费 MSK 数据写入到 Redshift 中。对于 CDC Sink 到 MSK 这里使用两种方式给大家做个示例,第一个方式是 Flink CDC 的方式,Flink 作业部署到 MSF 中;第二个是 MSK Connector 配置 Sink 到 MSK。对于 DMS 发送到 MSK,控制台上配置即可,只会说明注意事项。

3.1 CDC 方式 1 – Flink CDC

  • 代码支持说明

1. MSF Flink(Flink 版本 1.15)
2. Flink CDC DataStream API 解析 MySQL Binlog 发送到 Kafka,支持按库发送到不同 Topic,也可以发送到同一个 Topic
3. 自定义 FlinkKafkaPartitioner,数据库名,表名,主键值三个拼接作为 partition key,保证相同主键的记录发送到 Kafka 的同一个分区,保证消息顺序
4. Flink CDC 支持增量快照算法,全局无锁,Batch 阶段的 checkpoint,但需要表有主键,如果没有主键列增量快照算法就不可用,无法同步数据,需要设置 scan.incremental.snapshot.enabled=false 禁用增量快照
5. 当前只加入 MySQL,Mongo 的支持,如需其它数据库,可以自行修改
6. 支持 MySQL 指定从 binlog 位置或者 binlog 时间点解析数据*
7. EMR on EC2支持(flink 1.15.x version)

  • 准备 JAR 包
    # 代码地址: https://github.com/yhyyz/flink-cdc-msk 
    # 使用编译好的 JAR
    wget https://dxs9dnjebzm6y.cloudfront.net/tmp/flink-cdc-msk-1.0-SNAPSHOT-202310101435.jar
    # 上传到 S3
    aws s3 cp flink-cdc-msk-1.0-SNAPSHOT-202310101435.jar s3://xxxx/jars/
    # 自己编译
    git clone https://github.com/yhyyz/flink-cdc-msk
    mvn clean package -Dscope.type=provided
    
  • AWS 控制台或者 AWS CLI 创建 MSF 应用,填写 JAR 包地址和相关参数说明
    # local 表示本地运行,prod 表示 MSF 运行
    project_env local or prod
    # 是否禁用 flink operator chaining 
    disable_chaining false or true 
    # kafka 的投递语义 at_least_once or exactly_once,建议 at_least_once,不用担心 Redshift 写入重复因为已经做了幂等
    delivery_guarantee at_least_once
    # mysql 地址
    host localhost:3306
    # mysql 用户名
    username xxx
    # mysql 密码
    password xxx
    # 需要同步的数据库,支持正则,多个可以逗号分隔
    db_list test_db,cdc_db_02
    # 需要同步的表支持正则,多个可以逗号分隔
    tb_list test_db.product.*,test_db.user.*,cdc_db_02.sbt.*
    # 在快照读取之前,Source 不需要数据库锁权限。每个并行 Readers 都应该具有唯一的 Server id,所以 Server id 必须是类似 `5400-6400` 的范围,并且该范围必须大于并行度。
    server_id 10000-10010
    # mysql 时区
    server_time_zone Etc/GMT
    # latest 从当前 CDC 开始同步,initial 先快照再 CD
    position latest or initial
    # kafka 地址
    kafka_broker localhost:9092
    # topic 名称,如果所有的数据都发送到同一个topic,设定要发送的 topic 名称
    topic test-cdc-1
    # 如果按照数据库划分 topic,不同的数据库中表发送到不同 topic,可以设定 topic 前缀,topic 名称会被设定为前缀+数据库名。 设定了-topic_prefix 参数后,topic 参数不再生效
    topic_prefix flink_cdc_
    # 需要同步的表的主键,这次表的正则
    table_pk [{"db":"test_db","table":"product","primary_key":"pid"},{"db":"test_db","table":"product_01","primary_key":"pid"},{"db":"cdc_db_02","table":"sbt.*","primary_key":"id"}]
    kafka_properties 'max.request.size=1073741824' 
    chunk_size 8090 # 默认值 8096,全量阶段如果表比较大,表的单行数据比较大,产生 OOM 时,可以调小该值
    # max.request.size 默认 1MB,这里设置的 10MB
  • 查看运行状态

3.2 CDC 方式 2 – MSK Connector CDC

  • 创建 Debezium MySQL Plugin
    # debezium mysql plugin
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.2.1.Final/debezium-connector-mysql-2.2.1.Final-plugin.zip
    # 上传到 S3
    aws s3 cp debezium-connector-mysql-2.2.1.Final-plugin.zip s3://xxxx/debezium2/
    
  • 创建 Worker 配置
    key.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    
  • 创建 Connector 并启动
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.user=admin
    database.server.id=11000
    tasks.max=1
    time.precision.mode=adaptive_time_microseconds
    schema.history.internal.kafka.bootstrap.servers=<Your_MSK_Bootstrap_Servers>
    include.schema.changes=true
    topic.prefix=debezium2
    schema.history.internal.kafka.topic=debezium-dbbase-1-schema-history
    database.hostname=<Your_Database_Hostname>
    database.password=<Your_Database_Password>
    database.include.list=<Your_Database_Name>
    
    # 注意 Debezium2.x 和 1.x 版本参数是有差异的,这里使用的是 2.2.1
    # Connector 的配置可参考 https://debezium.io/documentation/reference/2.2/connectors/mysql.html
    # MSK connector 不能修改只能重建,重建时只要指定的名字和之前一样,同时配置和之前 connector 相同的 offset.storage.topic,就可以从该 topic 中获取之前的 offset(对于 sink connector)
    # 对于 debezium connector binlog 信息和 schema 信息在 schema.history.internal.kafka.topic 中存储,重建时指定同样的名字,就可以从之前的 binlog 位置消费

3.3 CDC 方式 3  – DMS CDC

DMS 控制台直接配置 Endpoint 和 Task 即可,比较简单,不再赘述。需要注意的是创建 Endpoint 是指定参数。

MessageFormat: json-unformatted
IncludePartitionValue: true
PartitionIncludeSchemaTable: true
NoHexPrefix: NoHexPrefix
IncludeNullAndEmpty: true
# endpoint 参数说明看这里 https://docs.thinkwithwp.com/AWSCloudFormation/latest/UserGuide/aws-properties-dms-endpoint-kafkasettings.html

3.4 验证 MSK CDC 数据

# 根据自己 kafka 版本选择
wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.12-2.8.2.tgz
tar -xvzf /kafka_2.12-2.8.2.tgz && cd kafka_2.12-2.8.2
export bs="xxxxx.kafka.us-east-1.amazonaws.com:9092"
export topic="xxx"
# 从头消费数据
./bin/kafka-console-consumer.sh --bootstrap-server $bs --topic $topic --from-beginning | jq .
# 从头消费数据,key 也输出
./bin/kafka-console-consumer.sh --bootstrap-server $bs --topic $topic --from-beginning --property print.key=true
# cdc 格式样例
https://github.com/yhyyz/kafka-cdc-redshift#mysql-cdc%E6%A0%BC%E5%BC%8F%E6%A0%B7%E4%BE%8B

四、Spark Sink Redshift 方式 1 – Glue

4.1 下载依赖

该程序使用 Spark Structured Streaming 实现。在 AWS 上部署有三个选择 Glue EMR Serverless,EMR on EC2,EMR on EKS,代码是兼容的,这里以 Glue 部署作为样例。

# 下载依赖的 JAR,上传到 S3
# spark redshift connector,对开源做了点修改,开源默认将 String 类型映射为了 Text,默认长度是 256,调整为了 varchar(65535)。代码地址 https://github.com/yhyyz/emr-spark-redshift。 可以使用如下编译好的 
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-spark-redshift-1.0-SNAPSHOT.jar
# 可以将 offset 通过 listener 提交到 Kafka,方便 Kafka 测监控消费
# glue 3.0
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/spark-sql-kafka-offset-committer-1.0.jar
# glue 4.0
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/spark3.3-sql-kafka-offset-committer-1.0.jar
# clone 代码,cdc_util build 成 whl,方便再在多个环境中使用,直接执行如下命令 build 或者下载 build 好的
# 编译好的
https://dxs9dnjebzm6y.cloudfront.net/tmp/cdc_util_202304251252-1.1-py3-none-any.whl
# 自己编译
git clone https://github.com/yhyyz/kafka-cdc-redshift
python3 -m venv cdc_venv
source ./cdc_venv/bin/activate
python3 setup.py bdist_wheel

4.2 创建 Job 配置参数

因为需要传递给 JOB 参数较多,将其定义到了一个文件中,启动作业会解析该配置文件,可以使用该文件作为参数配置。GitHub 上 README 注意看,里面会有各种配置的更新以及支持的特性。

# for glue job,请仔细看参数说明
aws_region = us-east-1
s3_endpoint = s3.us-east-1.amazonaws.com
# spark checkpoint 目前作业重启是会自动从 checkpoint 位置消费
checkpoint_location = s3://xxxx/checkpoint/
# 每批次的消费间隔,控制者最小的延迟,建议值 10s~60s 之间,或者配置为 availableNow 表示消费到当下停止(适用定时执行)spark3.3+才支持
checkpoint_interval = 30 seconds
# kafka 地址
kafka_broker = xxxx1.us-east-1.amazonaws.com:9092,xxxx2.us-east-1.amazonaws.com:9092
# 消费 CDC 数据的 Kafka 的 topic,多个以逗号分隔
topic = flink_mysql_cdc
# 启始的消费位置,支持 latest earliest 和指定时间戳,例如 1682682682000。需要注意的是此参数只在第一此启动作业时生效,当作业重启时会从 checkpoint 启动,如果重启时生效,需要删除 checkpoint 目录再启动。Spark 作业如果有 checkpoint 会优先从 checkpoint 启动,如果没有才会从该参数指定的位置启动
startingOffsets = latest
# 并发写的线程数,如果要 50 张表同时写就配置成 50,更多并发所有表的同步延迟就会更短,消耗的资源就会更多,不建议超过 50
thread_max_workers = 30
# 是否启动 Debug 日志,启动 Debug 会影响性能,主要用来排查错误
disable_msg = true
# CDC 的格式,目前支持三个值,FLINK-CDC or DMS-CDC or MSK-DEBEZIUM-CDC
cdc_format = FLINK-CDC
# 每个批次最大从 Kafka 拉取多少条数据,当回溯数据时,防止一个 batch 消费太多数据,资源开的不够,Spark 内存溢出
max_offsets_per_trigger = 1000000
# 消费者组
consumer_group = cdc-redshift-glue-g1
# Redshift 链接信息配置
# 如果用 secret manager 管理链接,填 secret_id 一个就可以
redshift_secret_id =
redshift_host = xxxx.us-east-1.redshift.amazonaws.com
redshift_port = 5439
redshift_username = xxx
redshift_password = xxx
redshift_database = dev
redshift_schema = public
# 数据写 S3 的临时路径
redshift_tmpdir = s3://xxxx/glue-cdc/tmpdir/
# Redshift 关联的 IAM role,需要有 S3 权限
redshift_iam_role = arn:aws:iam::xxxx:role/<your-role-name>

# 同步表的配置,db 和 table 都支持正则表达式适用于分库分表的合并,target_tabel 指定合并后的表名
sync_table_list = [\
{"db": "test_db", "table": "product", "primary_key": "pid"},\
{"db": "test_db", "table": "user", "primary_key": "id"},\
{"db": "test_db", "table": "product_02", "primary_key": "pid"},\
{"db": "test_db", "table": "product_multiple_key_01", "primary_key": "pid,pname"},\
{"db": "cdc_db_02", "table": "sbtest1", "primary_key": "id"},\
{"db": "cdc_db_02", "table": "sbtest2", "primary_key": "id"},\
{"db": "cdc_db_02", "table": "sbtest3", "primary_key": "id"},\
....
{"db": "cdc_db_02", "table": "sbtest100", "primary_key": "id"}
]

# 其它同步方式的配置例子
# save_delete 设置为 true,表示同步原表同时,将delete数据单独写一张表
# only_save_delete 设置为 true,表示只同步 delete 数据,不同步原表数据
sync_table_list = [\
{"db": "test_db", "table": "product", "primary_key": "pid","ignore_ddl":"true","save_delete":"true"},\
{"db": "test_db", "table": "user", "primary_key": "id","only_save_delete":"true"}\
]

# ignore_ddl,忽略 ddl 变更,表需要用户自己创建,创建表名如果不配置,请用源端的表名字创建 Redshift 表
# target_table 配置在 Redshift 创建表名称
sync_table_list = [\
{"db": "test_db", "table": "product", "primary_key": "pid","ignore_ddl":"true"},\
{"db": "test_db", "table": "product", "primary_key": "pid","ignore_ddl":"true","target_table":"t_product"},\
{"db": "test_db", "table": "user", "primary_key": "id"}\
]

4.3 Glue Job 配置

--extra-jars s3://xxxx/jars/emr-spark-redshift-1.0-SNAPSHOT.jar,s3://xxxxx/tmp/spark-sql-kafka-offset-committer-1.0.jar
--additional-python-modules  redshift_connector,jproperties,s3://xxxx/tmp/cdc_util-1.1-py3-none-any.whl
--aws_region us-east-1
# 注意这个参数 --conf 直接写后边内容,spark.executor.cores 调成了 8,表示一个 worker 可以同时运行的 task 是 8
# --conf spark.sql.shuffle.partitions=1  --conf spark.default.parallelism=1 设置为 1,这是为了降低并行度,保证当多个线程同时写多张表时,都尽可能有资源执行,设置为 1 时,最终生产的数据文件也是 1 个,如果数据量很大,生产的一个文件可能会比较大,比如 500MB,这样 Redshift copy 花费的时间就会长一些。如果想要加速,就把这两个值调大一些,比如 4,这样就会生产 4 个 125M 的文件,Redshift 并行 copy 就会快一些,但 Glue 作业的资源对应就要设置多一些,可以观察执行速度评估
--conf spark.sql.streaming.streamingQueryListeners=net.heartsavior.spark.KafkaOffsetCommitterListener  --conf spark.executor.cores=8 --conf spark.sql.shuffle.partitions=1  --conf spark.default.parallelism=1 --conf spark.speculation=false --conf spark.dynamicAllocation.enabled=false
--config_s3_path  s3://xxxx/kafka-cdc-redshift/job-4x.properties
# 关闭 spark event log, streaming 模式 event log 增长会将 driver 端的磁盘打满
--enable-spark-ui false
# Glue 选择 3.x,作业类型选择 Spark Streaming 作业,worker 个数根据同步表的数量和大小选择,Number of retries 在 Streaming 作业下可以设置大些,比如 100。失败自动重启,且会从 checkpoint 自动重启 
  • 启动作业运行即可

4.3 Redshift 中查看数据

上述配置我并行同步了 MySQL 中 50 张表到 Redshift,使用 Sysbench 压测 MySQL 保护(insert,update,delete)操作。

4.4 监控

监控是非常重要的,Kafka 的 Offset 延迟,Glue 作业的运行状态,同步到 Redshift 表的数据和元表的数据的一致性检查。这些重要的指标都需要监控报警出来,当前需要将这些监控纳入到自己的监控体系中。最简单的方式是 CloudWatch 拿到 Kafka,Glue 的监控指标定义报警规则。对于数据一致性的校验,可以写一个脚本,执行双端 SQL 对比检查,这是相对简单高效的方式。

五、Spark Sink Redshift 方式 2 –  EMR Serverless

5.1 导入环境变量

EMR Serveless 选择 6.11.0 版本

# S3 存储桶,例如:s3://<my-bucket>
export APP_S3_BUCKET='s3://<my-bucket>'
export AWS_REGION="us-east-1"
# EMR Serveless application id
export EMR_SERVERLESS_APP_ID='00fcvtfb3pisut09'
# execution role 需要有对应的 S3 桶权限
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::xxxxx:role/<your-role-name>'

export APP_NAME='emr-serverless-cdc-redshift'
export APP_LOCAL_HOME="/home/ec2-user/$APP_NAME"

export CDC_UTIL_WHL_NAME="cdc_util_202310201517-1.1-py3-none-any.whl"
export CDC_UTIL_WHL_LOCATION="https://dxs9dnjebzm6y.cloudfront.net/tmp/$CDC_UTIL_WHL_NAME"

export APP_S3_HOME="$APP_S3_BUCKET/$APP_NAME"
export MAIN_SCRIPT_PATH="$APP_S3_HOME/script/cdc_redshift.py"
export JOB_CONFIG_PATH="$APP_S3_HOME/script/job-ec2-canal.properties"
export PYTHON_VENV_PATH="$APP_S3_HOME/venv/cdc_venv.tar.gz"

5.2 打包 python venv 及依赖

# python 3.7+
mkdir -p $APP_LOCAL_HOME/venv/
cd $APP_LOCAL_HOME/venv/
deactivate
rm -rf ./cdc_venv
rm -rf ./cdc_venv.tar.gz

python3 -m venv cdc_venv
source cdc_venv/bin/activate
pip3 install --upgrade pip
pip3 install redshift_connector jproperties 
# cdc_util 是封装好的 Spark CDC Redshift 的包,源代码在 cdc_util中
wget $CDC_UTIL_WHL_LOCATION
pip3 install $CDC_UTIL_WHL_NAME

pip3 install venv-pack
venv-pack -f -o cdc_venv.tar.gz
# 上传到 S3
aws s3 cp $APP_LOCAL_HOME/venv/cdc_venv.tar.gz $APP_S3_HOME/venv/

5.3 依赖包

mkdir $APP_LOCAL_HOME/jars/
rm -rf $APP_LOCAL_HOME/jars/*
cd $APP_LOCAL_HOME/
aws s3 rm --recursive $APP_S3_HOME/jars/

wget -P ./jars  https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.2/spark-sql-kafka-0-10_2.12-3.3.2.jar
wget -P ./jars  https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.2/kafka-clients-2.8.2.jar
wget -P ./jars  https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.2/spark-token-provider-kafka-0-10_2.12-3.3.2.jar
wget -P ./jars  https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar

# emr 6.10 之后 emr or emr serverless 默认会在 spark.executor/driver.extraClassPath 中配置/usr/share/aws/redshift/spark-redshift/lib/* 加入自带的 redshift connector。所以下面的 emr-spark-redshift-1.3-SNAPSHOT.jar 在 emr serverless 中并不生效,可以不使用,如果使用自定义的在 emr serverless 中要通过自定义 image 实现。emr on ec2 中,需要在/etc/spark/conf/spark-defaults.conf 中去掉依赖
# wget -P  ./jars https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-spark-redshift-1.3-SNAPSHOT.jar

wget -P  ./jars https://dxs9dnjebzm6y.cloudfront.net/tmp/spark3.3-sql-kafka-offset-committer-1.0.jar

aws s3 sync $APP_LOCAL_HOME/jars ${APP_S3_HOME}/jars/

5.4 执行脚本及配置文件

cd $APP_LOCAL_HOME/
mkdir $APP_LOCAL_HOME/script/
rm -rf $APP_LOCAL_HOME/script/*
wget -P  ./script https://raw.githubusercontent.com/yhyyz/kafka-cdc-redshift/main/emr_ec2/cdc_redshift.py
aws s3 cp ./script/cdc_redshift.py $APP_S3_HOME/script/

wget -P ./script https://raw.githubusercontent.com/yhyyz/kafka-cdc-redshift/main/config/job-ec2-canal.properties
# 注意修改 job-ec2-canal.properties 配置, 这里使用的是 canal cdc 格式,也支持 Flink CDC,Debezium CDC,DMS CDC 格式
aws s3 cp ./script/job-ec2-canal.properties $APP_S3_HOME/script/

5.5 配置 submit

cd $APP_LOCAL_HOME/
cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
    "name":"$APP_NAME",
    "applicationId":"$EMR_SERVERLESS_APP_ID",
    "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
    "jobDriver":{
        "sparkSubmit":{
        "entryPoint":"$MAIN_SCRIPT_PATH",
        "entryPointArguments":[
        
  
  "$AWS_REGION",
        
  
  "$JOB_CONFIG_PATH"
        ],
         "sparkSubmitParameters":"--conf spark.sql.streaming.streamingQueryListeners=net.heartsavior.spark.KafkaOffsetCommitterListener --conf spark.executor.cores=16 --conf spark.executor.memory=16g --conf spark.driver.cores=8 --conf spark.driver.memory=16g --conf spark.executor.instances=10 --conf spark.sql.shuffle.partitions=2  --conf spark.default.parallelism=2 --conf spark.dynamicAllocation.enabled=false --conf spark.emr-serverless.driver.disk=150G --conf spark.emr-serverless.executor.disk=150G --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//'),/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,/usr/share/aws/redshift/spark-redshift/lib/spark-avro.jar --conf spark.archives=${PYTHON_VENV_PATH}#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
   },
   "configurationOverrides":{
        "monitoringConfiguration":{
            "s3MonitoringConfiguration":{
                "logUri":"$APP_S3_HOME/logs"
            }
        }
   }
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

5.6 提交作业

export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
    --no-paginate --no-cli-pager --output text \
    --region $AWS_REGION \
    --name $APP_NAME \
    --application-id $EMR_SERVERLESS_APP_ID \
    --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
    --execution-timeout-minutes 0 \
    --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
    --query jobRunId)

5.7 监控作业

now=$(date +%s)sec
while true; do
    jobStatus=$(aws emr-serverless get-job-run \
                    --no-paginate --no-cli-pager --output text \
                    --application-id $EMR_SERVERLESS_APP_ID \
                    --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                    --query jobRun.state)
    if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
        for i in {0..5}; do
            echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
            sleep 1
        done
    else
        echo -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"
        break
    fi
done

5.8 检查错误

JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME

六、DMS Sink Redshift

6.1 DMS Sink Redshift 时的注意事项

如果使用 DMS 直接 CDC Sink 到 Redshift 时,我们在配置时需要注意如下事项:

  • DMS 实例的大小影响同步的性能,对于生产环境中要同步的表比较多,表的数据量较大,变更较多时,非常建议直接选择大类型的实例。大类型实例在 Full Load 阶段能够提升性能。
  • DMS 当遇到不支持的源端的 Schema 变更,比如修改类类型,或者修改字符类型长度时,DMS 任务会失败,你需要手动调整 Redshift 表的列,让其和源端匹配,再重新 Resume 作业。
  • 如果 DMS 同步过程中某张表处于 Suspend 状态,你可以开启 Debug 日志,然后在 DMS 控制台可以单独 Reload 这张表,查看错误原因,再对应解决。Reload 这张表时,会重新拉取该表的数据。
  • DMS 任务在创建的时候如果是 Full Load+ongoing replication 模式,是不能直接从指定的 Binlog 位点重新启动作业的,需要新建一个 ongoing replication 的 Task,这时可以从指定的 MySQL Binlog 位点启动作业,也可以从上个作业的 Checkpoint 启动作业。
  • DMS 作业一定要加上监控报警,比较重要的磁盘使用率的指标,同步延迟的指标,表同步状态的指标,一旦出问题我们可以及时发现并修复。建议创建 DMS TASK 时开启 DMS 自身日志表(awsdms_statusawsdms_suspended_tablesawsdms_history)信息的同步到 Redshift,以便我们定位异常问题。
  • 如果表中有大量的 Lob 字段,DMS 对于 Lob 类字段的处理性能不是特别高,它的逻辑是先将非 Lob 类型的列同步到 Redshift,Lob 列此时为空,然后单独执行 Update 语句更新 Lob 列。同时在同步 Lob 和 Text 字段时,不建议开启 ParallelApplyThreads 参数。
  • 关于 DMS 如何启动失败的作业,以及如何从 Binlog 位点及 checkpoint 恢复作业,后面会以 API 操作举例,也可以在 AWS 控制台点击配置。当指定位点恢复作业时,只要指定的位点比当前的位点早就可以,不用担心 Redshift 数据重复,因为在 Redshift 端数据是会被更新的,会保证最终一致性。有时你可能不方便找到之前消费的 Binlog 是哪个,DMS 提供了 From a custom CDC start time 通过指定一个 timestamp(UTC)来标识从哪个位置消费数据,同样不需要十分准确的时间,只要比当前停止的时间早一些就可以。

6.2 DMS CLI 启动恢复作业

# 对于 Full load,ongoing replication 类型作业,使用 API 可以从上次停止的的位点恢复,console 上的 resume 也可以, 但此类型作业不可以从任意指定的 checkpoint 点或者 native LSN 恢复作业。因此对于此类型的作业,如果想要用新的 Task 启动作业,可以新启动一个 cdc 类型作业
# resume-processing 和 console 上 resume 等同。reload-target 和 console 上的 restart 等同。此类型作业 start-replication 只能首次启动作业才可以用
aws dms start-replication-task \
    --replication-task-arn arn:aws:dms:ap-southeast-1:xxx:task:xxx \
    --start-replication-task-type resume-processing \
    
# 指定位点,from native LSN 如何果获取这个位点,在下方查看 mysql binlog 位点说明
aws dms start-replication-task \
    --replication-task-arn arn:aws:dms:ap-southeast-1:xxx:task:xxx \
    --start-replication-task-type start-replication  \
    --cdc-start-position mysql-bin-changelog.038818:568
    
# 指定 checkpoint from checkpoint 
aws dms start-replication-task \
    --replication-task-arn arn:aws:dms:ap-southeast-1:xxx:task:xxx \
    --start-replication-task-type start-replication  \
    --cdc-start-position checkpoint:V1#264#mysql-bin-changelog.039022:2167:-1:2198:167598213826489:mysql-bin-changelog.039022:2052#0#0#*#0#825

# 从上一次失败恢复
aws dms start-replication-task \
    --replication-task-arn arn:aws:dms:ap-southeast-1:xxx:task:xxx \
   --start-replication-task-type resume-processing

# 根据情况,binlog 保留时间,防止 binlog 删除,cdc 恢复数据可能找不到 binlog 位点
call mysql.rds_show_configuration;
call mysql.rds_set_configuration('binlog retention hours', 72);
# cdc 需要 mysql 设置 binlog_format 为 ROW

# 查看 mysql binlog 位点
show master status;
show binary logs;
show binlog events in 'mysql-bin-changelog.xxxx';

6.3 DMS 参数优化

DMS 写入到 Redshift 时,常用的参数调整一般保持默认即可,如果遇到性能问题,再对应调整参数。

# cdc 阶段
BatchApplyEnabled=true 这个默认就是 True
BatchSplitSize=0 0 表示没有限制,设为 0 即可
BatchApplyTimeoutMax=30 等待的最小值,默认是 1s
BatchApplyTimeoutMin=60 等待的最大值,默认是 30s
ParallelApplyThreads=32 默认值是 0,调整为 32,表中有 Lob 和 Text 列,不建议调整,保持默认值
ParallelApplyBufferSize=1000 每个 buffer queue 中最大记录数默认是 100,调整为 1000
BatchApplyMemoryLimit=1500 默认值 500MB,MaxFileSize(默认 32MB)*ParallelApplyThreads=32*32=1024MB,调成 1500
MemoryLimitTotal=6144 默认值 1024MB,事务在内存中超过这个值开始写磁盘
MemoryKeepTime=180 默认值 60s,事务在内存中超过这个时间没有写到 target 开始写磁盘
# full load 阶段
MaxFullLoadSubTasks=8 默认值 8,最大 49。表示全量加载阶段最多同时并行同步多少张表
ParallelLoadThreads=16 表示对于一张表,可以启动多少个线程来同步,增大该值,对大表的全量数据同步会更快,划分表中数据的方式可通过指定范围字段或者分区方式,具体可以参考 https://docs.thinkwithwp.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.html#CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.ParallelLoad

七、总结

本篇文章介绍了多库多表实时同步到 Redshift 的方案选择及各自方案的适用场景。最简单的方式是使用 DMS 同步到 Redshift;如果 Aurora MySQL version 3.03.1(compatible with MySQL 8.0.26)或更高版本可以使用 zero-ETL;当需要更高的自主可控性,以及 CDC 数据要下游多端复用时,可以选择 CDC 到 MSK 的上下游解耦的方案。DMS 和 Kafka 解耦方案同样可以结合使用,当遇到 DMS 在某些场景不能满足需求时,比如包含 Lob 列的表同步遇到性能瓶颈时,或者某些过亿的表 Full Load 阶段时间较长时,可以考虑两者结合,这部分表可以使用 Spark Structured Streaming 的方式做同步。

本篇作者

潘超

亚马逊云科技数据分析解决方案架构师。负责客户大数据解决方案的咨询与架构设计,在开源大数据方面拥有丰富的经验。工作之外喜欢爬山。

冯秋爽

AWS 解决方案架构师,负责跨国企业级客户基于 AWS 的技术架构设计、咨询和设计优化工作。在加入 AWS 之前曾就职于 IBM、甲骨文等 IT 企业,积累了丰富的程序开发和数据库的实践经验。

汤市建

亚马逊云科技数据分析解决方案架构师,负责客户大数据解决方案的咨询与架构设计。

郭韧

AWS AI 和机器学习方向解决方案架构师,负责基于 AWS 的机器学习方案架构咨询和设计,致力于游戏、电商、互联网媒体等多个行业的机器学习方案实施和推广。在加入 AWS 之前,从事数据智能化相关技术的开源及标准化工作,具有丰富的设计与实践经验。