亚马逊AWS官方博客

Aurora MySQL 2 升级之下游 Binlog 消费处理方案 – Flink CDC

随着社区停止 MySQL 5.7 的支持,Aurora MySQL 2(兼容 MySQL 5.7)的标准支持也将在 2024/10/31 正式停止,并从 2024 年 12月 1 日 开始收取扩展支持费用。

为保证数据库的平稳升级,及不同场景的的升级需求差异。我们推出一些列文章,每一篇记录了细分场景的完整过程。

  1. Aurora MySQL 2 升级方案
  2. Aurora MySQL 2 升级前置准备
  3. Aurora MySQL 2 升级预检查-上
  4. Aurora MySQL 2 升级预检查-下
  5. Aurora MySQL 2 升级之流量兼容性检查辅助工具
  6. Aurora MySQL 2 升级之 Global Database 处理方案
  7. Aurora MySQL 2 升级之下游 Binlog 消费处理方案 – Canal
  8. Aurora MySQL 2 升级之下游 Binlog 消费处理方案 – Flink CDC(本文)

Aurora MySQL 作为 OLTP 数据库,通常存储核心业务数据。有很多数据需要 CDC 到数据湖仓,给下游分析业务使用。在 Aurora MySQL 2 升级到场景中,保持 Flink CDC 正常的数据同步,也是很多架构师关心问题。本文通过详细的步骤,讲述Aurora MySQL 2(兼容 MySQL 5.7)通过蓝绿部署方式升级后,如何配置 Flink CDC,以最小化升级影响。

Flink CDC简介

Flink CDC 是一个基于流的数据集成工具,其通过 YAML 配置文件定义 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及精确一次(Exactly-once)语义。

Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

  • 端到端的数据集成框架
  • 为数据集成的用户提供了易于构建作业的 API
  • 支持在 Source 和 Sink 中处理多个表
  • 整库同步
  • 具备表结构变更自动同步的能力(Schema Evolution)

1. 准备 Flink 集群

本文主要讲述 Aurora MySQL 5.7 升级过程中,Flink CDC 的断点续传问题,所以以 EC2 为环境,简易搭建 Flink/Flink CDC。生产环境,建议通过 EMR 方式部署 Flink。

1.1 下载 Flink 1.18.0,解压后得到 flink-1.18.0 目录。 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录

cd flink-1.18.0

1.2 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。并修改端口,访问 IP 范围等参数

execution.checkpointing.interval: 3000
rest.port: 8081
rest.address: localhost
rest.bind-address: 0.0.0.0

1.3 使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

1.4 启动成功后,可以在 http://localhost:8081/访问到 Flink Web UI,如下所示

2. 通过 Flink CDC CLI 提交任务

2.1 下载二进制压缩包 flink-cdc-3.1.0-bin.tar.gz,并解压得到目录 flink-cdc-3.1.0;flink-cdc-3.1.0 下会包含 bin、lib、log、conf 四个目录

下载地址:
wget https://dlcdn.apache.org/flink/flink-cdc-3.1.0/flink-cdc-3.1.0-bin.tar.gz

2.2 下载依赖的 jar 包,并且移动到 Flink lib 目录下

Flink JDBC Connector:
版本:flink-connector-jdbc-3.1.2-1.18.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.2-1.18/flink-connector-jdbc-3.1.2-1.18.jar

Flink MySQL CDC Connector:
版本:flink-sql-connector-mysql-cdc-2.3.0.jar
wget https://repo.maven.apache.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar

MySQL JDBC Driver:
版本:mysql-connector-java-8.0.27.jar
wget https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar

2.3 重新启动 Flink 集群,重新启动 Flink 集群以应用更改

./bin/stop-cluster.sh
./bin/start-cluster.sh

3. 数据库环境准备

通过 AWS 控制台,创建两个数据库:

  • 源数据库:Aurora MySQL 5.7
  • 目标数据库:RDS Mysql 5.7

我们会使用 Flink CDC 将源数据库的变更,同步到目标数据库

3.1 源数据库环境准备

1. 连接到数据库,并创建源数据表 source_table

mysql -haurora-57.cluster-***********.us-east-1.rds.amazonaws.com -u<username> -p<your-password>

创建表 source_table

CREATE DATABASE source;
USE source;
CREATE TABLE source_table (
    id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    ctime DATETIME
);

2. 插入模拟数据

