Amazon Web Services ブログ

cqlsh を使用して Amazon MCS にデータをロードする

Cassandra Query Language Shell (cqlsh) は、CQL コマンドの実行や、テーブルの作成や変更などのデータベース管理タスクの実行に使用できるオープンソースのコマンドラインシェルです。cqlsh を使用して、CSV ファイルから Amazon MCS テーブルにデータをロードすることにより、Cassandra と互換性があり、スケーラブルで可用性の高いマネージド型データベースである Amazon Managed Apache Cassandra Service (MCS) の使用を開始できます。詳細については、Amazon Managed Apache Cassandra Service とは何ですか? を参照してください。

この投稿では、cqlsh COPY コマンドを使用することによって、cqlsh を使用し、Amazon MCS テーブルにデータをロードする方法について説明します。また、データを準備するためのベストプラクティスと、COPY コマンドのパラメータを使用してデータ転送のパフォーマンスを調整する方法についても説明します。最後に、この投稿では、Amazon MCS テーブルの読み取り/書き込みスループット設定を構成して、データロードプロセスを最適化する方法について説明します。

前提条件

開始する前に、Amazon MCS キースペースおよびテーブル用の AWS アカウントが必要です。プログラムによって接続し、cqlsh を正しくセットアップしていることを確認してください。手順については、プログラムによる Amazon Managed Apache Cassandra Service への接続を参照してください。

この投稿のサンプルを実行するには、データを含む CSV ファイルが必要です。この投稿では、その CSV ファイルを export_keyspace_table.csv と呼んでいますが、別の名前に置き換えることができます。CSV ソースデータファイルの列ヘッダーがターゲットテーブルの列名と一致していることを確認してください。一致しない場合は、CSV ヘッダーをテーブルの列にマッピングする必要がありますが、この投稿では扱いません。

次のコードは、一致するテーブルの列を作成します。

CREATE TABLE "ks.blogexample" 
("id" int PRIMARY KEY, "columnvalue" text);

cat example.csv 
id,columnvalue
1,value1
2,value2
3,value3

cqlsh COPY TO からソース CSV ファイルを作成する

ソースデータが Apache Cassandra データベースにある場合は、cqlsh COPY TO コマンドを使用して CSV ファイルを生成できます。次のコードを参照してください。

cqlsh localhost 9042 -u "cassandra" -p "cassandra" —execute "COPY keyspace.table TO 'export_keyspace_table.csv' WITH HEADER=true"

別のソースから CSV を使用する

別のデータソースを使用して CSV を作成している場合は、最初の行に列名が含まれていること、およびデータがカンマで区切られていることを確認してください (必須ではありませんが、これにより cqlsh COPY のデフォルト設定を使用できます)。

さらに、すべてのデータ値が有効な Cassandra のデータ型であることを確認してください。詳細については、Apache Cassandra のウェブサイトのデータ型を参照してください。

ターゲットテーブルを作成する

Apache Cassandra が元のデータソースである場合、Amazon MCS テーブルを作成 (および CSV ヘッダーが一致することを確認) する簡単な方法は、ソーステーブルから CREATE TABLE ステートメントを生成することです。次のコードを参照してください。

cqlsh localhost 9042 -u "cassandra" -p "cassandra" —execute "DESCRIBE TABLE keyspace.table;”

まだ作成していない場合は、Amazon MCS でキースペースとテーブルを作成します。このテーブルを送信先で使用します。

データセットを準備する

効率的な転送のためにソースデータを準備するには、ランダム化する必要があります。次に、cqlsh パラメータ値とテーブル設定を決定するために、データを分析します。

データをランダム化する

cqlsh COPY FROM コマンドは、CSV ファイルに表示されるのと同じ順序でデータを読み書きします。cqlsh COPY TO コマンドを使用してソースファイルを作成すると、データはキーでソートされた順序で CSV に書き込まれます。内部的に、Amazon MCS はパーティションキーを使用してデータをパーティション分割します。Amazon MCS には、同じパーティションキーに対するリクエストのロードバランスを支援する組み込みロジックがありますが、異なるパーティションへの書き込みの組み込みロードバランシングを利用できるため、順序をランダム化すると、データのロードがより速く効率的になります。

書き込みをパーティション全体に均等に分散するには、ソースファイルのデータをランダム化する必要があります。これを行うアプリケーションを作成するか、Shuf などのオープンソースツールを使用できます。Shuf は、Linux ディストリビューションで、homebrew に coreutils をインストールして macOS で、および Windows Subsystem for Linux (WSL) を使用することで Windows で、それぞれ自由に利用できます。

