Amazon Web Services ブログ

Amazon EMR Serverless の可観測性、パート 1: Amazon CloudWatch を使用した Amazon EMR Serverless ワーカーのニアリアルタイムでのモニタリング

Amazon EMR Serverless を使用すると、クラスターやサーバーを管理することなく、Apache Spark や Apache Hive などのオープンソースのビッグデータフレームワークを実行できます。EMR Serverless では、変化するデータ量と処理要件に合わせて数秒でリソースのサイズを変更する自動スケーリング機能により、あらゆる規模の分析ワークロードを実行できます。

EMR Serverless 向けに、Amazon CloudWatch でジョブワーカーメトリクスを導入しました。この機能により、Spark および Hive ジョブの集計ワーカーレベルで、vCPU、メモリ、一時ストレージ、ディスク I/O の割り当てと使用状況のメトリクスをモニタリングできます。

このブログは、EMR Serverless の可観測性に関するシリーズの一部です。このブログでは、CloudWatch メトリクスを使用して EMR Serverless ワーカーをニアリアルタイムでモニタリングする方法について説明します。

EMR Serverless の CloudWatch メトリクス

Spark ジョブレベルでは、EMR Serverless はドライバーとエグゼキューターの両方について、以下の新しいメトリクスを CloudWatch に出力します。これらのメトリクスは、ジョブのパフォーマンス、ボトルネック、リソース使用率に関する詳細なインサイトを提供します。

WorkerCpuAllocated ジョブ実行で割り当てられた vCPU コアの総数
WorkerCpuUsed ジョブ実行でワーカーが使用した vCPU コアの総数
WorkerMemoryAllocated ジョブ実行でワーカーに割り当てられた総メモリ (GB)
WorkerMemoryUsed ジョブ実行でワーカーが使用した総メモリ (GB)
WorkerEphemeralStorageAllocated ジョブ実行でワーカーに割り当てられた一時ストレージのバイト数
WorkerEphemeralStorageUsed ジョブ実行でワーカーが使用した一時ストレージのバイト数
WorkerStorageReadBytes ジョブ実行でワーカーがストレージから読み取ったバイト数
WorkerStorageWriteBytes ジョブ実行でワーカーがストレージに書き込んだバイト数

CloudWatch を使用して EMR Serverless ジョブをモニタリングすることには、以下のメリットがあります:

  • リソース使用率の最適化 – リソース使用パターンに関するインサイトを得て、効率性とコスト削減のために EMR Serverless の設定を最適化できます。例えば、vCPU やメモリの利用率が低い場合は、リソースの無駄を示しており、ワーカーサイズを最適化してコスト削減を実現できます。
  • 一般的なエラーの診断 – ログの調査をせずにエラーの根本原因と対策を特定できます。例えば、一時ストレージの使用状況を監視し、ワーカーごとのストレージを事前に増やすことでディスクのボトルネックを軽減できます。
  • ニアリアルタイムでのインサイト – CloudWatch はほぼリアルタイムのモニタリング機能を提供するため、EMR Serverless ジョブの実行中にパフォーマンスを追跡でき、異常やパフォーマンスの問題をすばやく検出できます。
  • アラートと通知の設定 – CloudWatch では、定義済みのしきい値に基づいて Amazon Simple Notification Service (Amazon SNS) を使用したアラームを設定でき、特定のメトリクスが重要なレベルに達した場合に電子メールやテキストメッセージで通知を受け取ることができます。
  • 履歴分析の実施 – CloudWatch は履歴データを保存するため、時間の経過に伴う傾向を分析し、パターンを特定し、キャパシティプランニングとワークロードの最適化に関する十分な情報に基づいた意思決定を行うことができます。

ソリューションの概要

この可観測性エクスペリエンスをさらに向上させるため、EMR Serverless アプリケーションのすべてのメトリクスを単一の CloudWatch ダッシュボードに集約するソリューションを作成しました。EMR Serverless アプリケーションごとに 1 つの AWS CloudFormation テンプレートを起動する必要があります。同じ CloudWatch ダッシュボードを使用して、単一の EMR Serverless アプリケーションに送信されたすべてのジョブを監視できます。このダッシュボードの詳細とこのソリューションを自身のアカウントにデプロイする方法については、EMR Serverless CloudWatch Dashboard GitHub リポジトリを参照してください。

以下のセクションでは、このダッシュボードを使用して次のアクションを実行する方法について説明します:

  • ジョブのパフォーマンスに影響を与えることなくコストを削減するためのリソース使用率の最適化
  • ログの確認なしで一般的なエラーによる失敗の診断と最適な解決

前提条件