INSERT INTO source_table (id, customer_name, ctime) VALUES
(1, 'John Doe', NOW()),
(2, 'Jane Smith', NOW()),
(3, 'Michael Johnson', NOW()),
(4, 'Emily Davis', NOW()),
(5, 'David Wilson', NOW()),
(6, 'Sophia Thompson', NOW()),
(7, 'William Anderson', NOW()),
(8, 'Olivia Martinez', NOW()),
(9, 'James Taylor', NOW()),
(10, 'Emma Hernandez', NOW());

3.2 目标数据库环境准备

1. 连接到数据库,并创建源数据表 target_table

mysql -hrds57.***********.us-east-1.rds.amazonaws.com -u<username> -p<your-password>

2. 创建表 target_table

CREATE DATABASE target;
USE target;
CREATE TABLE target_table (
    id INT PRIMARY KEY,
    customer_name VARCHAR(100),
    ctime DATETIME
);

4. 创建 Flink CDC 任务

4.1 通过 Flink SQL Client 创建同步任务,首先启动 SQL Client

./bin/sql-client.sh

4.2 在 Flink SQL Client 中创建 source table

CREATE TABLE source_table (
    id INT,
    customer_name VARCHAR(100),
    ctime TIMESTAMP(3),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'aurora-57.cluster-***********.us-east-1.rds.amazonaws.com',
    'port' = '3306',
    'username' = '<username>',
    'password' = '<your-password>',
    'database-name' = 'source',
    'table-name' = 'source_table'
);

4.3 在 Flink SQL Client 中创建 sink table

CREATE TABLE target_table (
    id INT,
    customer_name VARCHAR(100),
    ctime TIMESTAMP(3),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://rds57.***********.us-east-1.rds.amazonaws.com:3306/target', 
    'username' = '<username>',
    'password' = '<your-password>',  
    'table-name' = 'target_table'
);

4.4 查询 Flink 表信息

Flink SQL> show tables;
+--------------+
|   table name |
+--------------+
| source_table |
| target_table |
+--------------+
2 rows in set

4.5 创建同步任务

insert into target_table select * from source_table;

4.6 在 Flink Web UI 可以看到新提交的同步任务

5. 升级过程的 Flink CDC

现在 FlinkCDC 已经正常工作,并且增量复制数据到 target 库,接下来开始验证,在 Aurora 蓝绿部署升级过程中,Flink CDC 的运行状态,以及相应的调整措施。

5.1 数据 Full Load 验证

1. 源数据库已经插入的数据

MySQL [source]> select * from source_table;
+----+------------------+---------------------+
| id | customer_name    | ctime               |
+----+------------------+---------------------+
|  1 | John Doe         | 2024-06-30 16:41:53 |
|  2 | Jane Smith       | 2024-06-30 16:41:53 |
|  3 | Michael Johnson  | 2024-06-30 16:41:53 |
|  4 | Emily Davis      | 2024-06-30 16:41:53 |
|  5 | David Wilson     | 2024-06-30 16:41:53 |
|  6 | Sophia Thompson  | 2024-06-30 16:41:53 |
|  7 | William Anderson | 2024-06-30 16:41:53 |
|  8 | Olivia Martinez  | 2024-06-30 16:41:53 |
|  9 | James Taylor     | 2024-06-30 16:41:53 |
| 10 | Emma Hernandez   | 2024-06-30 16:41:53 |
+----+------------------+---------------------+
10 rows in set (0.00 sec)

2. 查询目标数据库,可以看到数据同步完成,证明 Flink CDC 任务已经正常运行

MySQL [target]> select * from target_table;
+----+------------------+---------------------+
| id | customer_name    | ctime               |
+----+------------------+---------------------+
|  1 | John Doe         | 2024-06-30 16:41:53 |
|  2 | Jane Smith       | 2024-06-30 16:41:53 |
|  3 | Michael Johnson  | 2024-06-30 16:41:53 |
|  4 | Emily Davis      | 2024-06-30 16:41:53 |
|  5 | David Wilson     | 2024-06-30 16:41:53 |
|  6 | Sophia Thompson  | 2024-06-30 16:41:53 |
|  7 | William Anderson | 2024-06-30 16:41:53 |
|  8 | Olivia Martinez  | 2024-06-30 16:41:53 |
|  9 | James Taylor     | 2024-06-30 16:41:53 |
| 10 | Emma Hernandez   | 2024-06-30 16:41:53 |
+----+------------------+---------------------+
10 rows in set (0.00 sec)

