亚马逊AWS官方博客

Amazon Kinesis 更新& Amazon Elasticsearch Service 集成,分片级指标和基于时间的迭代器

Original URL:https://thinkwithwp.com/blogs/aws/amazon-kinesis-update-amazon-elasticsearch-service-integration-shard-level-metrics-time-based-iterators/

Amazon Kinesis 让您在云中轻松实现流数据处理。Amazon Kinesis 平台由三种不同的服务组成:Kinesis Streams 允许开发人员构建自己的流处理应用程序;Kinesis Firehose 简化了将流数据加载到 AWS 以进行存储和分析的过程;Kinesis Analytics 支持分析人员使用标准 SQL 查询分析流数据。许多 AWS 客户使用Kinesis Streams Kinesis Firehose 作为实时流数据提取和处理系统的组件。他们能感受到托管服务带来的易用性,并将开发时间投入到应用程序中,而不是花时间管理自己的流数据基础设施。

他们感受完全托管服务带来的易用性,并将开发时间投入到应用程序中,而不是花时间管理自己的流数据基础设施。

今天我们宣布了 Amazon Kinesis Streams 和 Amazon Kinesis Firehose 的三个新功能:

  • Elasticsearch 集成 – Amazon Kinesis Firehose 现在可以将数据流传输到 Amazon Elasticsearch Service 集群。
  • 增强的指标 – Amazon Kinesis 现在可以每分钟向 CloudWatch 发送分片级指标。
  • 灵活性 – Amazon Kinesis 现在允许您使用基于时间的分片迭代器检索记录。

 

Amazon Elasticsearch Service 集成

Elasticsearch 是一个流行的开源搜索和分析引擎。Amazon Elasticsearch Service 是一项托管服务,让您可以在 AWS 云中轻松地部署、运行和扩展 Elasticsearch。您现在可以将 Kinesis Firehose 数据流传输到 Amazon Elasticsearch 集群索引和分析服务器日志、点击流和社交媒体流量。

传入的记录(Elasticsearch 文档)将根据您指定的配置在 Kinesis Firehose 中缓存,然后自动添加到集群并批量索引多个文档。在将数据发送到 Firehose 之前,数据必须已进行 UTF-8 编码并合并为单个 JSON 对象(请参阅我最近的博文,Amazon Kinesis 代理更新新数据预处理功能,以了解有关如何执行此操作的更多信息)。

以下是如何使用 AWS 管理控制台进行设置的方法。选择目标 (Amazon Elasticsearch Service) 并设置传输流名称,然后选择一个 Elasticsearch 域 (本示例中为livedata),设置索引,并选择索引轮换 (无、每小时、每天、每周或每月)。同时指定一个 S3 存储桶,它将接收所有文档或失败文档的备份 (供参考):

然后设置缓冲区大小,为将要发送到 S3 存储桶的数据选择一些压缩和加密选项,设置日志记录 (如果有需要),并选择适当的 IAM 角色:

流将在一分钟左右准备就绪:

然后就可以在控制台中查看传输指标:

一旦数据开始到达 Elasticsearch,就可以使用 Kibana 或使用 Elasticsearch 查询语言来可视化查询数据。

综上所述,这种集成大大简化了捕获流数据并将其传输到 Elasticsearch 集群的过程。无需编写任何代码或构建自己的数据提取工具就可以实现。

分片级指标

每个 Kinesis 流由一个或多个分片组成,每个分片提供固定数量的读写容量。当我们向流平台添加分片时,就会增加流平台的容量。

为了使您能够更好地了解每个分片的性能,现在可以启用一组分片级指标。每个分片有 6 个指标,每个指标每分钟报告一次,并按标准的 CloudWatch 定价收费。这些指标将允许您查看特定分片是否比其他分片负载更高,并查找和根除端到端流数据传输管道中的任何效率底下的情况。例如,您可以识别以过高的速率接收记录的分片,以及应用程序以低于预期的吞吐量读取的分片。

以下是新指标:

IncomingBytes – 已成功放置到分片的字节数。

IncomingRecords – 已成功放置到分片的记录数。

IteratorAgeMilliseconds – 针对一个分片调用 GetRecords 所返回的最后一条记录的年龄 (以毫秒为单位)。值为 0 表示正在读取的记录完全被流捕获。

OutgoingBytes – 从分片中检索的字节数。

OutgoingRecords – 从分片中检索的记录数。

ReadProvisionedThroughputExceeded -已超过每秒 5 次读取或每秒 2 MB 分片限制而受限的GetRecords 调用数。

WriteProvisionedThroughputExceeded – 由于超过每秒 1000 条记录或每秒 1 MB 的分片限制而被拒绝的记录数。

您可以通过调用 EnableEnhancedMonitoring 函数来启用这些指标。与以往一样,您可以使用 CloudWatch APIs 在任何所需的时间段内聚合它们。

基于时间的迭代器

您的应用程序通过使用 GetShardIterator 函数在所需分片上创建迭代器并指定所需的起始点来从 Kinesis 流中读取数据。除了现有的起始点选项 (在序列号、最旧记录或最新记录处或之后),您现在还可以指定时间戳。指定的值 (Unix 时间戳) 表示您要读取和处理的最旧记录的时间点。

 

本篇作者

Jeff Barr

AWS 首席布道师; 2004年开始发布博客,此后便笔耕不辍。