Amazon Web Services ブログ

Kubernetes で Spark パフォーマンスを最適化する

Apache Spark はオープンソースプロジェクトで、分析分野で幅広い人気を博しています。有名なビッグデータや、ストリーミングといったの機械学習ワークロード、幅広いデータセットの処理、ETL などで使用されています。

Kubernetes は、人気のあるオープンソースのコンテナ管理システムで、アプリケーションのデプロイ、メンテナンス、スケーリングのための基本的なメカニズムを提供します。Amazon EKS は、高可用性コントロールプレーンを提供するマネージド Kubernetes サービスで、AWS で本番環境レベルのワークロードを実行します。お客様は、EKS でマイクロサービス、バッチ、機械学習などのさまざまなワークロードを実行できます。

このブログは、Kubernetes で Spark ワークロードをプラットフォームとして実行することを好むエンジニアやデータサイエンティスト向けです。

Kubernetes で Spark を実行することを検討すべき理由

お客様が Kubernetes での Spark の実行を検討する理由を説明します。

Kubernetes は Spark リソースマネージャーのネイティブオプションです

Spark 2.3 以降では、Kubernetes を使用して Spark リソースを実行および管理できます。それ以前は、Hadoop Yarn、Apache Mesos を使用して Spark を実行するか、スタンドアロンクラスターで実行できました。Kubernetes で Spark を実行すると、実験にかかる時間が短くなります。さらに、最小限の複雑さでさまざまな最適化手法を使用できます。

コンテナと Kubernetes エコシステムで実行する利点

Spark アプリケーションをコンテナとしてパッケージ化することで、アプリケーションと一緒に依存関係を単一のエンティティとしてパッケージ化できるため、コンテナのメリットを享受できます。Hadoop バージョンとの互換性に関するライブラリバージョンの不一致に関する懸念がありますが、コンテナを使用することで維持が容易になります。もう 1 つの利点は、バージョン管理を行い、タグをコンテナイメージに適用できることです。このようにして、異なるバージョンの Spark またはその依存関係を使用して実験する必要がある場合に、そうすることが容易になります。

スタックで Kubernetes を活用することで、Kubernetes エコシステムの利点を活用できます。モニタリングやログ記録などに Kubernetes アドオンを活用できます。ほとんどの Spark 開発者は、Spark ワークロードを、より幅広く使われている既存の Kubernetes インフラストラクチャにデプロイすることを選択しました。これにより、開始するための維持とアップリフト作業が少なくなります。クラスタオペレーターは、Kubernetes 名前空間リソースクォータを使用してリソース制限を適用することにより、クラスタへのアクセスを提供できます。このようにして、独自のインフラストラクチャを取得し、他のチームのリソースには踏み入らないようにします。Kubernetes ノードセレクターを使用して、Spark ワークロード専用のインフラストラクチャを保護することもできます。さらに、ドライバーポッドはエグゼキューターポッドを作成するため、Kubernetes サービスアカウントを使用して、Role または ClusterRole でアクセス許可を制御できます。これにより細粒度のアクセス制御を定義して、他のワークロードとともにご自身のワークロードを安全に実行できます。

一時的な Spark ワークロードを実行する場合でも、リアルタイムの Spark ワークロードを実行するビジネス要件がある場合でも、マルチテナントの Kubernetes インフラストラクチャを活用すると、クラスターのスピンアップに時間を取られるのを回避できます。さらに、サイロ化されたクラスターを実行するよりも、データのライフサイクルを管理する他のデータ中心のアプリケーションと一緒に Spark を実行することをお勧めします。このようにすることで、単一のオーケストレーターを使用してエンドツーエンドのライフサイクルソリューションを構築し、他のリージョンでスタックを簡単に再現したり、オンプレミス環境で実行したりすることもできます。

そうは言っても、Spark に対する Kubernetes スケジューラーのサポートはまだ実験段階です。このドキュメントによると、エンドユーザーは、設定、コンテナイメージ、およびエントリポイントの動作に変更がある可能性があることに注意する必要があります。

