亚马逊AWS官方博客

使用 AWS DMS 将数据从 Amazon S3 流式传输至 Amazon Kinesis Data

Original Link: https://thinkwithwp.com/cn/blogs/big-data/streaming-data-from-amazon-s3-to-amazon-kinesis-data-streams-using-aws-dms/

对于需要快速发现问题并根据数据改善结果的用例中,流处理已经成为一类重要工具,帮助我们解决生产线监控与供应链优化等实际需求。

本文将以Amazon Simple Storage Service (Amazon S3)为起点,以Amazon Kinesis为终点,引导您完成整个流程中对现有数据文件及变更操作的流式传输。流传输改造可通过AWS Database Migration Service (AWS DMS)实现。AWS DMS使您能够将数据从受支持的源处无缝迁移至AWS云内的关系数据库、数据仓库、流平台以及其他数据存储选项当中。

已经有众多SaaS第三方应用程序与Amazon S3相集成,能够将记录直接交付至S3存储桶。在某些使用场景下,您可能需要以近实时方式进一步处理这部分数据以生成警报。事实上,威胁检测与应用程序监控等用例往往要求在几秒钟之内生成洞见结论。对于这类需求,批处理流程带来的数据分析延迟将引发大麻烦,降低系统根据紧急情况做出快速响应的能力。因此,我们需要一种对应用程序同Amazon S3间的现有集成方式进行拓展的方法,有效将批处理转换为流处理。

您可以使用AWS DMS满足此类数据处理需求。通过AWS DMS,您可以将现有应用程序扩展至Amazon S3当中,并在Amazon Kinesis Data Streams中生成数据以进行实时分析,且全程无需编写及维护新的代码。AWS DMS还支持我们将Amazon S3指定为数据源,并将Kinesis及Amazon Managed Streaming of Kafka (Amazon MSK)等流服务指定为目标。AWS DMS能够将完整及变更数据捕捉(CDC)文件迁移至这些服务当中。更重要的是,AWS DMS提供现成的即用式功能选项,无需任何复杂的配置或代码开发操作。您还可以配置AWS DMS副本实例,以根据工作负载规模进行任意伸缩

在本文中,我们主要讨论如何将数据流传输至Kinesis。我们部署有一套AWS CloudFormation,能够在几分钟内快速上手并由此建立起流式传输管道。

架构概述

Web、API与数据集成服务等第三方应用程序会在S3存储桶中生成数据与日志文件。而建立在AWS上的数据湖则负责对这些数据进行分段处理,并将其保存在Amazon S3当中。AWS DMS支持将Amazon S3作为数据源、Amazon Kinesis作为传输目标,借此将S3存储桶内的数据流式传输至Kinesis当中。以此为基础,AWS LambdaAmazon Kinesis Data FirehoseAmazon Kinesis Data Analytics以及 Kinesis Consumer Library (KCL)等服务可以同时使用这些数据执行褚分析。这套架构中的每一项AWS服务,都能够根据需求进行独立扩展。

下图所示,为这套解决方案的基本架构。

部署AWS CloudFormation

首先,我们需要部署CloudFormation模板以构建架构的核心组件。AWS CloudFormation将以安全且可重复的方式,自动在各个区域与账户之间完成技术与基础设施部署,且将工作量与耗时控制在最低水平。要完成资源创建,请执行以下操作步骤:

    1. 登录至 AWS管理控制台 并选择us-west-2 区域。
    2. 选择 Launch Stack:

  1. 选择 Next

这时,模板会自动在您的AWS账户中启动AWS CloudFormation,并提示您根据需求进行登录。您可以在控制台上查看要使用的CloudFormation模板。

  1. Stack name部分,输入栈名称。
  2. 在下一屏中, 选择您的VPC与子网ID。
  3. 在 Does DMS VPC and Cloudwatch role Exists?部分,如果您的账户当中已经存在托管AWS身份与访问管理(AWS Identity and Access Management,简称IAM)角色dms-vpc-role 与 dms-cloudwatch-logs-role ,则直接按下Y键。如果尚不存在,请按N键。

