亚马逊AWS官方博客

使用 Amazon Athena 的联合查询和由用户定义的函数简化 ETL 数据管道

Original URL:https://thinkwithwp.com/blogs/big-data/simplify-etl-data-pipelines-using-amazon-athenas-federated-queries-and-user-defined-functions/

Amazon Athena 最近在预览版中增加了对联合查询及由用户定义的函数 (UDF) 的支持。请参阅使用 Amazon Athena 的全新联合查询对任何数据源进行查询,以了解更多详细信息。Jornaya 帮助营销人员智能的与市场中购买大型生活用品(如房屋、抵押贷款、汽车、保险和教育等)的消费者建立联结。

Jornaya 从各种数据源收集数据。我们的主要挑战是,清理这些数据并将其提取到 Amazon S3,以使数据分析师和数据科学家可以进行访问。

传统 ETL 和分析解决方案

2012 年,Jornaya 从 MySQL 迁移到 Amazon DynamoDB作为我们的实时数据库。DynamoDB 让我们这种规模的公司可以享受到创建、读取、更新和删除 (CRUD) 操作的好处,并提供可预测的低延迟、高可用性和出色的数据持久性,而且无需管理数据库。这让我们的技术团队能够专注于解决业务问题,并且快速开发可以推到市场的新产品。

在 NoSQL 数据库上运行分析查询有时非常棘手。我们决定从 DynamoDB 提取数据,然后在此类数据上运行查询。但这并不简单。

Jornaya 使用以下几种从 DynamoDB 获取数据的方法:

  • 利用 EMR:我们临时为DynamoDB 表预置额外的读取容量,创建瞬态 EMR 集群,以便从 DynamoDB 读取数据并写入到 Amazon S3。
    • 我们的 Jenkins 作业会触发启动集群的管道,通过 EMR 提取数据,并使用 Amazon Redshift copy 命令将数据加载到 Amazon Redshift。这是一个昂贵的流程,因为我们会使用了过多的读取容量。为了降低 EMR 成本,我们开始采用竞价型实例。
  • 启用 DynamoDB 流:我们借助内部开发的名为 Dynahose 的 Python AWS Lambda 函数来使用来自流的数据,然后将其写入到 Amazon Kinesis Firehose delivery stream。然后,我们会配置 Kinesis Firehose delivery stream将数据写入 Amazon S3 位置。最后,我们使用另一项内部开发的名为 Partition 的 Python Lambda 函数以确保与写入 Amazon S3 的数据位置对应的分区已被添加到 AWS Glue 数据目录,使其可以通过 AWS Glue、Amazon Redshift Spectrum、EMR 等工具进行读取。

该流程如下图所示。

我们之所以采用这样的管道,因为我们希望使用 SQL 以自然的方式查询与运营数据有关的问题。

利用 Amazon Athena 简化 ETL 工作流并实现更快的分析

Athena 是一项完全托管的无服务器交互服务,可使用 SQL 对 Amazon S3 中的数据进行查询。我们的组织在多个部门都已快速采用这项服务。在我们的用例中,我们不需要始终在线并等待分析查询的 EMR 集群。Athena 的无服务器性质非常适合我们的用例。在此过程中,我们发现可以使用 Athena 运行提取、转换和加载 (ETL) 作业。

但是,Athena 远不止是一项在 Amazon S3 中查询数据的交互服务。我们认为 Athena 还是稳定、有效、可靠、可扩展和高成本效益的 ETL 工具。调度 SQL 语句的能力,及其对 从查询结果创建表 (CTAS)INSERT INTO 语句的支持,帮助我们加速运行 ETL 工作负载。

在 Athena 之前,组织内的业务用户必须依靠工程资源来构建管道。Athena 的推出彻底改变了这种状况。Athena 使软件工程师和数据科学家能够处理原本不可访问,或者需要数据工程师帮助才能使用的数据。

通过Athena新增的联合查询和 UDF 功能,Jornaya 可以将很多不稳定的数据管道替换为 Athena,以便从 DynamoDB 提取与转换数据,并将其写入到 Amazon S3。Jornaya 的产品和工程团队注意到我们的 ETL 失败率降低了。财务部门发现 EMR 和 DynamoDB 的成本都有所下降,随时待命的轮班员工(和他们的家人)也能好好地睡个安稳觉。

例如,一个使用 EMR 的 ETL 管道的构建历史记录看起来可能是这样的(在此图表中,ETL 管道执行历史记录的作业执行 id 为在x 轴上,执行时间在 y 轴上,并以分钟为单位):

在将此管道迁移到 Athena 并使用联合查询对 DynamoDB 进行查询以后,我们现在可以轻松访问数据源,而在过去使用类似于下方的查询是无法做到的:

CREATE TABLE "__TABLE_NAME__"
WITH (
  external_location = '__S3_LOCATION__'
, format = 'PARQUET'
, orc_compression = 'SNAPPY'
, partitioned_by = ARRAY['create_day']
) AS
SELECT DISTINCT
  d.key.s AS device_id
, CAST(d.created.n AS DECIMAL(14, 4)) AS created
, d.token.s AS token
, c.industry AS industry_code
, CAST(CAST(FROM_UNIXTIME(CAST(d.created.n AS DECIMAL(14, 4))) AS DATE) AS VARCHAR) AS create_day
FROM "rdl"."device_frequency_stream" d
  LEFT OUTER JOIN "lambda::dynamodb"."default"."campaigns" c ON c.key = d.campaign_key
WHERE d.insert_ts BETWEEN TIMESTAMP '__PARTITION_START_DATE__' AND TIMESTAMP '__PARTITION_END_DATE__'
  AND d.created.n >= CAST(CAST(TO_UNIXTIME(DATE '__START_DATE__') AS DECIMAL(14, 4)) AS VARCHAR)
  AND d.created.n < CAST(CAST(TO_UNIXTIME(DATE '__END_DATE__') AS DECIMAL(14, 4)) AS VARCHAR);

我们的流程性能大幅提升,其构建历史记录如下图表所示:

结论

使用一个SQL 查询,我们能够处理来自 DynamoDB 的数据,将此类数据转换成 Parquet,应用 Snappy 压缩,在 AWS Glue 数据目录中创建正确的分区,以及将数据提取到 Amazon S3。我们的 ETL 流程执行时间从数小时缩短到几分钟,成本也得到了大幅降低,而且新的流程更简单而且更可靠。在扩展性方面,使用Athena进行ETL的新流程也是面向未来。如果需要从另一个特定用途的数据存储导入数据集,但该数据存储没有现成的数据源连接器,我们可以简单地使用数据源连接器 SDK 编写属于自己的连接器,并在生产中进行部署,而这只需要一次性投入不到一天时间。

除此以外,Athena 联合查询还让 Jornaya 可以对不仅连接来自不同数据源,甚至来自不同数据范式的数据运行查询! 我们可以通过运行一次查询实现无缝链接 NoSQL 数据存储、RDS RDBMS 和 Amazon S3 数据湖的数据。

本博文中的内容和意见属于第三方作者,AWS 不对本博文的内容或准确性负责。


本篇作者

Manny Wald

是 Jornaya 的一名技术联合创始人。他持有多项专利,对于研究云、大数据和 AI 如何加快公司将产品投入市场及解决现实问题情有独钟。他具有 BI、应用程序开发、数据仓库、网络服务,以及构建工具管理交易和分析信息的背景。此外,Manny 成立了互联网上第一个每周嘻哈唱盘主义混音节目,获得过州和联邦的法律执业许可,而且一有机会就跑到篮球场上大显身手。

Janak Agarwal

AWS 的 Athena 产品经理