亚马逊AWS官方博客

数据自由、分析无忧 – 字节跳动 ByteHouse 数据融合指南之对象存储 S3

ByteHouse 简介

随着数字化转型浪潮涌来,构建高效的数据分析平台已成为企业的必备战略。这类平台通常采用技术先进的分布式计算框架,可以实现海量异构数据的高速提取、转换和加载。平台还会集成各类高级分析算法,如生成式人工智能、传统机器学习等,通过模型构建洞察用户的需求和行为模式。但是,当企业试图将这样的数据平台推向全球化时,常面临诸如技术标准不统一、业务规则不兼容等问题。

字节跳动自主研发的分布式列式数据库 ByteHouse 与亚马逊云科技进行了深度整合,共同构建了新一代的云数据仓库解决方案。ByteHouse 基于列式存储和向量化查询执行机制,可以实现毫秒级的分析查询响应时间。它支持标准的 SQL 查询接口,并内置了各类高级分析函数,大幅简化了模型构建和维护。

通过与亚马逊云科技的无缝对接,ByteHouse 可以实现海量数据的弹性存储,并支持按需自动扩展计算资源。这种云原生的数据仓库架构,可帮助企业快速建立一个统一的、高性能的数据分析平台,以 SaaS 形式降低企业数字化国际化的门槛。ByteHouse 目前已在亚马逊云科技海外区域 Marketplace 完成上架,客户只需要在亚马逊云科技 Marketplace 页面搜索 ByteHouse 或者通过这个链接就可以进入产品的控制台页面。在亚马逊云科技平台上,ByteHouse 可以作为一个很好的数据解决方案选项,尤其是在使用或者计划使用类似 ClickHouse 的情况下,现在 ByteHouse 使得客户多了一种选择。

ByteHouse 是一个统一的数据平台,支持实时和批量数据导入,并且可以高效地分析 PB 级的数据,采用 ByteHouse 会给带来以下优势:

  • 能够连接本地和远程的数据,无论是实时还是批量
  • 获得近乎无限的可扩展性
  • 实时加速洞察和基于数据的业务决策
  • 近乎零的运维成本,可以完全专注于业务增长

同时,构建在亚马逊云科技平台上的 ByteHouse 架构简洁清晰(关于 ByteHouse 在亚马逊云科技平台构建 SaaS 服务的最佳实践,后续会开启另一个系列博客进行介绍),如下图所示:

整体架构主要包含以下三个模块:

  • 服务层 – 将一系列服务组合在一起,将 ByteHouse 的不同单元连接起来,包括执行元数据管理、访问控制和数据安全等。
  • 计算层 – 在 ByteHouse 中,虚拟仓库(Virtual Warehouse,即 VW)表示一个隔离的计算资源。每个虚拟仓库是一个 MPP 计算集群,用于执行查询。计算资源易于扩展和管理,例如调整 VW 的大小。它们都是相互独立的,不会相互影响,并且原生支持多租户。
  • 存储层 – ByteHouse 使用共享存储来实现高可用性并在 Amazon S3 上持久化数据,通过利用列式存储格式和压缩算法,ByteHouse 能够以闪电般的速度处理海量的数据。

想要开启 ByteHouse 的使用,首先要做的事情就是将已有的数据进行集成,目前 ByteHouse 和亚马逊云科技多种数据源都做好了紧密的集成,从对象存储 S3、关系型数据库 RDS 到流数据 Kinesis Data Streams(KDS)或者 MSK,以及通过 ETL/ELT 方式处理数据,都能在 ByteHouse 服务中快速地完成数据导入。

本文先从对象存储 S3 开始,后续本系列博客会陆续介绍 RDS、KDS 和 MSK 与 ByteHouse 集成的内容。

Amazon S3 数据导入

通常来说,S3 对应的场景需要(定时地)批量导入数据的情况,例如历史日志信息等。本文中的示例使用的是部署在新加坡区域 ap-southeast-1 的 ByteHouse 服务。

配置权限

在使用 ByteHouse 导入数据的时候,需要提供对应亚马逊云科技 IAM 用户的 Access Key(AK)和 Secret Key(SK),其中就需要对应的 S3 读取权限。这里有两种方式可以完成相应的权限设置任务,通过 IAM 策略或者 S3 桶策略,关于 IAM 策略和 S3 桶策略的对比和关系,可以参考 IAM Policies and Bucket Policies and ACLs! Oh, My! (Controlling Access to S3 Resources) 这篇博客,根据实际情况进行选择即可。这里,我们选择使用 IAM 策略,附加在一个示例 IAM 用户上。