この記事で提供されているサンプルジョブを実行するには、AWS マネジメントコンソールまたは AWS Command Line Interface (AWS CLI) を使用してデフォルト設定で EMR Serverless アプリケーションを作成し、その後、EMR Serverless アプリケーション ID をテンプレートへの入力として提供して GitHub リポジトリから CloudFormation テンプレートを起動する必要があります。

この記事のすべてのジョブは、同じ EMR Serverless アプリケーションに送信する必要があります。別のアプリケーションを監視する場合は、このテンプレートを独自の EMR Serverless アプリケーション ID 用にデプロイできます。

リソース使用率の最適化

Spark ジョブを実行する場合、多くの場合デフォルトの設定から始めます。実際のリソース使用率に関する可視性がない状態では、ワークロードの最適化は困難な場合があります。お客様が最も頻繁に調整する設定の一部にはspark.driver.cores、spark.driver.memoryspark.executor.coresspark.executors.memory があります。

新しく追加された CloudWatch ダッシュボードのワーカーレベルメトリクスが、より良い価格性能とリソース使用率向上のためにジョブ設定を微調整するのにどのように役立つかを説明するために、NOAA Integrated Surface Database (ISD) データセットを使用して変換と集計を実行する次の Spark ジョブを実行してみましょう。

このジョブを EMR Serverless で実行するには、以下のコマンドを使用します。CloudFormation テンプレートを起動した EMR Serverless アプリケーション ID と Amazon Simple Storage Service (Amazon S3) バケットを指定してください。この記事のすべてのサンプルジョブを送信する際は、必ず同じアプリケーション ID を使用してください。また、AWS Identity and Access Management (IAM) ランタイムロールも指定してください。

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-1 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKET_NAME>/scripts/windycity.py",
"entryPointArguments": ["s3://noaa-global-hourly-pds/2024/", "s3://<BUCKET_NAME>/emrs-cw-dashboard-test-1/"]
} }''
Bash

それでは CloudWatch ダッシュボードからエグゼキューターの vCPU とメモリを確認してみましょう。

このジョブはデフォルトの EMR Serverless Spark 設定で送信されました。前述のスクリーンショットの Executor CPU Allocated メトリクスから、合計 396 vCPU (99 エグゼキューター × エグゼキューターあたり 4 vCPU) が割り当てられたことがわかります。しかし、Executor CPU Used に基づくと、ジョブは最大で 110 vCPU しか使用していませんでした。これは vCPU リソースの過剰割り当てを示しています。同様に、Executor Memory Allocated に基づくと、合計 1,584 GB のメモリが割り当てられました。しかし、Executor Memory Used メトリクスから、ジョブの実行中に使用されたメモリは 176 GB のみで、メモリの過剰割り当てを示しています。

ここで、以下の調整後の設定でこのジョブを再実行してみましょう。

元のジョブ (デフォルト設定) 再実行ジョブ (調整後の設定)
spark.executor.memory 14 GB 3 GB
spark.executor.cores 4 2
spark.dynamicAllocation.maxExecutors 99 30
総リソース使用量

6.521 vCPU-時間

26.084 メモリGB-時間

32.606 ストレージGB-時間

1.739 vCPU-時間

3.688 メモリGB-時間

17.394 ストレージGB-時間

課金対象のリソース使用量

7.046 vCPU-時間

28.182 メモリGB-時間

0 ストレージGB-時間

1.739 vCPU-時間

3.688 メモリGB-時間

0 ストレージGB-時間

以下のコードを使用します:

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-2 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKET_NAME>/scripts/windycity.py",
"entryPointArguments": ["s3://noaa-global-hourly-pds/2024/", "s3://<BUCKET_NAME>/emrs-cw-dashboard-test-2/"],
"sparkSubmitParameters": "--conf spark.driver.cores=2 --conf spark.driver.memory=3g --conf spark.executor.memory=3g --conf spark.executor.cores=2 --conf spark.dynamicAllocation.maxExecutors=30"
} }''
Bash

このジョブ実行について、CloudWatch ダッシュボードでエグゼキューターのメトリクスをもう一度確認してみましょう。

2 番目のジョブでは、想定通り vCPU (396 対 60) とメモリ (1,584 GB 対 120 GB) の両方で割り当てが少なくなり、リソースの使用効率が向上しています。元のジョブは 4 分 41 秒で実行されました。2 番目のジョブは 4 分 54 秒かかりました。この再設定により、ジョブのパフォーマンスに影響を与えることなく、79% のコスト削減を実現しました。

これらのメトリクスを使用して、ワーカー数や割り当てリソースを増減することで、ジョブをさらに最適化できます。

ジョブ失敗の診断と解決

