Amazon Web Services ブログ
Goodreads はどのように Amazon DynamoDB テーブルを Amazon S3 にオフロードし、Amazon Athena を使用してクエリを実行するのか
Goodreads では現在、モノリシックな Rails アプリケーションをマイクロサービスに分解している途中です。これらのサービスの大半は、Amazon DynamoDB をプライマリデータストアとして使用することに決めました。DynamoDB はストレージとスループットのさまざまなニーズに対応して、一貫した、数ミリ秒のパフォーマンスを提供できるため気に入っています。
しかし DynamoDB は高スループットの読み書きワークロードで優れていますが、1 回限りの、アドホックなクエリやデータウェアハウスワークロードをサポートするようには最適化されていません。しかし、AWS Data Pipeline、Amazon S3、AWS Glue、Amazon Athena を組み合わせることで、DynamoDB から S3 にデータセットをエクスポートし、Athena を使用してデータセットで SQL クエリを実行できます。
アーキテクチャの概要
このプロセスの当社のアーキテクチャは次のとおりです。
- AWS Data Pipeline は DynamoDB テーブルからデータをスキャンし、JSON として S3 に書き込みます。終了したら、Amazon SNS トピックで配信します。
- AWS Lambda で呼び出された AWS Glue ジョブは、JSON データを Parquet に変換します。
- AWS Glue クローラーは S3 の Parquet データを AWS Glue データカタログに追加し、クエリ用に Athena で利用可能にします。
JSON 形式のデータをクエリして、Parquet 変換の余計なステップを排除できました。しかし、Parquet はスペース効率が高く、高性能であるため、追加の努力を行うことにしました。大規模なデータセットでは、このアプローチはより高速なクエリに変換するだけでなく、コストも節約します。
主要なアーキテクチャコンポーネント
アーキテクチャの主なコンポーネントは次のとおりです。
Data Pipeline は Apache Pig、Apache Hive、または Apache Sparkis などのビッグデータテクノロジーを使用して Amazon EMR クラスターをスピンアップし、フォールトトレラントジョブを実行するためのオーケストレーションサービスです。Data Pipeline は任意の DynamoDB テーブルから Amazon S3 へデータをエクスポートするためのテンプレートを提供します。標準のエクスポートテンプレートを少し変更したものを使用します。
このバージョンでは、Amazon SNS で成功または失敗のメッセージを送信する機能を追加しました。これにより、Lambda を使用して Data Pipeline サービス以外の処理を開始することができます。
AWS Glue はこのアーキテクチャでは 3 つの異なる方法で使用されます。
- サーバーレス Apache Spark 環境では、JSON エクスポートを Data Pipeline から Apache Parquet 形式に変換するジョブが実行されます。
- AWS Glue クローラーはデータセットのスキーマを自動的にクロールして推論し、AWS Glue データカタログに追加します。
- AWS Glue データカタログは、データセットのメタデータストアであるため、Athena でデータをクエリできます。
Athena はデータが AWS Glue データカタログに入った後に使用されます。この時点で、ANSI SQL を使用して Athena でクエリできます。
インフラストラクチャの設定
このプロセスでは、AWS CloudFormation を使用して AWS リソースを管理しています。さまざまな AWS リソースを 3 つのスタックに分割して、それらをより構成しやすくしました。
reviews.yaml テンプレートはレビューと呼ばれる DynamoDB テーブルの例を定義します。common.yaml テンプレートには、スタック全体で共有する IAM と S3 リソースが含まれています。dynamodb-exports.yaml テンプレートは、Data Pipeline、Lambda 関数、AWS Glue ジョブ、AWS Glue クローラーを定義します。
レビュースタックの操作
reviews.yaml CloudFormation テンプレートには、書籍のユーザーレビューを格納するための単純な DynamoDB テーブル定義が含まれています。特定のユーザーが書籍の各レビューをネストするハッシュキーとソートキー構造を使用しています。この構造により、アプリケーションは、単純な get 操作でユーザーが書籍のレビューがあるかを確認したり、ユーザーがすべてのレビューを一覧表示することができます。
DynamoDB テーブルスキーマの操作
reviews.yaml で定義されたテーブルは、ハッシュキーとソートキーテーブルです。User 属性はハッシュキーで、Book 属性はソートキーです。このテーブルにアプリケーションを構築する場合は、他のアクセスパターン (例えば書籍のレビューを最高に重み付けするなど) に対応するために、グローバルセカンダリインデックス (GSIs) を追加することがあります。
まず、CloudFormation スタックを作成します。
- この [Launch Stack] ボタンをクリックします。
- 画面の下部にある [Next] ボタンを選択します。
- Options 画面で、すべてをデフォルトのまま、画面の下部の [Next] を選択します。
- [Review] で [Create] を選択します。
次に、Reviews テーブルでテスト項目を作成します。ReviewsStack ステータスが CREATE_COMPLETE になった後で、DynamoDB コンソールを開いて、テーブルを参照します。テーブルにいくつかの項目を追加しましょう。
- AWS マネジメントコンソールで DynamoDB を開きます。
- 左側ナビゲーションペインから [Tables] を選択します。
- Reviews テーブルを選択し、[Items] を選択します。
- [Create item] を選択し、[Create item] ボックスで、[Tree] に [Text] を選択します。
- 既存のテキストを削除し、次の項目をコピーしてテキストボックスに貼り付けます。
- [Save] を選択します。
さらに、もう 1 項目追加します。
テーブルスキーマで指定されていない、いくつかの異なるフィールドを追加したことがわかります。特に評価、レビュー、著者です。DynamoDB は NoSQL データベースなので、アプリケーションの進化に合わせて新しい属性を追加できます。しかし、これらの属性を効率的に集計するには、テーブル作成時にプライマリキースキーマであるか、インデックスで定義済みであることが必要です。
Goodreads のレビューテーブルは、当社のサンプルテーブルと似ていないわけではありません。しかし、ユーザーが最も必要とするアクセスパターンをサポートするために、最大 5 つの グローバルセカンダリインデックス (GSIs) を使用しています。当社のデータについての任意の質問に答えるために、短時間しか存在しない GSI を作成することは、もはや選択肢にはなりません。それが可能であったとしても、GSI を作成するためのデータが大きすぎるため、数日かかります。
当社の製品チームが任意の著者のレビューデータに対してクエリを実行することを想定してください。さらに GSI を追加することはできず、アクセスパターンは本番環境では必要ありません。しかし、このブログ記事に記載されているアーキテクチャを使用することで、製品チームのデータセットのロックを解除することができます。
Athena で実行できるさらに興味深い SQL クエリをエクスポートするとき、さらに多くのデータをテーブルに格納するため、テーブルにさらに項目を自由に追加できます。
共通スタックの作成
common.yaml CloudFormation テンプレートは、Data Pipeline、Lambda、AWS Glue が使用するさまざまな IAM と EC2 パーミッションを作成します。さらに、テンプレートは DynamoDB エクスポートを格納するための S3 バケットを作成します。スタック全体で参照する必要があるリソースは、出力セクションで宣言されています。
CloudFormation スタックを次のように作成します。
- この [Launch Stack] ボタンをクリックします。
- 画面の下部にある [Next] ボタンを選択します。
- Options 画面で、すべてをデフォルトのまま、画面の下部の [Next] を選択します。
- Review の Capabilities セクションで、I acknowledge that AWS CloudFormation might create IAM resources with custom names を選択します。
- [Create] を選択します。
DynamoDB エクスポートスタックの作成
dynamodb-exports.yaml テンプレートは、任意の DynamoDB テーブル用の Data Pipeline、SNS トピック、Lambda トリガー、AWS Glue ジョブ、AWS Glue クローラーを作成するために自己完結型テンプレートです。エクスポートしたい複数の DynamoDB テーブルがあれば、dynamodb-exports.yaml テンプレートを再利用して、テーブルごとにスタックを作成できます。
このスタックの最も興味深い部分は、Data Pipeline タスクで作成された任意の DynamoDB エクスポートファイルを Parquet に変換するAWS Glue ジョブ スクリプトです。また Boto3 を使用して、PySpark AWS Glue 環境で使用可能な、raw JSON から DynamoDB タイプ情報を削除します。このコードは十分に文書化されており、カスタム AWS Glue ジョブの書き方に興味がある場合は、ためらわず始めましょう。
CloudFormation スタックを次のように作成します。
ExportTimeout: 1
MaxConsumedReadThroughput: 0.5
TableName: Reviews
- Options 画面で、すべてをデフォルトのまま、[Next] を選択します。
- Review セクションで、下にスクロールして [Create] を選択します。
DynamoDB から AWS Glue データカタログへのデータフローの監視
DynamoDB から Apache Hive カタログへのパイプラインは完全に自動化されています。レビューをエクスポートする CloudFormation スタックがデプロイ後、データパイプラインが開始されます。すぐに Athena のデータをクエリすることができます。
データパイプラインを監視します。
- コンソールで AWS Data Pipeline を開きます。
- ReviewsExport という名前のパイプラインを選択します。
- クラスターのプロビジョニングからジョブの実行まで、さまざまな段階でパイプラインを監視します。
- ステータスが Finished の場合、データは S3 にあります。
パイプラインはエクスポート成功 SNS トピックにメッセージを送信します。こうすることで、Lambda は AWS Glue ジョブを呼び出して、JSON エクスポートを Parquet に変換します。
AWS Glue ジョブを監視します。
- コンソールで AWS Glue を開きます。
- 左側ナビゲーションペインの ETL ヘッダーにある [Jobs] を選択します。
- ジョブの履歴とその他詳細を表示するには、ReviewsExportToParquet の横のチェックボックスを選択します。この時点で Run Status は Running です。
- AWS Glue ジョブは、Run ステータスが Succeeded に達すると終了します。
次に、AWS Glue クローラーを実行します。
- AWS Glue コンソールページから、左側のナビゲーションペインで [Crawlers] を選択します。
- ReviewsParquetCrawler の横にあるチェックボックスを選択します。
- ページの上部にある [Run crawler] を選択します。
クローラーを初回実行時、AWS Glue データカタログの dynamodb-exports データベースに reviews テーブルが追加されます。クローラーを実行後、さらにエクスポートスナップショットが蓄積されると、それ以降のクローラーの実行によって、新しいパーティションがテーブルに追加され、
AWS Glue データカタログのレビューテーブルの検査
次に、レビューテーブルを見てみましょう。
- AWS Glue コンソールページから、[Tables] を選択します。
- [reviews] を選択します。
AWS Glue データカタログは、Apache Hive 互換のメタストアであり、データセットのスキーマを格納します。S3 には、特にオブジェクト数やデータセットの場所などのプロパティが格納されます。
スキーマを見てみると、元々 DynamoDB の項目に追加した属性の一部ではない、ddb_export_timestamp 列があることに気付きます。キー列には、ddb_export_timestamp が Partition (0) としてマークされています。 パーティション列は通常の列と似ていますが、Athena の WHERE 句で使用されるときは、スキャンされるデータの量を制限できます。Athena パーティションドキュメント は詳しく知りたい場合に役に立ちます。
Parquet 変換スクリプトを呼び出す Lambda 関数は、この余分なメタデータを提供します。そのため、AWS Glue クローラーがスキーマを推論するとき、パーティションは名前が存在しない場合に与えられるデフォルトの partition_N 名とは対象的に、便利な名前が与えられます。
Athena を使用してデータセットをクエリする
Athena を使用してデータセットをクエリするには、次の手順を実行します。
- コンソールの Athena を開きます。
- まだの場合には、Hive Catalog を使用して Athena をアップグレードしてください。
- 左側ナビゲーションペインの Database で、[dynamodb-exports] を選択します。
- Tables で [reviews] が表示されます。
- reviews の右側の省略記号を選択し、[Preview table] を選択します。
あなたはたった今 DynamoDB データセットに対して SQL クエリを実行しました! 集計を実行して、J.K. Rowling のレビュー数を数えましょう。覚えているかもしれませんが、このアクセスパターンは、DynamoDB テーブル設計では十分にサポートされていません。
さらに項目を追加した場合、結果が異なる場合がありますが、ここでは表に示した結果が表示されます。
Athena を使用すると、データサイズや複雑さが増しても、ANSI SQL を使用して DynamoDB のデータから正しい情報を引き出すことができます。
次のステップ
この作業を拡張するには、いくつかの方法があります。
- 毎晩、あなたのローカルタイムの真夜中午前 0 時に、DynamoDB のエクスポートを実行するようにデータパイプラインを変更します。
- ローカルタイムの午前 4 時に毎日 AWS Glue クローラーを実行すると、常に最新のスナップショットが DynamoDB に保存されます。
- エクスポート成功トピックを使用して、より複雑なパイプラインまたは集約をトリガーします。
- この手法と S3 のデータレイクで構築を組み合わせます。
結論
この投稿記事では、DynamoDB テーブルからデータをエクスポートし、AWS Glue でより効率的なフォーマットに変換し、Athena でデータをクエリする方法を示します。この手法により、DynamoDB に格納されているデータから正しい情報を引き出すことができます。
その他の参考資料
この記事が役立つと思いましたら、以下の記事も参照ください。
- リアルタイム予測のために Amazon SageMaker を使用して、Amazon DynamoDB のデータを分析する
- AWS Step Functions と AWS Lambda を使って複数の ETL ジョブの統合を行う
- Amazon Kinesis Data Firehose、Amazon Athena、Amazon Redshift を使用して Apache Parquet 最適化データを分析する
- リアルタイム予測のために Amazon SageMaker を使用して、Amazon DynamoDB のデータを分析する
著者について
Joe Feeney は Amazon Author チームのソフトウェアエンジニアで、Amazon のすべてのデータを活用して、作家にユニークで実用的な正しい情報を提供します。彼は妻や子どもたちとマリオカートを楽しみ、ギターを作ったり、改造することが趣味です。