TPC-DS ベンチマーク

TPC-DS は、意思決定支援ソリューションのパフォーマンスを測定するための事実上の標準ベンチマークです。TPC-DS は、意思決定支援システムを、大量のデータを調査し、実際のビジネスの質問に答え、さまざまな運用要件と複雑さ (アドホック、レポート、反復 OLAP、データマイニングなど) の SQL クエリを実行するシステムとして定義し、CPU と I/O の負荷が高いのが特徴です。

TPC-DS ベンチマークを Amazon EKS で実行し、Apache Yarn と比較しました。 このベンチマークには、SQL 2003 標準の大部分を使用する 104 のクエリが含まれています。これらのクエリの 99 は TPC-DS ベンチマークからのもので、そのうち 4 つには 2 つのバリアント (14、23、24、39) と、最大のテーブル (store_sales) のフルスキャンと集計を実行する「s_max」クエリがあります。

こちらのリンクでベンチマーク結果を確認できます。

要約すると、1 TB のデータセットを使用して TPC-DS ベンチマークを実行したところ、この設定で Kubernetes と Yarn の間で同等のパフォーマンス (完了までの時間が約 5% 短縮) が得られました。

これで納得できました! Kubernetes で Spark を実行する方法

Kubernetes で Spark を実行するには 2 つの方法があります。Spark-submit を使用する方法と、Spark オペレーターを使用する方法です。spark-submit CLI を使用すると、Kubernetes でサポートされているさまざまな設定オプションを使用して Spark ジョブを送信できます。

Spark-submit

./bin/spark-submit \
--master k8s://https://<KUBERNETES_CLUSTER_ENDPOINT> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=3 \
--conf spark.kubernetes.container.image=aws/spark:2.4.5-SNAPSHOT \
--conf spark.kubernetes.driver.pod.name=sparkpi-test-driver \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5-SNAPSHOT.jar

Spark-submit は、Kubernetes で Spark を実行する最も簡単な方法です。コードスニペットを確認すると、2 つの小さな変更点に気付くことでしょう。1 つ目は、EKS コンソールから (または AWS CLI を介して) 取得できる Kubernetes クラスターエンドポイントの変更です。2 つ目は、Spark アプリケーションをホストするコンテナイメージです。ジョブを送信するには、クライアントモードまたはクラスターモードの 2 つの方法があります。2 つの間に微妙な違いがあります。クライアントモードを使用する場合、ドライバーに専用のインフラストラクチャ (エグゼキューターとは別) で実行するように指示できますが、クラスターモードを選択する場合、ドライバーとエグゼキューターの両方が同じクラスターで実行されます。コマンド内で Spark 設定と Kubernetes 固有のオプションを使用できます。

Spark オペレーター

Kubernetes で Spark を実行するためのより好ましい方法は、Spark オペレーターを使用することです。主な理由は、Spark オペレーターが Spark ワークロードにネイティブの Kubernetes エクスペリエンスを提供するためです。さらに、kubectl および sparkctl を使用して Spark ジョブを送信できます。

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
spec:
mode: cluster
  image: “aws/spark:2.4.5"
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5-SNAPSHOT.jar"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    coreLimit: "1024m”
    memory: "512m"
    serviceAccount: spark
  executor:
    cores: 1
    instances: 2
    memory: "512m"

最適化のヒント

以下は、Spark ワークロードのパフォーマンスを向上させることができる最適化のヒントのリストで、一考に値します。 調整対象の特定の設定については、eks-spark-benchmark repo をご覧ください。

Kubernetes クラスター

Kubernetes は汎用のコンテナオーケストレーションプラットフォームであるため、システムに必要なパフォーマンスを実現するには特定のパラメータを微調整する必要があるかもしれません。大まかに、微調整が必要なパラメータは 3 つのカテゴリに分類できます。インフラストラクチャレイヤー (EC2 インスタンス、ネットワーク、ストレージ、ファイルシステムなど)、プラットフォームレイヤー (Kubernetes、アドオン) とアプリケーションレイヤー (Spark、S3A コミッター) です。お客様は、これらのヒントを、パフォーマンスを向上させるために利用できるオプションのセットとして評価するだけでなく、システムの信頼性についても評価し、特定のワークロードに費やしたい金額と比較するのがよいでしょう。

