亚马逊AWS官方博客

采用无服务架构归档 Amazon Timestream for InfluxDB 数据到 Amazon S3

背景

时序数据在企业运营中扮演着越来越关键的角色,借助 Amazon Timestream for InfluxDB,您可以轻松地在 AWS 上运行开源 InfluxDB 数据库,用于响应时间为毫秒的时序应用场景,例如金融交易和物联网设备的实时监控。然而,随着业务规模的扩大,企业面临着新的挑战:一方面,多个行业的合规要求长期保存历史时序数据,如汽车、医疗和金融领域;另一方面,物联网设备产生的海量时序数据日益增长,导致数据库的存储成本攀升。虽然可以通过 InfluxDB 的保留策略自动删除旧数据来控制成本,但这与长期保存数据的合规要求相矛盾。

因此,企业需要一个经济实用的解决方案,既能满足合规要求,又能有效管理存储成本。将历史时序数据归档到 Amazon S3 是一个理想选择:S3 提供几乎无限的低成本存储容量。本文将详细介绍如何使用无服务架构,将 Amazon Timestream for InfluxDB 中的历史数据归档到 S3,并使用 Amazon Athena 进行查询,从而实现长期数据存取和成本效益的平衡。

方案架构图

该方案包含下列步骤:

  1. 创建 Amazon S3 桶用于存储归档数据。
  2. AWS Secrets Manager 中创建密钥,用于安全管理 Amazon Timestream for InfluxDB 的访问凭证。
  3. 构建基于 AWS Glue Python Shell 的 ETL 作业,配置定时触发机制,从 InfluxDB 提取历史数据并以 Parquet 格式存储至 S3。
  4. 使用 Amazon Athena 查询 S3 中归档的数据。Athena 提供了无服务器的查询体验,可以直接针对 S3 数据运行 SQL 查询,无需预先配置任何基础设施。
  5. 在 InfluxDB 中配置数据保留策略,定期删除指定时间段之前的历史数据,优化数据库存储空间。

1. 创建 S3 桶存放归档数据

在 InfluxDB 所在区域创建一个 S3 桶,为 S3 桶起一个名称,其他选项均保持默认。

2. 使用 Secrets Manager 管理 InfluxDB 密钥

如图所示,在Secrets Manager控制台,创建一个密钥,Secret type选择 Other type of secret,Key/value pairs输入下列信息:

为密钥取一个名称,其他配置保持默认,然后点击 Create。

3. 配置 Glue

3.1 创建 Connection

我们需要在 Glue 和 InfluxDB 之间创建网络连接。首先在 InfluxDB 实例所在的 VPC 中为 Glue 创建一个安全组,如下图,入站规则和出站规则均保持默认。

创建完成后,为该安全组添加一条自引用入站规则,以允许 Glue 各组件之间进行通信。

然后为 InfluxDB 所在的安全组添加下列入站规则,以允许 Glue 访问:

  • Type:选择 Custom TCP
  • Port range:输入 8086
  • Source:选择 Custom,然后选中之前为 Glue 创建的安全组

在和 InfluxDB 相同的区域,访问 Glue 服务,点击 Create Connection,数据源请选择 Network。

在 Configure connection 界面:

  • VPC:选择 InfluxDB 所在的 VPC
  • Subnet:选择 InfluxDB 所在的任一子网
  • Security groups:选择之前为 Glue 创建的安全组

为 Connection 命名,例如 InfluxDB Network connection,其他配置保持默认,然后创建 Connection。

除了需要和 InfluxDB 建立连接外,Glue Job 还需和其他 AWS 服务以及互联网资源进行交互,因此请确保您的 VPC 具有 NAT 网关。为了进一步增强安全性,您还可以参考文档为 Secrets Manager 创建 VPC Endpoint,参考文档创建 S3 Gateway Endpoint,使 Glue 能够安全地与 Secrets Manager 和 S3进行通信,而无需经过公共互联网。使用 S3 Gateway Endpoint 不会产生额外费用。

3.2 创建 IAM Role