根据 ByteHouse 任务的要求,同时尽可能缩小权限范围,这里建议设置以下三个权限:

  • GetBucketLocation:获取存储桶对应的区域
  • GetObject:获取存储对象
  • *ListBucket:后续需要通过列表或者通配符选择多个文件的时候使用

可以参考以下的 IAM 示例策略,其中<your-bucket-name>替换为对应的存储桶就可以,如果放开桶全部对象访问的情况下对象路径为“*”星号。将上面的策略附加给对应的 IAM 用户即可,并记录好 AK/SK。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::<your-bucket-name>/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<your-bucket-name>"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation"
            ],
            "Resource": [
                "arn:aws:s3:::<your-bucket-name>"
            ]
        }
    ]
}

准备数据

目前 ByteHouse 支持的对象数据类型包括:CSV、JSON 、Arvo、Parquet、Excel (.xls)。本文示例使用亚马逊开放数据集平台 Registry of Open Data 中的 NOAA Global Surface Summary of Day 气象数据,该数据集包含来自各个 NOAA 气象站的每日天气摘要,原始数据集主要是纯文本 CSV 格式。这里选取 2023 年的数据,默认存储在 s3://noaa-gsod-pds/2023 中,可以参考以下命令同步到自己的存储桶中。

$ aws s3 sync s3://noaa-gsod-pds/2023/ s3://<your-s3-bucket>/noaa-gsod-pds/2023/

数据基本上长成下面这个样子:

"STATION","DATE","LATITUDE","LONGITUDE","ELEVATION","NAME","TEMP","TEMP_ATTRIBUTES","DEWP","DEWP_ATTRIBUTES","SLP","SLP_ATTRIBUTES","STP","STP_ATTRIBUTES","VISIB","VISIB_ATTRIBUTES","WDSP","WDSP_ATTRIBUTES","MXSPD","GUST","MAX","MAX_ATTRIBUTES","MIN","MIN_ATTRIBUTES","PRCP","PRCP_ATTRIBUTES","SNDP","FRSHTT"
"A0705600464","2023-01-01","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  33.0","19","  31.8","19","9999.9"," 0","988.6","19","  6.9","19","  4.4","19","  9.9"," 15.0","  37.0","*","  27.3","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-02","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  30.1","24","  28.6","24","9999.9"," 0","995.1","24","  8.5","24","  1.6","24","  6.0","999.9","  33.8","*","  23.4","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-03","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  32.1","24","  31.5","24","9999.9"," 0","990.2","24","  8.7","24","  4.8","24"," 13.0"," 18.1","  34.2","*","  30.0","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-04","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  34.7","24","  34.7","24","9999.9"," 0","982.8","24","  4.1","24","  7.4","24"," 13.0"," 22.0","  36.1","*","  32.9","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-05","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  31.6","24","  31.6","24","9999.9"," 0","986.7","24","  4.4","24","  2.7","24","  8.0","999.9","  34.7","*","  28.9","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-06","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  28.2","24","  25.6","24","9999.9"," 0","994.6","24","  9.7","24","  3.4","24","  6.0","999.9","  30.4","*","  26.6","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-07","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  22.0","22","  20.3","22","9999.9"," 0","005.3","22"," 10.0","22","  2.2","22","  5.1","999.9","  25.5","*","  12.9","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-08","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  21.5","24","  19.1","24","9999.9"," 0","003.2","24"," 10.0","24","  5.8","24","  9.9","999.9","  26.2","*","  17.1","*"," 0.00","I","999.9","000000"
"A0705600464","2023-01-09","44.87405","-87.90977","184.4","J DOUGLAS BAKE MEMORIAL AIRPORT, WI US","  21.7","24","  18.7","24","9999.9"," 0","995.4","24","  8.0","24","  4.9","24","  9.9"," 14.0","  30.6","*","  16.7","*"," 0.00","I","999.9","000000"

创建任务

ByteHouse 批量导入任务主要分成以下 4 个步骤完成:

接下来,我们到 ByteHouse 控制台 https://console.bytehouse.cloud/ 创建数据导入的任务。进入控制台后,跳转到 Data Loading 页面,并点击创建新任务。