NVMe インスタンスストア

Spark ワークロードの実行には AWS Nitro EC2 インスタンスを使うことをお勧めします。これは、ストレージをブロックするためのより高速な I/O、強化されたセキュリティ、軽量ハイパーバイザーなどの AWS の革新に支えられているためです。EC2 Nitro インスタンスへのブロックレベルのストレージは、 2 つの方法で提供されます。その 2 つとは、EBS のみ、および NVMe ベースの SSD です。たとえば、r5.24xlarge には EBS のみの SSD ボリュームが備わり、EBS 帯域幅が大きく (19,000 Mbps)、r5d.24xlarge には 4 つの 900 GiB NVMe SSD ボリュームが備わっています。TPC-DS ベンチマークテストでは EBS バックアップ SSD ボリュームを使いましたが、NVMe ベースのインスタンスストアと比較して評価することが重要です。それらはホストサーバーに物理的に接続されており、スクラッチスペースとして使用すると、より多くの I/O を駆動できるためです。ローカルの NVMe SSD を Spark のスクラッチスペースとして使用する方法については、「シャッフルパフォーマンス」セクションで説明します。デフォルトではインスタンスにマウントされていないため、ローカル NVMe SSD を使用するには、それらを設定する必要があることに注意してください。eksctl を使用してクラスターを構築する場合は、サンプルの cluster config を使用して、EKS ワーカーノードをブートストラップできます。

nodeGroups:
  - name: spark-nodes
    instanceType: r5d.xlarge
    availabilityZones: ["us-west-2b"]
    desiredCapacity: 1
    minSize: 0
    maxSize: 4
    volumeSize: 50
    ssh:
      allow: true
      publicKeyPath: '~/.ssh/id_rsa.pub'
    preBootstrapCommands:
      - IDX=1
      - for DEV in /dev/disk/by-id/nvme-Amazon_EC2_NVMe_Instance_Storage_*-ns-1; do  mkfs.xfs ${DEV};mkdir -p /local${IDX};echo ${DEV} /local${IDX} xfs defaults,noatime 1 2 >> /etc/fstab; IDX=$((${IDX} + 1)); done
      - mount -a

シングル AZ ノードグループ

通常、ベストプラクティスの 1 つは、可用性を高めるためにマルチ AZ 全体でマイクロサービスを実行することです。デフォルトでは、AWS の Kubernetes は、マルチ AZ によってバインドされたノードにワークロードを起動しようとします。Spark ワークロードのこのパターンには、2 つの潜在的な問題があります。最初に、クロス AZ レイテンシーは通常 1 桁のミリ秒単位であり、シングル AZ 内のノード (マイクロ秒のレイテンシー) と比較すると、シャッフルサービスのパフォーマンスに影響します。Spark シャッフルは、ディスク I/O、データのシリアル化、ネットワーク I/O を伴う高価な操作であり、シングル AZ でノードを選択するとパフォーマンスが向上します。次に、クロス AZ 通信にはデータ転送コストがかかります。Amazon EC2 で「受信」およびそこから「発信」して転送されたデータは、各方向で 0.01 USD/GB の料金が発生します。Spark シャッフルは高いネットワーク I/O 操作であるため、お客様はデータ転送コストを考慮する必要があります。

したがって、シングル AZ を実行してもシステムの可用性が損なわれないかどうかを検証する必要があります。本質的に一時的な Spark ワークロードの場合、シングル AZ は非常に簡単であり、シングル AZ に限定されている Kubernetes ノードグループを実行することを選択できます。マルチテナント Kubernetes クラスターがある場合は、ノードアフィニティなどの高度なスケジューリングを使用したり、後のセクションで説明する Volcano スケジューラーでサポートされている高度な機能を検討したりできます。

自動スケーリングの考慮事項