如果您需要将AWS DMS端点部署在私有子网当中,请在部署模板之前为Kinesis与Amazon S3启用VPC端点。

    1. 选择 Next

  1. 在最后一屏的Capabilities下确认资源创建,而后选择 Create

整个栈创建过程大约需要5到10分钟,在此期间系统会执行以下操作:

  • 创建一个源S3存储桶,并创建具有两个分片的目标Kinesis数据流。
  • 创建一个AWS DMS副本实例, Amazon S3  端点,以及Kinesis 目标
  • 将S3存储桶与数据流同对应的端点映射起来。
  • 使用必要参数配置复制任务。
  • 创建一项AWS Lambda函数以及随附触发器,借此使用由Kinesis提供的记录。关于更多详细信息,请参阅将AWS Lambda与Amazon Kinesis配合使用

本文演示中使用的文件并未包含在模板当中。请下载blog_sample_file.zip并将其上传至您的源存储桶,之后再启动AWS DMS任务。

使用Amazon S3作为数据源

当您使用Amazon S3作为数据源时,数据文件(完整负载与CDC)必须为逗号分隔值(CSV)格式。

除数据文件之外,AWS DMS还需要配合外部表定义。所谓外部表定义,是指一份用于描述AWS DMS应如何解释Amazon S3数据的JSON文档。

要让AWS DMS正常运行,我们还需要指定关于完整负载与CDC文件的Amazon S3文件路径。请保证在文件名中正确使用有序编号,以正确顺序进行数据复制。此外,大家还可以在AWS DMS中使用额外的连接属性以指定列定界符、行定界符以及其他参数。

AWS DMS能够通过两种方式识别出每条负载记录所须执行的操作:记录关键字值INSERT或I。

关于更多详细信息,请参阅将Amazon S3作为AWS DMS数据源

使用Amazon Kinesis作为目标

AWS能够将记录以JSON的形式发布至Kinesis数据流。在转换期间,AWS DMS会将Amazon S3文件中的每条记录序列化为JSON格式的属性-值对。

无论源文件中指定怎样的操作,AWS DMS都会将源Amazon S3文件中的各条记录以JSON数据记录的形式发布至数据流当中。

此外,AWS DMS还支持对象映射,借此将数据从源文件选择至数据流内。对象映射负责确定流内数据记录的具体结构。

AWS DMS还支持对带有任务设置的全负载及CDC进行多线程迁移。您可以设置多条线程、缓冲区大小及并发队列以提高迁移性能。

关于更多详细信息,请参阅在AWS数据库迁移服务中使用Amazon Kinesis Data Streams作为目标

演练流程

AWS CloudFormation负责部署整体基础设施,现在我们着手为本次演练用例准备所需文件。

  1. 下载 blog_sample_file.zip,其中包含CSV格式的完整与CDC负载文件。

如果您的源文件并非CSV格式,请将文件格式转换为CSV。这里推荐大家使用AWS Glue。关于更多详细信息,请参阅AWS Glue中的ETL输入与输出格式选项

以下截屏所示,为此用例中的完整负载文件记录。

CDC文件还需要配合其他AWS DMS属性以正确识别操作、表与schema。

  1. 对文件进行格式转换,具体方法如下:
  • 操作——要执行的变更操作:INSERT或I、UPDATE或U,DELETE或D。
  • 表名称:源表的名称。
  • Schema名称——源schema的名称。
  • 数据——代表需要更改的一列或多列数据。

以下截屏所示,为CDC文件中的示例记录。

我们还需要在源端点配置中添加外部表定义。在本文中,该定义直接嵌入在AWS CloudFormation当中。

  1. 在表定义中,为完整及CDC负载文件输入以下代码:

