1. 背景
在一些大型的面向 C 端用户的应用程序中,由于业务交易频繁,数据量大,往往超出了数据库单表存储和处理的能力。这种情况下,对这些大表往往采用分表甚至分库分表的方式进行数据存储,并在应用与数据库之间,通过分库分表中间件可以将这些物理上的分库分表变成逻辑上的一张表,达到数据分散、均衡存储的目的,消除单表存储大量数据带来的性能问题。同时,这些分库/分表存储的业务数据有被抽取到数据仓库中进行数据计算、分析的需求,如何将这些分开存储的数据高效地、整合抽取到数据仓库是一件非常具有挑战的事情。本文将介绍在亚马逊云科技云平台中,将以分库/分表形式存储在 Amazon Aurora MySQL 中的数据抽取到 Amazon Redshift 数仓的方案以及实际操作。
2. 分库分表数据抽取方案
在使用亚马逊云科技服务同步数据到 Amazon Redshift 的方案与实践这篇文章中,介绍了在亚马逊云科技平台中,数据同步到 Amazon Redshift 数仓的方案。总体来讲,数据同步到 Amazon Redshift 有以下几种方式:
- 方式 1:通过 AWS Database Migration Service(AWS DMS)直接将业务数据库中的数据(全量+增量)同步到 Amazon Redshift
- 方式 2:将业务数据库的全量以及变更数据同步到 Amazon MSK 中,对 MSK 中的 CDC 数据进行解析后,写入到 Amazon Redshift
- 方式 3:通过 Amazon Aurora 和 Amazon Redshift 的 zero-ETL 特性,无缝地将 Aurora 中的数据同步到 Amazon Redshift
在分库/分表存储的情况下,一张逻辑上的数据表分散存储在多个物理表甚至多个库的多个物理表中。数据同步到 Amazon Redshift 时,这些数据希望能同步到一张 Redshift 表中,以方便后续的数据作业。尽管通过 zero-ETL 可以非常方便的把 Amazon Aurora 数据同步到 Redshift 中,但由于目前 zero-ETL 数据同步时还不支持转换操作,Aurora 中的数据表会原样的同步到 Redshift 中,需要后续进一步的整合才能形成统一的业务表。因此,在分库分表数据同步场景下,zero-ETL 目前还不是一个完美的方案。
AWS DMS 支持数据库、表以及字段级别的自定义选择规则和数据转换规则,利用这些自定义规则,可以方便地把分库/分表的数据映射到 Redshift 的一张目标数据表中。因此,分库分表情况下可选的数据同步方案有:
方案 1:通过 AWS DMS 全量+ 增量进行数据同步,架构图如下
- 在 AWS DMS 定义选择规则,选择哪些数据库和表需要进行数据同步
- 在 AWS DMS 定义转换规则,将物理上分散存储多个库和表中的数据转换映射到同一目标库和表中
- 在 AWS DMS 中应用 Batch Apply 和 Parallel Apply,以提升数据同步的性能
分库分表场景下,数据变更一般比较频繁,使用 DMS 的 Batch Apply 和 Parallel Apply 特性可以提升数据同步的性能。Batch Apply 的工作流程如下:
- AWS DMS 以批(Batch)的方式从原库中读取变更日志
- AWS DMS 在复制实例的内存中维护一张净变更表,将批量获取的变更日志应用净变更算法,获取到批次的净变更记录。比如,在一批变更中,对同一个主键的数据进行多次更新,DMS 将合并这些更新,取最后一次变更记录
- AWS DMS 将这些净变更数据应用到目标数据库中
当使用 Amazon Redshift 作为 DMS 的目标时,DMS 将自动默认使用 Batch Apply 特性,也就是 BatchApplyEnabled 参数被设置为 true。同时,在变更捕获阶段(CDC),可以指定多个线程(Parallel Apply)来推送变更数据到目标数据库 Redshift。以下参数可以适当调整以获得更好的性能:
MaxFileSize
:Redshift 目标端点属性,指定传输到 Redshift 的最大 CSV 文件大小,默认值 32MB
BatchApplyTimeoutMin
:批次变更应用等待的最小时间,默认值 1 秒
BatchApplyTimeoutMax
:批次变更应用等待的最大时间,默认值 30 秒。可以将 BatchApplyTimeoutMin
设置成与该值一样,并且适当提高该值,可以减少向目标数据库提交的次数,一次尽可能多的应用更多变更数据,但数据延迟会提高
ParallelApplyThreads
:CDC 阶段,指定使用多少线程推送变更数据到目标数据库,默认是 0,可以适当提高该值来加快变更数据的应用
ParallelApplyBufferSize
:指定每个缓冲队列(buffer queue)存储记录的最大值,默认 100
BatchApplyMemoryLimit
:批次变更预处理可以使用的最大内存数,默认此值等于 ParallelApplyThreads
* MaxFileSize
,需要根据设置并行线程数和最大文件大小进行相应的调整
方案 2:通过AWS DMS、Amazon MSK(托管的 Kafka)以及 Glue 进行全量+增量数据同步,架构图如下
- 在 AWS DMS 中定义选择和转换规则,将物理上分散存储在多个库和表中的数据转换映射到同一个库和表
- 使用 Amazon MSK 作为目标端点,AWS DMS 同步这些物理表的 CDC 数据到 Amazon MSK
- 通过 AWS Glue 执行 Spark Streaming 程序,解析 CDC 数据,并将这些变更数据写入到 Redshift 中
使用 Kafka 作为 AWS DMS 的目标时,默认是不支持 Batch Apply 的,也就是 BatchApplyEnabled 参数为 false,但是可以使用多线程也就是 Parallel Apply 的方式来提升 CDC 阶段的性能。使用 Amazon MSK 作为 DMS 的目标时,以下参数可以适当调整以获得更好的性能:
CommitTimeout
:批量获取源端事务日志的超时时间,默认值 1 秒。提升该值会使数据延迟,但会提升吞吐率
ParallelApplyThreads
:CDC 阶段,指定使用多少线程推送变更数据到目标数据库,默认是 0,可适当提升该值来提升性能
ParallelApplyBufferSize
:每个缓冲队列(buffer queue)存储记录的最大值,默认 100,最大 1000
另外,不管是否采用 Batch Apply 模式,下面两个参数可以根据实际情况适当调整:
- MemoryLimitTotal:事务数据在复制实例中最大可占用的内存大小,超过这个值时,会将数据溢写到磁盘,默认 1024 MB。当数据同步任务日志经常出现“Reading from source is paused, Total storage used by swap files exceeded the limit 1048576000 bytes” ,并且复制实例的内存利用率并不高时,可以适当提升该值,使得所有数据都在内存中处理完成,减少数据溢写到磁盘的情况。
- MemoryKeepTime:事务数据可留存在内存中的最大时间,超过该时间将会写入到磁盘,默认 60 秒。
使用 Amazon MSK 作为 DMS 的目标端可以充分利用 Kafka 的高吞吐特性,当源端变更非常频繁时,可以有效缓冲对 Redshift 应用变更的压力。同时,通过在 Glue 中执行 Spark Structured Streaming 程序,可以自由控制对变更数据的处理逻辑,比如控制不对源端的删除操作做变更,对源端数据做转换操作等,控制相对灵活。并且 Glue 提供了无服务器的 Spark 执行环境,用户可以不用关心底层计算资源的管理,非常方便。接下来我们对两种方案来进行实际的配置。
3. 分库分表数据同步实践
在实际配置前,准备一个 Aurora 数据库实例,创建两个数据库:my_db01,my_db02,在其中分别建立 100 张表,item_0,item_1,… item_99,目标是要将这些分库分表数据同步到 Redshift 一张 dms_item 表中。
在这篇博客中,已经详细介绍了 AWS DMS 的配置过程和使用方法,本文接下来只对一些关键步骤进行配置展示。
3.1 使用 AWS DMS 同步分库分表数据配置
配置 AWS DMS Redshift 端点时,设置 MaxFileSize
参数为 65536KB(64MB),该值默认为 32MB。该值为传输数据到 Redshift 的 CSV 文件大小,适当提升该值可以提高 DMS 的吞吐率。
配置 DMS 迁移任务时,目标表准备模式选择不执行任何操作,以便不同的分表同步到同一个 Redshift 目标表中。
设置 Batch Apply 相关参数:
设置 Parallel Apply 相关参数:
定义选择规则,选择需要同步的分库/分表:
定义转换规则,将分库/分表名称转换成统一的逻辑名称。
转换数据库名的设置:
转换数据表名的设置:
使用 dms.c5.2xlarge 作为复制实例,按上述配置执行分库分表数据同步任务,100 个 Go 协程执行并发执行插入和更新操作,3 分钟内执行 100 万次变更,目标端使用 8 RPU 的 Serverless Redshift。DMS 监控目标端延迟和吞吐如下图所示:
多次执行百万次的源端更新,可以看到目标延迟基本稳定在 150 秒内。DMS 的 CDC 目标吞吐最高可达到 7000 行/秒,端到端延迟在 60-120 秒内。
3.2 使用 AWS DMS + Amazon MSK + AWS Glue 同步分库分表数据配置
在 DMS 中配置使用 Amazon MSK 作为目标端点,并设置端点输出格式为 json-unformatted
,这种输出格式将 CDC 的 JSON 格式数据在一行输出,方便后续 Spark 程序处理。
DMS 输出的 CDC 数据格式主要包括两个部分:metadata,data 或者 control,其中:
- metadata:包含 CDC 的元数据信息,主要包含以下字段
- record-type:记录类型,有 data 和 control 两个值
- operation:对于 data 记录类型,有 load, insert, update, delete 四种操作类型;对于 control 记录类型,有 create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column, column-type-change 这些操作
- schema-name:数据变更所属的 schema 名
- table-name:数据变更所属的表名
- timestamp:JSON message 产生的时间戳,ISO 8601 格式
- data;变更数据的具体内容
- control:控制信息的具体内容,具体字段请参考文档
从 Aurora MySQL 到 Amazon MSK 的 DMS 同步任务选择规则和转换规则前文已有配置,这里不再赘述。配置好同步任务后,即可实现将分库分表的数据同步到 Amazon MSK 中。接下来将介绍如何使用 Spark 消费 DMS 产生的 CDC 数据,并写入 Redshift。其主要逻辑如下。
首先读取 Kafka 消息流,过滤空行的数据,将文本格式的数据转换成 JSON,并过滤出 metadata.record-type
为 data 类型的数据,然后以微批的方式进行数据处理,代码如下:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", msk_conf["brokers"]) \
.option("subscribe", msk_conf["topic"]) \
.option("startingOffsets", msk_conf["startingOffsets"]) \
.load() \
.selectExpr("CAST(value AS STRING)") \
.filter(col("value").isNotNull()) \
.select(from_json(col("value"), dms_cdc_schema).alias("json")) \
.select(col("json.*")) \
.filter(col("metadata.record-type") == "data")
glueContext.forEachBatch(frame=df,
batch_function=processBatch,
options=spark_batch_conf
)
在批次数据中,解析出数据变更时间戳、所属数据库和表名以及操作类型,为支持分库分表数据和非分库分表数据同时输出到同一个 topic 的情况,计算出批次数据中包含的独立数据库和表名数据,然后以特定数据库和表为维度进行后续数据处理, 代码如下:
df = df.select(to_timestamp(col("metadata.timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'").alias('ts'), \
col("metadata.schema-name").alias("db"),
col("metadata.table-name").alias("tb"),
col("metadata.operation").alias("op"),
col("data").alias("data"))
# get db and tb name from the data
win = Window.partitionBy("db", "tb").orderBy(col("ts"))
db_tabs = df.withColumn("row_num", row_number().over(win)) \
.where(col("row_num") == 1) \
.select(col("db"), col("tb")).collect()
在以数据库和表名为维度的批次数据里,以主键为分区键,通过开窗处理获得每条数据的最新纪录,相当于 DMS 的 Batch Apply 操作,然后根据不同的 operation,通过 Spark Redshift Connector 将数据写入到 Redshift。不过数据写入或者删除时,并不直接写入到 Redshift 的目标表,而是先写入到 stage 表,然后通过 Spark Redshift Connector DataFrame 写操作的 postactions
执行基于 stage 表的 MERGE INTO 或者 DELTE FROM SQL,代码如下:
def get_save_redshift_options(stage_table: str, table_alias: str,
target_table: str, op: str) -> dict:
post_action_sql = ""
if op != 'delete':
post_action_sql = f"""
BEGIN;
CREATE TABLE IF NOT EXISTS {target_table} (PRIMARY KEY(id), LIKE {stage_table});
MERGE INTO {target_table} USING {stage_table} s ON ({table_alias}.id = s.id) REMOVE DUPLICATES;
TRUNCATE TABLE {stage_table};
COMMIT;
"""
else:
post_action_sql = f"""
BEGIN;
DELETE FROM {target_table} USING {stage_table} WHERE {target_table}.id = {stage_table}.id;
TRUNCATE TABLE {stage_table};
COMMIT;
"""
return {
"url": redshift_conf["url"],
"user": redshift_conf["user"],
"password": redshift_conf["password"],
"dbtable": stage_table,
"tempdir": redshift_conf["tmpDir"],
"aws_iam_role": redshift_conf["aws_iam_role"],
"tempformat": "CSV",
"postactions": post_action_sql
}
dedup_win = Window.partitionBy("id").orderBy(col("ts").desc())
to_save_df = to_save_df.withColumn("row_num", row_number().over(dedup_win)) \
.where(col("row_num") == 1)
# apply load / insert / update changes
to_save_df.filter(col("op") != "delete") \
.drop("row_num", "ts", "op") \
.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.options(**get_save_redshift_options(rf_stage_table, tab, rf_target_table, "load|insert|update")) \
.mode("append") \
.save()
# apply delete changes if needed
if apply_delete:
del_df = to_save_df.filter(col("op") == "delete")
if del_df.count() > 0:
logger.info(f"apply delete changes for table: {db}.{tab}")
del_df.drop("row_num", "ts", "op") \
.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.options(**get_save_redshift_options(rf_stage_table, tab, rf_target_table, "delete")) \
.mode("append") \
.save()
完整的代码如下:
import sys
from concurrent.futures import ThreadPoolExecutor
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
import boto3.s3
from pyspark.context import SparkContext
from pyspark.sql import DataFrame, Window
from awsglue.job import Job
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import StructField, StructType, LongType, StringType, Row
from pyspark.sql.functions import row_number
import boto3
import json
args = getResolvedOptions(sys.argv, ['JOB_NAME',"S3_CONF_BUCKET", "S3_CONF_KEY"])
conf_bucket = args["S3_CONF_BUCKET"]
conf_key = args["S3_CONF_KEY"]
sc = SparkContext(appName="dmscdc-msk-redshift")
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glue_context=glueContext)
job.init(job_name=args['JOB_NAME'], args=args)
# global configs
record_id_key = "id"
cdc_timestamp_format = "yyyy-MM-dd'T'HH:mm:ss'Z'"
apply_delete = True
msk_conf = {}
spark_batch_conf = {}
redshift_conf = {}
# dms cdc data format
dms_cdc_schema = StructType([
StructField("data", StringType(), True),
StructField("control", StringType(), True),
StructField("metadata", StructType([
StructField("timestamp", StringType(), True),
StructField("record-type", StringType(), True),
StructField("operation", StringType(), False),
StructField("partition-key-type", StringType(), True),
StructField("schema-name", StringType(), True),
StructField("table-name", StringType(), False)
]), False)
])
# read config from s3, and init the global config
def init_config(bucket=conf_bucket, key = conf_key):
s3 = boto3.client("s3")
config = s3.get_object(Bucket=bucket, Key=key).get("Body").read()
config_json = json.loads(config)
global msk_conf, redshift_conf, spark_batch_conf, record_id_key, apply_delete, cdc_timestamp_format
msk_conf = config_json["msk_conf"]
redshift_conf = config_json["redshift_conf"]
spark_batch_conf = config_json["spark_batch_conf"]
record_id_key = config_json["record_id_key"]
apply_delete = config_json["apply_delete"]
cdc_timestamp_format = config_json["cdc_timestamp_format"]
logger.info(f"{msk_conf=},\n {redshift_conf=},\n {spark_batch_conf=}, \n {record_id_key=}, {apply_delete=}, {cdc_timestamp_format=}")
def run():
init_config()
# read cdc data from msk, filter the emplty text line,
# covert to json schema and select the data field
logger.info("read cdc data from msk.....")
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", msk_conf["brokers"]) \
.option("subscribe", msk_conf["topic"]) \
.option("startingOffsets", msk_conf["startingOffsets"]) \
.load() \
.selectExpr("CAST(value AS STRING)") \
.filter(col("value").isNotNull()) \
.select(from_json(col("value"), dms_cdc_schema).alias("json")) \
.select(col("json.*")) \
.filter(col("metadata.record-type") == "data") \
logger.info(f"msk df schema: {df.schema}")
logger.info("start batch processing.....")
glueContext.forEachBatch(frame=df,
batch_function=processBatch,
options=spark_batch_conf
)
job.commit()
def get_save_redshift_options(stage_table: str, table_alias: str,
target_table: str, op: str) -> dict:
post_action_sql = ""
if op != 'delete':
post_action_sql = f"""
BEGIN;
CREATE TABLE IF NOT EXISTS {target_table} (PRIMARY KEY(id), LIKE {stage_table});
MERGE INTO {target_table} USING {stage_table} s ON ({table_alias}.id = s.id) REMOVE DUPLICATES;
TRUNCATE TABLE {stage_table};
COMMIT;
"""
else:
post_action_sql = f"""
BEGIN;
DELETE FROM {target_table} USING {stage_table} WHERE {target_table}.id = {stage_table}.id;
TRUNCATE TABLE {stage_table};
COMMIT;
"""
return {
"url": redshift_conf["url"],
"user": redshift_conf["user"],
"password": redshift_conf["password"],
"dbtable": stage_table,
"tempdir": redshift_conf["tmpDir"],
"aws_iam_role": redshift_conf["aws_iam_role"],
"tempformat": "CSV",
"postactions": post_action_sql
}
def processBatch(df: DataFrame, batchId):
if df.count() == 0:
logger.info(f"batch data is empty, skip this batch: {batchId}")
return
logger.info(f"start processing batch: {batchId}")
# filter the data which is not delete record type and spread out the data json
# df = df.filter(col("metadata.operation") != 'delete') \
df = df.select(to_timestamp(col("metadata.timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'").alias('ts'), \
col("metadata.schema-name").alias("db"),
col("metadata.table-name").alias("tb"),
col("metadata.operation").alias("op"),
col("data").alias("data"))
# get db and tb name from the data
win = Window.partitionBy("db", "tb").orderBy(col("ts"))
db_tabs = df.withColumn("row_num", row_number().over(win)) \
.where(col("row_num") == 1) \
.select(col("db"), col("tb")).collect()
# per tabel save function
def tableSaveFunc(db_tab: Row) -> tuple:
db = db_tab["db"]
tab = db_tab["tb"]
logger.info(f"start to process the table: {db}.{tab}")
rf_stage_table = f"{redshift_conf['schema']}.stage_{tab}"
rf_target_table = f"{redshift_conf['schema']}.{tab}"
to_save_df = df.filter((col("db") == db) & (col("tb") == tab)) \
.select(col("data"), col("ts"), col("op"))
data_schema = spark.read.json(to_save_df.rdd.map(lambda r: r.data)).schema
to_save_df = to_save_df.select(from_json(col("data"), data_schema).alias("d"), col("ts"), col("op")) \
.select("d.*", "ts", "op") \
.withColumn("gmt_created", to_timestamp("gmt_created", cdc_timestamp_format)) \
.withColumn("gmt_modified", to_timestamp("gmt_modified", cdc_timestamp_format))
logger.info(f"to_save_df: {to_save_df.columns}")
dedup_win = Window.partitionBy("id").orderBy(col("ts").desc())
to_save_df = to_save_df.withColumn("row_num", row_number().over(dedup_win)) \
.where(col("row_num") == 1)
# apply load / insert / update changes
to_save_df.filter(col("op") != "delete") \
.drop("row_num", "ts", "op") \
.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.options(**get_save_redshift_options(rf_stage_table, tab, rf_target_table, "load|insert|update")) \
.mode("append") \
.save()
if apply_delete:
del_df = to_save_df.filter(col("op") == "delete")
if del_df.count() > 0:
logger.info(f"apply delete changes for table: {db}.{tab}")
del_df.drop("row_num", "ts", "op") \
.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.options(**get_save_redshift_options(rf_stage_table, tab, rf_target_table, "delete")) \
.mode("append") \
.save()
return (db, tab, True)
with ThreadPoolExecutor(max_workers=3, thread_name_prefix="spark_batch") as executor:
ret = executor.map(tableSaveFunc, db_tabs)
for r in ret:
logger.info(
f"======>db: {r[0]} , table: {r[1]} finished batch:{batchId} processing <======")
if __name__ == "__main__":
run()
配置文件格式如下:
{
"record_id_key": "id",
"cdc_timestamp_format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
"apply_delete": true,
"msk_conf": {
"brokers": "MSK Brokers",
"topic": "MSK topic",
"startingOffsets": "earliest"
},
"spark_batch_conf": {
"windowSize": "30 seconds",
"checkpointLocation": "Spark checkpoint S3路径"
},
"redshift_conf": {
"url": "Redshift JDBC URL",
"user": "Redshift 用户名",
"password": "Redshift 用户密码",
"schema": "Redshift schema, 如my_db",
"tmpDir": "任务执行用到临时目录",
"aws_iam_role": "具有Redshift命令执行权限的IAM Role"
}
}
利用上述代码,在 AWS Glue 中创建 Glue Job,具体方法同样可以参考这篇博客。
Glue Job 创建完成后,通过 AWS CLI 命令启动 Glue 任务:
aws glue start-job-run \
--job-name your_job_name\
--region your_region \
--arguments='{"--S3_CONF_BUCKET": "Your config bucket","--S3_CONF_KEY": "Your config json s3 object key"}'
Glue Job 执行后,可以查看 Redshift 监控,可以看到 Redshift 以稳定的 Spark 微批设置的 windowSize 时间间隔进行变更数据的写入,如下图所示:
查看 DMS 的监控可以看到,CDCThroughputRowsSource 和 CDCThroughputRowsTarget 这两个指标的形状和数值基本保持一致,说明源端获取的变更以及变更数据同步到 MSK 的速率能保持匹配。
4. 总结
本文介绍了同步分库分表数据到 Amazon Redshift 的两种方法,通过 AWS DMS 的库和表的选择规则以及转换规则,将物理上分开存储的分库/分表数据以逻辑上统一的方式同步到 Redshift 中,方便后续的数据分析和处理,免去在数据仓库中进行表合并的操作。另外,本文还探讨了分库/分表高变更率情况下,两种数据同步方法各自的调优参数,以指导不同场景下进行 DMS 任务调优。最后,本文对两种数据同步方法进行了实际的配置,尤其是对第二种方案,通过编写 Spark Streaming 程序,消费 DMS 任务同步到 MSK 的 CDC 数据,经转换处理后,再将数据写入到 Redshift,用户可以根据业务需求,在此程序的基础上进行修改,以满足实际业务需求。
参考资料
- 使用亚马逊云科技服务同步数据到 Amazon Redshift 的方案与实践:https://thinkwithwp.com/cn/blogs/china/solutions-and-practices-to-synchronize-data-to-amazon-redshift/
- 使用 DMS Batch Apply 特性提升 CDC 复制性能:https://repost.aws/knowledge-center/dms-batch-apply-cdc-replication
- 理解和优化使用 DMS 复制数据到 Redshift:https://thinkwithwp.com/blogs/database/understand-and-optimize-replication-for-amazon-redshift-with-aws-dms/
- Change processing tuning settings:https://docs.thinkwithwp.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.ChangeProcessingTuning.html
- 使用 Amazon Redshift 作为 DMS 的目标:https://docs.thinkwithwp.com/dms/latest/userguide/CHAP_Target.Redshift.html
本篇作者