Spark アーキテクチャには、ドライバーポッドとエグゼキューターポッドが含まれています。これらが、分散コンテキストで連携して動作し、ジョブの結果を出します。ドライバーポッドは、ワーカーノードのエグゼキューターを取得する、アプリケーションコード (JAR または Python で定義) をエグゼキューターに送信する、タスクをエグゼキューターに送信するなど、いくつかのアクティビティを実行します。アプリケーションが完了すると、エグゼキューターポッドは終了しますが、ドライバーポッドは存続し、ガベージコレクションまたは手動でクリーンアップするまで「完了」状態のままになります。ドライバーポッドからログにアクセスして、結果を確認できます。したがって、クラスターのスケールインアクションが原因でドライバーポッドが失敗しないようにすることが重要です。以下のように、ドライバーポッドマニフェストにクラスターオートスケーラー (CA) に関する注釈を追加することで、これを回避できます。

"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"

さらに、マルチテナントクラスタで Spark ワークロードの使用状況に基づいてノードを自動スケーリングすることを選択した場合、Kubernetes Cluster Autoscaler (CA) を使用してこれを行うことができます。CA は、要求に対応するためのリソースが不足しているために失敗したポッドリクエストに基づいて、ターゲットクラスターの容量を推測します。これにより、エグゼキューターポッドがスケジュールできるようになると、スケーリングのレイテンシーが増加します。この問題を軽減するには、優先度の低いポーズポッドを実行して、クラスターをオーバープロビジョニングします (「優先度の高いプリエンプション」を参照)。リソースが限られているシナリオでは、これらの一時停止ポッドは、エグゼキューターポッドを配置するためにスケジューラーによってプリエンプトされます。これにより、一時停止ポッドに対応するために追加のノードを加える CA スケーリングも発生します。

メモリ管理

Spark ワークロードのリソースをより適切に管理するには、Kubernetes がメモリ管理をどのように処理するかを理解することが重要です。Kubernetes ノードは通常、Kubernetes デーモンに加えて多くの OS システムデーモンを実行します。システムデーモンはささいな量を超えたリソースを使用し、その可用性は Kubernetes ノードの安定性にとって重要です。Kubelet は、重要なデーモン用にシステムリソースを予約できるように、Node Allocatable を公開します。たとえば、kube-reserved を使用すると、kubelet、コンテナランタイムなどの Kubernetes システムデーモンのコンピューティングリソースを予約できます。system-reserved を使用すると、sshd、udev などの OS システムデーモン用にリソースを予約できます。詳細については、「システムデーモン用にコンピューティングリソースを予約する」を参照してください。Allocatable 設定を使用して、ポッドのコンピューティングリソースを予約できます。

デフォルトでは、Kubernetes はポッド定義で定義されたリクエスト/制限に基づいて cgroup を使用してメモリ割り当てを行います。これは、spark.executor.memory として知られています。さらに、Kubernetes は spark.kubernetes.memoryOverheadFactor * spark.executor.memory または最小 384MiB を非 JVM メモリの追加クッションとして考慮します。これには、オフヒープメモリ割り当て、非 JVM タスク、さまざまなシステムプロセスが含まれます。デフォルト設定を変更する場合は、spark.executor.memoryOverhead 値を割り当てることにより、この動作をオーバーライドできます。

ヒープ設定に関して、Kubernetes 上の Spark は -Xms (最小ヒープ) と -Xmx (最大ヒープ) の両方を spark.executor.memory と同じになるように割り当てます 。この場合、Xmx は、メモリ不足 (OOM) エラーが原因でエグゼキューターが強制終了されるのを防ぐのに役立つため、ポッドのメモリ制限よりわずかに小さくなります。

OOM エラーが原因でポッドが強制終了される主な理由は 3 つあります。

  1. Spark アプリケーションがより多くのヒープメモリを使用する場合、コンテナ OS カーネルは Java プログラム (xmx<usage<pod.memory.limit) を強制終了します。
  2. メモリ使用量が pod.memory.limit よりも多い場合、ホスト OS の cgroup がコンテナを強制終了します。Kubelet は、OOMKilled コンテナを同じホストまたは別のホストで再起動しようとします。
  3. ワーカーノードでメモリの負荷が発生した場合、Kubelet はノードを保護しようとし、メモリを解放するまでランダムなポッドを強制終了します。これは、より多くのメモリを消費するポッドのみが強制終了されることを必ずしも意味しません。

