Amazon Web Services ブログ

Apache Flink バージョン 1.18 が Amazon Managed Service for Apache Flink でサポートされました

Apache Flink は、ストリームおよびバッチ処理向けの、パワフルなプログラミングインターフェースを提供するオープンソースの分散処理エンジンです。ステートフルな処理やイベントタイムセマンティクスをサポートしています。Apache Flink は、複数のプログラミング言語、Java、Python、Scala、SQL、および異なる抽象化レベルの複数の API をサポートしています。これらを単一のアプリケーション内で組み合わせて使用することも可能です。

Amazon Managed Service for Apache Flink は、Apache Flink アプリケーションをフルマネージド、サーバーレスで実行可能なサービスです。このたび、 Apache Flink 1.18.1 がサポートされたことをお知らせします。

本投稿では、直近のメジャーリリース 1.16、1.17、1.18 で導入され、かつ Managed Service for Apache Flink でサポートされた、Apache Flink の興味深い機能の一部について説明していきます。

新しいコネクタ

Apache Flink バージョン 1.18.1 で利用可能な新機能を詳しくみていく前に、新しいオープンソースコネクタについて説明させてください。

OpenSearch

専用の OpenSearch コネクタが利用可能になりました。本コネクタにより、Apache Flink アプリケーションは Elasticsearch の互換モードに頼らずに、直接 OpenSearch にデータを書き込むことができます。本コネクタは Amazon OpenSearch Service のプロビジョンドクラスター、および OpenSearch Service Serverless と互換性があります。

新しいコネクタは SQL と Table API をサポートし、Java と Python の両方で動作します。また、Java については DataStream API もサポートされています。本コネクタは Flink のチェックポインティングと同期して書き込みを行うため、追加設定なしで at-least-once を保証しています。一意の ID とアップサート方式を組み合わせることで、exactly-once を達成することも可能です。

デフォルトでは、コネクタは OpenSearch バージョン 1.x のクライアントライブラリを使用します。適切な依存関係を追加することで、バージョン 2.x に切り替えることができます。

Amazon DynamoDB

Apache Flink アプリケーション開発者は、Amazon DynamoDB にデータを書き込むための専用のコネクタを利用できるようになりました。
本コネクタは、AWS によって開発され、今や Apache Flink プロジェクトの不可欠なコンポーネントである Apache Flink AsyncSink をベースとしています。Apache Flink AsyncSink を活用することで、ノンブロッキングな書き込みリクエストとアダプティブバッチングにより、効率のよい出力コネクタを簡単に実装できます。

本コネクタは SQL と Table API (Java と Python)、および DataStream API (Java のみ) の両方をサポートします。デフォルトでは、スループットを最適化するためにバッチ書き込みを行います。SQL バージョンにおける注目すべき機能は、PARTITIONED BY 句のサポートです。1 つ以上のキーを指定することで、クライアント側の重複排除を実現でき、各バッチ書き込み時に最新のレコードのみを指定されたキーごとに送信することができます。DataStream API でも、各バッチ内で上書きするパーティションキーのリストを指定することで、同等の処理を実現できます。

このコネクタはシンクとしてのみ動作します。DynamoDB からのデータ読み取りには使用できません。DynamoDB のデータを検索するには、Flink Async I/O API を使って参照処理を実装するか、SQL の場合はカスタムユーザー定義関数 (UDF) を実装する必要があります。

MongoDB

興味深い別のコネクタとして、MongoDB 向けのものがあります。本コネクタはソースとシンクの両方で、SQL と Table API と DataStream API が利用可能です。新しいコネクタは、公式な Apache Flink プロジェクトの一部であり、コミュニティによってサポートされています。本コネクタは、MongoDB 自身が提供していた以前のコネクタに置き換わるものです。以前のコネクタでは、Flink の Sink および Source API のみをサポートしていました。

他のデータストアコネクタと同様に、ソースコネクタはバッチモードで bounded source として、または参照用として使用できます。シンクコネクタはバッチモードとストリーミングの両方で利用可能で、upsert および append の両モードをサポートします。