5.2 数据 CDC 状态验证

我们会重点关注几个过程,Flink CDC 到同步情况

  • 升级前,Flink CDC 同步状态
  • 蓝绿环境部署中,Flink CDC 同步状态
  • 蓝绿环境切换中,Flink CDC 同步状态
  • 升级后,Flink CDC 断点续传

分步测试并记录

1. 升级前

源数据库插入数据

MySQL [source]> INSERT INTO source_table (id, customer_name, ctime) VALUES (11, 'Grace Kelly', NOW());
Query OK, 1 row affected (0.02 sec)

目标库查询

MySQL [target]> select * from target_table;
+----+------------------+---------------------+
| id | customer_name    | ctime               |
+----+------------------+---------------------+
|  1 | John Doe         | 2024-06-30 16:41:53 |
|  2 | Jane Smith       | 2024-06-30 16:41:53 |
|  3 | Michael Johnson  | 2024-06-30 16:41:53 |
|  4 | Emily Davis      | 2024-06-30 16:41:53 |
|  5 | David Wilson     | 2024-06-30 16:41:53 |
|  6 | Sophia Thompson  | 2024-06-30 16:41:53 |
|  7 | William Anderson | 2024-06-30 16:41:53 |
|  8 | Olivia Martinez  | 2024-06-30 16:41:53 |
|  9 | James Taylor     | 2024-06-30 16:41:53 |
| 10 | Emma Hernandez   | 2024-06-30 16:41:53 |
| 11 | Grace Kelly      | 2024-07-02 19:40:05 |
+----+------------------+---------------------+
11 rows in set (0.01 sec)

2. 蓝绿环境部署过程中

开始蓝绿环境部署

源数据库插入数据

MySQL [source]> INSERT INTO source_table (id, customer_name, ctime) VALUES (12, 'Lucas Mitchell', NOW());
Query OK, 1 row affected (0.01 sec)

目标数据库查询

MySQL [target]> select * from target_table;
+----+------------------+---------------------+
| id | customer_name    | ctime               |
+----+------------------+---------------------+
|  1 | John Doe         | 2024-06-30 16:41:53 |
|  2 | Jane Smith       | 2024-06-30 16:41:53 |
|  3 | Michael Johnson  | 2024-06-30 16:41:53 |
|  4 | Emily Davis      | 2024-06-30 16:41:53 |
|  5 | David Wilson     | 2024-06-30 16:41:53 |
|  6 | Sophia Thompson  | 2024-06-30 16:41:53 |
|  7 | William Anderson | 2024-06-30 16:41:53 |
|  8 | Olivia Martinez  | 2024-06-30 16:41:53 |
|  9 | James Taylor     | 2024-06-30 16:41:53 |
| 10 | Emma Hernandez   | 2024-06-30 16:41:53 |
| 11 | Grace Kelly      | 2024-07-02 19:40:05 |
| 12 | Lucas Mitchell   | 2024-07-02 19:53:10 |
+----+------------------+---------------------+
12 rows in set (0.00 sec)

3. 蓝绿环境切换中,Flink CDC 状态

切换前,请检查绿环境,确保参数组为同步状态

重启绿环境,使参数组生效

确认参数组已生效

4. 开始蓝绿切换

在蓝绿部署切换过程中,多次向源数据库中插入数据,以验证同步状态。在最终切换前的瞬间,源数据库的最后一条数据插入失败。

MySQL [source]> INSERT INTO source_table (id, customer_name, ctime) VALUES (16, 'Mason Wood', NOW());
ERROR 2006 (HY000): MySQL server has gone away
No connection. Trying to reconnect...
Connection id:    147
Current database: source

MySQL [source]> select * from source_table;
+----+------------------+---------------------+
| id | customer_name    | ctime               |
+----+------------------+---------------------+
|  1 | John Doe         | 2024-06-30 16:41:53 |
|  2 | Jane Smith       | 2024-06-30 16:41:53 |
|  3 | Michael Johnson  | 2024-06-30 16:41:53 |
|  4 | Emily Davis      | 2024-06-30 16:41:53 |
|  5 | David Wilson     | 2024-06-30 16:41:53 |
|  6 | Sophia Thompson  | 2024-06-30 16:41:53 |
|  7 | William Anderson | 2024-06-30 16:41:53 |
|  8 | Olivia Martinez  | 2024-06-30 16:41:53 |
|  9 | James Taylor     | 2024-06-30 16:41:53 |
| 10 | Emma Hernandez   | 2024-06-30 16:41:53 |
| 11 | Grace Kelly      | 2024-07-02 19:40:05 |
| 12 | Lucas Mitchell   | 2024-07-02 19:53:10 |
| 13 | Penelope Scott   | 2024-07-02 20:06:51 |
| 14 | Zoe Roberts      | 2024-07-02 20:40:09 |
| 15 | Henry Phillips   | 2024-07-02 20:39:38 |
| 16 | Mason Wood       | 2024-07-06 15:44:55|
+----+------------------+---------------------+
16 rows in set (0.01 sec)