{

“TableCount”: “1",

“Tables”: [{

“TableName”: “table01”,

“TablePath”: “schema01/table01/“,

“TableOwner”: “schema01",

“TableColumns”: [{

“ColumnName”: “ingest_time”,

“ColumnType”: “TIMESTAMP”,

“ColumnNullable”: “false”,

“ColumnIsPk”: “true”

}, {

“ColumnName”: “doi”,

“ColumnType”: “STRING”,

“ColumnLength”: “30”

}, {

“ColumnName”: “id”,

“ColumnType”: “INT8”

}, {

“ColumnName”: “value”,

“ColumnType”: “NUMERIC”,

“ColumnPrecision”: “5”,

“ColumnScale”: “2”

}, {

“ColumnName”: “data_sig”,

“ColumnType”: “STRING”,

“ColumnLength”: “10”

}],

“TableColumnsTotal”: “5”

}]

}

在由CloudFormation模板创建的源S3存储桶内,进一步创建文件夹结构

    1. 为完整负载创建schema01/table01/文件夹,并为CDC数据文件创建cdcfile/ 文件夹。
    2. 另外,文件名应采用增量形式,如以下CLI输出所示。

$aws s3 ls s3://blog-xxxxxxxx/schema01/table01 –recursive –human-readable –summarize2020-08-03 22:05:57   5.0 MiB schema01/table01/full_0002020-08-03 22:05:51   5.0 MiB schema01/table01/full_0012020-08-03 22:06:00   5.0 MiB schema01/table01/full_0022020-08-03 22:05:56   5.0 MiB schema01/table01/full_0032020-08-03 22:05:59   3.1 MiB schema01/table01/full_004 $aws s3 ls s3://blog-xxxxxxxx/cdcfile –recursive –human-readable –summarize2020-08-03 22:06:28   4.8 MiB cdc/cdc_0002020-08-03 22:06:28   4.8 MiB cdc/cdc_0012020-08-03 22:06:26   4.8 MiB cdc/cdc_0022020-08-03 22:06:19   4.8 MiB cdc/cdc_003

  1. 在文件复制完成之后,在 AWS DMS控制台上, 选择 Replication
  2. 验证实例状态与配置。
  3. 选择 Endpoints
  4. 验证Amazon S3源端点的状态与配置,确保已经正确接入副本实例。
  5. 同样的,请验证Kinesis目标端点的状态与配置,保证已经正确接入副本实例。
  6. 选择 Database migration task
  7. 验证源与目标是否已经正确映射。
  8. 在所有配置验证完成之后,重新启动AWS DMS任务。由于任务之前已经创建完成但尚未启动过,因此请选择Restart/Resume以启动完整负载与CDC。

在数据迁移开始之后,您可以在Table statistics下进行查看。关于更多详细信息,请参阅如何使用表统计信息监控AWS DMS任务

AWS DMS首先完成对完整负载的上传,而后将变更数据以文件形式上传至由cdcPath参数指定的存储桶对应位置。

  1. 在迁移进行中,大家可以通过Kinesis控制台的Monitoring选项卡下的IncomingBytes指标,确认数据正在流式传输至Kinesis Data Streams当中。
  2. 要确认Lambda一端正在使用流式数据,请参考GetRcords.Bytes指标。

现在,您已经验证了Lambda中的记录。根据配置,Lambda将通过触发器从Kinesis中读取数据。

本文中指定的Lambda数据使用方是一项示例函数,该函数使用Kinesis数据流内的记录、对base64编码数据进行解码,而后将记录输出至Amazon CloudWatch日志组处。

  1. 在Monitoring选项卡中,打开CloudWatch Log Insights下的recent logstream以查看输出的记录。

关于监控的更多细节信息,请参阅AWS Lambda控制台中的监控功能

大家可以根据需要向Lambda函数中添加处理逻辑,借此对记录进行合并或处理。您也可以配置Lambda目标以执行进一步处理。Lambda异步调用还可以在Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS) 或者 Amazon EventBridge上添加事件或消息。关于更多详细信息,请参阅AWS Lambda Destinations简介