本コネクタには多くの注目すべき機能がありますが、その中でも言及しておきたいのは、ソースにおける参照時のキャッシュ有効化と、シンクにおける追加設定なしでの at-least-once 保証の二点です。プライマリキーが定義されている場合、シンクは idempotent upserts により、exactly-once をサポートすることもできます。

コネクタのバージョニング

新機能ではありませんが、コネクタのバージョン管理が新しくなった点は、以前の Apache Flink アプリケーションを更新する際に考慮すべき重要な要素となります。

Apache Flink バージョン 1.17 以降、ほとんどのコネクタがメインの Apache Flink ディストリビューションから外部化され、独立したバージョン管理に従うようになりました。

依存関係を正しく含めるためには、-の形式でアーティファクトバージョンを指定する必要があります。

例えば、本投稿の執筆時点で最新の Kafka コネクタは、Amazon Managed Streaming for Apache Kafka (Amazon MSK) とも連携する、バージョン 3.1.0 です。Apache Flink 1.18 で本コネクタを使用する場合は、次の依存関係を使用する必要があります。

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

Amazon Kinesis の新しいコネクタバージョンは 4.2.0 です。Apache Flink 1.18 における依存関係は以下のとおりです。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

次のセクションでは、Apache Flink 1.18 から利用可能となった強力な新機能の中で、Amazon Managed Service for Apache Flink でサポートされているものについて、さらに説明していきます。

SQL

Apache Flink SQL において、オプティマイザに効果的なクエリプランを提案するために、hints を結合クエリに付与可能となりました。特にストリーミングアプリケーションでは、外部システム (一般的にはデータベース) からクエリされたデータを使用して、ストリーミングデータを表すテーブルのエンリッチのために、lookup joins が使用されます。バージョン 1.16 以降、lookup joins にいくつかの改善が導入され、結合の動作を調整してパフォーマンスを向上できるようになりました。

  • ルックアップキャッシュは、最も頻繁に使用されるレコードをメモリにキャッシュすることで、データベースの負荷を軽減する強力な機能です。以前は、ルックアップキャッシュはいくつかのコネクタのみの専用機能でした。Apache Flink 1.16 以降、このオプションはルックアップをサポートしているすべてのコネクタで利用可能になりました (FLIP-221)。執筆時点では、JDBCHive、および HBase コネクタがルックアップキャッシュをサポートしています。ルックアップキャッシュには 3 つのモードがあります。メモリ上にすべて保持できる小さなデータセットの場合は FULL、大きなデータセットの場合は最新のレコードのみをキャッシュする PARTIAL、キャッシュを完全に無効にする NONE です。PARTIAL キャッシュでは、バッファ行数と有効期限を設定可能です。
  • 非同期ルックアップはパフォーマンスを大幅に改善するための異なるアプローチです。非同期ルックアップは、Apache Flink SQL における DataStream API で利用可能な非同期 I/Oと同様の機能を提供します。これにより、Apache Flink は、前のルックアップへの応答を受信するまでスレッドをブロックすることなく、データベースに新しい要求を送信できます。非同期 I/O と同様に、結果の順序付けを強制や、順不同な結果の許容、バッファ容量やタイムアウトの調整が可能です。
  • PARTIAL または NONE ルックアップキャッシュと組み合わせて、外部データベースのルックアップ失敗時の動作を設定する lookup retry strategy を設定することもできます。

これらの動作はすべて LOOKUP ヒントを使用して制御できます。以下に非同期ルックアップを使用したルックアップ結合を示します。

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address 
 FROM Orders AS O 
 JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

PyFlink

このセクションでは、PyFlink の新機能と改善点について説明します。

Python 3.10 のサポート

Apache Flink 1.18 では、PyFlink ユーザー向けのいくつかの改善が導入されました。
最も重要なのは、Python 3.10 がサポートされ、Python 3.6 のサポートが完全に削除されたことです (FLINK-29421)。
Managed Service for Apache Flink も、Python 3.10 ランタイムを使用して PyFlink アプリケーションを実行します。

機能差異の解消

