Amazon Web Services ブログ

AWS KMSを使用してAmazon Kinesisレコードを暗号化および復号する

コンプライアンスやデータセキュリティの要件が厳しいお客様は、AWSクラウド内での保存中や転送中など、常にデータを暗号化する必要があります。この記事では、保存中や転送中もレコードを暗号化しながら、Kinesisを使用してリアルタイムのストリーミングアプリケーションを構築する方法を示します。

Amazon Kinesisの概要

Amazon Kinesisプラットフォームを使用すると、要求に特化したストリーミングデータを分析または処理するカスタムアプリケーションを構築できます。 Amazon Kinesisは、ウェブサイトクリックストリーム、金融取引、ソーシャルメディアフィード、ITログ、トランザクショントラッキングイベントなど、何十万ものソースから1時間につき数テラバイトのデータを連続的にキャプチャして保存できます。
Amazon Kinesis Streamsは、HTTPSを使用してクライアント間でデータを暗号化し、転送されているレコードの盗聴を防止します。ただし、HTTPSで暗号化されたレコードは、データがサービスに入ると解読されます。このデータは24時間保管され(最大168時間まで延長可能)、アプリケーションの処理、再処理、処理遅延の際の巻き取りに対して十分なゆとりが確保されています。

ウォークスルー

Amazon Kinesis Producer Library(KPL)、Kinesis Consumer Library(KCL)、AWS KMSaws-encryption-sdkを使用してサンプルKinesisプロデューサおよびコンシューマアプリケーションへの暗号化と復号を行います。この記事で使用されているKinesisレコードの暗号化と復号に使用される方法とテクニックは、あなたのアーキテクチャに簡単に再現できます。いくつか制約があります:

  • AWSは、暗号化と復号のためのKMS APIリクエストの使用料金を請求します。詳しくは、「AWS KMSの料金」を参照してください。
  • Amazon Kinesis Analyticsを使用して、このサンプルアプリケーションのクライアントによって暗号化されたレコードのAmazon Kinesis Streamにクエリすることはできません。
  • アプリケーションでレイテンシの低い処理が必要な場合は、レイテンシに多少の上乗せがあることに注意してください。

次の図は、ソリューションのアーキテクチャを示しています。

プロデューサでレコードを暗号化する

PutRecordまたはPutRecords APIを呼び出す前に、KinesisEncryptionUtils.toEncryptedStringを呼び出すことによって文字列レコードを暗号化します。
この例では、サンプルの株式売買銘柄オブジェクトを使用しました。

example {"tickerSymbol": "AMZN", "salesPrice": "900", "orderId": "300", "timestamp": "2017-01-30 02:41:38"}. 

メソッド(KinesisEncryptionUtils.toEncryptedString)の呼び出しには、次の4つのパラメータがあります。

  • amazonaws.encryptionsdk.AwsCrypto
  • 株式売買銘柄オブジェクト
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • 暗号化コンテキスト用のutil.Map

暗号文はメイン呼び出し側に返され、KinesisEncryptionUtils.calculateSizeOfObjectを呼び出すことによってサイズもチェックされます。暗号化はオブジェクトのサイズを大きくします。オブジェクトがスロットルされないように、ペイロードのサイズ(1つ以上のレコード)は、1MBを超えないように検証されます。この例では、ペイロードが1MBを超える暗号化レコードサイズがワーニングとして記録されます。サイズが制限値より小さい場合は、それぞれKPLまたはKinesis Streams APIを使用している場合は、addUserRecordまたはPutRecordとPutRecordsが呼び出されます

例:KPLでレコードを暗号化する

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);
log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
   log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encrypted record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
//Adding the encrypted record to stream
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);
Futures.addCallback(f, callback);

上記のコードでは、株式売買銘柄レコードのサンプルがKinesisEncryptionUtils.toEncryptedStringに渡され、暗号化されたレコードが返されます。 encryptedRecord値もKinesisEncryptionUtils.calculateSizeOfObjectに渡され、暗号化されたペイロードのサイズが返され、1MB未満であるかどうかが確認されます。そうであれば、ペイロードはUTF-8でエンコードされ(KinesisEncryptionUtils.toEncryptedByteStream)、処理のためにストリームに送信されます。

例:Streams PutRecordを使用してレコードを暗号化する

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
    log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encryptyed record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
//putting the record into the stream
kinesis.putRecord(putRecordRequest);

レコードが暗号化されていることを確認する

KinesisEncryptionUtils.toEncryptedStringを呼び出した後、UTF-8エンコーディングの直前に暗号化された文字列レコードを出力することができます。このサンプル・アプリケーションの実行時に標準出力に出力される内容の例を以下に示します。