目标库查询,最后一条数据没有同步

select * from target_table;
select now();
show master status;

MySQL [target]> select * from target_table;
+----+------------------+---------------------+
| id | customer_name    | ctime               |
+----+------------------+---------------------+
|  1 | John Doe         | 2024-06-30 16:41:53 |
|  2 | Jane Smith       | 2024-06-30 16:41:53 |
|  3 | Michael Johnson  | 2024-06-30 16:41:53 |
|  4 | Emily Davis      | 2024-06-30 16:41:53 |
|  5 | David Wilson     | 2024-06-30 16:41:53 |
|  6 | Sophia Thompson  | 2024-06-30 16:41:53 |
|  7 | William Anderson | 2024-06-30 16:41:53 |
|  8 | Olivia Martinez  | 2024-06-30 16:41:53 |
|  9 | James Taylor     | 2024-06-30 16:41:53 |
| 10 | Emma Hernandez   | 2024-06-30 16:41:53 |
| 11 | Grace Kelly      | 2024-07-02 19:40:05 |
| 12 | Lucas Mitchell   | 2024-07-02 19:53:10 |
| 13 | Penelope Scott   | 2024-07-02 20:06:51 |
| 14 | Zoe Roberts      | 2024-07-02 20:40:09 |
| 15 | Henry Phillips   | 2024-07-02 20:39:38 |
+----+------------------+---------------------+
15 rows in set (0.00 sec)

6. Flink CDC断点续传

6.1 Flink CDC 任务已经报错,显示无法找到 binlog

2024-07-03 04:44:24
java.lang.RuntimeException: One or more fetchers have encountered exception
......
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
......
Caused by: io.debezium.DebeziumException: Unexpected error while connecting to MySQL and looking for binary logs: 
......
Caused by: java.sql.SQLException: You are not using binary logging
......

6.2 重新启动任务

要解决以上错误,需要手动重新启动任务,从上次中断的 Binlog ID 和 Postion ID 继续同步数据。

查找 Binlog ID 和 Postion ID。在 AWS 控制台的 RDS 页面中,在“事件”中查找“Binary log coordinates”关键字。

6.3 重新提交 Flink 任务

根据查询到的 Binlog ID 和 Postion ID,更新建表语句,重新创建源数据库表

需要增加三个关键参数 scan.startup.modescan.startup.specific-offset.filescan.startup.specific-offset.pos

CREATE TABLE source_table_new (
    id INT,
    customer_name VARCHAR(100),
    ctime TIMESTAMP(3),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'aurora-57.cluster-***********.us-east-1.rds.amazonaws.com',
    'port' = '3306',
    'username' = '<username>',
    'password' = '<your-password>',
    'database-name' = 'source',
    'table-name' = 'source_table',
    'scan.startup.mode' = 'specific-offset',
    'scan.startup.specific-offset.file' = 'mysql-bin-changelog.000001',
    'scan.startup.specific-offset.pos' = '853'
);

Flink SQL Client 中重新创建目标数据库表

CREATE TABLE target_table (
    id INT,
    customer_name VARCHAR(100),
    ctime TIMESTAMP(3),
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://rds57.***********.us-east-1.rds.amazonaws.com:3306/target', 
    'username' = '<username>',
    'password' = '<your-password>',  
    'table-name' = 'target_table'
);

重新提交 Flink CDC 任务

insert into target_table select * from source_table_new;

6.4 检查 Flink CDC 同步状态

1. 在源数据库插入数据

MySQL [source]> INSERT INTO source_table (id, customer_name, ctime) VALUES (17, 'Abigail Cox', NOW());
Query OK, 1 row affected (0.00 sec)

2. 验证数据同步状态

目标数据库中,数据已经同步,并且是从升级前的位置继续同步数据,而不是 Full Load 方式同步的数据

