Amazon Web Services ブログ

Amazon Athena のパフォーマンスチューニング Tips トップ 10

2024 年 2 月に更新された原文を日本語版として 9 月に反映しました: この記事は、コストベースの最適化とクエリ結果の再利用を含む Amazon Athena エンジンバージョン 3 の変更を反映するために確認および更新されました。

Amazon Athena は、オープンソースのフレームワークに基づいた対話型分析サービスで、標準の SQL を使って Amazon Simple Storage Service (Amazon S3) に格納されたオープンテーブルおよびファイル形式のデータを簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要で、実行したクエリに対してのみ料金を支払います。Athena は使いやすく、Amazon S3 内のデータを指定し、スキーマを定義すれば、標準 SQL を使ってクエリを実行できます。

この投稿では、クエリのパフォーマンスを向上させるためのヒントのトップ10を紹介します。Amazon S3 へのデータ保存とクエリ特有のチューニングに関連する側面に焦点を当てます。

ストレージ

本セクションでは、Athena で最大限の効果を得るためのデータの構造化方法について紹介します。Amazon S3 にデータを保存する場合、同じ実践方法を Amazon EMR のデータ処理アプリケーション (Spark、Trino、Presto、Hive など) にも適用できます。以下のベストプラクティスについて説明します。

  1. データのパーティション分割
  2. データのバケット化
  3. 圧縮の利用
  4. ファイルサイズの最適化
  5. 列指向のファイル形式の利用

1. データのパーティション分割

パーティション分割は、テーブルを論理的なパーティションに分割し、日付、国、地域などのカラム値に基づいて関連するデータを同じパーティションに格納します。パーティションは仮想的な列として機能します。パーティションはパーティション値に基づいてデータが論理的に区分されます。テーブル作成時にパーティションを定義し、クエリごとにスキャンするデータ量を減らすことで、パフォーマンスが向上します。パーティションに基づいたフィルタを指定することで、クエリでスキャンするデータ量を制限できます。詳細は、Partitioning data in Athenaをご覧ください。

次の例は、S3 バケットに年ごとにパーティション分割されたデータセットを示しています。

$ aws s3 ls s3://athena-examples/flight/parquet/
 PRE year=1987/
 PRE year=1988/
 PRE year=1989/
 PRE year=1990/
 PRE year=1991/
 PRE year=1992/
 PRE year=1993/

このデータセットの場合、テーブルには PARTITIONED BY (year STRING) 句を含め、Athena に年単位でパーティション分割されていることを伝える必要があります。テーブル作成後、たとえばALTER TABLE ADD PARTITION を使用するか、AWS Glue クローラーを使用するか、MSCK REPAIR TABLE を実行して、各パーティションを追加する必要があります(Iceberg テーブルではテーブルレイアウト情報が追跡されるため、MSCK REPAIR TABLE の実行は必要なく、サポートもされていません)。また、テーブルはパーティションプロジェクションを使用するように構成することもできます (設定方法については、ボーナスチップのセクションを参照してください)。

パーティション化されたテーブルを問い合わせる際、WHERE句でパーティションキーを使用することで、スキャン対象となるパーティションを限定できます:

SELECT dest, origin FROM flights WHERE year = '1991'

このクエリを実行すると、Athena は年のパーティションキーにプレディケート (フィルター) があることを認識し、一致するパーティションからのみデータを読み込みます。この場合、s3://athena-examples/flight/parquet/year=1991/ のデータのみが読み込まれます。

データセットには複数のパーティションキーを持つことができます。以下は、AWS Open Data Registry の NOAA Global Historical Climatology Network データセットからの例です。このデータセットは、STATION と ELEMENT でパーティション分割されており、コマンドを実行すると、特定の STATION(=ASN00023351) の ELEMENT パーティションのリストを取得することができます。

$ aws s3 ls --no-sign-request s3://noaa-ghcn-pds/parquet/by_station/STATION=ASN00023351/
 PRE ELEMENT=DAPR/
 PRE ELEMENT=DWPR/
 PRE ELEMENT=MDPR/
 PRE ELEMENT=PRCP/

このデータセットのテーブルには、PARTITIONED BY (station STRING, element STRING) 句が含まれ、Athena にこのようにパーティション分割されていることを伝えます。

パーティション分割するカラムを決定する際は、以下の点を考慮してください。

  • クエリに適したパーティションキーを選択します。クエリから逆算して、データセットをフィルタリングするのによく使われるフィールドを見つけてください。
  • パーティションキーのカーディナリティ(パーティションキーが取りうるユニークな値の数を指します)は比較的低い方が良いです。テーブル内のパーティション数が増えるほど、パーティションメタデータの取得と処理のオーバーヘッドが高くなり、ファイルサイズが小さくなります。パーティションキーのカーディナリティが高すぎると、パーティション分割のメリットが失われる可能性があります。
  • データがある特定のパーティション値に偏っていて、ほとんどのクエリがその値を使用する場合、パーティション分割のメリットがオーバーヘッドで打ち消される可能性があります。

時間の経過とともに増大するデータセットは、一般的に日付でパーティションされるべきです。特定の期間、例えば過去 1 週間や過去 1 か月を見るクエリは一般的なパターンです。日付でパーティションすることで、全体のデータセットのサイズが時間とともに増大しても、これらのクエリが読み取るデータ量は一定に保たれます。