プログラミング API の観点から見ると、PyFlink は、バージョンを重ねるごとに Java に近づいています。DataStream API では、サイド出力やブロードキャスト状態などの機能がサポートされるようになり、ウィンドウ API における不足も解消されました。また PyFlink は DataStream API から直接 Amazon Kinesis Data Streams などの新しいコネクタをサポートするようになりました。

スレッドモードの改善

PyFlink は非常に効率的です。PyFlink で Flink API オペレータを実行するオーバーヘッドは、Java や Scala に比べて最小限に抑えられています。これは、アプリケーションの言語に関係なく、ランタイムが実際にオペレータの実装を JVM で直接実行するためです。しかし、ユーザー定義関数の場合は少し異なります。lambda x: x + 1 のような単純な Python コードでも、複雑な Pandas 関数でも、Python ランタイムで実行する必要があるためです。

デフォルトでは、Apache Flink は JVM の外部で、各 Task Manager 上で Python ランタイムを実行します。各レコードはシリアライズされ、プロセス間通信を介して Python ランタイムに渡され、デシリアライズされて処理されます。その結果は再度シリアライズされ、JVM に戻され、デシリアライズされます。これが PyFlink の PROCESS モードです。非常に安定していますが、オーバーヘッドが発生し、場合によってはパフォーマンスのボトルネックになる可能性があります。

Apache Flink バージョン 1.15 以降では、PyFlink 向けに THREAD モード もサポートしています。このモードでは、Python のユーザー定義関数が JVM 内で実行されるため、シリアライズ/デシリアライズおよび、プロセス間通信のオーバーヘッドが取り除かれます。THREAD モードには いくつかの制限 があります。たとえば、Pandas や UDAF (多数の入力レコードから 1 つの出力レコードを生成するユーザー定義集約関数) では使用できません。しかしながら、PyFlink アプリケーションのパフォーマンスを大幅に向上させることができます。

バージョン 1.16 では、THREAD モードのサポートが大幅に拡張され、Python DataStream API もカバーされるようになりました。

Managed Service for Apache Flink は THREAD モードをサポートしており、PyFlink アプリケーションから直接有効化が可能です

Apple Silicon サポート

Apple Silicon ベースのマシンで PyFlink アプリケーションの開発をする際に、PyFlink 1.15 においては、Apple Silicon における既知の Python 依存関係の問題に遭遇したかもしれません。これらの問題はついに解決されました (FLINK-25188)。これらの制限は、Apache Flink の Managed Service 上で実行される PyFlink アプリケーションには影響しませんでした。バージョン 1.16 より前は、M1、M2、M3 チップセットを使用するマシン上で PyFlink アプリケーションを開発する場合、PyFlink 1.15 以前をマシン上に直接インストールすることができないため、回避策を適用する必要がありました。

アンアラインドチェックポイントの改善

Apache Flink 1.15 ではすでに Incremental Checkpoint と Buffer Debloating がサポートされています。特にこれらの機能を組み合わせて使用することで、チェックポイントのパフォーマンスを改善し、バックプレッシャーが発生している場合でもチェックポインティングの期間をより予測可能とすることができます。これらの機能の詳細については、Amazon Managed Streaming for Apache Flink アプリケーション向けのバッファデブロートとアンアラインドチェックポイントをご覧ください。

バージョン 1.16 および 1.17 では、安定性とパフォーマンスを向上させるためにいくつかの変更が導入されました。

データスキューの処理

Apache Flink はウォーターマークによるイベントタイムのセマンティクスをサポートしています。ウォーターマークは、通常はソースオペレーターからフローに挿入される特別なレコードで、イベントタイムウィンドウ集計などのオペレーターのイベントタイムの進行をマークします。一般的な手法は、最新の観測されたイベントタイムからウォーターマークを遅延させ、ある程度の範囲でイベントが順不同になることを許容することです。

しかし、ウォーターマークの使用には課題があります。アプリケーションが複数のソースを持つ場合、例えば Kafka トピックの複数のパーティションからイベントを受信する場合、ウォーターマークは各パーティションごとに独立して生成されます。内部的には、各オペレーターは常に、すべての入力パーティションで同じウォーターマークを待機します。これは実質的に最も遅いパーティションに合わせることを意味します。これがネックとなり、あるパーティションがデータを受信していない場合は、ウォーターマークが進まず、エンドツーエンドのレイテンシが増加します。このため、多くのストリーミングソースにオプションとしてアイドルタイムアウトが導入されています。設定したタイムアウト後は、レコードを受信していないパーティションを無視してウォーターマークの生成を行い、ウォーターマークを進めることができます。