CloudWatch ダッシュボードを使用すると、メモリ不足やデバイス上の空き容量不足などの CPU、メモリ、ストレージに関連する問題によるジョブ失敗を診断できます。これにより、ログを確認したり Spark History Server を参照することなく、一般的なエラーを迅速に特定して解決できます。さらに、ダッシュボードからリソース使用率を確認できるため、リソースを過剰に割り当てることなく、必要な分だけリソースを増やして設定を微調整でき、これによってさらなるコスト削減が可能です。

ドライバーのエラー

このユースケースを説明するために、数百万行を含む大きな Spark データフレームを作成する次の Spark ジョブを実行してみましょう。通常、この操作は Spark ドライバーによって実行されます。ジョブの送信時に、多数のカラムを持つデータフレームのタスクシリアル化に必要なため、spark.rpc.message.maxSize も設定します。

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-3 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKET_NAME>/scripts/create-large-disk.py"
"sparkSubmitParameters": "--conf spark.rpc.message.maxSize=2000"
} }''
Bash

数分後、Job details セクションに示されているように、ジョブは「コンテナのリリース中にエラーが発生しました」というエラーメッセージで失敗しました。

理解の難しいエラーメッセージに遭遇した場合、さらなるトラブルシューティングのためにドライバーとエグゼキューターのログを調査することが重要になります。しかし、さらにログを確認する前に、まず CloudWatch ダッシュボードを確認しましょう。特にドライバーのメトリクスを確認します。コンテナのリリースは通常ドライバーによって実行されるためです。

Driver CPU UsedDriver Storage Used はそれぞれの割り当て値の範囲内であることがわかります。しかし、Driver Memory AllocatedDriver Memory Used を確認すると、ドライバーは割り当てられた 16 GB のメモリをすべて使用していたことがわかります。デフォルトでは、EMR Serverless のドライバーには 16 GB のメモリが割り当てられます。

より多くのドライバーメモリを割り当ててジョブを再実行してみましょう。デフォルトのワーカータイプでは spark.driver.memory + spark.driver.memoryOverhead が 30 GB 未満である必要があるため、開始点としてドライバーメモリを 27 GB に設定しましょう。spark.rpc.message.maxSize は変更しません。

aws emr-serverless start-job-run \
—name emrs-cw-dashboard-test-4 \
—application-id <APPLICATION_ID> \
—execution-role-arn <JOB_ROLE_ARN> \
—job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/create-large-disk.py"
"sparkSubmitParameters": "--conf spark.driver.memory=27G --conf spark.rpc.message.maxSize=2000"
} }''
Bash

今回はジョブは成功しました。CloudWatch ダッシュボードでドライバーメモリの使用状況を確認してみましょう。

ご覧の通り、割り当てられたメモリは現在 30 GB ですが、ジョブ実行中の実際のドライバーメモリ使用量は 21 GB を超えませんでした。したがって、spark.driver.memory の値を削減することで、ここでさらにコストを最適化できます。同じジョブを spark.driver.memory を 22 GB に設定して再実行したところ、より良いドライバーメモリ使用率でジョブは依然として成功しました。

エグゼキューターのエラー

可観測性のために CloudWatch を使用することは、ドライバー関連の問題の診断に理想的です。ジョブあたり 1 つのドライバーしかなく、使用されたドライバーリソースは単一のドライバーの実際のリソース使用量だからです。一方、エグゼキューターのメトリクスはすべてのワーカーにわたって集計されます。しかし、このダッシュボードを使用して、ジョブを成功させるために適切な量のリソースを提供し、リソースの過剰割り当てを避けることができます。

この説明のために、すべてのワーカーで均一なディスクの過剰使用をシミュレートする次の Spark ジョブを実行してみましょう。このジョブは、数年分の非常に大きな NOAA データセットを処理し、非常に大きなデータフレームをディスク上に一時的にキャッシュします。

aws emr-serverless start-job-run \ --name emrs-cw-dashboard-test-5 \ --application-id <APPLICATION_ID> \ --execution-role-arn <JOB_ROLE_ARN> \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://<BUCKET_NAME>/scripts/noaa-disk.py" } }'
Bash

数分後、Job details セクションで「No space left on device」エラーでジョブが失敗したことがわかります。これは、一部のワーカーのディスク容量が不足したことを示しています。

ダッシュボードの Running Executors メトリクスを確認すると、99 のエグゼキューターワーカーが実行されていたことがわかります。各ワーカーには、デフォルトで 20 GB のストレージが提供されています。

Spark タスクの失敗原因を確認するため、Executor Storage AllocatedExecutor Storage Used のメトリクスをダッシュボードから確認しましょう。(ドライバーでは Spark タスクが実行されないため、エグゼキューターのメトリクスを確認します。)