次の表は、パーティション分割されたテーブルとパーティション分割されていないテーブルのクエリ実行時間を比較しています。このテーブルは、業界標準のベンチマークデータセット TPC-H から取得されています。テーブルの両バージョンには、74 GB の非圧縮テキストデータが格納されています。パーティション分割されたテーブルは、l_shipdate 列でパーティション分割され、2,526 のパーティションがあります。

クエリ パーティション分割されていないテーブル コスト パーティション分割されたテーブル コスト 節約分
. 実行時間 スキャンされたデータ . 実行時間 スキャンされたデータ . .
SELECT COUNT(*)
 FROM lineitem 
 WHERE l_shipdate = '1996-09-01'
4.8 秒 74.1 GB $0.36 0.7 秒 29.96 MB $0.0001

99% 安価

85% 高速

SELECT COUNT(*)
 FROM lineitem 
 WHERE l_shipdate >= '1996-09-01' 
 AND l_shipdate < '1996-10-01'
4.4 秒 74.1 GB $0.36 2.0 秒 898.58 MB $0.004

98% 安価

54% 高速

EXPLAIN コマンドを使用すると、クエリによってどのパーティションが読み取られるかを確認できます:

EXPLAIN 
 SELECT COUNT(*)
 FROM lineitem 
 WHERE l_shipdate = '1996-09-01'

出力で SOURCE 項目を探し、その中の PARTITION_KEY ラベルを確認してください。この行は、クエリによって読み取られるパーティションを示しています。