したがって、OOM エラーを回避するには、このことを念頭に置くことが重要です。これらの設定に慣れていない場合は、java docsSpark on Kubernetes configuration のドキュメントをご覧ください。Spark のメモリ消費をプロアクティブにモニタリングする場合は、Prometheus または同様の監視ソリューションの cadvisor からメモリメトリクス (container_memory_cache および container_memory_rss) をモニタリングすることをお勧めします。

スポットインスタンスの使用

スポットインスタンスは、未使用の EC2 インスタンスで、オンデマンド料金より大幅な割引 (最大 90%) で利用できるインスタンスです。一時的で長時間実行される Spark ワークロードは、スポットインスタンスの魅力的なユースケースです。スポットインスタンスは割り込み可能であるため、適切な緩和策を Spark ワークロードに使用して、タイムリーに完了するようにする必要があります。オンデマンドインスタンスでドライバーポッドを実行することは重要です。ドライバーポッドが中断された場合、ジョブ全体を最初から再開する必要があるためです。オンデマンドとスポットの 2 つのノードグループを実行でき、ノードアフィニティを使用して、オンデマンドノードグループのドライバーポッドとスポットノードグループのエグゼキューターポッドをスケジュールできます。

Amazon FSx for Lustre

Amazon FSx for Luster は、機械学習、高性能コンピューティング (HPC)、ビデオ処理、財務モデリング、電子設計自動化 (EDA) などのワークロードの高速処理用に最適化された高性能ファイルシステムを提供します。このようなワークロードでは、通常、高速でスケーラブルなファイルシステムインターフェイスを介してデータを提示する必要があり、通常、Amazon S3 のような長期データストアにデータセットが保存されています。Github の手順に従って、Kubernetes クラスターに CSI ドライバーをインストールできます。Amazon FSx for Luster は、Amazon S3 と深く統合されています。S3 に保存されているデータのストレージクラスを定義し、Kubernetes ポッドにアクセスできるようにしながら、インポートパスとエクスポートパスを指定できます。Spark ワークロードの場合、ドライバーとエグゼキューターは S3 と直接対話して、I/O 操作の複雑さを最小限に抑えることができます。Amazon S3 にファイルをエクスポートする場合は、コンテナに Lustre クライアントを構築することをお勧めします。

Amazon S3 を使用する

データは「バケット」と呼ばれるリソース内のオブジェクトとして保存されるため、Amazon S3 はファイルシステムではありません。 このデータには、Amazon S3 API または Amazon S3 コンソールを使用してアクセスできます。一方、Hadoop は、ファイルロック、名前の変更、ACL などの機能がその操作にとって重要なファイルシステムとして利用可能な分散ストレージ用に作成されました。

Hadoop に関する S3 の制限

Hadoop の S3A クライアントは、S3 バケットを Hadoop 互換のファイルシステムのように見せることができますが、それでもオブジェクトストアであり、Hadoop 互換のファイルシステムとして機能するときに制限があります。注意すべき重要な点は次のとおりです。

  • ディレクトリでの操作は潜在的に遅く、非アトミックです。
  • rename() など、すべてのファイル操作がサポートされているわけではありません。
  • 出力ストリーム全体が書き込まれるまで、データはオブジェクトストアに表示されません。
  • Amazon S3 は、上書き PUTS と DELETES の結果整合性と、新しいオブジェクトの PUTS の読み取り後の整合性を実現します。オブジェクトは可用性を高めるためにサーバー間でレプリケートされますが、レプリカへの変更は他のレプリカに反映されるまでに時間がかかります。このプロセスの間、オブジェクトストアは不整合です。ファイルを一覧表示、読み取り、更新、または削除すると、不整合の問題が表面化します。不整合の問題を軽減するために、S3Guard を設定できます。詳細については、「S3Guard: S3A の整合性とメタデータキャッシング」を参照してください。
  • HDFS でサポートされているファイル単位およびディレクトリ単位の権限も、さらに高度な ACL メカニズムも、サポートされていません。
  • ワークロードクラスターと Amazon S3 の間の帯域幅は制限されており、ネットワークと VM の負荷に応じて大幅に変動する可能性があります。