1 つのソースがほかよりもイベントを大幅に早く受信している場合、逆の課題に直面する可能性があります。ウォーターマークはもっとも遅いパーティションに揃えられるため、ウィンドウ集約のためにウォーターマークを待つ必要が生じます。高速なソースからのレコードは、バッファリングされながら待たされることになります。オペレータ状態が制御できない程に、バッファリングされたデータが肥大化する恐れがあります。

より高速なソースの問題に対処するため、Apache Flink 1.17 以降では、ソース分割におけるウォーターマークアライメントを有効化できます (FLINK-28853)。デフォルトでは無効になっているこのメカニズムにより、特定のパーティションが他のパーティションと比べてウォーターマークを速く進めすぎないようになります。複数のソース (複数の入力トピックなど) を 1 つのアライメントグループ ID に割り当て、現在のウォーターマークからの最大ドリフト期間を指定することで、これらを束ねることができます。特定のパーティションがイベントを高速で受信する場合、ソースオペレーターはドリフトが指定されたしきい値を下回るまで、そのパーティションの消費を一時停止します。それぞれのソースごとに有効化できます。必要なのは、同じ ID を持つすべてのソースを結びつける整列グループ ID と、現在の最小ウォーターマークからの最大ドリフト時間を指定することです。これにより、あまりにも速く進んでいるソースサブタスクの消費が一時停止し、ドリフトが指定された閾値を下回るまで待機します。

以下のコードスニペットは、Kafka ソースから協会のない順序性のウォーターマークを出力するために、ソース分割のウォーターマークアラインメントを設定する方法を示しています。

KafkaSource kafkaSource = ...
 DataStream stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

この機能は、FLIP-217 に準拠した、ソース分割のウォーターマークアラインメントをサポートしているソースでのみ利用可能です。執筆時点では、主要なストリーミングソースコネクタのうち、Kafka ソースのみがこの機能をサポートしています。

Protobuf フォーマットの直接サポート

現在、SQL と Table API は、Protobuf 形式を直接サポートしています。この形式を利用するには、.proto スキーマ定義ファイルから Protobuf の Java クラスを生成し、アプリケーションの依存関係に含める必要があります。

Protobuf フォーマットは SQL およびテーブル API でのみ利用可能で、Protobuf でシリアルライズされたデータをソースまたはシンクから読み書きする場合にのみ機能します。現在、Flink はステートを直接シリアルライザブルする Protobuf を直接サポートしておらず、Avro などのように schema evolution もサポートしていません。アプリケーション用にカスタムシリアライザーを登録する必要がありますが、オーバーヘッドを伴います。

Apache Flink をオープンソースのまま維持

Apache Flink は、サブタスク間のデータ送信を、内部的に Akka に依存してきました。 2022 年、Akka を開発している企業である Lightbend は、今後の Akka バージョンのライセンスを Apache 2.0 からより制限的なライセンスに変更し、Apache Flink で使用されているバージョンである Akka 2.6 にはこれ以上のセキュリティアップデートや修正が提供されないことを発表しました。

Akka は従来から非常に安定しており、頻繁な更新は必要ありませんでしたが、このライセンスの変更は Apache Flink プロジェクトにとってリスクとなりました。Apache Flink コミュニティの決定は、Akka 2.6 のフォークである Apache Pekko (FLINK-32468) に置き換えることでした。このフォークは Apache 2.0 ライセンスを維持し、コミュニティによって必要な更新が加えられます。それまでの間、Apache Flink コミュニティは、Akka ならびに Pekko への依存関係を完全に削除するかどうかを検討しています。

ステート圧縮