…
 Fragment 1 [SOURCE] 
    Output layout: [count_0] 
    Output partitioning: SINGLE [] 
    Aggregate[type = PARTIAL] 
    │   Layout: [count_0:bigint] 
    │   Estimates: {rows: 1 (9B), cpu: 0, memory: 9B, network: 0B}
    │   count_0 := count(*)
    └─ TableScan[table = awsdatacatalog:tpc_h:lineitem] 
           Layout: [] 
           Estimates: {rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
           l_shipdate:string:PARTITION_KEY 
		   :: [[1996-09-01]]

テーブルに複数のパーティションキーがある場合、それぞれのキー値について行が出力されます。クエリがパーティションキーの複数の値に一致する場合、出力にはそれぞれの値が含まれます。たとえば、クエリを変更して l_shipdate の値の範囲を選択した場合、最後の 2 行は次のようになります。

l_shipdate:string:PARTITION_KEY 
     :: [[1996-09-01], [1996-09-02], [1996-09-03], [1996-09-04], [1996-09-05]]

次の表に示すように、パーティション分割にも、クエリでパーティションフィルターが使用されない場合にパフォーマンス低下が生じます。データを過剰にパーティション分割しないよう注意してください。過剰なパーティション分割は、多数の小さなファイルを生成するため、パフォーマンスが低下します。この点については、この記事の後半で詳しく説明します。

クエリ パーティション分割されていないテーブル パーティション分割されたテーブル 節約
. 実行時間 スキャンされたデータ 実行時間 スキャンされたデータ .
SELECT COUNT(*)
 FROM lineitem
3.4 秒 74.1 GB 8.9 秒 74.1 GB 62% 遅い

パーティション分割のもう一つのペナルティは、クエリに一致するパーティションを見つけるのにかかる時間です。この問題を軽減する方法の 1 つは、テーブルにパーティションインデックスを有効にすることです。これにより、テーブルにパーティションが数万個以上ある場合のパフォーマンスが向上する可能性があります。パーティションインデックスを使用すると、すべてのパーティションのメタデータを取得するのではなく、クエリのフィルタ内のパーティション値のメタデータのみがカタログから取得されます。その結果、このようにパーティションが多数あるテーブルに対するクエリが高速化されます。次の表は、パーティションインデックスがない場合とある場合のパーティション分割テーブルのクエリ実行時間を比較しています。このテーブルには約 10 万個のパーティションと非圧縮のテキストデータが含まれています。orders テーブルは o_custkey 列でパーティション分割されています。

クエリ パーティションインデックス = 無効 パーティションインデックス = 有効 高速化
. 実行時間 実行時間 .
SELECT COUNT(*)
 FROM orders 
 WHERE o_custkey BETWEEN 1 AND 100
19.5 秒 1.2 秒 16 倍

Athena での AWS Glue Data Catalog のパーティションインデックスの利点の詳細については、AWS Glue Data Catalog パーティションインデックスを使用して Amazon Athena クエリのパフォーマンスを向上を参照してください。

データをパーティション分割する方法の例については、この記事の後半にある最適化されたデータセットの作成に関するセクションを参照してください。

2. データのバケット化

クエリが読み取らなければならないデータ量を減らす別の方法は、各パーティション内のデータをバケット化することです。バケット化とは、ある列の値に基づいてレコードを別々のファイルに分散させる手法です。これにより、同じ値を持つすべてのレコードが同じファイルに入ります。バケット化は、高いカーディナリティを持つ列があり、多くのクエリがその列の特定の値を検索する場合に有用です。バケット化の良い候補は、ユーザーやデバイスの ID などの列です。

Athena で既にバケット化されたデータセットがある場合、CREATE TABLEステートメントの中で CLUSTERED BY (<バケット化されたカラム>) INTO <バケット数> BUCKETS と指定することで、バケット化されたカラムを指定できます。Athena は Hive または Spark でバケット化されたデータセットをサポートしており、Athena の CREATE TABLE AS (CTAS) でバケット化されたデータセットを作成できます。

次の表は、c_custkey 列を使用して 32 個のバケットを作成した場合の顧客テーブルの違いを示しています。顧客テーブルのサイズは 2.29 GB です。

クエリ 非バケット化テーブル コスト c_custkey をクラスター化カラムとしてバケット化したテーブル コスト 節約分
. 実行時間 スキャンしたデータ . 実行時間 スキャンしたデータ . .
SELECT COUNT(*)
 FROM customer 
 WHERE c_custkey = 12677856
1.3 秒 2.29 GB $0.01145 0.82 秒 72.94 MB $0.0003645 97% 安く
37% 高速

前述のクエリに対して EXPLAIN ANALYZE を実行すると、バケット化が customer テーブルから Amazon S3 からのデータ読み取り量を減らすのに役立ったことがわかります。バケット化されていないテーブルとバケット化されたテーブルのクエリに対する EXPLAIN ANALYZE 出力の抜粋から、入力行数とデータサイズの違いがわかります。

以下はバケット化されていないテーブルの出力です:

…
─ ScanFilterProject[table = awsdatacatalog:tpc_h:customer, filterPredicate = ("c_custkey" = 12677856), projectLocality = LOCAL, protectedBarrier = NONE] 
           Layout: [] 
           Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
           CPU: 19.48s (99.94%), Scheduled: 37.43s (99.97%), Blocked: 0.00ns (0.00%), Output: 1 row (0B)
           Input avg.: 202702.70 rows, Input std.dev.: 4.83%
           c_custkey := c_custkey:int:REGULAR 
           Input: 15000000 rows (2.29GB), Filtered: 100.00%, Physical input: 2.29GB, Physical input time: 0.00ms

以下はバケット化されたテーブルの出力です:

…
─ ScanFilterProject[table = awsdatacatalog:tpc_h:customer buckets=32, filterPredicate = ("c_custkey" = 12677856), projectLocality = LOCAL, protectedBarrier = NONE] 
           Layout: [] 
           Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
           CPU: 654.00ms (100.00%), Scheduled: 1.13s (100.00%), Blocked: 0.00ns (0.00%), Output: 1 row (0B)
           Input avg.: 156250.00 rows, Input std.dev.: 22.35%
           c_custkey := c_custkey:int:REGULAR 
           Input: 468750 rows (72.94MB), Filtered: 100.00%, Physical input: 72.94MB, Physical input time: 0.00ns

Athena でバケット化されたデータを扱う方法の詳細は、以下のリソースを参照してください。

データをバケットに分割する方法の例については、この記事の後半にある最適化されたデータセットの作成に関するセクションを参照してください。

3. 圧縮の利用

データを圧縮すると、クエリの実行速度が大幅に向上します。データサイズが小さくなることで、Amazon S3 からスキャンするデータ量が減るため、クエリの実行コストが下がります。また、Amazon S3 から Athena へのネットワークトラフィックも減少させることができます。

Athena は gzip、Snappy、zstd などの一般的な形式を含む、さまざまな圧縮形式をサポートしています。サポートされている形式の一覧については、Athena の圧縮サポートを参照してください。

JSON や CSV などの圧縮されたテキストデータを問い合わせるには、特別な考慮が必要です。Athena がデータを読み取る際、データの並列処理を最大化するために、ファイルの異なる範囲をさまざまなノードに割り当てます。各範囲は スプリット と呼ばれ、並列で読み取れるファイルは スプリット可能 と呼ばれます。一般的な圧縮形式のほとんどはスプリット不可能です。つまり、リーダーは圧縮ファイルの先頭から読み取る必要があります。これは、データセットが単一の圧縮された CSV ファイルである場合、クエリ処理に使用できるのは 1 つのノードだけであることを意味します。

テキストファイルを圧縮したデータセットを作成する際は、ファイル数とファイルサイズのバランスを心がけてください。ファイルサイズの最適化については、次のセクションで詳しく説明します。

Parquet ファイルと ORC ファイルは、これらの形式がファイルを構成する複数のセクションを個別に圧縮し、ファイル内の各セクションの場所を含むメタデータを持つため、分割が可能です(ファイル全体が1つのセクションで構成されている場合は分割できないので注意が必要です)。

gzip 形式は高い圧縮率を持ち、他のツールやサービスでも幅広くサポートされています。zstd (Zstandard) 形式は、パフォーマンスと圧縮率のバランスが良い新しい圧縮形式です。bzip2 と LZO 圧縮形式は分割可能ですが、パフォーマンスと互換性を重視する場合は推奨されません。

データを圧縮する方法の例については、この記事の後半にある最適化されたデータセットの作成に関するセクションを参照してください。

4. ファイルサイズの最適化

データを並列で読み取れ、1回の読み取り要求で可能な限り多くのデータを読み取れるときに、クエリは効率的に実行されます。各ファイルの読み取りにはオーバーヘッドがあり、例えばメタデータの取得、Amazon S3 への要求、圧縮辞書のセットアップなどが発生します。これは通常は気付かれませんが、ファイル数が増えるにつれて、オーバーヘッドが蓄積されます。クエリのパフォーマンスを最大化するには、ファイル数とサイズのバランスを取る必要があります。

一般的なガイドラインとして、128 MB 程度の分割を目指すことをお勧めします。分割とは、ファイルの一部を指します。たとえば、非圧縮テキストファイルのバイト範囲、または Parquet ファイルのページです。圧縮のセクションで説明したように、ほとんどの圧縮テキストファイルは分割できないため、一括して処理されます。Parquet や ORC などの分析用に最適化された形式は、いつでも分割可能です。

小さなファイルが多数生成される理由の 1 つは、前のセクションで説明したパーティション分割の過剰分割です。クエリのパフォーマンスが小さなファイルが多すぎるために低下していることを示す兆候は、クエリ統計の計画フェーズが全実行時間の数パーセントを超えることです。最悪の場合、クエリが「Please reduce your request rate.」という Amazon S3 エラーで失敗する可能性があります。これは、大量のファイルに対し、Athena が Amazon S3 のサービスクォータを超える数の参照リクエストを実行する場合に発生します。 詳細については、ベストプラクティスデザインパターン: Amazon S3 のパフォーマンス最適化を参照してください。

小さいファイルの問題を解決する 1 つの方法は、Amazon EMR の S3DistCP ユーティリティを使うことです。これを使って、小さなファイルを大きなオブジェクトにまとめることができます。また、S3DistCP を使えば、HDFS から Amazon S3 へ、Amazon S3 から Amazon S3 へ、Amazon S3 から HDFS へと、大量のデータを最適化された方法で移動できます。このセクションの最後で、Athena Spark を使ってデータを再処理する別の方法について説明します。

ファイル数が少なく、サイズが大きい方が、ディレクトリ内のファイルリスト取得が速くなり、Amazon S3 へのリクエスト数が減り、管理するメタデータも少なくなるというメリットがあります。

例えば、次の表は、100,000 個のファイルを読み取る必要があるクエリと、同じデータセットを単一のファイルとして格納したクエリの違いを比較しています。両方のファイルセットには同じデータが、圧縮されていないテキストファイルとして格納されています。データの総量は 74 GB です。

クエリ ファイル数 実行時間
SELECT COUNT(*)
 FROM lineitem
100,000 ファイル 11.5 秒
SELECT COUNT(*)
 FROM lineitem
1 ファイル 4.3 秒
処理速度向上率 . ~ 62%

同様に、次の表は、データが圧縮された場合にファイル数の違いがどのような影響を与えるかを比較しています。データは前の例と同じですが、それぞれ 10 ファイルと 100 ファイルに gzip 圧縮されています。

クエリ ファイル数 平均ファイルサイズ 実行時間
SELECT COUNT(*)
 FROM lineitem
10 ファイル 2.4 GB 60 秒
SELECT COUNT(*)
 FROM lineitem
100 ファイル 243 MB 6.8 秒
処理速度向上率 . . 約 88%

ファイルサイズ、ファイル数、ファイルが圧縮されているかどうかによって、クエリのパフォーマンスに大きな違いが生じます。データが圧縮されていない場合、Athena は最適なサイズでファイルを並列処理できます。これにより、単一の非圧縮テキストファイルのほうが 10 万個の非圧縮ファイルを処理するよりも効率的です。データが圧縮されている場合、ファイル数とサイズの影響はさらに大きくなりますが、この場合、Athena がデータセットを並列処理できるように十分な数のファイルが必要になります。

データセットを最適化する方法については、この記事の後半にある小さなファイルを結合してデータセットを書き換える例を参照してください。

5. 列指向のファイル形式の利用

Apache ParquetApache ORC は、分析ワークロードで人気のあるファイル形式です。これらは、行ごとではなく列ごとにデータを格納することから、列指向のファイル形式と呼ばれることが多いです。また、読み込む必要のあるデータ量を様々な方法で削減できる機能も備えています。たとえば、列を個別に格納・圧縮することで、より高い圧縮率を実現でき、クエリで参照される列のみを読み込めます。

列指向のファイル形式では、データに対して複数の圧縮戦略が使用されます。たとえば、多くの繰り返し値を含む列は、値を 1 回格納し、繰り返し回数を付加するランレングス圧縮を使用してエンコードすることも、各値を検索テーブルへのポインタに置き換える辞書符号化を使用してエンコードすることもできます。テキストデータは、gzip、Snappy、zstd などの標準的な圧縮形式で圧縮できます。詳細については、Athena の圧縮サポートを参照してください。

Parquet と ORC は、さまざまなデータセットに合わせて調整できます。たとえば、ブロック (Parquet) またはストライプ (ORC) サイズを大きくすると、状況によっては有益な場合があります。データセットに多数のカラムがある場合は、Parquet のデフォルト 128MB、ORC のデフォルト 64MB から大きくすることをお勧めします。これにより、各カラムの十分な値が一緒に格納され、読み取りの回数が減ります。

列指向のファイル形式を使ってデータセットを調整する別の方法は、クエリに頻繁に含まれる列でデータを並べ替えておくことです。Parquet と ORC は、各データブロックの列の最小値と最大値などのメタデータを格納しています。つまり、クエリエンジンは、そのブロックに含まれる値がクエリに一致しないと判断した場合、そのブロックのデータを読み込む必要がありません。これは述語プッシュダウンと呼ばれます。たとえば、データにはタイムスタンプのようなものがあり、この属性でファイル内のデータを並べ替えておくと、特定の時間範囲を探すクエリは、タイムスタンプの前後のブロックのデータを読み込む必要がなくなります。

タイムスタンプによる並べ替えとパーティション分割を組み合わせると、さらにパフォーマンスが向上し、コストを節約できます。たとえば、数時間の時間枠で集計を行うことが多い場合を考えてみましょう。時間単位でパーティション分割することは可能ですが、ファイルが多すぎたり小さすぎたりするリスクがあります。代わりに、日単位でパーティション分割し、データをタイムスタンプで並べ替えることができます。このようにすると、粗粒度のパーティション分割を使用してクエリに含まれるファイルセットを一致するパーティションのみに減らし、並べ替えを使用して残りのファイル内のブロックをスキップできます。パーティションキーとタイムスタンプ列の両方でフィルタリングを行うことを忘れないでください。

次の表は、テキスト gzip、ソート無しの Parquet gzip、および l_partkey でソートされた Parquet gzip の同じデータセットに対する実行時間とスキャンされたデータを比較しています。

クエリ

SELECT l_orderkey
FROM lineitem
WHERE l_partkey = 17766770

テキスト形式と比較した節約分
テキスト gzip データ 実行時間 11.9 秒 .
スキャンしたデータ 23.7 GB .
コスト $0.1 .
ソート無しの Parquet gzip データ 実行時間 2.1 秒 ~ 82% 高速化
スキャンしたデータ 2.0 GB .
コスト $0.009 ~ 91% 安価
l_partkey でソート済みの Parquet gzip データ 実行時間 1.1 秒 ~ 90% 高速化
スキャンしたデータ 38.8 MB .
コスト $0.0001 ~ 99.9% 安価

最適化データセットの作成

このセクションでは、Athena Spark を使用してデータセットを変換し、前のセクションで説明した最適化を適用する方法を示します。このコードは、Amazon EMR ServerlessAWS Glue ETL など、ほとんどの他の Spark ランタイムでも使用できます。Athena SQL を使ってデータを変換し、この記事で説明されている多くの最適化を適用することもできますが、Athena Spark の方がプロセスに対する設定オプションとコントロールが多くなります。

次のコードは最初に tpc_h データベースをデフォルトに設定します。このデータベースの場所は、データが書き込まれる場所を決定するために使用されます。次に、以下の操作を実行して customer_optimized という新しいテーブルを作成します:

  • customer テーブルのすべての行を読み込みます
  • coalesce を使用して、バケットごとにパーティションごとに書き込まれるファイル数を 4 に減らします
  • sortWithinPartitionsc_name によってレコードを並べ替えます
  • partitionByc_mktsegmentc_nationkey によってパーティション分割し、bucketByc_custkey によって 32 のバケットに分割し、zstd で圧縮された Parquet ファイルに書き込みます

次のコードを参照してください:

spark.sql("use tpc_h")
 spark\ 
    .read.table("customer")\ 
    .coalesce(4)\ 
    .sortWithinPartitions("c_name")\ 
    .write\ 
    .partitionBy("c_mktsegment", "c_nationkey")\ 
    .bucketBy(32, "c_custkey")
    .saveAsTable("customer_optimized", format="parquet", compression="gzip")

この例では、すべての最適化を同時に示しています。使用ケースによっては、1 つまたはいくつかの最適化だけが必要な場合があります。それぞれの最適化がもっとも役立つ場合については、この記事の前のセクションを参照してください。

Amazon EMR、EMR Serverless、AWS Glue ETL、または Athena SQL を使用してデータを処理する方法の詳細については、以下のリソースを参照してください。

クエリチューニング

Athena SQL エンジンは、オープンソースの分散クエリエンジン TrinoPresto 上に構築されています。その動作を理解することで、クエリを実行する際の最適化の手がかりが得られます。このセクションでは、以下のベストプラクティスについて解説します。

  1. ORDER BYの最適化
  2. 結合の最適化
  3. GROUP BYの最適化
  4. 近似関数の利用
  5. 必要なカラムのみを含める

6. ORDER BY の最適化

ORDER BY 句は、クエリの結果を並べ替えた順序で返します。Athena は分散ソートを使用して、複数のノードでソート操作を並列に実行します。上位または下位 N 件の値を見るために ORDER BY 句を使用する場合は、LIMIT 句を使用してソートのコストを削減し、クエリの実行時間を短縮できます。

例えば、次の表は、サイズが 7.25 GB の約 6,000 万行のテキスト形式非圧縮テーブルを使用したデータセットに対するクエリ実行時間をまとめたものです。

クエリ 実行時間
SELECT *
 FROM lineitem 
 ORDER BY l_shipdate
274 秒
SELECT *
 FROM lineitem 
 ORDER BY l_shipdate 
 LIMIT 10000
4.6 秒
高速化 98% 高速

7. 結合の最適化

適切な結合順序を選ぶことは、クエリのパフォーマンスを向上させるために重要です。2 つのテーブルを結合する際は、大きい方のテーブルを結合の左側に、小さい方のテーブルを右側に指定してください。等値条件を使う一般的な結合の場合、Athena は右側のテーブルから検索テーブルを構築し、ワーカーノードに配布します。次に左側のテーブルをストリーミングし、検索テーブル内の一致する値を探して行を結合します。これは分散ハッシュ結合と呼ばれます。右側のテーブルから構築された検索テーブルはメモリ内に保持されるため、そのテーブルが小さいほど、使用するメモリが少なく、結合の実行速度が速くなります。

ハッシュテーブルがワーカーノード間に分散されているため、データスキュー(テーブル内のデータがクラスター内で偏って保持される状態)がパフォーマンスに影響を与える可能性があります。結合条件で使用されるカラムの値に偏りがある場合、1 つのノードが結合の大部分を処理しなければならず、他のノードは遊休状態になってしまいます。最高のパフォーマンスを得るには、結合条件のカラムに値が均一に分布するようにしてください。

次の表は、テキスト形式で圧縮されていない合計 74 GB のデータセットに対する実行時間を示しています。lineitem テーブルには約 6 億行、part テーブルには約 2,000 万行があります。

クエリ 実行時間
SELECT COUNT(*)
 FROM lineitem, part
 WHERE l_partkey = p_partkey
6.4 秒
SELECT COUNT(*)
 FROM part, lineitem
 WHERE p_partkey = l_partkey
8.1 秒
高速化 約 20% 高速

Athena コンソールの実行詳細ビジュアライザーを使用すると、結合の実行順序を確認できます。ビジュアライザーには、各テーブルから結合された行数も表示されます。ビジュアライザーの使用方法の詳細については、完了したクエリの統計と実行詳細の表示を参照してください。

3 つ以上のテーブルを結合する場合は、中間結果を減らすために、最初に大きなテーブルと最も小さなテーブルを結合し、次に他のテーブルと結合することを検討してください。

コストベースの結合最適化

テーブルに AWS Glue Data Catalog でテーブル統計情報がある場合、Athena はこれらを使用して、コストベースの最適化 (ここでの「コスト」は計算コストを指します) によりジョイン順序の変更と集約プッシュダウンを実行します。テーブルに統計情報がある場合、クエリ最適化エンジンはテーブルの順序のどれが最も効率的かを把握でき、自動的に最適化を行えます。つまり、大きなテーブルを手動でジョインの左側に配置する必要がなくなります。

コストベースの最適化機能の使用方法の詳細については、Amazon Athena でコストベースの最適化機能を使ってクエリを高速化するおよびコストベースの最適化機能の使用を参照してください。

パーティションテーブルの結合

パーティションテーブルを問い合わせる際は、すべてのテーブルのすべてのパーティションキーでフィルタを含めることが最適です。これにより、クエリプランナーはファイルのリストと読み取りをできるだけ省略できるようになります。

次の例では、orders テーブルと lineitem テーブルが注文日 (orderso_orderdatelineiteml_orderdate) でパーティションされています。最初のクエリでは l_orderdate に条件がないため、エンジンは lineitem テーブルのすべてのパーティションをスキャンする必要があります。注文日を結合条件に追加すると、クエリプランナーは 2 つのテーブルに対して1つのパーティションのみを読み込めば良いことがわかり、スキャンされるデータ量が大幅に減少します。

クエリ スキャンされたデータ 実行時間
SELECT AVG(l_extendedprice)
 FROM lineitem 
 JOIN orders ON (l_orderkey = o_orderkey)
 AND o_orderdate = '1993-07-08'
68.1 GB 106 秒
SELECT AVG(l_extendedprice)
 FROM lineitem 
 JOIN orders ON (l_orderkey = o_orderkey 
 AND l_orderdate = o_orderdate)
 AND o_orderdate = '1993-07-08'
35.4 MB 2 秒

前述のように、EXPLAIN を使用してクエリによってどのパーティションが読み取られるかを確認できます。これは、複数のパーティション化されたテーブルを結合する際に特に重要です。

クエリに関わるテーブルの 1 つ以上のパーティションが、クエリの実行時に発見される情報に依存することがあります。最悪の場合、クエリプランナーがクエリを分析してパーティションを決定できないため、すべてのパーティションを読み取る必要があります。しかし、このような場合でも、Athena は 動的パーティションプルーニングと呼ばれるメカニズムを使用して、クエリの実行中にパーティションの読み取りをスキップできることがよくあります。たとえば、エンジンが結合条件にパーティションキーが関与していることを認識し、右側の値の数が少ない場合、ワーカーノード間でこの情報をブロードキャストできます。その後、ワーカーノードはこの情報を使用して、結合条件で後から除外されるパーティションのファイル読み取りをスキップできます。

次の例では、orders テーブルと lineitem テーブルが注文日 (orderso_orderdatelineiteml_orderdate) でパーティション分割されています。lineitem テーブルは合計で約 75 GB の CSV で、orders は約 16 GB です。このクエリは、パーティションの約 10% に出現する特定の条件セットで注文された品目の平均価格を計算します。これらのパーティションは事前に分からないため、最悪の場合は 90 GB のデータをスキャンする必要がありますが、実際にはわずか 26.5 GB しかスキャンしません。

SELECT AVG(l_extendedprice)
 FROM lineitem 
 JOIN orders ON (l_orderkey = o_orderkey AND l_orderdate = o_orderdate)
 WHERE o_clerk = 'Clerk#000094772'
 AND o_orderpriority = '1-URGENT'
 AND o_orderstatus = 'F'

クエリが実行されると、Athena は条件を満たす行の o_orderdate の値を収集します。これらの値をクラスター全体にブロードキャストし、ノードが lineitem テーブルの一致しないパーティションの読み込みをスキップできるようにします。

EXPLAIN コマンドを使用すると、Athena が動的パーティションプルーニングを実行するかどうかを確認できます。出力で dynamicFilterAssignment を探してください。この例のクエリでは、EXPLAIN プランは次のようになります。

…
 Fragment 1 [HASH] 
    Output layout: [avg_4] 
    Output partitioning: SINGLE [] 
    Aggregate[type = PARTIAL] 
    │   Layout: [avg_4:row(double, bigint)] 
    │   Estimates: {rows: 1 (55B), cpu: ?, memory: 55B, network: 0B}
    │   avg_4 := avg("l_extendedprice")
    └─ InnerJoin[criteria = ("l_orderkey" = "o_orderkey") AND ("l_orderdate" = "o_orderdate"), hash = [$hashvalue, $hashvalue_6], distribution = PARTITIONED] 
       │   Layout: [l_extendedprice:double] 
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   Distribution: PARTITIONED 
       │   dynamicFilterAssignments = {o_orderkey -> #df_562, o_orderdate -> #df_563}
       ├─ RemoteSource[sourceFragmentIds = [2]] 
       │      Layout: [l_orderkey:integer, l_extendedprice:double, l_orderdate:varchar, $hashvalue:bigint] 
       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_6], arguments = ["o_orderkey", "o_orderdate"]] 
          │   Layout: [o_orderkey:integer, o_orderdate:varchar, $hashvalue_6:bigint] 
          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
          └─ RemoteSource[sourceFragmentIds = [3]] 
                 Layout: [o_orderkey:integer, o_orderdate:varchar, $hashvalue_7:bigint] 
       …