このような理由により、Amazon S3 は永続データのソースおよびストアとしては使用できますが、HDFS などのクラスター全体のファイルシステムの直接の代替として使用したり、fs.defaultFS として使用したりすることはできません。これらの問題に対処するために、S3A コミッターと呼ばれる hadoop-aws モジュールの S3A ファイルシステムクライアントを介して S3 に作業をコミットするための明確なサポートが追加されました。

S3A コミッター

コミッターには、ステージングとマジックの 2 つのタイプがあります。ステージングコミッターは Netflix によって開発され、ディレクトリとパーティションの 2 つの形式で提供されます。マジックコミッターは Hadoop コミュニティによって開発され、一貫性を保つために S3Guard が必要です。

  • ディレクトリコミッター: 作業データをローカルディスクにバッファリングし、HDFS を使用してタスクからジョブコミッターにコミット情報を伝達し、宛先ディレクトリツリー全体の競合を管理します。
  • パーティション化されたコミッター: 競合がパーティションごとに管理されることを除いて、ディレクトリコミッターと同じです。これにより、既存のデータセットのインプレース更新に使用できます。これは、Spark での使用にのみ適しています。
  • マジックコミッター: データは直接 S3 に書き込まれますが、「魔法のように」最終的な宛先に再ターゲットされます。競合はディレクトリツリー全体で管理されます。これには一貫した S3 オブジェクトストアが必要です。つまり、S3Guard は必須の前提条件です。

たとえば、Hadoop 3.1 で Spark 2.4.5 のディレクトリ S3A コミッターを設定する場合は、このオプションを使用できます。

"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"

"spark.hadoop.fs.s3a.committer.name": "directory"
"spark.hadoop.fs.s3a.committer.staging.conflict-mode": "append"

特定の Spark および Hadoop バージョン用に S3A コミッターを設定する方法については、こちらを参照してください。

シャッフルパフォーマンス

ほとんどの Spark 操作は、シャッフルフェーズ中に費やされます。これは、多数のディスク I/O、シリアル化、ネットワークデータ送信、およびその他の操作が含まれるためです。Spark ドライバーとエグゼキューターはどちらも、一時ファイルを保存するためにポッド内のディレクトリを使用します。このようなポッドディレクトリのストレージオプションを定義する方法に関連して、最適化のヒントがいくつかあります。調整対象の特定のシャッフル設定については、eks-spark-benchmark repo をご覧ください。

スクラッチスペースとして EmptyDir を使用する

Docker は、コンテナの書き込み可能なレイヤーに新しいデータが書き込まれるたびに、copy-on-write (CoW) を使用します。理想的には、パフォーマンスへの影響から、この層に書き込まれるデータはほとんどありません。ベストプラクティスは、書き込みを Docker ストレージドライバーにオフロードすることです。Kubernetes は、emptyDir ボリュームを使用してコンテナに提示するために使用できるストレージオプションの抽象化を実現します。EmptyDir は、ホストに接続されているボリューム、ネットワークファイルシステム、またはホスト上のメモリによってサポートされます。

tmpfs でバックアップされた emptyDir ボリュームを使用する

emptyDir 仕様で tmpfs を定義すると、Kubernetes はメモリをスクラッチスペースとしてポッドに割り当てます。内蔵メモリを使用すると、Spark のシャッフルフェーズが大幅に向上し、全体的なジョブパフォーマンスが向上します。これは、Spark ジョブがホストに割り当てられたメモリに制限されることも意味しますが、ディスクレスホストまたはストレージフットプリントが低いホストに最適です。spark-operator の場合、以下の設定オプションを使用して tmpfs を使用するように Spark を設定できます。