进入到任务页面后,填写对应的任务名称和描述,并且选择 S3 作为数据源。

之后,点击创建新数据源,并填写对应的 AK/SK。

如果按照上述 IAM 策略进行配置的情况下,会弹出以下提示信息,说明无法列出所有 S3 存储桶,后续需要自己写上存储桶的名称,也可以根据实际情况加上 ListAllMyBuckets 和 ListBucket,这里选择继续手动添加。

注:填写好对应的存储桶后,注意还需要填写一个 path 参数,其主要目的是要获取数据 schema 信息,注意这里并不是说只会导入这一个文件,在任务执行前还会再填写要导入的路径信息,可以包含多个文件。

对于 CSV 文件,还可以再配置其他如表头、分隔符等参数。

对于目标来说,这里可以选择导入到已经创建的表或者创建新表,示例中采用创建新表的方式,ByteHouse 会根据数据判断出数据类型等相关信息,并根据这些内容创建数据表,建议查看识别出的数据列和类型,或者直接使用 SQL 创建数据表,这里可以根据实际需求配置 Order By Key 和 Partition Key 等。

接下来选择对应的数据库和数据表,然后可以配置数据加载类型,目前 ByteHouse 支持三种加载类型:追加数据、分区内覆写和全量数据替换。

点击创建任务,创建完成后并不会自动执行任务,首次会弹出一个选项是否执行。填写对应的路径,注意可以使用通配符的方式,然后开始任务。注意这里的路径需要从 Path Prefix 之后开始写,否则会导致任务失败,示例为了演示采用了通配符的方式加载了特定前缀的对象。

任务开始后就可以查看详细的状态和日志信息:

执行成功后可以查看到具体数据表信息:

也可以通过 SQL Worksheet 进行查询:

后续可以在对应的数据导入任务对其进行执行和编辑等管理操作。

另外,从日志中可以看到 ByteHouse 后台是通过 Spark 来完成数据加载任务的,并且会根据数据的规模进行弹性扩展。

2023-11-07 16:35:06,478 [mainn] INFO  com.bytedance.bytehouse.loader.connection.ConnRetry          null - Executing Query: select partition_key, sorting_key, uuid, primary_key, definition, cluster_key from system.cnch_tables  where database = '205840333.my_db' and name = 'yt8m' | QueryID spark-aebb540b-e18e-46f2-a3ea-7268a2a95341-1 
2023-11-07 16:35:06,718 [mainn] INFO  com.bytedance.bytehouse.loader.schema.jdbc.ChJdbcMethods     null - CnchKeys: CnchKeys(partitionKey=, clusterKey=, sortingKey=id, primaryKey=id, uniqueKey=, tableUUID=bb7fa79b-cfcb-4177-b236-7942127f2eb6, engine=) 
2023-11-07 16:35:06,719 [mainn] INFO  com.bytedance.bytehouse.loader.connection.CnchConnFactory    null - Establishing ByteHouse JDBC Connection at: jdbc:bytehouse://10.0.91.61:9010/ 
2023-11-07 16:35:06,719 [mainn] INFO  com.bytedance.bytehouse.loader.schema.jdbc.ChJdbcMethods     null - ChJdbcMethods::getColumns: describe table `205840333.my_db`.`yt8m` 
2023-11-07 16:35:06,722 [mainn] INFO  com.bytedance.bytehouse.loader.connection.ConnRetry          null - Executing Query: describe table `205840333.my_db`.`yt8m` | QueryID spark-05a9bb68-6be2-4b72-b0dd-c4839ed127dc-2 
2023-11-07 16:35:06,741 [mainn] INFO  com.bytedance.bytehouse.loader.sink.partwriter.PwQueryOpsImpl null - Finding VW: null 
2023-11-07 16:35:06,789 [mainn] INFO  com.bytedance.bytehouse.common.crypto.SecretsDecryptor       null - Reading file from /opt/spark/work-dir/secret/secret.json without decrypting

Amazon S3 外表

除了通过任务直接把数据导入到 ByteHouse 之外,ByteHouse 也支持 S3 的外表模式,可以直接对 S3 中的数据进行查询,对应的引擎是 CnchS3,并且 ByteHouse 也支持外表和普通表进行 Join 操作。