動的パーティションプルーニングの詳細については、Trino ドキュメントの 動的フィルタリング をご覧ください。

クロスジョインに注意

結合条件は結合のパフォーマンスにも大きな影響を与えます。結合条件が複雑な場合、例えば LIKE> を使用する場合、計算コストがはるかに高くなります。最悪の場合、一方の結合側のすべてのレコードを、もう一方の結合側のすべてのレコードと比較する必要があります。これは クロス結合 と呼ばれます。可能な限り、等値条件を使用してください。

EXPLAIN コマンドを使用すると、Athena がどのようなジョインを実行するかを確認できます。たとえば、ON t1.name LIKE (t2.prefix || '%') のようなジョイン条件でクエリに対して EXPLAIN を実行すると、次のように出力されます。

Fragment 1 [HASH] 
…
└─ CrossJoin[]

クエリプランにクロスジョインが表示される場合は、代わりに等価条件を使用するようにクエリを書き換えることを検討してください。テーブルの 1 つが非常に小さい場合を除き、クロスジョインを含むクエリはクエリタイムアウト制限を超えるリスクが高くなります。

8. GROUP BY の最適化

集約を実行する際は、GROUP BY 句に含めるカラムを可能な限り少なくして、必要な CPU とメモリの量を減らすべきです。さらに、値の分布が可能な限り均一なカラムでグループ化することを確認してください。

