Amazon Web Services ブログ

Amazon EMR を使用して Amazon DynamoDB の Time to Live (TTL) 属性をバックフィルする

データベースへの一括更新は混乱を招き、ダウンタイム、ビジネスプロセスのパフォーマンスへの影響や、コンピューティングリソースとストレージリソースの過剰プロビジョニングを引き起こす可能性があります。一括更新を実行する場合、迅速に実行でき、ビジネスを中断することなく運営でき、コストを最小限に抑えるプロセスを選択する必要があります。Amazon DynamoDB などの NoSQL データベースを使ってこれを実現する方法を見てみましょう。

DynamoDB は、テーブル内の項目中、どの項目にも存在しない属性を持てる項目を含められる柔軟なスキーマ構造を提供する NoSQL データベースです (リレーショナルデータベースでは、他の行から除外される一方、一部の行にしか存在できない列もあります)。DynamoDB は極端なスケールで実行するように構築されているため、ペタバイトものデータと数兆もの項目を持つテーブルを使用できます。そのため、このようなテーブル全体の変更を行うには、スケーラブルなクライアントが必要です。このようなユースケースでは、通常、Amazon EMR を使用します。DynamoDB は伸縮自在な容量を提供するため、不定期の一括操作に対応できるように、通常の操作の過程で過剰にプロビジョニングする必要はありません。一括操作中にテーブルに容量を追加し、完了したらその容量を削除するだけです。

DynamoDB は Time to Live (TTL) と呼ばれる機能をサポートしています。TTL を使用すると、期限切れの項目を追加コストなしでテーブルから自動的に削除できます。通常、項目を削除すると書き込み容量が消費されるため、TTL を使用すると、特定のユースケースでは大幅なコスト削減を実現できます。たとえば、TTL を使用して、長期間保持するために Amazon Simple Storage Service (Amazon S3) バケットにすでにアーカイブしたセッションデータまたは項目を削除できます。

TTL を使用するには、タイムスタンプ (Unix エポックからの秒数としてエンコード) を含む項目の属性を指定します。この時点で、DynamoDB は項目が期限切れであると見なします。項目の有効期限が切れると、DynamoDB は通常、有効期限から 48 時間以内に項目を削除します。TTL の詳細については、「Time to Live を使用した項目の期限切れ」を参照してください。

DynamoDB テーブルにデータを配置する前に TTL 属性を選択するのが理想ですが、DynamoDB ユーザーは、テーブルにデータを含めた後に TTL を使い始めることがよくあります。アプリケーションを変更してタイムスタンプ付きの属性を新しい項目または更新された項目に追加するのは簡単ですが、古い項目すべての TTL 属性をバックフィルする最良の方法は何でしょうか? 通常、DynamoDB テーブルの一括更新には Amazon EMR を使用することをお勧めします。これは、Amazon EMR が、DynamoDB に接続するための組み込み機能を備えた非常にスケーラブルなソリューションであるためです。アプリケーションを変更してすべての新しい項目に TTL 属性を追加した後、この Amazon EMR ジョブを実行できます。

この記事では、EMR クラスターを作成し、Amazon EMR 内で Hive クエリを実行して、TTL 属性を持たない項目に TTL 属性をバックフィルする方法を示します。各項目にすでに存在する別のタイムスタンプ属性を使用して、項目ごとに新しい TTL 属性を計算します。

DynamoDB スキーマ

はじめに、次の属性を持つ単純なテーブルを作成します。

  • pk – パーティションキー。これは、汎用一意識別子 (UUID) 形式の文字列です。
  • creation_timestampISO 8601 形式で項目の作成タイムスタンプを表す文字列。
  • expiration_epoch_time – エポックを起点に項目の有効期限を秒単位で表す数値。これは、creation_timestamp から 3 年後です

この記事では、400 万項目を持つ TestTTL というテーブルを使用しています。TTL を使用することにした後、これらの項目 100 万個が挿入されました。つまり、300 万個の項目に expiration_epoch_time 属性がありません。次のスクリーンショットは、TestTTL テーブルの項目のサンプルを示しています。

DynamoDB での Hive の動作方法上、このメソッドは、Hive INSERT OVERWRITE クエリの実行中に変更されない項目を安全に変更することができます。アプリケーションで expiration_epoch_time 属性が欠落している項目を変更している可能性がある場合は、クエリの実行中にアプリケーションのダウンタイムをとるか、(Hive および基になる emr-dynamodb-connector は実行しない) 条件式に基づく別の手法を用いる必要があります。詳細については、「条件式」を参照してください。