Apache Flink は、すべてのチェックポイントとセーブポイントに対してオプションの圧縮 (デフォルト: オフ) を提供します。 Apache Flink は、スナップショット圧縮が有効になっている場合にオペレーターの状態を適切に復元できないという Flink 1.18.1 のバグを特定しました。これにより、データが失われるか、チェックポイントから復元できなくなる可能性があります。これを解決するために、Managed Service for Apache Flink は、Apache Flink の将来のバージョンに含まれる修正をバックポートしました。

Managed Service for Apache Flink におけるインプレースバージョンアップグレード

Apache Flink 1.15 以前のバージョンを使用して Managed Service for Apache Flink で現在アプリケーションを実行している場合、AWS コマンドラインインターフェース (AWS CLI)、AWS CloudFormationAWS Cloud Development Kit (AWS CDK) または AWS API を使用するツールのいずれかを使用して、ステートを失うことなくバージョン 1.18 にインプレースアップグレードが可能です。

UpdateApplication API アクションによって、既存の Managed Service for Apache Flink アプリケーションの Apache Flink ランタイムバージョンを更新できるようになりました。実行中のアプリケーションに直接 UpdateApplication を使用できます。

インプレースアップグレードを続行する前に、アプリケーションに含まれる依存関係を検証したうえで更新し、新しい Apache Flink バージョンと互換性があることを確認する必要があります。特に、Apache Flink ライブラリ、コネクタ、場合によっては Scala バージョンを更新する必要があります。

また、更新を続行する前に、更新されたアプリケーションをテストすることをお勧めします。リグレッションが発生していないことを確認するために、ターゲットの Apache Flink ランタイムバージョンを使用して、ローカルもしくは本番以外の環境でテストすることをお勧めします。

最後に、アプリケーションがステートフルである場合は、実行中のアプリケーションの状態のスナップショットを取得することをお勧めします。これにより、以前のアプリケーションバージョンにロールバック可能となります。

準備ができたら、UpdateApplication API アクションまたは update-application AWS CLI コマンドを使用して、アプリケーションのランタイムバージョンを更新し、更新された依存関係を含む新しいアプリケーションアーティファクト、JAR、または zip ファイルをポイントできるようになります。

プロセスと API の詳細については、Apache Flink のインプレース バージョン アップグレードを参照してください。ドキュメントには、ステップバイステップの手順とアップグレードプロセスを説明する動画が含まれています。

まとめ

この投稿では、Apache Flink の Amazon マネージドサービスでサポートされている Apache Flink の新機能のいくつかを調べました。このリストは包括的なものではありません。 Apache Flink は、SQL およびテーブル API のオペレーターレベル TTL [FLIP-292] やタイムトラベル [FLIP-308] など、非常に有望な機能もいくつか導入しましたが、これらは API ではまだサポートされておらず、ユーザーが実際にアクセスできる状態にありません。そのため、本投稿で取り上げる対象から外しました。

Apache Flink 1.18 で利用できる興味深い新機能と新しいコネクタのいくつかと、Apache Flink のマネージド サービスが既存のアプリケーションのアップグレードにどのように役立つかを見てきました。
最近のリリースの詳細については、Apache Flink ブログとリリースノートをご覧ください。

Apache Flink を初めて使用する場合は、適切な API と言語を選択するガイドを参照し、スタートガイドに従って Apache Flink のマネージド サービスの使用を開始することをお勧めします。

著者について

Lorenzo NicoraLorenzo Nicora は AWS でシニア ストリーミング ソリューション アーキテクトとして働いており、EMEA 全体の顧客をサポートしています。彼は 25 年以上にわたってクラウドネイティブでデータ集約型のシステムを構築しており、コンサルティング会社や FinTech 製品会社の両方を通じて金融業界で働いています。彼はオープンソース テクノロジーを幅広く活用し、Apache Flink などのいくつかのプロジェクトに貢献してきました。

Francisco MorilloFrancisco Morillo は AWS のストリーミング ソリューション アーキテクトです。 Francisco は AWS の顧客と協力し、AWS のサービスを使用したリアルタイム分析アーキテクチャの設計を支援し、Amazon MSK および Amazon Managed Service for Apache Flink をサポートしています。

本記事は、Amazon Managed Service for Apache Flink now supports Apache Flink version 1.18 を翻訳したものです。翻訳は Solutions Architect の榎本が担当しました。