集約クエリのパフォーマンス問題の一因はデータスキューです。これは、多くの行が GROUP BY 句のカラムに同じ値を持っている場合に発生する可能性があります。集約中、行は GROUP BY 句のカラムのハッシュに基づいてワーカーノードに分散されます。データにスキューがある場合、1 つのノードが集約の大部分を処理しなければならず、他のノードはアイドル状態になる可能性があります。

冗長なカラムは、SQL 言語が式を GROUP BY 句に含めるか集約関数を使用することを要求するため、しばしば GROUP BY 句に追加されます。たとえば、customer_idcustomer_name カラムがあるテーブルの場合、1 つの customer_id に対して 1 つの名前しかないにもかかわらず、顧客ごとに集計する場合は GROUP BY c_custkey, c_name と書く必要があります。GROUP BY 句に多くの冗長なカラムがある場合にクエリを高速化する方法の 1 つが ARBITRARY 関数です。この関数は、名前が示すように、そのグループから任意の値を返す集約関数です。

この例では、顧客ごとの注文数を知りたいと考えています。顧客テーブルと orders テーブルを結合すると、注文ごとに 1 行になるので、GROUP BY c_custkey を使って顧客ごとに集約します。結果に顧客名を含めたいので、GROUP BY 句に c_name 列を追加する代わりに、ARBITRARY(c_name) を使用します。

