亚马逊AWS官方博客
采用无服务架构归档 Amazon Timestream for InfluxDB 数据到 Amazon S3
背景
时序数据在企业运营中扮演着越来越关键的角色,借助 Amazon Timestream for InfluxDB,您可以轻松地在 AWS 上运行开源 InfluxDB 数据库,用于响应时间为毫秒的时序应用场景,例如金融交易和物联网设备的实时监控。然而,随着业务规模的扩大,企业面临着新的挑战:一方面,多个行业的合规要求长期保存历史时序数据,如汽车、医疗和金融领域;另一方面,物联网设备产生的海量时序数据日益增长,导致数据库的存储成本攀升。虽然可以通过 InfluxDB 的保留策略自动删除旧数据来控制成本,但这与长期保存数据的合规要求相矛盾。
因此,企业需要一个经济实用的解决方案,既能满足合规要求,又能有效管理存储成本。将历史时序数据归档到 Amazon S3 是一个理想选择:S3 提供几乎无限的低成本存储容量。本文将详细介绍如何使用无服务架构,将 Amazon Timestream for InfluxDB 中的历史数据归档到 S3,并使用 Amazon Athena 进行查询,从而实现长期数据存取和成本效益的平衡。
方案架构图
该方案包含下列步骤:
- 创建 Amazon S3 桶用于存储归档数据。
- 在 AWS Secrets Manager 中创建密钥,用于安全管理 Amazon Timestream for InfluxDB 的访问凭证。
- 构建基于 AWS Glue Python Shell 的 ETL 作业,配置定时触发机制,从 InfluxDB 提取历史数据并以 Parquet 格式存储至 S3。
- 使用 Amazon Athena 查询 S3 中归档的数据。Athena 提供了无服务器的查询体验,可以直接针对 S3 数据运行 SQL 查询,无需预先配置任何基础设施。
- 在 InfluxDB 中配置数据保留策略,定期删除指定时间段之前的历史数据,优化数据库存储空间。
1. 创建 S3 桶存放归档数据
在 InfluxDB 所在区域创建一个 S3 桶,为 S3 桶起一个名称,其他选项均保持默认。
2. 使用 Secrets Manager 管理 InfluxDB 密钥
如图所示,在Secrets Manager控制台,创建一个密钥,Secret type选择 Other type of secret,Key/value pairs输入下列信息:
- influxdb_url:InfluxDB 的 URL,例如 https://xxxxxxxxxxxx-us-east-1.timestream-influxdb.amazonaws.com:8086
- influxdb_token:InfluxDB 的 API token
- influxdb_org:要归档的 InfluxDB org 名称
- influxdb_bucket:要归档的 InfluxDB bucket 名称
为密钥取一个名称,其他配置保持默认,然后点击 Create。
3. 配置 Glue
3.1 创建 Connection
我们需要在 Glue 和 InfluxDB 之间创建网络连接。首先在 InfluxDB 实例所在的 VPC 中为 Glue 创建一个安全组,如下图,入站规则和出站规则均保持默认。
创建完成后,为该安全组添加一条自引用入站规则,以允许 Glue 各组件之间进行通信。
然后为 InfluxDB 所在的安全组添加下列入站规则,以允许 Glue 访问:
- Type:选择 Custom TCP
- Port range:输入 8086
- Source:选择 Custom,然后选中之前为 Glue 创建的安全组
在和 InfluxDB 相同的区域,访问 Glue 服务,点击 Create Connection,数据源请选择 Network。
在 Configure connection 界面:
- VPC:选择 InfluxDB 所在的 VPC
- Subnet:选择 InfluxDB 所在的任一子网
- Security groups:选择之前为 Glue 创建的安全组
为 Connection 命名,例如 InfluxDB Network connection,其他配置保持默认,然后创建 Connection。
除了需要和 InfluxDB 建立连接外,Glue Job 还需和其他 AWS 服务以及互联网资源进行交互,因此请确保您的 VPC 具有 NAT 网关。为了进一步增强安全性,您还可以参考文档为 Secrets Manager 创建 VPC Endpoint,参考文档创建 S3 Gateway Endpoint,使 Glue 能够安全地与 Secrets Manager 和 S3进行通信,而无需经过公共互联网。使用 S3 Gateway Endpoint 不会产生额外费用。
3.2 创建 IAM Role
我们需要为 AWS Glue Job 创建 IAM Role,确保其具有访问相关 AWS 服务的适当权限。请在 IAM 服务控制台,创建如下 IAM role:
- Trusted entity type:选择 AWS service
- Use case:选择 Glue
- Permission policies:选择 AWSGlueServiceRole
为此 IAM Role 命名,如 AWSGlueServiceRole-InfluxDB-Daily-Archive
为了使 Glue Job 能够访问 S3 桶和 Secrets Manager 中的密钥,我们还需为此 IAM Role 添加一个 inline policy。policy 如下所示,请用实际的 arn 替换掉 Resource 后的备注:
3.3 创建 Glue Database
Glue Database 用于组织和管理元数据表,元数据包括表结构、分区信息等。Glue Database 直接与 Amazon Athena 集成,使得后续查询 S3 中的归档数据变得简单高效。创建步骤如下:
3.4 创建 Glue Job
在 Glue 控制台,选择 ETL jobs → Script editor → Python shell → Start fresh 创建 Python 脚本。将下列 Python 代码粘贴到编辑器中,该脚本用于将 InfluxDB 数据库前一天的时序数据归档到 S3 存储桶。它用 influxdb-client-python 库查询指定时间范围内的数据,将其处理成 Pandas DataFrame 格式,然后使用 awswrangler(AWS SDK for Pandas)库将数据以 Parquet 格式写入 S3,并根据年、月、日进行分区存储,同时创建/更新 Glue 元数据表。
其中,influxdb-client-python 库支持多种数据返回格式(raw data, csv, flux_table structure, Pandas DataFrame),您可以根据业务需求选择合适的查询方法。通过时间窗口参数控制单次查询的数据量,可防止内存溢出。awswrangler 库简化了与 AWS 服务的交互,Parquet 格式和分区结构则有助于优化查询性能。此外,您还可以参考 Amazon Athena 的十大性能优化技巧,调整压缩格式和数据分区,以获得更好的查询性能。
接下来,在 Job details 配置如下参数:
- IAM Role:选择之前为 Glue 创建的 IAM Role
- Data processing units:选择 1 DPU
- Connections:选择之前为 Glue 创建的 Connection
- Job parameters:添加下列 Key 和 Value
之后请点击 Save 保存。
3.5 定时运行 Glue 归档任务
在已创建的 Glue Job 配置页面中,选择 Schedules → Create schedule 进行定时任务设置。
系统提供多种执行频率选项,选择 Daily 将使归档任务每天执行一次,上述 Python 脚本默认会归档前一天的数据,如果选择其他频次请按照实际情况修改脚本,调整归档的时间范围。此外请注意 Start hour 以 UTC+0 为基准,例如将 Start hour 设置为 1 点,对应北京时间(UTC+8)上午 9 点执行归档任务。
4. 使用 Athena 查询归档数据
访问 Athena 控制台的 Query Editor,您可以看到之前创建的 Glue Database,以及由 awswrangler 自动生成的表结构。
利用标准 SQL 语法,您可以直接查询和分析归档数据,结合之前设置的年、月、日分区结构,可以限制每次查询扫描的数据量,从而提升查询性能并降低成本。
5. 为 InfluxDB 设置数据保留策略
在上述步骤中,我们实现了将 InfluxDB 历史数据归档到 S3,并使用 Athena 进行查询。为了减少归档数据对 InfluxDB 磁盘空间的占用,降低存储成本,我们可以对 Bucket 创建保留策略,删除历史数据,如下图所示。
结论
在此方案中,我们成功地实现了Amazon Timestream for InfluxDB 数据到 S3 的自动归档。该方案满足了长期数据存储的合规要求,在降低了存储成本的同时保证数据易于查询。该方案的主要优势包括:
- 利用 AWS 无服务器服务,减少运维负担,同时通过按需付费模式(Glue Job按运行时间,Athena 按查询量)实现成本优化;
- 通过将历史数据存储在 S3 中,并为 InfluxDB 设置合理的保留策略,自动删除已归档的旧数据,降低了 InfluxDB 的存储成本;
- 使用 Parquet 格式存储数据并对其进行分区,优化查询性能;
- 保留了使用 SQL 查询历史数据的能力,满足了数据分析需求。
我们建议您根据具体业务场景对此方案进行相应调整和优化。
参考资料
- 手把手教你玩转 Timestream For influxDB 实现时序数据存储和分析
- Automate the archive and purge data process for Amazon RDS for PostgreSQL using pg_partman, Amazon S3, and AWS Glue
- Export data by using AWS Glue
- https://github.com/influxdata/influxdb-client-python
- Amazon Athena 的十大性能优化技巧