亚马逊AWS官方博客
通过集成Amazon Kinesis Service 在Amazon DocumentDB (兼容MongoDB)上实现实时数据同步
Amazon DocumentDB (兼容MongoDB)是一项快速、可扩展、具备高可用性的全托管文档数据库服务,可支持MongoDB工作负载。您可以使用相同的MongoDB应用程序代码、驱动程序与工具运行、管理并扩展Amazon DocumentDB上的工作负载,且不必分神于底层基础设施管理事务。作为一套文档数据库,Amazon DocumentDB极大简化了对JSON数据的存储、查询与索引流程。
实时数据同步是Amazon DocumentDB常见的需求场景。例如,场景1: 业务数据实时同步:有零售集团, 需要实时将产品目录更新信息, 从公司总部文档数据库下发和实时同步到子公司文档数据库, 使公司各层级销售部门, 实时获取产品最新信息, 上下协同,及时掌握和应对市场变化。场景2: 实时数据在线迁移:需要文档数据库之间在线数据迁移, 例如DocumentDB和MongoDB之间在线数据迁移, 可结合文档数据库离线方式数据导入和导出(Mongodump/resotre)和实时数据同步。
在本博文中,我们将向您介绍如何将Amazon DocumentDB与Amazon Kinesis相集成,实现Amazon DocumentDB之间或者Amazon DocumentDB和MongoDB之间的实时数据同步。具体来讲,我们将向您展示如何使用变化数据捕获Python程序将事件从Amazon DocumentDB集群的变更流传输至Amazon Kinesis Data Stream,再使用变化数据消费Python程序将事件从Amazon Kinesis Data Stream实时应用至目标文档数据库DocumentDB或者MongoDB。
演示概述
本文具体涵盖以下任务:
- 部署一套AWS CloudFormation模板以启动以下组件:
- Amazon DocumentDB集群
- Amazon EC2 BastionHost环境
- 设置Amazon EC2 BastionHost环境。
- 在Amazon DocumentDB上启用变更流。
- 创建Kinesis Data Stream
- 设置并部署实时变化数据捕获和应用Python 应用程序
- 演示实时数据同步。
部署一套CloudFormation模板
AWS CloudFormation提供一种通用语言,供您在云环境中对AWS资源进行建模及配置。在本演练中,您将部署一套CloudFormation模板,用于创建以下内容:
- Amazon DocumentDB集群 – 模拟源和目标DocumentDB环境
- Amazon EC2 BastionHost环境 – 部署和运行实时数据捕获和应用程序
要部署此模板,请完成以下操作步骤:
下载Github上Cloudformation Yaml文件:
git clone https://github.com/bingbingliu18/DocumentDB-Data-capture
DocumentDB-Data-capture目录中包含下面Cloudformation所用文件docdb_changestream.yaml
- 在AWS CloudFormation控制台上, 选择 Create stack。
- 选择 Upload a template file。
- 选择 choose file。
- 上传之前下载到本地docdb_changestream.yaml 文件。
- 选择 Next。
- 为您的Amazon DocumentDB集群输入名称、用户名、密码与标识符。
- 如果您已经拥有角色,请选择 true。如果还没有角色,请选择 false,AWS CloudFormation模板将为您创建新角色。
- 其他部分保留默认设置,之后选择 Next。
- 选中复选框,允许栈为您创建角色选择 Create stack。
CloudFormation一般在几分钟内完成。
CloudFormation 创建成功后, 请访问CloudFormation output Tab, 查看后续需要访问EC2 BastionHost 访问地址和DocumentDB Cluster Endpoint:
设置Amazon EC2 BastionHost环境
连接到新建的Ec2堡垒机:
修改证书文件权限
chmod 0600 [path to downloaded .pem file]
ssh -i [path to downloaded .pem file] ec2-user@[bastionEndpoint]
aws configure
default region name, 输入: “us-east-1”,其它选择缺省设置
安装Mongo Shell
安装4.0 mongo shell,请在命令提示符中使用以下命令创建repo文件:
echo -e "[mongodb-org-4.0] \nname=MongoDB Repository\nbaseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/4.0/x86_64/\ngpgcheck=1 \nenabled=1 \ngpgkey=https://www.mongodb.org/static/pgp/server-4.0.asc" | sudo tee /etc/yum.repos.d/mongodb-org-4.0.repo
完成之后,使用以下命令安装mongo shell:
sudo yum install -y mongodb-org-shell
下载DocumentDB数据库证书:
wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
安装Pymongo:
sudo pip3 install pymongo
安装boto3:
sudo pip3 install boto3
在Amazon DocumentDB上启用变更流
Amazon DocumentDB变更流提供一条按时间顺序排列的更新事件序列,用以囊括发生在集群内各集合与数据库中的更新事件。可以轮询DocumentDB集群上的变更流,并在发生变更事件(INSERTS, UPDATES以及DELETES)时进行读取。我们使用变更流,将变更事件从您的Amazon DocumentDB集群流式传输至Amazon Kinesis Data Stream。
要在集群上启用变更流,请输入以下代码(请将相应部分替换为您的集群值)。首先,我们使用mongo shell登录数据库:
export USERNAME=<DocumentDB cluster username>
echo "export USERNAME=${USERNAME}" >> ~/.bash_profile
export PASSWORD=<DocumentDB cluster password>
echo "export PASSWORD=${PASSWORD}" >> ~/.bash_profile
export CLUSTERENDPOINT= 【上图Cloudformation Outputs ClusterEndpoint】
echo "export CLUSTERENDPOINT=${CLUSTERENDPOINT}" >> ~/.bash_profile
登录至您的Amazon DocumentDB集群
mongo --ssl --host $CLUSTERENDPOINT:27017 --sslCAFile rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD
接下来,在集群上启用DocumentDB变更流:
在集群级别启用DocumentDB变更流, 集群上所有数据库上的数据变化都将被捕获至DocumentDB Change Stream:
db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
您应得到以下响应结果:
{ "ok" : 1 }
创建Kinesis Data Stream
aws kinesis create-stream --stream-name 【请输入您的Kinesis Data Stream Name】 --shard-count 1
设置并部署实时数据捕获和应用Python程序
登录到EC2 BastionHost, 下载Python程序:
install git
sudo yum install -y git
下载Github上Python程序:
git clone https://github.com/bingbingliu18/DocumentDB-Data-capture
cd DocumentDB-Data-capture
修改和设置操作系统环境变量
修改实时变化数据捕获应用相关操作系统变量:
操作系统变量名 | 操作系统变量值示例 | 备注 |
USERNAME | masteruser | 源端DocumentDB用户名 |
PASSWORD | password | 源端DocumentDB用户密码 |
CLUSTERENDPOINT | clusterendpoint | 源端DocumentDB终端节点 |
SSL | True or False | 源端DocumentDB SSL是否启用 |
WATCHED_DB_NAME | bar | 数据同步的源端DocumentDB 数据库 |
STATE_DB | bar_stat | 存放同步状态的源端DocumentDB 数据库 |
STATE_COLLECTION | bar_stat | 存放同步状态的源端DocumentDB collection |
KINESIS_STREAM | test | 前面新建Kinesis Data Stream名字 |
STATE_SYNC_COUNT | 1 | 源端同步n条记录后 记录同步状态 |
MAX_LOOP | 100000 | 执行数据捕获和数据应用的循环次数 |
修改实时数据应用程序相关操作系统变量:
操作系统变量名 | 操作系统变量值示例 | 备注 |
TARGET_USERNAME | masteruser | 目标端DocumentDB或者MongoDB用户名 |
TARGET_PASSWORD | password | 目标端DocumentDB或者MongoDB用户密码 |
TARGET_CLUSTERENDPOINT | clusterendpoint | 目标端DocumentDB或者MongoDB终端节点 |
Target SSL | True or False | 目标端DocumentDB或者MongoDB SSL是否启用 |
TARGET_DB_NAME | bar_new | 数据应用的目标端DocumentDB或者MongoDB数据库 |
SEQ_DB | bar_new | 目标端DocumentDB或者MongoDB存放同步状态的数据库 |
SEQ_COLLECTION | sequence | 目标端DocumentDB或者MongoDB存放同步状态的collection |
KINESIS_STREAM | test | 前面新建Kinesis Data Stream名字 |
MAX_LOOP | 100000 | 执行数据捕获和数据应用的循环次数 |
修改您实时数据捕获和实时数据应用相关的操作系统环境变量:
根据您的实际环境,修改.bash_profile sample文件
更改.bash_profile sample 为ec2-user用户下的.bash_profile
mv .bash_profile .bash_profile.bak
mv .bash_profile_sample .bash_profile
. ~/.bash_profile
演示实时数据同步
为了简化演示环境, 源和目标文档数据库均设置为同一套DocumentDB集群, 在实际环境,根据需求设置分别的源端DocumentDB数据库和目标端DocumentDB或者MongoDB数据库。
以下演示将模拟场景1: 业务数据实时同步:有零售集团, 需要实时将产品目录更新信息, 从公司总部文档数据库下发和实时同步到子公司文档数据库,将模拟总部文档数据库的数据的增删改如何实时同步到子公司文档数据库。关于场景2:在线数据迁移也可以采用同样的实时数据同步的方案,在此博文中不再花篇幅赘述。
演示环境的实时数据捕获和实时数据应用程序采用Python程序部署,在实际环境中部署也可以采用Lambda无服务架构,来进一步优化部署架构和成本。
演示环境实时消费和应用Kinesis数据采用Kinesis Data Stream API开发, 在实际开发和部署生产环境,也可参考Kinesis KCL实现实时消费和应用Kinesis数据。
运行实时数据捕获程序:
打开终端窗口1, 连接到Ec2堡垒机,运行实时数据捕获应用:
python3 changestream_capture.py
运行实时数据应用程序:
打开终端窗口2, 连接到Ec2堡垒机,运行实时数据应用程序:
python3 changestream_apply.py
打开终端窗口3, 连接到Ec2堡垒机,运行Mongo Shell模拟连接到源端和目标端文档数据库:
mongo --ssl --host $CLUSTERENDPOINT:27017 --sslCAFile rds-combined-ca-bundle.pem --username $USERNAME --password $PASSWORD
在源端文档数据库插入一条产品信息:
运行mongo shell
use bar
db.products.insert({
"name" : "RayBan Sunglass Pro",
"sku" : "1590234",
"description" : "RayBan Sunglasses for professional sports people",
"inventory" : 100
})
连接到目标端文档数据库, 查看新插入产品信息是否被同步:
运行mongo shell
use bar_new
db.products.find()
{ "_id" : ObjectId("60d97230a2e692800e1a2b76"), "name" : "RayBan Sunglass Pro", "sku" : "1590234", "description" : "RayBan Sunglasses for professional sports people", "inventory" : 100 }
在源端文档数据库更改一条产品信息:
运行mongo shell
use bar
db.products.update(
{"sku":"1590234"},
{
$set: {
"reviews" : [{
"rating" :4,
"review":"perfect glasses"
},{
"rating" :4.5,
"review":"my priced posession"
},{
"rating" :5,
"review":"Just love it"
}]}
}
)
连接到目标端文档数据库, 查看更新产品信息是否被同步:
运行mongo shell
use bar_new
db.products.find()
{ "_id" : ObjectId("60d97230a2e692800e1a2b76"), "name" : "RayBan Sunglass Pro", "sku" : "1590234", "description" : "RayBan Sunglasses for professional sports people", "inventory" : 100, "reviews" : [ { "rating" : 4, "review" : "perfect glasses" }, { "rating" : 4.5, "review" : "my priced posession" }, { "rating" : 5, "review" : "Just love it" } ] }
在源端文档数据库删除产品信息:
运行mongo shell
use bar
db.products.remove({"sku":"1590234"})
连接到目标端文档数据库, 查看更新产品信息是否被同步:
运行mongo shell
use bar_new
db.products.find()
记录也被删除,没有记录被查询
如果在运行上述演示步骤,出现异常情况,可以重置演示环境,重新再执行上述演示步骤:
- 删除之前创建的Kinesis Data Stream
aws kinesis delete-stream --stream-name 【请输入您的Kinesis Data Stream Name】
- 重新创建Kinesis Data Stream和重置同步状态表
wget https://documentdb-zhy.s3.ap-southeast-1.amazonaws.com/change_stream/reinitiate-new.py .
python3 reinitiate-new.py
清理资源
要清理本次演练中创建的资源,请导航至AWS CloudFormation控制台,找到您为本次演练创建的栈,而后一一将其删除。再请删除之前创建的Kinesis Data Steam,操作之后,即可删除与演练相关的所有资源。
总结
在本博文中,我们将向您介绍如何将Amazon DocumentDB与Amazon Kinesis相集成,实现Amazon DocumentDB之间或者Amazon DocumentDB和MongoDB之间的实时数据同步。
参考资源
- DocumentDB 开发者指南:https://docs.thinkwithwp.com/documentdb/latest/developerguide/index.html
- Kinesis 开发者指南:https://docs.thinkwithwp.com/streams/latest/dev/introduction.html