SELECT c_custkey,
	 ARBITRARY(c_name) AS c_name,
	 COUNT(*) AS order_count 
 FROM customer 
 JOIN orders ON (customer.c_custkey = orders.o_custkey)
 GROUP BY c_custkey

可能な限り、GROUP BY 句から不要なカラムを削除する必要があります。前の例のように 1 つのカラムしかない場合、パフォーマンスの向上は目立ちません。しかし、GROUP BY 句に多数の列がある大規模なデータセットに対するクエリのパフォーマンスにとっては非常に重要です。

9. 近似関数の利用

大規模なデータセットを探索する際の一般的なユースケースは、COUNT(DISTINCT column) を使用して特定の列の重複のない値の数をカウントすることです。例としては、Web ページにアクセスしたユニークユーザーの数を調べることが挙げられます。

正確な数値が必要ない場合 (例えば、どのウェブページを詳しく調べるかを探している場合)、approx_distinct(column) の使用を検討してください。この関数は、完全な文字列ではなく、値のユニークなハッシュをカウントすることで、メモリ使用量を最小限に抑えようとします。注意点は、標準誤差が 2.3% あることです。

次の表は、テキスト形式で圧縮されていない約 6 億行の 74 GB のテーブルを使用したときの高速化の概要をまとめたものです。