我们需要为 AWS Glue Job 创建 IAM Role,确保其具有访问相关 AWS 服务的适当权限。请在 IAM 服务控制台,创建如下 IAM role:

  • Trusted entity type:选择 AWS service
  • Use case:选择 Glue
  • Permission policies:选择 AWSGlueServiceRole

为此 IAM Role 命名,如 AWSGlueServiceRole-InfluxDB-Daily-Archive

为了使 Glue Job 能够访问 S3 桶和 Secrets Manager 中的密钥,我们还需为此 IAM Role 添加一个 inline policy。policy 如下所示,请用实际的 arn 替换掉 Resource 后的备注:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": [
                "在Secrets Manager中为InfluxDB创建的密钥的arn"
            ]
        },
        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "存放InfluxDB归档数据的S3桶的arn/*"
            ]
        }
    ]
}

3.3 创建 Glue Database

Glue Database 用于组织和管理元数据表,元数据包括表结构、分区信息等。Glue Database 直接与 Amazon Athena 集成,使得后续查询 S3 中的归档数据变得简单高效。创建步骤如下:

3.4 创建 Glue Job

在 Glue 控制台,选择 ETL jobs → Script editor → Python shell → Start fresh 创建 Python 脚本。将下列 Python 代码粘贴到编辑器中,该脚本用于将 InfluxDB 数据库前一天的时序数据归档到 S3 存储桶。它用 influxdb-client-python 库查询指定时间范围内的数据,将其处理成 Pandas DataFrame 格式,然后使用 awswrangler(AWS SDK for Pandas)库将数据以 Parquet 格式写入 S3,并根据年、月、日进行分区存储,同时创建/更新 Glue 元数据表。

其中,influxdb-client-python 库支持多种数据返回格式(raw data, csv, flux_table structure, Pandas DataFrame),您可以根据业务需求选择合适的查询方法。通过时间窗口参数控制单次查询的数据量,可防止内存溢出。awswrangler 库简化了与 AWS 服务的交互,Parquet 格式和分区结构则有助于优化查询性能。此外,您还可以参考 Amazon Athena 的十大性能优化技巧,调整压缩格式和数据分区,以获得更好的查询性能。

import sys
from awsglue.utils import getResolvedOptions
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient
import pandas as pd
import awswrangler as wr
import boto3
import json

args = getResolvedOptions(sys.argv, ['INFLUXDB_SECRET_NAME','S3_OUTPUT_PATH','GLUE_DATABASE','GLUE_TABLE'])
db_secrets = json.loads(boto3.client('secretsmanager').get_secret_value(SecretId=args['INFLUXDB_SECRET_NAME'])['SecretString'])
s3_output_path = args['S3_OUTPUT_PATH']
glue_database = args['GLUE_DATABASE']
glue_table = args['GLUE_TABLE']
time_window = timedelta(hours=1) 

def query_influxdb_data(start_time, end_time):
    with InfluxDBClient(url=db_secrets['influxdb_url'], token=db_secrets['influxdb_token'], org=db_secrets['influxdb_org'], timeout=7200000) as client:
        query = f'''
        from(bucket: "{db_secrets['influxdb_bucket']}")
          |> range(start: {start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}, stop: {end_time.strftime("%Y-%m-%dT%H:%M:%SZ")})
          |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
          |> drop(columns: ["_start","_stop"]) '''
        print(query)
        result = client.query_api().query_data_frame(query)
        df = pd.concat(result) if isinstance(result, list) else result
        df = df.drop(columns=['result', 'table'])
    
    return df

def backup_to_s3(start_time, end_time):
    current_time = start_time
    while current_time < end_time:
        chunk_end_time = min(current_time + time_window, end_time)
        df = query_influxdb_data(current_time, chunk_end_time)
        df['year'], df['month'], df['day'] = start_time.year, start_time.month, start_time.day
        wr.s3.to_parquet(
            df=df,
            path=s3_output_path,
            dataset=True,
            mode="append",
            database=glue_database,
            table=glue_table,
            partition_cols=["year","month","day"]
        )
        current_time = chunk_end_time
        