仍然以上面的示例数据集为例,这里数据集被上传到了美东 us-east-1 区域的 S3 存储桶中,而 ByteHouse 集群是在新加坡 ap-southeast-1 区域,因此需要使用带有 S3 Endpoint 地址信息的链接来支持跨区域的访问,相同区域的情况下不需要额外进行处理,参考建表语句如下所示:

CREATE TABLE my_db.noaa_gsod_pds (
`STATION` String, 
`DATE` Date, 
`LATITUDE` Float64, 
`LONGITUDE` Float64, 
`ELEVATION` Float64, 
`NAME` String, 
`TEMP` String, 
`TEMP_ATTRIBUTES` String, 
`DEWP` String, 
`DEWP_ATTRIBUTES` String, 
`SLP` String, 
`SLP_ATTRIBUTES` String, 
`STP` String, 
`STP_ATTRIBUTES` String, 
`VISIB` String, 
`VISIB_ATTRIBUTES` String, 
`WDSP` String, 
`WDSP_ATTRIBUTES` String, 
`MXSPD` String, 
`GUST` String, 
`MAX` String, 
`MAX_ATTRIBUTES` String, 
`MIN` String, 
`MIN_ATTRIBUTES` String,
`PRCP` String, 
`PRCP_ATTRIBUTES` String, 
`SNDP` String, 
`FRSHTT` Int64) 
ENGINE=CnchS3('https://s3.us-east-1.amazonaws.com/<your-bucket-name>/noaa-gsod-pds/2023/*', 'CSVWithNames', 'none', '<AK>', '<SK>');

目前,支持的外表数据类型包括:

  • TabSeparated
  • TabSeparatedRaw
  • TabSeparatedWithNamesAndTypes
  • CSV
  • CSVWithNames
  • JSONEachRow
  • TSKV
  • Protobuf
  • Parquet
  • RowBinary
  • RowBinaryWithNamesAndTypes
  • CapnProto

ByteHouse 也支持 S3 中的压缩数据,具体包括以下几种,括号中的是内容是创建表时候压缩格式的写法,注意如果没有使用压缩格式的话对应的是“none”:

  • Gzip (gzip)
  • Zlib (deflate)
  • Brotli (br)
  • Xz (xz)
  • Zstd (zst)
  • Lz4 (lz4)
  • Bzip2 (bz2)
  • Snappy (snappy)
  • HadoopSnappy (snappy)

针对上面的数据表,可以使用正常的 SQL 查询数据,除此之外这里也支持两个额外的虚拟列“_path”和“_file”,它们分别提供有关 S3 桶路径和文件的相关信息,在查询的时候也可以作为条件使用:

SELECT  _file,
        _path,
        STATION,
        `DATE`
FROM    my_db.noaa_gsod_pds
WHERE   _file LIKE '041620999%.csv'
LIMIT   10

查询结果如下所示:

和 CnchS3 引擎对应的还有 ChchS3 函数,使用起来也是类似的方式,参考 SQL 语句如下所示:

SELECT  _file,
        _path,
        *
FROM    CnchS3(
            'https://s3.us-east-1.amazonaws.com/bytehouse-shtian/noaa-gsod-pds/2023/*',
            '`STATION` String, `DATE` Date, `LATITUDE` Float64, `LONGITUDE` Float64, `ELEVATION` Float64, `NAME` String, `TEMP` String, `TEMP_ATTRIBUTES` String, `DEWP` String, `DEWP_ATTRIBUTES` String, `SLP` String, `SLP_ATTRIBUTES` String, `STP` Float64, `STP_ATTRIBUTES` String, `VISIB` String, `VISIB_ATTRIBUTES` String, `WDSP` String, `WDSP_ATTRIBUTES` String, `MXSPD` String, `GUST` String, `MAX` String, `MAX_ATTRIBUTES` String, `MIN` String, `MIN_ATTRIBUTES` String, `PRCP` String, `PRCP_ATTRIBUTES` String, `SNDP` String, `FRSHTT` Int64',
            'CSVWithNames',
            'none',
            '<AK>',
            '<SK>'
        )
WHERE   _file LIKE '041620999%.csv'
LIMIT   10

查询结果如下所示:

基于 CnchS3 引擎和 CnchS3 函数,用户也可以通过 INSERT INTO destination_table SELECT * FROM source_table 的方式插入数据。