クエリ 実行時間
SELECT COUNT(DISTINCT l_comment)
 FROM lineitem
7.7 秒
SELECT approx_distinct(l_comment)
 FROM lineitem
4.6 秒
高速化 約 40% 高速

詳細については、Trino ドキュメントの概算集約関数を参照してください。

10. 必要なカラムのみを含める

クエリを実行する際は、SELECT ステートメントで必要なカラムのみを選択し、すべてのカラムを選択しないようにしてください。カラム数を削減することで、クエリ全体のパイプラインを通して処理する必要があるデータ量が減り、最終的な結果に書き込まれるデータ量も減ります。これは特に、多数の文字列ベースのカラムを持つテーブルを照会したり、複数の結合や集約を実行する場合に特に効果的です。参照データが列指向のファイル形式の場合は、特定のカラムのデータのみが Amazon S3 から読み取られるため、スキャンされるデータ量が減ります。

次の表は、約 6000 万行のテキスト形式の非圧縮 7.25 GB のテーブルを使用したときの高速化の概要をまとめたものです。

クエリ 実行時間
SELECT *
 FROM lineitem, orders, customer 
 WHERE l_orderkey = o_orderkey 
 AND c_custkey = o_custkey
19.7 秒
SELECT c_name, l_quantity, o_totalprice 
 FROM lineitem, orders, customer 
 WHERE l_orderkey = o_orderkey 
 AND c_custkey = o_custkey
