AWS Glue 是一项无服务器数据集成服务,可让使用分析功能的用户轻松发现、准备、移动和集成来自多个来源的数据。您可以将其用于分析、机器学习和应用程序开发。它还包括用于编写、运行任务和实施业务工作流程的额外生产力和数据操作工具。
通过使用 AWS Glue,您可以发现并连接到 70 多个不同的数据来源,将主要数据集成功能整合到一项服务中。其中包括数据发现、现代 ETL、清理、转换和集中式编目。这也是一项无服务器服务,即无需管理基础设施。通过在一项服务中灵活支持 ETL、ELT 和流式传输之类的所有工作负载,AWS Glue 可为不同工作负载和类型的用户提供支持。另外 AWS Glue 可按需扩展,可针对任何数据大小进行扩展,并支持所有数据类型和架构变化。
AWS Glue Data Catalog 是您在 AWS 云中的持久性技术元数据存储。AWS Glue 使用 AWS Glue Data Catalog 来存储有关数据源、转换和目标的元数据。数据目录是 Apache Hive 元存储的简易替代。AWS Glue Data Catalog 提供了一个统一的存储库,不同的系统可以在其中存储和查找元数据来跟踪数据孤岛中的数据。然后,您可以使用元数据在各种应用程序中以统一的方式查询和转换该数据。
在实际的项目开发过程中,我们经常会需要在不同的数据库系统中进行数据的同步,以满足不同业务的需求。数据同步模式基本上有三种类型:全量同步,增量同步,实时同步。根据实际的业务场景和需求选择不同的同步模式。
本文将以实际的案例讲解通过 Glue + Glue Data Catalog + S3 来实现基于时间戳的 RDS 到 Redshift 的灵活增量同步。
背景介绍
在一般的业务场景中,应用程序写入线上业务库,为了对数据进行 OLAP 分析,会将业务库中的数据按需导入到 Redshift 中以对接 BI 工具和提供跟运营人员进行数据的统计分析和出报表之用。
本案例来源于客户实际项目需求。客户应用场景需要每天定时的将存储在 RDS Postgres 数据库中的上百张表,完全一模一样地复制到 Redshift 中,以满足客户分析和出报表的需求,同时需要适度的控制成本。由于 Glue Bookmark 机制存在一定的限制,因此本方案采用通过 S3 缓存中间状态的方式来实现增量同步。
考虑到客户需求对实时性的要求没那么高,我们采用了 Glue + Glue Catalog(表结构定义)+ S3(缓存中间状态)增量同步的方式实现复制,并且通过多线程并行 job 的方式,实现单个 Glue Job 代码完成多表复制的功能。
部署方案
前提条件
1. 提前在环境中准备 RDS for PostgresSQL 数据库实例、Redshift 数据库实例和 Amazon S3 存储桶,本文对于数据库的创建部分内容不做详细描述。
2. RDS 中的表结构和 Redshift 中的表结构完全一致或者 Redshift 表的字段是 RDS 表的子集,如果不一致需要根据情况调整代码逻辑。
3. RDS 中需要增量同步的表中,必须有能够反应记录修改的时间戳信息。
4. RDS 中需要增量同步的表不存在硬删除记录的情况,设计者可以通过在表里设置删除标志位来表示记录是否被删除。
数据准备
1. RDS Postgres 创建 user 表
我们以简单的用户表为例(纯粹演示用),如下:
CREATE SEQUENCE IF NOT EXISTS public.users_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 1;
CREATE TABLE IF NOT EXISTS public.users
(
id bigint NOT NULL DEFAULT nextval('users_id_seq'::regclass),
username character varying ,
age integer,
country character varying ,
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL,
CONSTRAINT users_pkey PRIMARY KEY (id)
)
WITH (
OIDS = FALSE
)
TABLESPACE pg_default;
源表必须包含用来反应记录最后被更新的时间戳,例如 users 表的 updated_at 字段用来跟踪记录的最新一次变化情况。
2. 在 Redshift 创建相同的 user 表,如下:
CREATE SEQUENCE IF NOT EXISTS public.users_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 1;
CREATE TABLE IF NOT EXISTS public.users
(
id bigint NOT NULL DEFAULT nextval('users_id_seq'::regclass),
username character varying ,
age integer,
country character varying ,
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL,
CONSTRAINT users_pkey PRIMARY KEY (id)
)
WITH (
OIDS = FALSE
)
TABLESPACE pg_default;
3. 大致数据表样例
Id |
username |
age |
country |
created_at |
updated_at |
1 |
James |
26 |
China |
2020-06-05 13:41:04.323918 |
2020-06-06 14:11:05.322816 |
2 |
Banny |
32 |
UK |
2020-07-08 08:20:05.322936 |
2020-07-08 08:20:05.322936 |
Glue Catalog 元数据创建
1. 在 Glue Catalog 里创建 2 个 databse,分别是 rds_db,redshift_db
2. 创建 connection,分别连接 RDS(rds_connection)和 Redshift(redshift_connection)
具体创建流程,可参考相应文档:https://docs.thinkwithwp.com/zh_cn/glue/latest/dg/glue-connections.html
3. 创建 2 个 Crawlers 分别爬 rds 和 redshift 里的 users 表结构:rds_db_crawler,redshift_db_crawler, 以 rds_db_crawler 为例 (Crawler 要用到第 2 步创建的连接器)
爬完后结果,进入 Databases rds_db 可以看到生成的表:
以相同的方式构建 reshift_db_crawler 并运行,将 redshift 表爬到 redshift_db 中:
注意 crawler 要有相应权限的 role,可以构建一个 glue-common-role。
构建 Endpoint
1. 构建 S3 Gateway Endpoint 以便 Glue 能够在 Copy 数据到 Redshift 的时候,Redshift 可以通过 VPC 访问 S3 数据
具体参考文档:https://docs.thinkwithwp.com/zh_cn/glue/latest/dg/connection-S3-VPC.html
2. 构建 Glue Endpoint,以便 Glue Job 能够通过 Boto3 访问 Glue catalog
构建 Glue job
打开 Glue Studio,选择 Spark Script editor 创建一个 Glue job,因为我们需要动态访问 Glue Catalog 去获取 RDS 和 Redshift 下的元数据,所以可视化创建 Job 不能完全满足我们的要求。
点击 Create 后,创建如下 Job,Job 命名为 demo-incremental-sync,glue 会产生样例代码如下:
Job Detail 配置
先进入 Job Detail 将 job 的参数配置好:Glue 版本选择 4.0,Role 需要有 S3、Redshift、Glue Catalog、RDS 等访问权限,Language 选择 Python
Disable bookmark 功能,将 retry 设置成 0,job timeout 设置成 30 分钟(retry 、timeout、worker 等参数可以根据实际情况进行设置)
进入 Advanced Properties 部份,大部份选用缺省设置,以下几部份需要修改:
1. Connections 部分选择你之前创建的两个 connections,一个针对 RDS,一个针对 Redshift。
2. Referenced files path 和任务参数
fairscheduler.xml 是用来定义 job 多任务的,具体后面涉及到代码的时候还会具体说明。
任务参数说明如下:
–sync_rds_database:同步的源数据库,在 Glue Catalog 里定义
–sync_rds_tables_exp:数据库下要同步的表,可以通过正则表达式,* 代表全部
–sync_redshift_database:同步的目标数据库,在 Glue Catalog 里定义
–sync_redshift_tables_exp:数据库下要同步的目标表,可以通过正则表达式,* 代表全部
–sync_start_time:同步的起始时间点,该参数不是必须的,如果有将会从该时间点开始同步数据,如果不设置,会从系统保存的上一次同步时间点开始同步,初次同步如果没有设置该值,会从 1970 开始同步,相当于全量同步
Job 代码分析
1. 主逻辑
该部分逻辑是 Job 的入口定义整个增量同步的主逻辑,为了让单个 Spark Job 能够支持多任务执行,同时实现多表同时增量同步,需要定义调度器的配置,具体可以参考 Spark 文档:https://spark.apache.org/docs/latest/job-scheduling.html
fairscheduler_file = "fairscheduler.xml"
init_start_sync_time= '1970-01-08 00:00:00'
redshift_tables_map = {}
args = getResolvedOptions(sys.argv, ["JOB_NAME","sync_rds_database", "sync_rds_tables_exp","sync_redshift_database","sync_redshift_tables_exp","sync_start_time"])
#配置 Spark ,调度模式采用公平模式
conf1 = SparkConf().setAppName("demo-incremental-sync").set('spark.scheduler.mode', 'FAIR').set("spark.scheduler.allocation.file", fairscheduler_file)
sc = SparkContext(conf=conf1)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# rds parameters
rds_db = args["sync_rds_database"]
rds_tables_exp = args["sync_rds_tables_exp"]
#从 glue catalog 获取要同步的 rds 端的表
rds_tables = getSyncTablesFromGlueCatalog(rds_db, rds_tables_exp)
# redshift parameters
redshift_db = args["sync_redshift_database"]
redshift_tables_exp = args["sync_redshift_tables_exp"]
#从 glue catalog 获取要同步的目标端的 Redshift 的表
redshift_tables = getSyncTablesFromGlueCatalog(redshift_db, redshift_tables_exp)
# build redshift table mapping,原表的表名需要和目标表的表名一致,以便能够匹配
for table in redshift_tables:
tmp = table.get('StorageDescriptor').get('Location').split('.')
table_name = tmp[-1]
redshift_tables_map[table_name] = table
#通过多线程方式提交任务,具体根据实际同步表的数量控制线程数,另外实际起的 glue worker 数和总的 executors 数量也会限制能够同时执行的任务数
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool:
for table in rds_tables:
pool.submit(sync,args["sync_start_time"],table)
print("----waiting tables to sync completely-----")
job.commit()
2. getSyncTablesFromGlueCatalog 获取要处理的表,通过 boto3 访问 glue catalog
这里使用了 token 来分页读取数据库下的表,如果表的数量很多,需要采用分页模式读取,否则可能会漏掉表。
def getSyncTablesFromGlueCatalog(db, expression):
glue_client = boto3.client('glue')
tables = []
next_token = ""
while True:
response = glue_client.get_tables(
DatabaseName=db,
Expression=expression,
NextToken=next_token
)
for table in response.get('TableList'):
tables.append(table)
next_token = response.get('NextToken')
if next_token is None:
break
return tables;
3. sync 主程序
def sync(sync_time_parameter, table):
location = table.get('StorageDescriptor').get('Location')
name = table.get('Name')
try:
last_sync_time = sync_time_parameter
if not last_sync_time :
# 获取上一次同步的时间
last_sync_time = readLastSyncTimeFromS3(name)
now_timestamp = str(datetime.now())
# 复制数据
copy(table, "bigdata","2",last_sync_time,now_timestamp)
updateLastSyncTimeToS3(name,now_timestamp)
except:
#TODO: exception code here
pass
4. sync time 时间 bookmark
Bookmark 上一次同步时间,每张同步的表,在每次同步完后,都会有记录同步的时间,并将该时间信息储存在 Glue Job TempDir S3 目录下的 last_sync_times 目录,该目录不存在的话,需要自建。
def readLastSyncTimeFromS3(table_name):
bucket = args["TempDir"].replace('s3://','').replace('/temporary/','')
s3 = boto3.resource('s3')
last_sync_time = init_start_sync_time
try:
timestamp_obj = s3.Object(bucket, 'last_sync_times/'+table_name).get()['Body'].read().decode('utf-8')
last_sync_time = timestamp_obj
except:
pass
return last_sync_time
def updateLastSyncTimeToS3(table_name, lastsync_timestamp):
s3 = boto3.client("s3")
bucket = args["TempDir"].replace('s3://','').replace('/temporary/','')
s3.put_object(
Bucket=bucket,
Key='last_sync_times/'+table_name,
Body=lastsync_timestamp
)
5. 主复制程序
def copy(table, pool_name, hashpartitions, last_sync_timestamp, now_timestamp):
location = table.get('StorageDescriptor').get('Location')
src_catalog_table_name = table.get('Name')
src_table_name = location
#设置线程池的名字,该名字应该和 fairscheduler.xml 定义的相应的 pool 名字一致。
sc.setLocalProperty("spark.scheduler.pool", pool_name)
# Script generated for node rds
sql = "select * from {} where updated_at >= '{}' and updated_at < '{}' ".format(src_table_name,last_sync_timestamp,now_timestamp)
# 使用 sampleQuery 去构建 spark Dataframe,因此这里 updated_at 时间戳字段必须要和实际表里定义的时间戳字段一致,否则会有问题。database 是之前创建的 rds_db。
rds_node1 = glueContext.create_dynamic_frame.from_catalog(
database="rds_db",
table_name=src_catalog_table_name,
# dbtable = sql,
transformation_ctx="rds_node1",
additional_options={
"sampleQuery":sql
}
)
#获取字段 Mapping,这里也可以进行源表和目标表的映射,目前认为两边的表是一致的,你也可以控制这个映射过程。
mapping = getTableMapping(table)
# Script generated for node ApplyMapping
ApplyMapping_node1 = ApplyMapping.apply(
frame= rds_node1,
mappings=mapping,
transformation_ctx="ApplyMapping_node1",
)
# 选择需要复制的字段,通过上一步的映射,根据 Redshift 的相同表的定义,选择 Redshift 表里的字段,如果存在 Redshift 表是原同表子集的情况,可以通过这个方式进行控制。
tmp = table.get('StorageDescriptor').get('Location')
table_name = tmp.split('.')[-1]
redshift_table = redshift_tables_map[table_name]
select_fields = getTableSelectFields(redshift_table)
SelectFields_node1 = SelectFields.apply(
frame=ApplyMapping_node1,
paths=select_fields,
transformation_ctx="SelectFields_node1",
)
# 复制表, catalog_connection 是之前构造的 redshift_connection
target_table_name = redshift_table.get('StorageDescriptor').get('Location');
pre_query = "drop table if exists public.stage_table_{};create table public.stage_table_{} as select * from {} where 1=2;".format(src_catalog_table_name,src_catalog_table_name,target_table_name)
post_query = "begin;delete from {} using public.stage_table_{} where public.stage_table_{}.id = {}.id; insert into {} select * from public.stage_table_{}; drop table public.stage_table_{}; end;".format(target_table_name,src_catalog_table_name,
src_catalog_table_name,target_table_name,target_table_name,src_catalog_table_name,src_catalog_table_name)
Redshift_node1 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=SelectFields_node1,
catalog_connection="redshift_connection",
connection_options={
"database": "dev",
"dbtable": "public.stage_table_" + src_catalog_table_name,
"preactions": pre_query,
"postactions": post_query,
"tempformat":"CSV GZIP",
"extracopyoptions":"COMPUPDATE OFF STATUPDATE OFF",
},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="Redshift_node1",
)
6. fairscheduler.xml
<?xml version="1.0"?>
<allocations>
<pool name="bigdata">
<schedulingMode>FIFO</schedulingMode>
<weight>1</weight>
<minShare>10</minShare>
</pool>
</allocations>
完整代码请参考附录。
总结
本文讨论利用 Glue 进行增量数据的离线加载,需要具备条件:原表和目标表需要一致,或者目标表的字段定义是原表的子集;需要有表示记录更新或插入的时间戳字段;表不应该有硬删除的情况。更新数据的同步,我们利用 Glue 脚本在目标库执行 Merge 语句实现 UPSERT 语义;如果源数据库表无法满足上述同步对应的条件时,可以利用 Glue 进行全量数据加载或者利用 Glue streaming job 流式处理源数据 CDC 日志的方式,具体可以参考官方文档。
附录代码
本代码是样例代码,如果用于生产环境,需根据实际情况进行调整。
import sys
import boto3
import time
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.context import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import concurrent.futures
from datetime import datetime
def getSyncTablesFromGlueCatalog(db, expression):
glue_client = boto3.client('glue')
tables = []
next_token = ""
while True:
response = glue_client.get_tables(
DatabaseName=db,
Expression=expression,
NextToken=next_token
)
for table in response.get('TableList'):
tables.append(table)
next_token = response.get('NextToken')
if next_token is None:
break
return tables;
def readLastSyncTimeFromS3(table_name):
bucket = args["TempDir"].replace('s3://','').replace('/temporary/','')
s3 = boto3.resource('s3')
last_sync_time = INIT_START_SYNC_TIME;
try:
timestamp_obj = s3.Object(bucket, 'last_sync_times/'+table_name).get()['Body'].read().decode('utf-8')
last_sync_time = timestamp_obj
except:
pass
return last_sync_time
# create timestamp for this table
def updateLastSyncTimeToS3(table_name, lastsync_timestamp):
s3 = boto3.client("s3")
bucket = args["TempDir"].replace('s3://','').replace('/temporary/','')
s3.put_object(
Bucket=bucket,
Key='last_sync_times/'+table_name,
Body=lastsync_timestamp
)
def getTableMapping(table):
mappings = []
for field in table.get('StorageDescriptor').get('Columns'):
fieldType = field.get('Type')
mappings.append((field.get('Name'), fieldType, field.get('Name'), fieldType));
return mappings
def getTableSelectFields(table):
selectFields = []
for field in table.get('StorageDescriptor').get('Columns'):
selectFields.append(field.get('Name'))
return selectFields
def copy(table, pool_name, hashpartitions, last_sync_timestamp, now_timestamp):
location = table.get('StorageDescriptor').get('Location')
src_catalog_table_name = table.get('Name')
src_table_name = location
sc.setLocalProperty("spark.scheduler.pool", pool_name)
# Script generated for node rds
sql = "select * from {} where updated_at >= '{}' and updated_at < '{}' ".format(src_table_name,last_sync_timestamp,now_timestamp)
rds_node1 = glueContext.create_dynamic_frame.from_catalog(
database="rds_db",
table_name=src_catalog_table_name,
# dbtable = sql,
transformation_ctx="rds_node1",
additional_options={
"sampleQuery":sql,
"hashexpression": "id",
'hashpartitions': hashpartitions
}
)
mapping = getTableMapping(table)
# Script generated for node ApplyMapping
ApplyMapping_node1 = ApplyMapping.apply(
frame= rds_node1,
mappings=mapping,
transformation_ctx="ApplyMapping_node1",
)
# Script generated for node Select Fields
tmp = table.get('StorageDescriptor').get('Location')
table_name = tmp.split('.')[-1]
redshift_table = redshift_tables_map[table_name]
select_fields = getTableSelectFields(redshift_table)
SelectFields_node1 = SelectFields.apply(
frame=ApplyMapping_node1,
paths=select_fields,
transformation_ctx="SelectFields_node1",
)
# Script generated for node Redshift
target_table_name = redshift_table.get('StorageDescriptor').get('Location');
pre_query = "drop table if exists public.stage_table_{};create table public.stage_table_{} as select * from {} where 1=2;".format(src_catalog_table_name,src_catalog_table_name,target_table_name)
post_query = "begin;delete from {} using public.stage_table_{} where public.stage_table_{}.id = {}.id; insert into {} select * from public.stage_table_{}; drop table public.stage_table_{}; end;".format(target_table_name,src_catalog_table_name,
src_catalog_table_name,target_table_name,target_table_name,src_catalog_table_name,src_catalog_table_name)
Redshift_node1 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=SelectFields_node1,
catalog_connection="redshift_connection",
connection_options={
"database": "dev",
"dbtable": "public.stage_table_" + src_catalog_table_name,
"preactions": pre_query,
"postactions": post_query,
"tempformat":"CSV GZIP",
"extracopyoptions":"COMPUPDATE OFF STATUPDATE OFF",
},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="Redshift_node1",
)
def sync(last_time, table):
location = table.get('StorageDescriptor').get('Location')
name = table.get('Name')
try:
last_sync_time = last_time
if not last_sync_time :
last_sync_time = readLastSyncTimeFromS3(name)
now_timestamp = str(datetime.now())
copy(table, "bigdata","2",last_sync_time,now_timestamp)
updateLastSyncTimeToS3(name,now_timestamp)
except:
#TODO: exception code here
pass
fairscheduler_file = "fairscheduler.xml"
INIT_START_SYNC_TIME = '1970-01-08 00:00:00'
redshift_tables_map = {}
args = getResolvedOptions(sys.argv, ["JOB_NAME","sync_rds_database", "sync_rds_tables_exp","sync_redshift_database","sync_redshift_tables_exp","sync_start_time"])
conf1 = SparkConf().setAppName("yunchen_poc").set('spark.scheduler.mode', 'FAIR').set("spark.scheduler.allocation.file", fairscheduler_file)
sc = SparkContext(conf=conf1)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# rds parameters
rds_db = args["sync_rds_database"]
rds_tables_exp = args["sync_rds_tables_exp"]
rds_tables = getSyncTablesFromGlueCatalog(rds_db, rds_tables_exp)
# redshift parameters
redshift_db = args["sync_redshift_database"]
redshift_tables_exp = args["sync_redshift_tables_exp"]
redshift_tables = getSyncTablesFromGlueCatalog(redshift_db, redshift_tables_exp)
# build redshift table mapping
for table in redshift_tables:
tmp = table.get('StorageDescriptor').get('Location').split('.')
table_name = tmp[-1]
redshift_tables_map[table_name] = table
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool:
for table in rds_tables:
pool.submit(sync,args["sync_start_time"],table)
print("----waiting tables to sync completely-----")
job.commit()
本篇作者