DynamoDB 項目の一部には、すでに expiration_epoch_time 属性が含まれています。また、3 年以上経過したデータに関する自身のルールに基づいて、項目の一部を期限切れと見なすことができます。AWS CLI の次のコードを参照してください。後で Hive クエリが実行されたときにこの項目を参照して、ジョブが期待どおりに動作したことを確認します。

aws dynamodb get-item --table-name TestTTL --key '{"pk" : {"S" : "02a8a918-69fd-4291-9b45-3802bf357ef8"}}'
{
    "Item": {
        "pk": {
            "S": "02a8a918-69fd-4291-9b45-3802bf357ef8"
        },
        "creation_timestamp": {
            "S": "2017-10-12T20:10:50Z"
        }
    }
} 

EMR クラスターの作成

クラスターを作成するには、以下の手順を実行します。

  1. Amazon EMR コンソールで、[クラスターの作成] を選択します。
  2. [クラスター名] には、「emr-ddb-ttl-update」など、クラスターの名前を入力します。
  3. オプションで、Amazon S3 ロギングフォルダを変更します。
    デフォルトの場所は、アカウント番号を使用するフォルダです。
  4. [ソフトウェア設定] セクションの [リリース] で、[emr-6.6.0] または入手可能な最新の Amazon EMR リリースを選択します。
  5. [アプリケーション] では、[Core Hadoop] を選択します。
    この設定には Hive が含まれており、TTL 属性を追加するために必要なものがすべて含まれています。
  6. [ハードウェア設定] セクションの [インスタンスタイプ] で、[c5.4xlarge] を選択します。
    このコアノード (Hive クエリが実行される場所) は、そのサイズのインスタンスが 1 分間に処理できる項目のおおよその数を測定します。
  7. Hive CLI を実行するにはマスターノードに SSH で接続する必要があるため、[EC2 キーペア] の [セキュリティとアクセス] セクションで、アクセス権のあるキーペアを選択します。
    これをさらに最適化し、より優れた費用対性能比を実現するには、高度なオプションに進み、マスターノードのインスタンスサイズを小さくする (m5.xlarge など) ことができます。これはコンピューティングを大規模に行うことを必要としておらず、クライアントがタスクを実行したり、Ganglia などの不要なサービスを無効にしたりするために使用しますが、これらの変更はこの記事では扱いません。EMR クラスターの作成の詳細については、「Amazon EMR によるビッグデータの分析」を参照してください。
  8. [クラスターの作成] を選択します。

Amazon EMR マスターノードへの SSH 接続

EMR クラスターを作成し、それが Waiting 状態になったら、そのクラスターのマスターノードに SSH 接続します。コンソールのクラスタービューで、SSH の手順を確認できます。EMR クラスターのマスターノードに SSH で接続する方法については、[概要] タブで [マスターパブリック DNS] の SSH リンクを選択してください。

マスターノードのセキュリティグループを編集して、IP アドレスからの SSH を許可する必要があるかもしれません。Amazon EMR コンソールは、[概要] タブのセキュリティグループにリンクしています。

詳細については、「Linux インスタンスの受信トラフィックの認証」を参照してください。

Hive CLI コマンドの実行