5.2 秒
高速化 73% 高速

ボーナスのヒント

このセクションでは、追加のパフォーマンスチューニングのヒントと、この記事の最初のバージョン以降にリリースされた新しいパフォーマンス指向の機能について説明します。

パーティションプロジェクションを使用したパーティション処理の最適化

パーティション数が非常に多く、AWS Glue のパーティションインデックスを使用していない場合、パーティション情報の処理が Athena クエリのボトルネックになる可能性があります。Athena のパーティションプロジェクションを使用すると、パーティション数が非常に多いテーブルのクエリ処理を高速化し、パーティション管理を自動化できます。パーティションプロジェクションは、パーティション情報をメタストアから取得するのではなく、パーティション情報を計算してクエリするため、このオーバーヘッドを最小限に抑えることができます。AWS Glue テーブルにパーティションのメタデータを追加する必要がなくなります(この機能は Athena でのみ利用することができる機能である点にご注意ください)。

パーティションプロジェクションでは、AWS Glue Data Catalog のようなリポジトリから読み取るのではなく、構成から計算されたパーティション値とロケーションが使用されます。メモリ内の操作はリモート操作よりも通常高速であるため、パーティションプロジェクションはパーティション数が非常に多いテーブルに対するクエリの実行時間を短縮できます。クエリと基盤となるデータの特性によっては、パーティションメタデータの取得で遅延が生じるクエリの実行時間を大幅に短縮できる可能性があります。

パーティションプロジェクションを使用するのは、パーティションのスキーマが同じである場合や、テーブルのスキーマがパーティションのスキーマを正確に記述している場合に理想的です。ID のような非常に値の種類が多い列や、非常に細かい粒度の日付範囲でパーティション分割するのに使用できます。

詳細については、Amazon Athena でのパーティションプロジェクションをご覧ください。

大規模な結果セットを生成するクエリの高速化 (UNLOAD の利用)

Athena で SELECT クエリを実行すると、非圧縮 CSV 形式の単一の結果ファイルが Amazon S3 に生成されます。クエリの出力結果が大きくなると予想される場合、単一のファイルへの書き込みに多くの時間がかかります。UNLOAD を使用すると、結果を Amazon S3 の複数のファイルに分割できるため、書き込み段階で費やされる時間が短縮されます。また、結果セットの形式 (ORC、Parquet、Avro、JSON、TEXTFILE) と圧縮タイプ (Parquet、JSON、TEXTFILE の場合はデフォルトで gzip、ORC の場合は zlib) を指定できます。

次の表は、SELECTUNLOADステートメントの比較を示しています。このクエリでは、約 13 GB の非圧縮データが出力されることが予想されます。

クエリ SELECT * FROM lineitem LIMIT 85700000 UNLOAD (SELECT * FROM lineitem LIMIT 85700000) to with (format=’TEXTFILE’) 節約
実行時間 362 秒 33.2 秒 ~ 90% 高速化
結果セット 13 GB (CSV、非圧縮) 3.2 GB (CSV、gzip 圧縮) ~ 75% ストレージ削減

データが変更されていない場合のクエリ結果の再利用

データレイクのデータセットは、多くの場合、1 日に 1 回、または 1 日に数回しか更新されませんが、より頻繁にクエリが実行されることもよくあります。ダッシュボードを更新するためのクエリや、アプリケーションのビューにアクセスするたびに実行されるクエリがある可能性があります。前回実行した時から、データが変更されていない場合は、結果を再計算する必要はありません。実際、再計算するとより時間がかかり、コストも高くなります。このような状況では、クエリ結果の再利用を活用できます。これは、同じクエリが例えば過去 15 分以内に実行された場合、Athena がその実行結果を返すように指示する機能です。そのような結果がある場合、Athena はすぐにその結果を返し、データのスキャンは行われません。

クエリ結果の再利用の詳細については、Amazon Athena のクエリ結果再利用によるコスト削減とクエリパフォーマンス向上およびクエリ結果の再利用を参照してください。

結論

この投稿では、Athena SQL での対話型分析を最適化するためのトップ 10 のヒントを紹介しました。これらの実践方法は、Amazon EMR 上の Trino を使用する際にも適用できます。

この記事のトルコ語翻訳版もご覧いただけます。


著者について


Mert Hocanin は、AWS Lake Formation の Principal Big Data Architect です。


Pathik Shah は、Amazon Athena の Sr. ビッグデータアーキテクトです。2015 年に AWS に入社し、以来ビッグデータ分析の分野に注力し、AWS の分析サービスを使用してスケーラブルで堅牢なソリューションを構築するお客様をサポートしています。


Theo Tolv はスウェーデン、ストックホルムに拠点を置くシニアアナリティクスアーキテクトです。彼はキャリアの大半を小さなデータと大きなデータで仕事をしてきました。そして 2008 年から AWS 上で動作するアプリケーションを構築しています。余暇時間には、電子工作をいじったり、宇宙オペラを読むのが好きです。


監査履歴

2024 年 2 月に Theo Tolv 氏 (シニアアナリティクスアーキテクト) による最終確認と更新が行われました。翻訳はアナリティクススペシャリストソリューションアーキテクトの川村が担当しました。原文はこちらです。