Linux または Windows でソースファイルをランダム化するには、次のコードを入力します。

shuf -o keyspace.table.csv< export_keyspace.table.csv

macOS でソースファイルをランダム化するには、次のコードを入力します。

gshuf -o keyspace.table.csv< export_keyspace.table.csv

Shuf は、データを keyspace.table.csv という新しい CSV ファイルに書き換えます。export_keyspace.table.csv ファイルを削除できます。もう必要ありません。

データを分析する

データを分析して、行の平均サイズと最大サイズを決定します。これを行う理由は次の 2 つです。

  • 行の平均サイズは、転送するデータの量を決定するのに役立ちます
  • すべての行のサイズが Amazon MCS の最大行サイズである 1 MB 未満であることを確認できます。

このクォータは、パーティションサイズではなく、行サイズを参照します。Apache Cassandra とは異なり、Amazon MCS パーティションのサイズは、事実上無制限にすることができます。さらに、パーティションキーとクラスタリング列には、インデックスを作成するための追加のストレージが必要です。これを行の RAW サイズに追加する必要があります。詳細については、クォータを参照してください。

1 MB を超える行の処理はこの投稿の焦点ではありませんが、そのような行がある場合は、次のオプションのいずれかを試すことができます。

  • データを小さな行に分割します (アクセスパターンによっては、複数のテーブルを使用する必要がある場合があります)
  • Amazon S3 などのオブジェクトストアにデータを保存し、Amazon MCS でオブジェクトへの参照を維持します

次のコードは、AWK を使用して CSV ファイルを分析し、行の平均サイズと最大サイズを出力します。

awk -F, 'BEGIN {samp=10000;max=-1;}{if(NR>1){len=length($0);t+=len;avg=t/NR;max=(len>max ? len : max)}}NR==samp{exit}END{printf("{lines: %d, average: %d bytes, max: %d bytes}\n",NR,avg,max);}' keyspace.table.csv

次の出力が表示されます。

using 10,000 samples:
{lines: 10000, avg: 123 bytes, max: 225 bytes}

テーブルスループットキャパシティーモードを設定する

Amazon MCS では、使用したリソースに対してのみ支払います。Amazon MCS は、オンデマンドとプロビジョニングの 2 つのスループットキャパシティーモードを提供します。オンデマンドモードでは、アプリケーションが実行する実際の読み取りと書き込みに基づいて支払います。プロビジョニングされたキャパシティーを使用すると、アプリケーションがテーブルから 1 秒あたりに読み書きできるデータ量を事前に設定することで、読み取りと書き込みのコストを最適化できます。どちらのモードでも、テーブルにデータをロードできます。詳細については、読み取り/書き込みキャパシティーモードを参照してください。

この投稿では、設定された時間範囲内でデータをロードするように cqlsh を調整する方法について説明します。事前に実行する読み取りと書き込みの数がわかっているため、プロビジョニングされたキャパシティーモードを使用します。データ転送が完了したら、アプリケーションのトラフィックパターンに一致するようにキャパシティーモードを設定する必要があります。

プロビジョニングされたキャパシティーモードでは、テーブルにプロビジョニングする読み取りと書き込みのキャパシティーを事前に指定します。書き込みキャパシティーは 1 時間ごとに請求され、書き込みキャパシティー単位 (WCU) で計測されます。各 WCU は、毎秒 1 KB のデータの書き込みをサポートするのに十分な書き込みキャパシティーです。データをロードするとき、書き込みレートは、ターゲットテーブルに設定された最大 WCU (パラメータ: write_capacity_units) 未満である必要があります。デフォルトでは、1 つのテーブルに最大 40,000 WCU を、アカウントのすべてのテーブル全体で最大 80,000 WCU を、それぞれプロビジョニングできます。追加のキャパシティーが必要な場合は、AWS サポートを通じてクォータの増加をリクエストできます。

挿入に必要な WCU の平均数を計算する

1 秒あたり 1 KB のデータを挿入するには、1 WCU が必要です。CSV ファイルに 360,000 行があり、すべてのデータを 1 時間でロードする場合は、毎秒 100 行を書き込む必要があります (360,000 行/60分/60秒 = 100 行/秒)。各行に最大 1 KB のデータがある場合、1 秒あたり 100 行を挿入するには、テーブルに 100 WCU をプロビジョニングする必要があります。各行に 1.5 KB のデータがある場合、1 秒あたり 1 行を挿入するには 2 WCU が必要です。したがって、1 秒あたり 100 行を挿入するには、200 WCU をプロビジョニングする必要があります。