これで、マスターノードで Hive CLI を実行する準備ができました。

  1. データベースがないことを確認し、外部 DynamoDB テーブルをホストするデータベースを作成します。
    このデータベースは実際には EMR クラスターにデータを格納しません。今後のステップで EXTERNAL TABLE を作成すると、それが実際の DynamoDB テーブルへのポインターになります。Hive CLI プロンプトで実行するコマンドは、次のコードで太字で示されています。まず、Bash プロンプトで hive コマンドを実行します。

    # hive
    hive> show databases;
    OK
    default
    Time taken: 0.483 seconds, Fetched: 1 row(s)
    
    hive> create database dynamodb_hive;
    OK
    Time taken: 0.204 seconds
    
    hive> show databases;
    OK
    default
    dynamodb_hive
    Time taken: 0.035 seconds, Fetched: 2 row(s)
    
    hive> use dynamodb_hive;
    OK
    Time taken: 0.029 seconds
    hive>
  2. 次のコードを入力して、Hive で外部 DynamoDB テーブルマッピングを作成します (属性名とスキーマに一致するようにこれを調整してください)。
    hive> CREATE EXTERNAL TABLE ddb_testttl (creation_timestamp string, pk string, expiration_epoch_time bigint) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "TestTTL", "dynamodb.column.mapping" = "creation_timestamp:creation_timestamp,pk:pk,expiration_epoch_time:expiration_epoch_time");
    OK
    Time taken: 1.487 seconds
    hive>

    詳細については、「Hive で外部テーブルを作成する」を参照してください。

  3. expiration_epoch_time 属性を含まない項目がいくつ存在するかを調べるには、次のコードを入力します。
    hive> select count(*) from ddb_testttl where expiration_epoch_time IS NULL;
    Query ID = hadoop_20200210213234_20b0bc7a-bbb3-4450-82ac-7ecdad9b1e85
    Total jobs = 1
    Launching Job 1 out of 1
    Tez session was closed.Reopening...
    Session re-established.
    Status: Running (Executing on YARN cluster with App id application_1581025480470_0002)
    
    VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
    
    Map 1 .......... container SUCCEEDED 14 14 0 0 0 0
    Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
    
    VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 20.80 s
    
    OK
    3000000
    Time taken: 19.783 seconds, Fetched: 1 row(s)
    hive>

    このユースケースでは、Hive クエリで 300 万個の項目を更新し、それぞれに expiration_epoch_time 属性を追加する必要があります。

  4. Hive クエリを実行して、expiration_epoch_time 属性をそれが欠落している行に追加します。項目が挿入されてから 3 年後に期限切れにしたいので、3 年の秒数を作成タイムスタンプに追加します。
    .
    この追加を実現するには、creation_timestamp 文字列値を変更する必要があります (次のサンプルコードを参照)。Hive ヘルパー関数 unix_timestamp() は、文字列形式で保存された日付を Unix エポック以降の秒の整数に変換します。
    .
    ただし、ヘルパー関数は yyyy-MM-dd HH:mm:ss の形式の日付を想定していますがこれらの項目の日付形式は、yyyy-MM-ddTHH:mm:ssZ の ISO 8601 バリアントです日 (dd) と時間 (HH) の間の T を削除し、末尾の UTC タイムゾーンを表す Z を削除するように Hive に指示する必要があります。そのために、regex_replace() ヘルパー関数を使用して、creation_timestamp 属性を unix_timestamp() 関数に変更できます。
    .
    データ内の文字列の正確な形式によっては、この regex_replace() を変更する必要がある場合があります。詳細については、Apache Hive マニュアルの「日付関数」を参照してください。

    hive> INSERT OVERWRITE TABLE ddb_testttl SELECT creation_timestamp, pk, (unix_timestamp(regexp_replace(creation_timestamp, '^(.+?)T(.+?)Z$','$1 $2')) + (60*60*24*365*3)) FROM ddb_testttl WHERE expiration_epoch_time IS NULL;
    Query ID = hadoop_20200210215256_2a789691-defb-4d98-a1e5-84b5b12d3edf
    Total jobs = 1
    Launching Job 1 out of 1
    Tez session was closed.Reopening...
    Session re-established.
    Status: Running (Executing on YARN cluster with App id application_1581025480470_0003)
    
    
    VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
    
    Map 1 .......... container SUCCEEDED 14 14 0 0 0 0
    
    VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 165.88 s
    
    OK
    Time taken: 167.187 seconds

    クエリを実行した前述の結果では、このジョブのマップ 1 フェーズで単一のコアノードで合計 14 のマッパーを起動しました。これは、c5.4xlarge インスタンスに 16 個の vCPU があるため、Hive ジョブがそのほとんどを使用したためです。Hive ジョブの実行には 167 秒かかりました。

  5. クエリをチェックして、expiration_epoch_time 属性がまだ不足している項目の数を確認します。次のコードを参照してください。
    hive> select count(*) from ddb_testttl where expiration_epoch_time IS NULL;
    Query ID = hadoop_20200210221352_0436dc18-0676-42e1-801b-6bd6882d0004
    Total jobs = 1
    Launching Job 1 out of 1
    Status: Running (Executing on YARN cluster with App id application_1581025480470_0004)
    
    
    VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
    
    Map 1 .......... container SUCCEEDED 18 18 0 0 0 0
    Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
    
    VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 12.35 s
    
    OK
    0
    Time taken: 16.483 seconds, Fetched: 1 row(s)
    hive>

    ジョブの OK ステータスの後の 0 回答からわかるように、すべての項目が新しい属性で更新されています。たとえば、前に調べた次の単一項目のコードをご覧ください。

    aws dynamodb get-item --table-name TestTTL --key '{"pk" : {"S" : "02a8a918-69fd-4291-9b45-3802bf357ef8"}}'
    {
        "Item": {
            "pk": {
                "S": "02a8a918-69fd-4291-9b45-3802bf357ef8"
            },
            "creation_timestamp": {
                "S": "2017-10-12T20:10:50Z"
            },
            "expiration_epoch_time": {
                "N": "1602447050"
            }
        }
    }

    expiration_epoch_time 属性が 1602447050 の値を持つ項目に追加されました。これは、EpochConverter によると、2020 年 10 月 11 日日曜日の午後 8 時 10 分 50 秒に相応します。これは、項目の creation_timestamp からちょうど 3 年後になります。