最佳实践与注意事项

在实施这套解决方案时,请注意以下最佳实践:

  • 完整负载旨在将现有数据从S3存储桶流式传输至Kinesis。您可以在流式传输CDC数据之前,首先使用完整负载迁移原有存储数据。在迁移开始之前,完整负载数据应该已经存在。对于新的CDC文件,数据会在文件传送期间被实时传输至Kinesis。
  • 要加载多个表,您可以在外部表定义文件中指定表数量与表属性。CDC路径保持不变,且AWS DMS将以元数据字段为基础将记录与表映射起来。
  • 在高工作负载期间,我们可以对AWS DMS实例中的CPU、内存、存储及I/O等资源进行限制。为了获取最佳传输速度,请监控CoudWatch指标并扩展副本实例。
  • 要对大量表进行迁移,可以上调多线程参数的值以加快传输速度。
  • CloudFormation模板会创建带有两个分片的数据流。随着数据流传输速率的增加,您可以进一步扩展分片数量以适应需求变化。请使用CloudWatch提供的IncomingRecords 与 WriteProvisionedThroughputExceeded指标监控Kinesis,借此获取关于分片扩展需求的洞见结论。
  • AWS DMS任务中的对象映射会定义分区键。此分区键用于按传输流内的分片对数据进行分组。AWS DMS使用的默认分区键为TableName,您可以使用属性映射将分区键更改为JSON内某一字段的值,或者将其更改为源数据库中表的主键。或者,您也可以将分区键设置为恒定值,保证所有数据都被流式传输至单一分片处。
  • 在默认情况下,当流中包含默认时,Lambda会立即调用对应函数。要避免因存在少量记录而调用函数,请配置批处理窗口将事件源缓冲记录周期配置为最多5分钟。关于更多详细信息,请参阅将AWS Lambda与Amazon Kinesis配合使用
  • 您可以通过将Kinesis配置为Lambda触发器以增加并发性,即以并发方式处理各个分片中的多批次数量。Lambda每个分片最多可以同时处理10个批次。关于并发机制的更多详细信息,请参阅面向Kinesis与DyanmoDB事件源的全新AWS Lambda规模伸缩控件

资源清理

在成功完成测试与验证之后,请删除由CloudFormation模板部署的所有资源,以避免产生不必要的运营费用。请首先清空S3存储桶,而后停止AWS DMS任务,最后在AWS CloudFormation控制台上删除本次演练中使用的栈。

总结

本文介绍了如何使用AWS DMS,将批处理方案转换为近实时解决方案。这套解决方案大大简化了将记录由Amazon S3迁移至Kinesis并加以分析的过程。Kinesis作为AWS DMS指定的目标,可为多种其他系统提供数据资源。这样一条近实时管道将帮助您快速了解系统内的各类变化,最终提升组织的实际决策能力。该解决方案中使用的全部资源皆可无缝扩展,保证您能够专注于实施分析、警报、报告与欺诈检测,而不必分神于平台本身的设置及维护。这不仅降低了运营负担,同时也将显著提高成本效益。

本篇作者

Mahesh Goyal

AWS公司大数据数据架构师。他协助客户推进云迁移之旅,主要负责大数据与数据仓库方面的工作。在业余时间,Mahesh喜欢听音乐以及同家人一道探索新的美食。

Charishma Makineni

AWS公司技术客户经理。她与企业客户合作,帮助他们在AWS云上构建起安全且可扩展的解决方案。她专注于大数据与分析技术方向。在工作之余,Charishma喜欢客户活动、园艺和烹饪。

Suresh Patnam

AWS公司解决方案架构师。他以大数据及AI/ML为基础构建起高度可用、可扩展且安全可靠的架构,帮助客户在AWS平台上实施各类创新。在业余时间,Suresh喜欢打网球和陪伴家人。