1 秒あたり 1 行を挿入する場合に必要な WCU 数を知るには、バイト単位の平均行サイズを 1024 で割り、最も近い整数に切り上げます。

たとえば、平均行サイズが 3000 バイトの場合、1 秒あたり 1 行を挿入するには 3 WCU が必要です。

ROUNDUP(3000 / 1024) = ROUNDUP(2.93) = 3 WCU

データロード時間とキャパシティーを計算する

CSV ファイルの行の平均サイズと数がわかったので、指定した時間内にデータを読み込むために必要な WCU 数と、さまざまな WCU 設定を使用して CSV 内のすべてのデータを読み込むためにかかるおおよその時間を計算できます。

たとえば、ファイルの各行が 1 KB で、CSV ファイルに 1,000,000 行ある場合、1 時間でデータをロードするには、その時間に少なくとも 278 WCU をテーブルにプロビジョニングする必要があります。

1,000,000 行 * 1 KB = 1,000,000 KB
1,000,000 KB/3600 秒 = 277.8 KB/秒 = 278 WCU

プロビジョニングされたキャパシティー設定を構成する

テーブルを作成するとき、または ALTER TABLE コマンドを使用して、テーブルの書き込みキャパシティーを設定できます。ALTER TABLE コマンドを使用してテーブルのプロビジョニングされたキャパシティー設定を変更するための構文は次のとおりです。

ALTER TABLE keyspace.table WITH custom_properties={'capacity_mode':{'throughput_mode': 'PROVISIONED', 'read_capacity_units': 100, 'write_capacity_units': 278}} ; 

詳細については、Amazon MCS 開発者ガイドの「ALTER TABLE」を参照してください。

cqlsh COPY 設定を構成する

ここで、cqlsh COPY のパラメータ値を決定します。cqlsh COPY コマンドは、以前に準備した CSV ファイルを読み取り、CQL を使用してデータを Amazon MCS に挿入します。cqlsh は行を分割し、INSERT 操作を一連のワーカーに分散します。各ワーカーは Amazon MCS との接続を確立し、このチャネルに沿って INSERT リクエストを送信します。cqlsh COPY コマンドには、ワーカー間で作業を均等に分散するための内部ロジックはありませんが、手動で設定して、作業が均等に分散されるようにすることができます。主要な cqlsh パラメータを確認することから始めます。

  • DELIMITER – コンマ以外の区切り文字を使用した場合、このパラメータを設定できます。デフォルトはコンマです。
  • INGESTRATEcqlsh COPY が 1 秒間に処理しようとする目標行数。設定されていない場合、デフォルトは 100,000 です。
  • NUMPROCESSES – 並列リクエストを処理するために cqlsh が作成する子ワーカープロセスの数。これを最大で num_cores-1 に設定できます。ここで、num_cores は、cqlsh を実行しているホスト上の処理コアの数です。
  • MINBATCHSIZE および MAXBATCHSIZE – ワーカーが作業のチャンクを受け取るとき、バッチサイズは、1 つの作業のチャンクを構成する挿入の数を決定します。設定しない場合、cqlsh は 20 のバッチを使用します。これは、1 つのチャンクが 20 の挿入に等しいことを意味します。
  • CHUNKSIZE – 子ワーカーに渡す作業単位のサイズ。デフォルトでは、1,000 に設定されています。これは、ワーカーが作業のチャンクを (行数で) CHUNKSIZE * MAXBATCHSIZE として受け取ることを意味します。デフォルトでは、各ワーカーには 20,000 行が送信されます。
  • MAXATTEMPTS – 失敗したワーカーチャンクを再試行する最大回数。最大試行回数に達すると、失敗したレコードが新しい CSV ファイルに書き込まれ、失敗を調査した後で再度実行できます。

ターゲットの送信先テーブルにプロビジョニングした WCU 数に基づいて INGESTRATE を設定します。COPY コマンドの INGESTRATE は制限ではなく、ターゲットとする平均です。つまり、設定した数を超えるとバーストする可能性があります (そして、多くの場合はバーストします)。バーストを許容し、データロードリクエストを処理するのに十分なキャパシティーがあることを確認するには、INGESTRATE をテーブルの書き込み容量の 90% に設定します。

INGESTRATE = WCU * 0.90

次に、NUMPROCESSES パラメータをシステムのコア数より 1 少ない値に設定します。たとえば、16 個のコンピューティングコアを持つホストからデータロードを実行している場合は、NUMPROCESSES = 15 に設定します