INSERT  INTO
        `my_db`.`noaa_s3`
SELECT  *
FROM    `my_db`.`noaa_gsod_pds`
WHERE   _file LIKE '041620999%.csv'

可以执行 SQL 查询检查数据插入是否成功(示例中是比源表多插入一个 station 站点的数据):

通过 ByteHouse 的 S3 外表模式,用户也可以使用查询亚马逊云科技上各个服务的日志,例如 ALB/NLB/ELB、CloudFront、CloudTrail、Network Firewall、WAF、VPC Flow Logs 等。

定时执行数据导入任务即将在 23 年底前开启,欢迎届时使用。如果想在亚马逊云平台上完成任务的规划和编排,可以通过 EventBridge(定时任务)或者 Amazon MWAA/Airflow/DolphinScheduler(或者可选项,和其他数据任务一起编排),加上 Lambda 或者 EC2(计算)组合的方式完成,然后通过 INSERT INTO SQL 语句或者 Job API 完成,同时可选地将成功和失败的记录在 DynamaDB 中记录,如下图所示。

此外,也可以选择在 Lambda 或者 EC2(计算)中直接读取数据,然后通过 ByteHouse 的 SDK 或者 API 插入数据,如下图所示。

性能指标

本文对数据导入任务和 INSERT INTO 两种加载方法进行了一系列测试(S3 数据和 ByteHouse 集群都在新加坡区域),获取了对应数据导入(单表)的性能数据,用户可以在下表中进行查看并作为一个参考,实际效果还是强烈建议结合自己的数据存储情况进行测试。

数据导入任务:

数据格式 是否压缩 文件数量 大小 数据行数 导入时间(单位 s
CSV 1000 56M 244,451 85
CSV 10000 632M 2,757,810 293
Parquet 1000 4G 1,011,180 361
Parquet 3000 12G 3,034,505 408

CSV 文件采用的是上面提到的 NOAA 数据集,Parquet 文件采用的是 YouTube 8 Million – Data Lakehouse Ready 数据集。另外,在使用 INSERT INTO 方式进行插入的时候,由于是轻量的外表功能,只消耗 1s 时间就完成了数据插入。

使用建议

这里列举一些使用过程中的一些要点,希望对数据导入有所帮助:

  • 源数据根据实际需求使用对应的格式,如果是新开启的业务或者负载,格式上可以优先考虑使用 JSON,另一方面由于 JSON 的体积通常比较大(包含 schema 信息),所以建议使用文件压缩。
  • 压缩算法建议使用可切割的格式,比如 bzip2,ByteHouse 数据导入服务可自动进行切割导入。如果用非可切割的压缩算法,比如 gz, ByteHouse 需要将源文件切分成多个文件,比如每个 64MB 大小。
  • 对于分区表(Partition)或分桶表(Bucket),源文件需要做预聚合(根据Partition/Bucket Key),提高导入效率。

小结

本文首先介绍了ByteHouse在亚马逊云科技平台上可以无缝地和多种数据源进行集成,然后重点展开讨论了从对象存储Amazon S3导入数据的方式,介绍了通过任务和专有引擎/函数的方式加载数据。如果有批量对象存储数据的导入,这种方式较为合适。在实际应用中,更多关键业务数据会存储在数据库和消息队列中,后续几篇内容我们还会继续介绍ByteHouse和亚马逊云服务之间的紧密结合,包括关系型数据库RDS MySQL CDC和流数据MSK/Kinesis实时摄入,以及ETL和ELT等相关内容,敬请关注。

参考资料

https://docs.bytehouse.cloud/en/bytehouse/docs/batch-loading

https://docs.byteplus.com/en/bytehouse/docs/s3-external-table

本篇作者

史天

亚马逊云科技资深解决方案架构师。拥有丰富的云计算、数据分析和机器学习经验,目前致力于数据科学、机器学习、无服务器等领域的研究和实践。译有《机器学习即服务》《基于 Kubernetes 的 DevOps 实践》《Kubernetes 微服务实战》《Prometheus 监控实战》《云原生时代的 CoreDNS 学习指南》等。

Aelfric Lin

字节跳动 ByteHouse 高级产品经理。在数据产品和 AI 产品领域拥有深厚的经验,目前专注于拓展国际市场,并在湖仓一体化、数据生态系统、大型模型等专业领域进行深入研究和实际操作。