Amazon Web Services ブログ
新機能 ‐ Amazon EMR と Apache Hudi を使用して S3 でデータを挿入、更新、削除する
Amazon S3 のデータの保存により、スケール、信頼性、コスト効率の観点で多くの恩恵が得られます。 その上、Amazon EMR を利用して、Apache Spark、Hive、および Presto のようなオープンソースツールを使用してデータを処理し、分析することができます。 これらのツールと同じように強力ですが、増分データ処理、レコードレベルの挿入、更新、および削除を行うために必要なユースケースを処理するには引き続き課題があることが考えられます。
お客様と話して、個々のレコードに対する増分変更を処理することが必要なユースケースがあることがわかりました。たとえば、次のような場合です。
- データプライバシー規制に準拠する。この場合、ユーザーは忘れられる権利を行使するか、データを使用する方法に関する同意を変更するかを選択することができます。
- ストリーミングデータを操作する。この場合、特定のデータの挿入とイベントの更新を取り扱う必要があります。
- エンタープライズデータウェアハウスまたは運用データストアからデータベースの変更ログを追跡し取り込むために、変更データ取り込み (CDC) アーキテクチャを使用する。
- 後で到着するデータを復元するか、特定の時点でのデータを分析する。
本日より、EMRリリース5.28.0にはApache Hudi (incubating)が搭載され、レコードレベルの挿入、更新、削除の操作を行うためのカスタムソリューションを構築する必要がなくなりました。Hudiの開発は、インジェストとETLパイプライン全体の非効率性に対処するため、2016年にUberで始まりました。ここ数カ月、EMRチームはApache Hudiコミュニティと緊密に連携し、HudiをSpark 2.4.4にアップデート(HUDI-12)、Spark Avroをサポート(HUDI-91)、AWS Glue Data Catalogのサポートを追加(HUDI-306)、さらに複数のバグフィックスを含むパッチを提供してきました。
Hudi を使用して、S3 でレコードレベルの挿入、更新、削除を実行して、データプライバシー法を順守し、リアルタイムストリームを消費してデータキャプチャを変更し、遅れて到着するデータを復元し、ベンダーに依存しないオープンな形式で履歴とロールバックを追跡できます。データセットとテーブルを作成し、Hudi は基礎のデータ形式を管理します。 Hudi は Apache Parquet および Apache Avro をデータストレージに使用し、Spark、Hive、および Presto との組み込み統合が含まれており、現在使用しているものと同じツールを使用して Hudi データセットをクエリし、最新のデータにほぼリアルタイムでアクセスできます。
EMR クラスターを開始するときに、Hive、Spark、または Presto のいずれか少なくとも 1 つのコンポーネントが選択されるときにはいつでも、Hudi のライブラリとツールがインストールされ、自動的に設定されます。 Spark を使用して、新しい Hudi データセットを作成し、データを挿入、更新、および削除できます。それぞれの Hudi データセットはクラスターの設定された メタストア (AWS Glue Data Catalog を含む) で登録され、Spark、Hive、および Presto を使用してクエリできるテーブルとして表示されます。
Hudi はデータの書き込み、インデックス作成、S3 からの読み取り方法を定義する 2 つのストレージタイプをサポートします。
- Copy on Write – データは列形式 (Parquet) で保存され、書き込み中に更新によりファイルの新しいバージョンが作成されます。データセットの最新バージョンが常に効率的な列形式のファイルで使用できるため、このストレージタイプは読み取りの多いワークロードに最適です。
- Merge on Read – データは列 (Parquet) と行ベース (Avro) の形式の組み合わせで保存されます。更新は行ベースの「デルタファイル」に記録され、後で圧縮されて新しいバージョンの列形式のファイルが作成されます。 このストレージタイプでは、新しいコミットはデルタファイルに迅速に書き込まれますが、データセットの読み取りでは、デルタファイルに圧縮した列形式のファイルを結合することが必要であるため、書き込みが多いワークロードに最適です。
EMR クラスターで Hudi データセットをセットアップして、使用する方法を素早く見てみましょう。
Amazon EMR での Apache Hudi の使用
EMR コンソール からクラスターの作成を開始します。アドバンストオプションで、EMR リリース 5.28.0 (Hudi を含む最初のリリース) とSpark、Hive、および Tez のアプリケーションを選択します。ハードウェアオプションでは、3 つのタスクノードを追加して、Spark と Hive の両方を実行するのに十分な容量があることを確認します。
クラスターの準備ができたら、セキュリティオプションで選択したキーペアを使用して、マスターノードに SSH を実行し、Spark Shell にアクセスします。次のコマンドを使用して、Spark Shell を開始して、それを Hudi で使用します。
$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
--conf "spark.sql.hive.convertMetastoreParquet=false"
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
次に、以下のScalaコードにより、Copy on Writeストレージタイプを使って、ELBログのサンプルをHudiデータセットにインポートしています。
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
//Set up various input values as variables
val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"
val hudiTableName = "elb_logs_hudi_cow"
val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName
// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb",
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb",
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
classOf[MultiPartKeysValueExtractor].getName)
// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)
// Write data into the Hudi dataset
inputDF.write
.format("org.apache.hudi")
.options(hudiOptions)
.mode(SaveMode.Overwrite)
.save(hudiTablePath)
Spark Shell では、Hudi データセットでレコードをカウントすることができるようになりました。
scala> inputDF2.count()
res1: Long = 10491958
このオプションでは、クラスタに対して設定された Hive メタストアとの統合を使用することができ、テーブルは デフォルト
データベースで作成されるようになります。このようにして、Hive を使用して、Hudi データセットのデータをクエリすることができます。
hive> use default;
hive> select count(*) from elb_logs_hudi_cow;
...
OK
10491958
...
データセットの単一レコードを更新または削除できるようになりました。Spark Shell で、いくつかの変数を準備して、更新するレコードと変更する列の値を選択するために SQL ステートメントを検索します。
val requestIpToUpdate = "243.80.62.181"
val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"
SQL ステートメントを実行して、列の現在値を確認します。
scala> spark.sql(sqlStatement).show()
+------------+
| elb_name|
+------------+
|elb_demo_003|
+------------+
次に、レコードを選択して更新します。
// Create a DataFrame with a single record and update column value
val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)
.withColumn("elb_name", lit("elb_demo_001"))
ここで、作成するために使用した構文に類似する構文で、Hudi データセットを更新します。しかし今回は、作成する DataFrame には 1 つのレコードしか含まれません。
// Write the DataFrame as an update to existing Hudi dataset
updateDF.write
.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(hudiTablePath)
In the Spark Shell, I check the result of the update:
scala> spark.sql(sqlStatement).show()
+------------+
| elb_name|
+------------+
|elb_demo_001|
+------------+
ここで、同じレコードを削除します。それを削除するために、書き込みオプションで EmptyHoodieRecordPayload
ペイロードを渡します。
// Write the DataFrame with an EmptyHoodieRecordPayload for deleting a record
updateDF.write
.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
"org.apache.hudi.EmptyHoodieRecordPayload")
.mode(SaveMode.Append)
.save(hudiTablePath)
Spark Shell で、このレコードはもはや使用できないことを確認できます。
scala> spark.sql(sqlStatement).show()
+--------+
|elb_name|
+--------+
+--------+
これらのすべての更新と削除はどのように Hudiにより管理されていますか ? データセットに接続するために Hudi のコマンドラインインターフェイス (CLI) を使用し、コミットとしてこれらの変更を解釈してみましょう。
このデータセットは Copy on Write です。これは、レコードが更新されるたびに、そのレコードを含むファイルは更新された値を含むように書き換えられることを意味します。それぞれのコミットで書き込まれたレコードの数を確認できます。テーブルの一番下の行は、データセットの最初の作成を示し、その上には単一レコードの更新があり、一番上には単一レコードの削除があります。
Hudi では、各コミットにロールバックできます。たとえば、次を利用して削除オペレーションをロールバックできます。
hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031
Spark Shell では、更新後すぐに、レコードは元の場所に戻されます。
scala> spark.sql(sqlStatement).show()
+------------+
| elb_name|
+------------+
|elb_demo_001|
+------------+
Copy on Write はデフォルトのストレージタイプです。上記のステップを繰り返して、次を hudiOptions
に追加することにより、Merge on Reset データセットタイプを作成して更新できます。
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"
Merge on Read データセットを更新し、Hudi CLI でコミットを見た場合、Merge on Read が Copy on Write と比較してどのように異なるのかを確認できます。Merge on Read では、更新行のみを書き込みます。Copy on Write のようにファイル全体を書き込みません。このため、Merge on Read は読み取り数が少なく、より多くの書き込みが必要であるか、または更新 / 削除操作の多いワークロードをもつユースケースに役立ちます。Delta コミットは Avro レコード(行ベースのストレージ)としてディスクに書き込まれ、圧縮されたデータは Parquet ファイル(列状ストレージ)として書き込まれます。作成するデルタファイルが多くならないように、Hudi は読み取りの性能ができるだけ高くなるように、データセットを自動的に圧縮します。
Merge On Read データセットが作成されると、次の 2 つの Hive テーブルが作成さmれあす。
- 最初のテーブルは、データセットの名前と一致します。
- 2 番目のテーブルでは
_rt
という文字が名前の後にアペンドされます。たとえば、_rt
ポストフィックスは real-time を表します。
クエリされると、最初のテーブルは圧縮されたデータを返し、最新のデルタコミットを表示しません。このテーブルを使用することで、最高のパフォーマンスが得られますが、最新のデータは省略されます。リアルタイムテーブルのクエリは、圧縮されたデータが読み取り時のデルタコミットと結合されるため、このデータセットは「Merge on Read」と呼ばれます。これにより、最新のデータが使用可能になりますが、パフォーマンスのオーバーヘッドが発生し、圧縮されたデータのクエリほどパフォーマンスが高くならなくなります。このように、データエンジニアとアナリストには、パフォーマンスとデータの最新性の間で選択する柔軟性が与えられます。
今すぐ利用可能
この新機能は現在、EMR 5.28.0 と共に、すべてのリージョンで利用可能です。 EMR で Hudi を使用するのに、追加コストはありません。EMR ドキュメントで Hudi に関する詳細を学習することができます。この新しいツールは、S3 のデータを処理、更新、削除するための方法を簡素化できます。それに対してどのユースケースを使用するかを教えてください。
— Danilo