各プロセスはワーカーを作成し、各ワーカーは Amazon MCS への接続を確立します。Amazon MCS は、すべての接続で 1 秒あたり最大 3,000 の CQL リクエストをサポートできます。つまり、各ワーカーが毎秒 3,000 未満のリクエストを処理するようにする必要があります。INGESTRATE と同様に、ワーカーは多くの場合、設定した数を超えるとバーストし、クロック秒数による制限はありません。したがって、バーストを許容するには、cqlsh パラメータを設定して、1 秒あたり 2,500 リクエストを処理する各ワーカーをターゲットにします。ワーカーに分配される作業量を計算するには、INGESTRATENUMPROCESSES で割ります。INGESTRATE/NUMPROCESSES が 2,500 を超える場合は、INGESTRATE を下げて、INGESTRATE/NUMPROCESSES <= 2,500 の式が真になるようにします。

この投稿では、NUMPROCESSES が 4 (デフォルト) に設定されていることを前提とするため、データロードの処理に使用できるワーカーが 4 つあることになりますcqlsh は、CHUNKSIZE * MAXBATCHSIZE の式を使用して作業のチャンク (INSERT ステートメント) を作成し、ワーカーに配布します。cqlsh はワーカー間で作業を均等に分散しないため、ワーカーがアイドル状態にならないように、CHUNKSIZEMAXBATCHSIZE、および INGESTRATE を設定する必要があります。

次のコードは主にデフォルトを使用しており、アイドルワーカーがあります。

INGESTRATE = 10,000
NUMPROCESSES = 4 (default)
CHUNKSIZE = 1,000 (default)
MAXBATCHSIZE. = 20 (default)
Work Distribution:
Connection 1 / Worker 1 : 10,000 Requests per second
Connection 2 / Worker 2 : 0 Requests per second
Connection 3 / Worker 3 : 0 Requests per second
Connection 4 / Worker 4 : 0 Requests per second

上記のサンプルコードでは、最初のワーカーがすべての作業を取得し、他のワーカーはアイドル状態です。これは、CHUNKSIZE (1,000) * MAXBATCHSIZE (20) = 20,000 であり、INGESTRATE (10,000) よりも大きいためです。これらの設定により、各ワーカーは 20,000 行のチャンクを処理するように設定されます。cqlsh は、INGESTRATE 設定に基づいて、一度に 10,000 行をプルするように設定されています。cqlsh が CSV ファイルから 10,000 行をプルバックすると、最初のワーカーは最大 20,000 行を要求するため、cqlsh は 10,000 行すべてを最初のワーカーに送信し、残りのワーカーに作業を残しません。ワークロードが不均衡であることに加えて、最初のワーカーは最大 3,000 リクエスト/秒をはるかに超えています。

入力パラメータを変更することで、ワーカー全体に負荷を均等に分散し、各ワーカーを最適な毎秒 2,500 リクエストの速度に保つことができます。次のコードを参照してください。

INGESTRATE = 10,000
NUMPROCESSES = 4 (default)
CHUNKSIZE = 100
MAXBATCHSIZE. = 25
Work Distribution:
Connection 1 / Worker 1 : 2,500 Requests per second
Connection 2 / Worker 2 : 2,500 Requests per second
Connection 3 / Worker 3 : 2,500 Requests per second
Connection 4 / Worker 4 : 2,500 Requests per second

データロード中のネットワークトラフィックの使用率を最適化するには、MAXBATCHSIZE の値を最大値の 30 に近い値に設定します。CHUNKSIZE を 100 に、MAXBATCHSIZE を 25 に変更すると、各ワーカーは 2,500 行 (100 * 25) を受け取るようになります。つまり、10,000 行が 4 つのワーカーに均等に分散されます (10,000/2500 = 4)。

要約すると、cqlsh COPY パラメータを設定するときは、次の式を使用します。

  • INGESTRATE = write_capacity_units * .90
  • NUMPROCESSES = 1-16 or num_of_cores -1
  • INGESTRATE / NUMPROCESSES <= 2,500 (this must be a true statement)
  • MAXBATCHSIZE <= 30 (defaults to 20; MCS accepts batches up to 30)
  • CHUNKSIZE = (INGESTRATE / NUMPROCESSES) / MAXBATCHSIZE

NUMPROCESSESINGESTRATEおよび CHUNKSIZE を計算したので、データをロードする準備が整いました。

cqlsh COPY FROM コマンドを実行する