MySQL [target]> select * from target_table;
+----+------------------+---------------------+
| id | customer_name    | ctime               |
+----+------------------+---------------------+
|  1 | John Doe         | 2024-06-30 16:41:53 |
|  2 | Jane Smith       | 2024-06-30 16:41:53 |
|  3 | Michael Johnson  | 2024-06-30 16:41:53 |
|  4 | Emily Davis      | 2024-06-30 16:41:53 |
|  5 | David Wilson     | 2024-06-30 16:41:53 |
|  6 | Sophia Thompson  | 2024-06-30 16:41:53 |
|  7 | William Anderson | 2024-06-30 16:41:53 |
|  8 | Olivia Martinez  | 2024-06-30 16:41:53 |
|  9 | James Taylor     | 2024-06-30 16:41:53 |
| 10 | Emma Hernandez   | 2024-06-30 16:41:53 |
| 11 | Grace Kelly      | 2024-07-02 19:40:05 |
| 12 | Lucas Mitchell   | 2024-07-02 19:53:10 |
| 13 | Penelope Scott   | 2024-07-02 20:06:51 |
| 14 | Zoe Roberts      | 2024-07-02 20:40:09 |
| 15 | Henry Phillips   | 2024-07-02 20:39:38 |
| 16 | Mason Wood       | 2024-07-06 15:44:55 |
| 17 | Abigail Cox      | 2024-07-06 16:07:16 |
+----+------------------+---------------------+
17 rows in set (0.00 sec)

6.5 验证是否从断点位置续传

通过目标数据库的 Binlog 中,可以看到最新的 Query 条目,是单条数据插入同步。如果是 Full Load,会是之前所有条目的统一插入更新。

MySQL [target]> show binary logs;
+----------------------------+-----------+
| Log_name                   | File_size |
+----------------------------+-----------+
| mysql-bin-changelog.003109 |       578 |
| mysql-bin-changelog.003110 |      1533 |
| mysql-bin-changelog.003111 |       521 |
+----------------------------+-----------+
3 rows in set (0.00 sec)

MySQL [target]> show binlog events in 'mysql-bin-changelog.003110';
+----------------------------+------+----------------+------------+-------------+---------------------
| Log_name                   | Pos  | Event_type     | Server_id  | End_log_pos | Info                                                                                                       |
+----------------------------+------+----------------+------------+-------------+---------------------
| mysql-bin-changelog.003110 |  219 | Query          | 1344015823 |         302 | BEGIN                                                                                                       |
| mysql-bin-changelog.003110 |  302 | Query          | 1344015823 |         597 | use `target`; INSERT INTO `target_table`(`id`, `customer_name`, `ctime`) VALUES (17, 'Abigail Cox', '2024-07-06 16:07:16')  ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `customer_name`=VALUES(`customer_name`), `ctime`=VALUES(`ctime`)        |
| mysql-bin-changelog.003110 |  597 | Xid            | 1344015823 |         628 | COMMIT /* xid=5390337 */                                                                                                       |
| mysql-bin-changelog.003110 |  693 | Query          | 1344015823 |         774 | BEGIN                                                                                                       |
| mysql-bin-changelog.003110 | 1060 | Query          | 1344015823 |        1143 | BEGIN                                                                                                       |
| mysql-bin-changelog.003110 | 1143 | Query          | 1344015823 |        1445 | use `target`; INSERT INTO `target_table`(`id`, `customer_name`, `ctime`) VALUES (18, 'Michael Richardson', '2024-07-06 16:08:37')  ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `customer_name`=VALUES(`customer_name`), `ctime`=VALUES(`ctime`) |
+----------------------------+------+----------------+------------+-------------+-----------------------

通过以上步骤,已经完整的演示了 Aurora Mysql 5.7 在蓝绿部署的各个阶段,Flink CDC 的同步状态。以及蓝绿切换之后,如何断点续传。希望本文能在 Flink CDC 运行的数据库升级场景中,带给你有参考价值的点,以顺利完成数据库升级操作。

参考文档

https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/get-started/introduction/

本篇作者

许晓亮

亚马逊云科技解决方案架构师,负责基于 AWS 云计算方案架构的咨询和设计,在国内推广 AWS 云平台技术和各种解决方案。擅长数据库和大数据领域,结合云原生特性,为客户设计高效稳定的全球化系统方案。

汤市建

亚马逊云科技数据分析解决方案架构师,负责客户大数据解决方案的咨询与架构设计。