# spark-operator example
spec:
  sparkConf:
    "spark.kubernetes.local.dirs.tmpfs": "true"
    ...

emptyDir ボリュームに hostPath を使用する

同様に、前述したように、emptyDir をホストにマウントするボリュームに設定できます。NVMe SSD インスタンスストアボリュームでバックアップされた EC2 インスタンスの場合、このような設定を行うと、EBS によってバックアップされたボリュームを大幅に強化できます。このユースケースは、Spark ジョブのスクラッチスペースとしてインスタンスストアを使用しているため、このシナリオでは完全に機能します。これは、TPCDS など、大規模で大量の I/O を必要とするジョブにも役立ちます。インスタンスストアボリューム (または tmpfs) に保存されているデータは、ノードが再起動/終了されていない限り利用できます。インスタンスストアボリュームを使用した Spark-operator の例を次に示します。

# spark-operator example
spec:
  ...
  mainApplicationFile: local:///opt/spark/examples/jars/eks-spark-examples-assembly-1.0.jar
  volumes:
    - name: "spark-local-dir-1"
      hostPath:
        path: "/tmp/spark-local-dir"
  executor:
    instances: 10
    cores: 2
    ....
    volumeMounts:
      - name: "spark-local-dir-1"
        mountPath: "/tmp/spark-local-dir"

emptyDir ボリュームに複数のディスクを使用する

インスタンスに複数のディスクがある場合は、設定でそれらを指定して、I/O パフォーマンスを向上させることができます。複数のディスクの設定は、性質は似ていますが、多少の違いがあります。

Volcano スケジューラーを使用したジョブスケジューリング

Kubernetes スケジューラはデフォルトでポッドごとのスケジューリングを行います。バッチワークロード向けにはうまく設計されていません。Volcano スケジューラーは、下記の機能でギャップを埋めるのに役立ちます。

ギャングスケジューリング

クラスターに十分なリソースがない場合、Spark ジョブは Kubernetes がスケーリングしてクラスターにノードを追加するのを常に待機しているデッドロックを経験する可能性があります。ギャングスケジューリングは、多数のポッドを一度にスケジュールする方法です。Kubernetes が部分的なアプリケーションを起動しないようにします。これにより、さまざまなジョブのリソースのデッドロック問題が解決されます。たとえば、X ポッドを必要とするジョブがリクエストされ、X-2 ポッドをスケジュールするのに十分なリソースしかない場合、そのジョブは、すべての X ポッドに対応するリソースが利用可能になるまで待機します。同時に、Y ポッドを含むジョブがあり、クラスターに Y ポッドをスケジュールするリソースがある場合、そのジョブはスケジュール可能です。ギャングスケジューリングは、Volcano スケジューラで使用できます。Volcano スケジューラーは、GitHub リポジトリの指示に従ってインストールできます。

タスクトポロジと高度なビンパッキング

タスクトポロジと高度なビンパッキング戦略を使用して、ジョブスケジューリングをさらに強化できます。この戦略を使用すると、インスタンス間の通信とリソースの断片化のネットワークオーバーヘッドを減らすことができます。単一の EC2 で実行プログラムをビンパッキンぐすることがネットワークパフォーマンスのボトルネックにつながるため、これは一部のユースケースでは機能する可能性がありますが、すべてではありません。

まとめ

Spark ワークロードを実行するには、コンピューティング、ネットワーク、ストレージリソース間の高 I/O が必要であり、お客様は常に、このワークロードを最大のパフォーマンスと低コストでクラウドで実行する最良の方法を知りたいと思っています。Kubernetes には、チューニングするための複数の選択肢があり、このブログでは、ご利用いただける最適化手法もいくつかご紹介しました。読者がこのブログの恩恵を受け、ベストプラクティスを適用して Spark のパフォーマンスを向上させられることを願っています。何かご要望等がございましたら是非お聞かせください。フィードバックは、eks-spark-benchmark GitHub リポジトリでイシューを作成してお寄せください。