ご覧のように、99 のエグゼキューターは、割り当てられた合計 2,126 GB のストレージのうち、1,940 GB を使用しています。これには、エグゼキューターがシャッフルしたデータとデータフレームのキャッシュに使用されたストレージの両方が含まれます。このグラフでは 2,126 GB 全体が使用されているのが見えないのは、99 のエグゼキューターのうち、ジョブが失敗する前(これらのエグゼキューターがタスクの処理を開始してデータフレームのチャンクを保存する前)にデータをあまり保持していなかったエグゼキューターが数台あった可能性があるためです。

パラメータ spark.emr-serverless.executor.disk を使用して エグゼキューター のディスクサイズを増やして、同じジョブを再実行してみましょう。
まずは エグゼキューター あたり 40 GB のディスクから試してみます。

aws emr-serverless start-job-run \ --name emrs-cw-dashboard-test-6 \ --application-id <APPLICATION_ID> \ --execution-role-arn <JOB_ROLE_ARN> \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://<BUCKET_NAME>/scripts/noaa-disk.py" "sparkSubmitParameters": "--conf spark.emr-serverless.executor.disk=40G" } }'
Bash

今回は、ジョブは正常に実行されました。Executor Storage AllocatedExecutor Storage Used のメトリクスを確認してみましょう。

Executor Storage Allocatedspark.emr-serverless.executor.disk の値を 2 倍にしたため、4,251 GB になりました。エグゼキューターの合計ストレージが 2 倍になったにもかかわらず、ジョブは 4,251 GB のうち最大でも 1,940 GB しか使用しませんでした。これは、エグゼキューター のディスク容量が数 GB 程度不足していただけだったことを示しています。したがって、前のシナリオのようにストレージコストを節約するために、spark.emr-serverless.executor.disk を 40 GB ではなく、25 GB や 30 GB のような低い値に設定することができます。
さらに、Executor Storage Read BytesExecutor Storage Write Bytes を監視して、ジョブが I/O ヘビーかどうかを確認できます。この場合、EMR Serverless の シャッフル最適化ディスク 機能を使用して、ジョブの I/O パフォーマンスをさらに向上させることができます。

このダッシュボードは、ディスクへのスピルシナリオを含め、データフレームのキャッシュや永続化中に使用される一時的なストレージに関する情報を取得する際にも役立ちます。次のスクリーンショットに示すように、Spark History Server の Storage タブはキャッシュアクティビティを記録します。
ただし、このデータはキャッシュが削除されるかジョブが終了すると Spark History Server から失われます。そのため、一時的なストレージの問題による失敗したジョブの分析には、Executor Storage Used を使用できます。

この例では、データがエグゼキューター間で均等に分散されていました。しかし、データスキュー(たとえば、99 の エグゼキューター のうち 1-2 個だけが最も多くのデータを処理し、その結果、ディスク容量が不足する)がある場合、CloudWatch ダッシュボードはこのシナリオを正確に捉えることができません。これは、ストレージデータがジョブのすべてのエグゼキューター のメトリクスを集約して集計されるためです。個々のエグゼキューターレベルの問題を診断するには、エグゼキューターレベルごとのメトリクスを追跡する必要があります。私たちは EMR Serverless と Amazon Managed Service for Prometheus の統合を通じて、ワーカーレベルのメトリクスが発見が困難な問題の特定、緩和、解決にどのように役立つかについて、より高度な例を探ります。

まとめ

この投稿では、強化された EMR Serverless メトリクスを含む単一の CloudWatch ダッシュボードを使用して、EMR Serverless アプリケーションを効果的に管理および最適化する方法を学びました。これらのメトリクスは、EMR Serverless が利用可能なすべての AWS リージョンで利用できます。この機能の詳細については、ジョブレベルの監視を参照してください。

本記事は Amazon EMR Serverless observability, Part 1: Monitor Amazon EMR Serverless workers in near real time using Amazon CloudWatch を翻訳したものです。翻訳はソリューションアーキテクトの平川 大樹が担当しました。


著者について

Kashif Khan は AWS のシニアアナリティクススペシャリストソリューションアーキテクトで、Amazon EMR、AWS Lake Formation、AWS Glue、Amazon Athena、Amazon DataZone などのビッグデータサービスを専門としています。ビッグデータ分野で 10 年以上の経験を持ち、スケーラブルで堅牢なソリューションの設計に関する幅広い専門知識を有しています。彼の役割は、アーキテクチャのガイダンスを提供し、顧客と密接に協力して AWS アナリティクスサービスを使用したカスタマイズされたソリューションを設計し、データの可能性を最大限に引き出すことです。

Veena Vasudevan は AWS のプリンシパルパートナーソリューションアーキテクトおよびデータ&AI スペシャリストです。彼女は、顧客やパートナーが高度に最適化され、スケーラブルで安全なソリューションを構築し、アーキテクチャを最新化し、ビッグデータ、アナリティクス、AI/ML ワークロードを AWS に移行するのを支援しています。