亚马逊AWS官方博客

基于 Flink on Kinesis Data Analytics 对数据进行流式处理

1. 概述

Flink on Kinesis Data Analytics(Amazon Managed Service for Apache Flink,以下简称 KDA)是适用于 Apache Flink 的托管服务,它可以帮助用户实时处理和分析流式数据,并支持以低延迟和高吞吐量的方式进行数据处理和转换。基于 KDA,用户可以通过使用 Flink 的流式处理引擎和 Kinesis Data Analytics 的实时数据分析和可视化工具来构建复杂的数据处理管道,这些管道可以从多个数据源中汇聚数据,并将其转换为有价值的信息。同时,基于 KDA 的解决方案还可以支持监控和告警等业务场景,以帮助用户快速识别和解决潜在的问题,并高效地处理和分析实时数据。

本文我们将讲述基于 Flink on KDA 为某制造行业客户的设备数据进行流式处理的方案,即通过对设备运行数据进行分析和规则匹配,最终产生设备告警信息的整个过程,并使用 Terraform 将整个方案中所使用到的资源部署在一个 AWS 账号中。通过参考本文中提到的技术方案,您也可以对类似的业务场景进行同样的处理来提高数据处理的时效性。

2. 架构说明

该方案的原始数据是 Amazon Managed Streaming for Apache Kafka(以下简称 MSK) 中的设备运行数据,我们使用 KDA 对消息进行处理之后,将输出的设备告警信息下发到告警的 MSK topic 中(本方案范围之外的下游告警服务会消费该 topic 的数据并进行下一步动作),具体的处理过程及说明如下:

  1. 在 Generate/revoke result 的 KDA 应用程序中筛选出符合告警规则的消息,根据规则进行处理,生成告警消息或撤销告警消息,并将产生的消息放入 alert topic。
  2. Dispatcher 的 KDA 应用程序从 alert topic 中读取消息,无撤回告警消息时下发至最后的 dispatch topic。
  3. 告警规则以及设备运行数据的格式文件存放在 S3 桶中,如果告警规则有更新,由 EventBridge 每天在业务低峰期触发一个 Lambda 函数,去重启两个 KDA 的应用程序,使得告警规则生效。
  4. 和设备有关的配置信息,如设备的级别、版本等,存放在 Configs RDS PostgreSQL 中,使得 KDA 的应用程序能在很短的时间内查到这些信息,提高 KDA 应用程序的运行效率。

AWS 每个服务在该架构中的作用如下:

  • Amazon Lambda:重启现在正在运行的 KDA applications,使规则生效
  • Event Bridge:在该项目中担任 Scheduler,每天定时触发更新规则及重启 application 的 Lambda function
  • S3:存储 Flink SQL 脚本文件
  • RDS:使用 PostgreSQL 数据库存放设备的配置信息
  • Kinesis Data Analytics(KDA):流式数据处理,其中的 application 实现从上游 MSK topic 进来的消息处理逻辑
  • Managed Streaming for Apache Kafka(MSK):使用该服务,用于输出 KDA 最终产生的下发到下游服务的设备故障消息,本方案的输入和输出都对应 MSK 中的不同的 topic
  • Cloud Watch:日志管理及查看
  • IAM:权限管理
  • SNS:重启 KDA applications 失败的消息接受,可订阅 SNS 的 topic 接收告警

3. 业务背景

生成告警消息

设备产生的故障、移动、告警等事件消息,发送到对应的 MSK topic。因为网络波动,设备不稳定等问题,数据可能存在乱序和迟到的现象。从 MSK topic 里获取原始的设备事件,根据这些事件的组合判断是否产生告警。有些告警对设备事件的产生顺序和时间间隔有要求,有些没有要求。典型的判断逻辑如下:

  • 设备在一分钟之内产生一条或多条设备故障码 A 和故障码 B,并且其中没有发生故障码 A 的恢复事件。
  • 设备首先产生一条故障码 C,之后一分钟之内产生告警事件,并且其中没有接收到设备恢复运行事件。

撤销告警消息

告警事件产生之后,当收到一些特定的事件时,如某个设备运行事件,某个故障码恢复等,需要对上一个产生的告警事件进行撤销。

下发告警消息

当收到告警事件时,需要对设备状态作进一步判断,决定是否下发告警消息:

  • 当收到一条新的告警事件时,需要根据设备状态做去重,避免频繁发送相同的告警消息。
  • 当前告警事件有没有被其他优先级更高的告警事件覆盖。
  • 对于需要撤销的告警,需要根据之前工单的状态去判断是否需要发送撤销工单的事件。

下发的告警消息需要在 KDA 中写入告警的 MSK topic 中。

4. 技术方案