[main] INFO kinesisencryption.streams.EncryptedProducerWithStreams - String Record is TickerSalesObject{tickerSymbol='FB', salesPrice='184.285409142', orderId='2a0358f1-9f8a-4bbe-86b3-c2929047e15d', timeStamp='2017-01-30 02:41:38'} and Encrypted Record String is AYADeMf6zmVg9JvIkGNv5M39rhUAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFpUkpCaG1UOFQ3UTZQZ253dm9FSU9iUDZPdE1xTHdBZ1JjNlZxN2doMDZ3QlBEWUZndWdJSEFKaXNvT0ZPUGsrdz09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDCCYBk+hfB3tOGVx7QIBEIA7FqaEcOWpic+gKNeT+dUe4yttB9dsZSFPAUTlz2L2zlyLXSLMh1otRH24SO485ov+TCTtRCgiA8a9rYQCAAAAAAwAABAArlGWPO8BavNSJIpJOtJekRUhOwbM+WM1NBVXB/////8AAAABXNZnRND3J7u8EZx3AAAAkfSxVPMYUv0Ovrd4AIUTmMcaiR0Z+IcJNAXqAhvMmDKpsJaQG76Q6pYExarolwT+6i87UOi6TGvAiPnH74GbkEniWe66rAF6mOra2JkffK6pBdhh95mEOGLaVPBqs2jswUTfdcBJQl9NEb7wx9XpFX8fNDF56Vly7u6f8OQ7lY6fNrOupe5QBFnLvwehhtogd72NTQ/yEbDDoPKUZN3IlWIEAGYwZAIwISFw+zdghALtarsHSIgPMs7By7/Yuda2r3hqSmqlCyCXy7HMFIQxHcEILjiLp76NAjB1D8r8TC1Zdzsfiypi5X8FvnK/6EpUyFoOOp3y4nEuLo8M2V/dsW5nh4u2/m1oMbw=

また、getRecords APIコールの直後にUTF-8でデコードされた受信レコードを出力することで、レコードがStreamsで暗号化されたままであることを確認することもできます。サンプルアプリケーションを実行しているときのプリントアウトの例を以下に示します。

[Thread-2] INFO kinesisencryption.utils.KinesisEncryptionUtils - Verifying object received from stream is encrypted. -Encrypted UTF-8 decoded : AYADeBJz/kt7Fm3L1lvS8Wy8jhAAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFrM2N4K2s1ODJuOGVlNWF3TVJ1dk1UUHZQc2FHeGoxQisxb09kNWtDUExHYjJDS0lMZW5LSnlYakRmdFR4dzQyUT09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDAGI3oWLlIJ2p6kffQIBEIA7JVUOTsLtEyNK8vS4GIS9iyTejuB2xhIpRXfG8o0lUfHawcrCbNbNH8XLm/8RW5JbgXo10EpOs8dSjkICAAAAAAwAABAAy64r24sGVKWN4C1gXCwJYHvZkLpJJj16SZlhpv////8AAAABg2pPFchIiaM7D9VuAAAAkwh10ul5sZQ08KsgkFszOOvFoQu95CiY7cK8H+tBloVOZglMqhhhvoIIZLr9hmI8/lQvRXzGDdo7Xkp0FAT5Jpztt8Hq/ZuLfZtNYIWOw594jShqpZt6uXMdMnpb/38R3e5zLK5vrYkM6NS4WPMFrHsOKN5tn0CDForgojRcdpmCJ8+cWLNltb2S+EJiWiyWS+ibw2vJ/RFm6WZO6nD+MXn3vyMAZzBlAjAuIUTYL1cbQ3ENxDIeXHJAWQguNPqxq4HgaCmCEI9/rn/GAKSc2nT9ln3UsVq/2dgCMQC7yNJ3DCTnppavfxTbcVS+rXaDDpZZx/ZsluMqXAFM5/FFvKRqr0dVML28tGunxmU=

コンシューマでのレコードの復号

レコードをコンシューマがリストとして受け取ったら、record.getDataを呼び出すことでByteBufferとしてデータを取得できます。次に、KinesisEncryptionUtils.decryptByteStreamを呼び出すことによって、byteBufferをデコードおよび復号します。このメソッドは5つのパラメータをとります:

  • amazonaws.encryptionsdk.AwsCrypto
  • レコードByteBuffer
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • キーarn文字列
  • 暗号化コンテキスト用のjava.util.Map

株式売買銘柄オブジェクトの文字列表現は、後続の処理のために呼び出し元に返されます。この例では、この表現は単に標準出力に出力されます。

[Thread-2] INFO kinesisencryption.streams.DecryptShardConsumerThread - Decrypted Text Result is TickerSalesObject{tickerSymbol='AMZN', salesPrice='304.958313333', orderId='50defaf0-1c37-4e84-85d7-bc15597355eb', timeStamp='2017-01-30 02:41:38'}

例:KCLおよびStreams APIによるレコードの復号

ByteBuffer buffer = record.getData();
//Decrypting the encrypted record data
String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext());
log.info("Decrypted Text Result is " + decryptedResult);

上記のコードでは、Kinesisストリームのレコードは、以前はプロデューサ側で暗号化するために使用されていた同じキーARNおよび暗号化コンテキストを使用して復号されます。

Mavenの依存関係

この記事で概説した実装を使用するには、以下のpom.xmlでBouncy Castleライブラリと一緒に説明されているいくつかの依存関係を使用する必要があります。 Bouncy Castleは、Java用の暗号APIを提供します。

 <dependency>
        <groupId>org.bouncycastle</groupId>
        <artifactId>bcprov-ext-jdk15on</artifactId>
        <version>1.54</version>
    </dependency>
<dependency>
   <groupId>com.amazonaws</groupId>
   <artifactId>aws-encryption-sdk-java</artifactId>
   <version>0.0.1</version>
</dependency>

まとめ

上記のサンプルコードスニペットを組み込んだり、アプリケーションコードのガイドとして使用して、Amazon Kinesisストリームとの間でレコードの暗号化と復号を始めることができます。
完全なプロデューサとコンシューマのサンプルアプリケーションと、AWS上でAmazon Kinesisのプロデューサとコンシューマアプリケーションを暗号化されたレコードで開発する詳細な段階的な例は、kinesisencryption githubリポジトリから入手できます。
ご質問やご提案などがありましたら、ぜひご意見をお寄せ下さい。

原文:Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS(翻訳:半場光晴)