cqlsh COPY FROM コマンドを実行するには、以下の手順を実行します。

  1. cqlsh 経由で Amazon MCS に接続します。
  2. 次のコードでキースペースに切り替えます。
    use keyspace_name;
  3. 書き込み整合性を LOCAL_QUORUM に設定します (データの耐久性のため、Amazon MCS では他の書き込み整合性設定を許可していません)。次のコードを参照してください。
    CONSISTENCY LOCAL_QUORUM;
  4. cqlsh COPY FROM 構文を準備します。次のサンプルコードを参照してください。

    COPY yourmcstablename FROM ‘./keyspace.table.csv' WITH HEADER=true AND INGESTRATE=<calculated ingestrate> AND NUMPROCESSES=<calculated numprocess> AND MAXBATCHSIZE=20 AND CHUNKSIZE=<calculated chunksize>;

    cqlsh は、設定したすべての設定をエコーバックします。

  1. 設定が入力と一致していることを確認します。次のコードを参照してください。
    Reading options from the command line: {'chunksize': '120', 'header': 'true', 'ingestrate': '36000', 'numprocesses': '15', 'maxbatchsize': '20'}
    Using 15 child processes

    cqlsh は、転送した行数と現在の平均レートを出力します。次のコードを参照してください。

    Processed: 57834 rows; Rate: 6561 rows/s; Avg. rate: 31751 rows/s

    次に、cqlsh は完了するまでファイルを処理し、データロード統計の概要 (読み取ったファイルの数、ランタイム、スキップされた行) を提供します。次のコードを参照してください。

    15556824 rows imported from 1 files in 8 minutes and 8.321 seconds (0 skipped).

    これで、Amazon MCS にデータが読み込まれました。

クリーンアップ

データを転送したので、アプリケーションの通常のトラフィックパターンに一致するようにキャパシティーモードの設定を調整します。プロビジョニングされたキャパシティーを変更するまで、時間単位の料金が発生します。

ソース CSV ファイルのディレクトリを確認します。データロード中にスキップされた行がある場合、それらは import_yourcsvfilename.err.timestamp.csv という名前の新しい CSV ファイルに書き込まれます。そのファイルが存在し、その中にデータがある場合、これらの行は Amazon MCS に転送されませんでした。これらの行を再試行するためにプロセスを再実行できます。他の理由でエラーが発生した場合は、再試行する前にデータを調整してください。

一般的なエラー

行が読み込まれない最も一般的な理由は、キャパシティーエラーと解析エラーです。

cqlsh クライアントがサーバーから任意のタイプの 3 つの連続したエラーを受信した場合、次のコードが表示されます。

Failed to import 1 rows: NoHostAvailable - , will retry later, attempt 3 of 100

次に、クライアントは接続の再確立を試みます。

キャパシティーエラーを解決する

次のコードはキャパシティーエラー (WriteTimeout) です。

Failed to import 1 rows: WriteTimeout - Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 2, 'write_type': 'SIMPLE', 'consistency': 'LOCAL_QUORUM'}, will retry later, attempt 1 of 100

Apache Cassandra は、一連のノードで実行するように設計されたクラスターベースのソフトウェアであるため、スループットキャパシティーなどのサーバーレス機能に関連する例外メッセージはありません。ほとんどのドライバーは、Apache Cassandra で利用可能なエラーコードのみを理解するため、Amazon MCS は同じ一連のエラーコードを使用して互換性を維持します。

たとえば、Amazon MCS は ReadTimeout および WriteTimeout の例外を使用して、不十分なスループットキャパシティーが原因で書き込みリクエストが失敗したことを示します。キャパシティー不足の例外を診断するために、Amazon MCS は Amazon CloudWatchWriteThrottleEvents および ReadThrottledEvents メトリクスを発行します。

データロード中のキャパシティー不足のエラーを解決するには、ワーカーあたりの書き込み率または総取り込み率を下げて、行を再試行します。

解析エラーを解決する

次のコードは解析エラー (ParseError) です。

Failed to import 1 rows: ParseError - Invalid ... – 

インポートするデータがテーブルスキーマと一致していることを確認してください。cqlsh は、解析エラーのある行を CSV ファイルに書き込みます。そのファイルからデータを取得し、その単一行に INSERT ステートメントを使用して、問題をよりよく確認できるようにします。

まとめ

この投稿では、cqlsh COPY コマンドを使用してデータをロードするように cqlsh と Amazon MCS を設定する方法を示し、データ転送を実行するためのベストプラクティスについて説明しました。詳細については、Amazon Managed Apache Cassandra Service とは何ですか? を参照してください。

ご質問やご意見はコメント欄にお寄せください。

 


著者について

 

Steve Mayszak 氏は、アマゾン ウェブ サービスのソフトウェア開発マネージャーです。

 

 

 

 

Michael Raney 氏は、アマゾン ウェブ サービスのソリューションアーキテクトです。