由于设备告警规则较多,并且需要较为频繁的新增和更改,我们希望方案足够灵活,可扩展,减少开发维护的成本。考虑到开发效率和成本,我们决定尽可能使用 Flink SQL,只有当 Flink SQL 无法满足业务需求时才使用 Flink stream API。在我们本次的场景中,设备的一些设备数据存储在 RDS PostgreSQL 数据库中。对于 MSK 来说,在 Flink 中使用 kafka connector 来将 topic 转换成表,Flink 的 Apache Kafka Connector 使用请参考官方文档

对于上述提到的所有数据源使用 SQL 定义,如带密码验证的 MSK topic 数据源和用来存放配置的 RDS postgreSQL 表。MSK Topic 在 Flink SQL 中的定义示例如下:

create table err_kafka (
`Message` string,
`Param` row<`ErrCode` int, `Timestamp` timestamp_ltz(3), `ID` string>,
`DeviceNumber` int,
`ErrCode` as `Param`.`ErrCode`,
`MessageTime` as `Param`.`Timestamp`,
`ID` as `Param`.`ID`,
`ProcTime` as proctime(),
`RowTime` as `Param`.`Timestamp`,
watermark for `RowTime` as `RowTime`
)
with (
'connector' = 'kafka',
'topic' = 'your-device-fault-topic-name',
'properties.bootstrap.servers' = 'your-own-value',
'properties.group.id' = 'your-own-value',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="your-own-value" password="your-own-value";',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);

RDS postgreSQL 配置表在 Flink SQL 中的定义如下:

create table device_config_jdbc (
  device_id int,
  version int,
  info string,
  primary key (device_id, version) not enforced
) with (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://host:5432/db',
   'username' = 'your-username',
   'password' = 'your-password',
   'table-name' = 'device_config',
   'lookup.cache.max-rows' = '1000',
   'lookup.cache.ttl' = '60s'
);

我们对比了三种生成告警的代码实现:

  1. CEP
  2. interval join
  3. regular join

Complex Event Processing(CEP)复杂事件处理可以根据特定模式实时检测数据流,类似于使用正则表达式进行字符串的检测。CEP 的好处是写法简单直观,资源占用少,但是存在一些限制,不适用所有场景,比如:

  • 不要求事件发生的顺序的场景,不容易用 CEP 表达
  • watermark 设置过小会丢失迟到数据,设置过大会导致过高的事件生成延迟

Join 更为通用灵活,但是资源消耗更大,有些场景需要在 join 之后需要增加去重等逻辑。

  • interval join 适合 watermark 准确的场景,由框架本身根据流的 watermark 自动清理历史状态
  • regular join 适合不需要保存过长时间历史状态的场景,通过设置 exec.state.ttl 参数,限制状态大小,防止内存溢出

SQL 脚本模板

为了提高开发效率和后期维护的灵活性,我们采用模板化的方式管理规则。在系统启动时,先从数据库的配置表读取规则配置,根据每条规则的模板名,从 S3 下载相应的 SQL 模板(模板需要提前部署到指定的 S3 路径),使用 FreeMarker 处理模板,对模板中的变量进行替换(板中所有${paramName}的内容会用存储在 RDS postgreSQL 配置表中的相应字段来替换),生成最终的 SQL 语句。alert.ftl 示例如下:

create view alert as
<#list expression as ex>${ex}
union all
</#list>
select Message, DeviceNumber, MessageTime, '', '', 0, 0
from scenario1;

其中 scenario1 为一个 view,创建语句如下:

create view `scenario1` as
select * from `err_kafka` 
where `ErrCode` in (1, 2, 3)
and `MessageTime` < now() + interval '5' minute;

采用 SQL 模版的好处是:

  • 模版复用,类似的规则可以使用同一个模版
  • 增加,修改规则只需要上传新模版,更改配置表参数,无需改动代码
  • 模板基于 SQL,可读性强,开发速度快

在 Flink application 的 main class 对 SQL 模板进行处理并替换里面的变量,main class 的示例代码如下,  注意其中的 alert_kafka 和 dispatch_kafka 您需要参考 err_kafka 进行创建:

import org.apache.flink.table.api.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.io.*;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import java.util.Map;
import java.util.List;
import java.util.Properties;
import freemarker.template.*;
import java.util.ArrayList;
import java.util.HashMap;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.apache.commons.io.FileUtils;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;


public class DataStreamJob {

    static void executeSqlFile(String fileName, StreamTableEnvironment tableEnv) throws IOException {
        InputStream sqlStream = new FileInputStream(new File(fileName));
        executeSql(sqlStream, tableEnv);
    }

    static void executeSqlString(String sqlString, StreamTableEnvironment tableEnv) throws IOException {
        InputStream sqlStream = IOUtils.toInputStream(sqlString, StandardCharsets.UTF_8);
        executeSql(sqlStream, tableEnv);
    }

    static void executeSql(InputStream inputStream, StreamTableEnvironment tableEnv) throws IOException {
        String newLine = System.getProperty("line.separator");
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
        StringBuilder sb = new StringBuilder();
        for (String line; (line = reader.readLine()) != null; ) {
            line = line.split("--")[0];
            line = line.strip();
            if (line.toLowerCase().startsWith("set")) {
                continue;
            }
            if (sb.length() > 0) {
                sb.append(newLine);
            }
            sb.append(line);
            if (line.endsWith(";")) {
                tableEnv.executeSql(sb.toString());
                sb.setLength(0);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        FileUtils.deleteDirectory(new File("/tmp/templates"));
        FileUtils.deleteDirectory(new File("/tmp/scripts"));
        AmazonS3 s3 = AmazonS3ClientBuilder.standard()
        .withCredentials(new DefaultAWSCredentialsProviderChain())
        .withRegion("cn-north-1")
        .build();
        Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
        Properties properties = applicationProperties.get("AppConfigProperties");
        if (properties == null) {
            properties = new Properties();
            properties.load(DataStreamJob.class.getClassLoader().getResourceAsStream("local.properties"));
        }
        TransferManager transferManager = TransferManagerBuilder.standard().withS3Client(s3).build();
        String bucketName = properties.getProperty("s3_bucket");
        MultipleFileDownload xfer = transferManager.downloadDirectory(bucketName, "templates", new File("/tmp/"));
        xfer.waitForCompletion();
        xfer = transferManager.downloadDirectory(bucketName, "scripts", new File("/tmp/"));
        xfer.waitForCompletion();

        System.out.print(properties);
        String appName = properties.getProperty("app_name");
        switch (appName) {
            case "alert":
                processAlert(properties);
                break;
            case "dispatch":
                processDispatch();
                break;
        }
    }

    static void processAlert(Properties properties) throws Exception {
        final StreamExecutionEnvironment alertStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamTableEnvironment alertTableEnv = StreamTableEnvironment.create(alertStreamEnv);
        TableConfig tableConfig = alertTableEnv.getConfig();
        tableConfig.set("table.exec.state.ttl", "1h");

        Configuration cfg = new Configuration(Configuration.VERSION_2_3_31);
        cfg.setDirectoryForTemplateLoading(new File("/tmp/templates"));
        cfg.setDefaultEncoding("UTF-8");
        cfg.setNumberFormat("computer");

        StringWriter writer = new StringWriter();

        Template template = cfg.getTemplate("ddl.ftl");
        template.process(properties, writer);
        String ddlStatement = writer.toString();
        System.out.println(ddlStatement);
        executeSqlString(ddlStatement, alertTableEnv);
        
        Map<String, List<String>> expressionMap = new HashMap<String, List<String>>();
        expressionMap.put("expression", new ArrayList<String>());
        // Get config from RDS table.
        List<ServiceConfig> configs = ServiceConfig.getConfig(properties);
        for (ServiceConfig config: configs) {
            writer = new StringWriter();
            template = cfg.getTemplate(config.getTemplateName());
            template.process(config, writer);
            expressionMap.get("expression").add(writer.toString());
        }
        writer = new StringWriter();
        template = cfg.getTemplate("alert.ftl");
        template.process(expressionMap, writer);
        System.out.println(writer.toString());
        executeSqlString(writer.toString(), alertTableEnv);

        Table alertTable = alertTableEnv.from("alert");
        DataStream<Alert> alertStream = alertTableEnv.toDataStream(alertTable, Row.class)
        .keyBy(value -> (Integer) value.getFieldAs("DeviceNumber")).process(new AlertFunction());
        
        alertTableEnv.createTemporaryView("processed_alert", alertStream);
        executeSqlFile("/tmp/scripts/insert_alert.sql", alertTableEnv);
    }

    static void processDispatch() throws Exception {
        final StreamExecutionEnvironment dispatchStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamTableEnvironment dispatchTableEnv = StreamTableEnvironment.create(dispatchStreamEnv);
        executeSqlFile("ddl.sql", dispatchTableEnv);
        Table alert = dispatchTableEnv.from("alert_kafka");
        DataStream<Row> alertStream = dispatchTableEnv.toDataStream(alert);
        DataStream<Row> delayedAlertStream = alertStream.keyBy(
            value -> (Integer) value.getFieldAs("DeviceNumber")
            ).process(new DelayFunction());
        DataStream<Dispatch> dispatchDataStream = delayedAlertStream.keyBy(value -> (Integer) value.getFieldAs("DeviceNumber")).process(new DispatchFunction());
        dispatchTableEnv.createTemporaryView("dispatch", dispatchDataStream);
        executeSqlFile("insert_dispatch.sql", dispatchTableEnv);
        dispatchStreamEnv.execute("Dispatch application");
    }
}

其中 insert_alert.sql 的示例代码如下:

insert into alert_kafka(`DeviceNumber`, `AlertType`, `AlertID`, `AlertTime`, `FaultTime`, `INFO`, `Overrides`, `Delay`)
select a.DeviceNumber, a.alertType, a.alertUUID, a.alertTime, a.faultTime, e.info, e.overrides, a.delay
from (select *, proctime() as proctime from processed_alert) a
left join device_sla_jdbc for system_time as of a.proctime e
on a.DeviceNumber = e.device_id;

Table API 和 Stream API 的转换

在需要保存设备状态,然后根据设备状态做逻辑判断的场景下,更适合用 stream API 编写 process function 处理。我们可以根据具体的业务场景选择用 stream API 还是 table API 处理。

  • Table 转 stream
Table alertTable = alertTableEnv.from("alert");
DataStream<Alert> alertStream = alertTableEnv.toDataStream(alertTable, Row.class)
        .keyBy(value -> (Integer) value.getFieldAs("DeviceNumber")).process(new AlertFunction());
  • Stream 转 table
alertTableEnv.createTemporaryView("processed_alert", alertStream);

5. 方案建议

建议

  1. 有一个可以运行 terraform 的 EC2 或 local laptop。
  2. 一个用来部署 KDA/Lambda function/RDS 的 VPC,至少有两个 private subnet,一个 security group。
  3. 配置好 AWS credentials,准备一个用来执行部署的 IAM user/role。
  4. 已有支持 SASL 认证方式的 MSK cluster 及 raw data 的 topic,如:device_err_topic。
  5. MSK 和 RDS 的 secret key 使用 Secret Manager 进行管理。
  6. MSK cluster 提前创建好 topic。
  7. 您可以考虑使用控制台/CDK/Terraform 创建 RDS postgreSQL/KDA application。
  8. KDA application 的 KPU 数量设置建议根据您实际的数据量以及处理逻辑的复杂度来做评估。

测试

生成设备运行数据的消息,按照告警 rule 的不同组合,将消息打进 MSK 的 Raw Data 的 input topic 中,  消费两个 alert/dispatch output topic 查看告警消息或撤销告警消息是否生成。

6. 总结及扩展方法

该方案通过对 Flink CEP 的试验,并结合具体的业务逻辑,经过 CEP 和 interval join 的对比,使用 Flink SQL interval join,通过对输入设备运行数据的处理,并结合设备分级的配置信息,产生不同级别的告警信息,同时对于乱序的消息也采用 watermark 进行一定程度的处理,通过合理设置 table.exec.state.ttl,也可以对时间范围内延迟的消息进行处理。该方案提供的内容主要有:

  1. Flink 处理告警处理及分级的相关业务逻辑。
  2. Terraform 部署 KDA/RDS/Lambda 的代码。
  3. 支持 Flink SQL 在 S3 上更新,重启 application 后生效。

通过对该方案的理解,您可以在此基础之上结合具体的业务逻辑参考用 MSK+KDA 来实践一个高时效性的数据分析或信息处理场景。该方案的设计及实现思路不仅可以适用于制造业的设备告警信息处理,同样也适用于任何需要采用 Flink SQL 进行流式数据处理的其他各个行业的业务场景。

本篇作者

郝亮

AWS Analytics 快速原型解决方案架构师,负责根据客户实际的业务场景,利用最新最适用于场景的大数据技术,基于 AWS 服务快速搭建核心系统,解决客户的关键业务诉求,验证方案的可行性。

贾婷

AWS 快速原型解决方案架构师,致力于帮助客户设计和构建大数据方向的快速原型方案,有游戏、汽车等行业的大数据技术经验。

曹琪

Andy Cao 是亚马逊云科技的 Senior Customer Solutions Manager,在亚马逊云科技主要支持制造业,游戏和 OTA 等行业的用户。专注于在亚马逊云科技用户上云期间运用云相关解决方案帮助亚马逊云科技用户实现自身的业务价值。他始终坚持运用亚马逊云科技已有的数据分析,机器学习和 AIGC 的能力帮助用户在业务上做出更多的创新。

邹勇军

亚马逊云科技资深商务拓展经理。专注于服务支持大型 MNC 制造业企业客户数字化转型,助力客户通过数字化转型和数字化创新取得业务上的成功。

叶骏

亚马逊云科技资深解决方案架构师。拥有超过 18 年的零售行业、制造行业以及数字营销领域的技术产品研发和解决方案架构经验。目前专注于将 AWS 云平台技术应用于实际解决方案,为客户实现技术创新和成功的技术落地。

巫佳杰

西云数据技术客户经理,拥有超过 10 年企业通信和云技术领域的客户成功和技术咨询经验。目前负责为亚马逊云科技中国区客户提供企业级技术支持和专业指导。协助客户在亚马逊云平台上安全稳定运行业务以及做好云财务管理。