if __name__ == "__main__": 
    end_time = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    start_time = end_time - timedelta(days=1)
    backup_to_s3(start_time, end_time)

接下来,在 Job details 配置如下参数:

  • IAM Role:选择之前为 Glue 创建的 IAM Role
  • Data processing units:选择 1 DPU
  • Connections:选择之前为 Glue 创建的 Connection
  • Job parameters:添加下列 Key 和 Value

之后请点击 Save 保存。

3.5 定时运行 Glue 归档任务

在已创建的 Glue Job 配置页面中,选择 Schedules → Create schedule 进行定时任务设置。

系统提供多种执行频率选项,选择 Daily 将使归档任务每天执行一次,上述 Python 脚本默认会归档前一天的数据,如果选择其他频次请按照实际情况修改脚本,调整归档的时间范围。此外请注意 Start hour 以 UTC+0 为基准,例如将 Start hour 设置为 1 点,对应北京时间(UTC+8)上午 9 点执行归档任务。

4. 使用 Athena 查询归档数据

访问 Athena 控制台的 Query Editor,您可以看到之前创建的 Glue Database,以及由 awswrangler 自动生成的表结构。

利用标准 SQL 语法,您可以直接查询和分析归档数据,结合之前设置的年、月、日分区结构,可以限制每次查询扫描的数据量,从而提升查询性能并降低成本。

5. 为 InfluxDB 设置数据保留策略

在上述步骤中,我们实现了将 InfluxDB 历史数据归档到 S3,并使用 Athena 进行查询。为了减少归档数据对 InfluxDB 磁盘空间的占用,降低存储成本,我们可以对 Bucket 创建保留策略,删除历史数据,如下图所示。

结论

在此方案中,我们成功地实现了Amazon Timestream for InfluxDB 数据到 S3 的自动归档。该方案满足了长期数据存储的合规要求,在降低了存储成本的同时保证数据易于查询。该方案的主要优势包括:

  • 利用 AWS 无服务器服务,减少运维负担,同时通过按需付费模式(Glue Job按运行时间,Athena 按查询量)实现成本优化;
  • 通过将历史数据存储在 S3 中,并为 InfluxDB 设置合理的保留策略,自动删除已归档的旧数据,降低了 InfluxDB 的存储成本;
  • 使用 Parquet 格式存储数据并对其进行分区,优化查询性能;
  • 保留了使用 SQL 查询历史数据的能力,满足了数据分析需求。

我们建议您根据具体业务场景对此方案进行相应调整和优化。

参考资料

本篇作者

何培培

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的云计算方案架构咨询和设计。

刘冰冰

亚马逊云科技数据库解决方案架构师,负责基于亚马逊云科技的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入亚马逊云科技之前曾在 Oracle 工作多年,在数据库云规划、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰富的经验。

安在军

亚马逊云科技数据库高级产品经理,负责亚马逊云科技的数据库产品 Go-To-Market, 特别是 DocumentDB 和时序数据库 Timestream for InfluxDB。在过去的 10 多年一直从事数据库领域的方案以及技术在各个行业推广的工作,曾在 Oracle,Sybase 等数据库公司工作。

章平

亚马逊云科技数据库架构师。2014 年起就职于亚马逊云科技,先后加入技术支持和解决方案团队,致力于客户业务在云上高效落地。对于各类云计算产品和技术,特别是在数据库和大数据方面,拥有丰富的技术实践和行业解决方案经验。此前曾就职于 Sun,Oracle,Intel 等 IT 企业。

金川

亚马逊云科技数据库解决方案架构师,负责基于亚马逊云科技的数据库的解决方案咨询与架构设计。在加入亚马逊云科技之前曾在华为、阿里云等公司工作多年,在数据库选型与架构设计、数据库优化、数据迁移、大数据和数仓建设方面有丰富的技术经验,在金融、互联网、通信等行业有丰富的设计和实施经验。