サイズ設定とテストに関する考慮事項

このユースケースでは、単一の c5.4xlarge EMR コアインスタンスを使用して Hive クエリを実行しました。そしてインスタンスは 400 万のドキュメントをスキャンし、そのうち 300 万のドキュメントを約 3 分間で変更しました。デフォルトでは、Hive は DynamoDB テーブルの読み取りおよび書き込み容量の半分を消費して、Hive ジョブの実行中に運用プロセスが機能できるようにします。EMR クラスターで適切なコアまたはタスクインスタンスの数を選択し、Hive で使用できる DynamoDB 容量を適切な割合に設定して、テーブルのプロビジョニングのために選択した容量を使いすぎて本番ワークロードでスロットリングが発生しないようにする必要があります。Hive DynamoDB 容量の調整の詳細については、「DynamoDB のプロビジョニングされたスループット」を参照してください。

ジョブをより速く実行するには、テーブルがプロビジョニングされた容量モードを使用していることを確認し、Hive クエリの実行中にプロビジョニングされた RCU と WCU を一時的に増やします。これは、通常の操作で大量のデータがあり、テーブルのスループットが低い場合に特に重要です。Hive クエリが完了したら、プロビジョニングされた容量をスケールダウンするか、テーブル用のオンデマンド容量に戻すことができます。

さらに、EMR クラスター内のコアインスタンスまたはタスクインスタンスの数を増やすか、異なるインスタンスタイプを使用して、Hive タスクの並列処理を増やします。Hive は、クラスター内の vCPU ごとに約 1 つのマッパータスクを起動します (いくつかの vCPU はシステムのために予約されています)。たとえば、前述の Hive クエリを 3 つの c5.4xlarge EMR コアインスタンスで実行すると、46 のマッパーが使用され、クエリの実行時間が 3 分から 74 秒に短縮されます。10 個の c5.4xlarge インスタンスでクエリを実行すると、158 個のマッパーが使用され、クエリの実行時間が 24 秒に短縮されます。コアノードとタスクノードの詳細については、「マスターノード、コアノード、タスクノードについて」を参照してください。

フルデータセットに対して Hive クエリをテストするオプションの 1 つに、[DynamoDB オンデマンドバックアップおよび復元] を使用してテーブルの一時コピーを作成することがあります。その一時コピーに対して Hive クエリを実行して、本番テーブルに対してクエリを実行する前に、適切な EMR クラスターサイズを決定できます。ただし、オンデマンドバックアップと復元を実行するにはコストがかかります。

さらに、中断が発生した場合は、この Hive クエリを安全に再起動できます。なんらかの理由でジョブが早期に終了した場合は、TTL 属性が欠落している行にのみ変更を加えるため、いつでもクエリを再起動できます。ただし、再起動すると、各 Hive クエリがテーブルスキャンを再起動するため、読み取りキャパシティーユニットが余分に消費されます。

クリーンアップ

コストが不必要に発生しないようにするために、そのリソースが不要になった場合は、Hive クエリの実行後に EMR クラスターを終了することを忘れないでください。

まとめ

Hive クエリは、新しい計算された属性を、その属性が欠落している DynamoDB 項目に簡単で柔軟な方法で追加することができます。すでに存在する項目の TTL 属性をバックフィルする必要がある場合は、今すぐご利用を開始しましょう。

 


著者について

 

Chad Tindel はニューヨーク勤務の DynamoDB スペシャリストソリューションアーキテクトです。Chad は大企業のお客様と連携して DynamoDB ベースのソリューションの評価、設計、およびデプロイメントを行っています。Amazon に入社する前は、Red Hat、Cloudera、MongoDB、および Elastic で同じような役割を担っていました。

 

 

 

Andre Rosa は、アマゾン ウェブ サービスのパートナートレーナーです。彼は IT 業界で 20 年務めてきた経験があり、主にデータベースの分野を専門にしています。AWS トレーニングと認定チームのメンバーとして、彼は AWS パートナーがクラウドへの移行の過程でプロジェクトに取り組む際に学習し、知識を共有することに情熱を傾けています。

 

 

 

Suthan Phillips は AWS のビッグデータアーキテクトです。Suthan はアーキテクチャ面でのガイダンスを提供するためにお客様と連携し、Amazon EMR 上の複雑なアプリケーションのパフォーマンス強化を達成するお手伝いをしています。余暇には、太平洋岸北西地区でのハイキングと探検を楽しんでいます。