亚马逊AWS官方博客

基于亚马逊云科技托管 Flink 的开发系列 — 写入 Amazon S3 篇

1. 概述

上文讲述了如何建立 Apache Flink(以下简称 Flink)的本地开发环境,完成了从 Amazon Kinesis Data Streams 读取数据,然后经过统计之后再输出的屏幕。这篇文章将继续来讲述 Flink 的开发:写入结果到 Amazon Simple Storage Service(Amazon S3)。Amazon S3(下文简称 S3)存储服务是众多用户建立企业数据湖的基础,通过 Flink 来完成实时数据入湖的操作是非常常见的需求。

2. 本地开发环境设置

Flink 写入 S3 需要有相应的 flink-s3-fs-hadoop 文件系统插件。在亚马逊云科技托管 Flink 环境中,这个 S3 的包文件已经存在了,用户无需再打包进程序包中,但是在本地环境中,我们还是需要自行下载,具体链接可以参考附录中[1]。

虽然按照 Flink 官方文档中,flink-s3-fs-hadoop.jar 需要放在 Flink 安装目录 [2] 所在的 plugins/ 下,但经过实际测试后,发现需要放在 Flink 的 lib/ 下才能生效,这可能只是对于 PyFlink 的特别之处。

另外还有一个特别设置,默认情况下,S3 插件会去访问亚马逊云科技海外区域的 S3 Bucket,所以如果要访问国内北京区或者宁夏区,需要下面特别设置。这个设置也只是针对本地开发环境需要,在托管的 Flink 中无需设置。

在 Flink 安装目录[2]所在的 conf/ 下,新建文件 core-site.xml,贴入下面内容。其中 endpoint 地址是被进去的 S3;如果是访问宁夏区的 S3,请修改相应的 endpoint 地址为 s3.cn-northwest-1.amazonaws.com.cn。

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
      <name>fs.s3a.endpoint</name>
      <value>s3.cn-north-1.amazonaws.com.cn</value>
    </property>
</configuration>

有关访问 S3 的 AKSK 设置,可以参考本系列上一篇文章的设置。

3. 开始测试程序

现在我们可以开始准备尝试写入 S3 了。在 Github 的示例代码中,pyflink-examples/FileSink [3] 是一个以 Amazon Kinesis Data Streams 为数据源,然后再写入到 S3 的程序。

3.1 生成源数据

我们可以按照上一篇文章中的方式创建一个 Amazon Kinesis Data Stream,然后使用 stock.py 程序来产生测试数据。这个测试数据是模拟股票价格。

3.2 准备相关 Java 包

Flink 访问 Amazon Kinesis Data Streams 需要相应的 jar 包,从 Maven 仓库中下载 flink-sql-connector-kinesis-1.15.2.jar [4],然后放入到 FileSink 的 lib 目录(需要创建)。

3.3 更新应用属性

修改 FileSink/application_properties.json 文件中配置,分别是第 12 行的 Data Stream 名称,第 14 行的 aws.region,以及第 20 行 S3 Bucket 的名称。

3.4 更改输出文件路径

程序默认输出在 Bucket 的根目录下,为了后续方便其它测试,建议增加一层目录,可以按照下面方法来修改 streaming-file-sink.py 中 96 行,增加 {0}/ ,这样会用表名来作为目录,示例代码中即 output_table:

3.5 修改运行配置

在 streaming-file-sink.py 文件上右键选择 Modify Run Configuration…

在 Run 下面的 interpreter 中选择之前配置的 flink-env,在 Environment variables 中增加两个变量,中间用分号相隔,其中 HADOOP_CONF_DIR 后面路径即是 PyFlink 安装目录的 conf 路径,请按照实际目录修改:

IS_LOCAL=true;HADOOP_CONF_DIR=/home/ec2-user/miniconda3/envs/flink-env/lib/python3.8/site-packages/pyflink/conf

最后如果配置了额外的 Amazon Command Line Interface(Amazon CLI)profile,要选择相应的 profile。

其它配置可以参考上篇文章的截图。

3.6 运行程序

现在我们就可以开始运行程序了,在右上角的运行栏中选择 streaming-file-sink,点击右边绿色三角形按钮就可以开始运行程序了。

如果一切顺利,下方 Run 框里面没有错误信息,同时可以看到 S3 插件的相关信息:

3.7 验证结果

运行一段时间后,转到亚马逊云科技的控制台,选择 S3 服务,在之前设置的 Bucket 里面,可以看到有 JSON 文件生成,这就是 Flink 写入成功了。

4. 部署上云

部署上云时不用考虑之前 S3 的 endpoint 地址问题,因为托管的 Flink 中已经做好相应的配置,直接按照上篇文章的方式打包部署就可以了。

5. 结束语

本文针对 Flink 写入 S3 的一些设置做了说明,特别是在本地开发时要写入国内亚马逊云科技的 S3 上而增加的 core-site.xml。

接下来一篇将是 Flink 读取需要 TLS 证书访问 Kafka 的内容,敬请期待。

附录参考网址:

[1] https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.15.2

[2] <home directory>/miniconda3/envs/<environment name>/lib/python3.8/site-packages/pyflink/

[3] https://github.com/aws-samples/pyflink-getting-started/tree/main/pyflink-examples/FileSink

[4] https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis/1.15.2

本篇作者

周平

西云数据高级技术客户经理,致力于大数据技术的研究和落地,为亚马逊云科技中国客户提供企业级架构和技术支持。