Amazon Web Services ブログ

AWS IoT SiteWise Edgeでデータを様々な場所に保存する方法

この記事は Philipp Sacha によって投稿された How to store data with AWS IoT SiteWise Edge in many locations を翻訳したものです。

はじめに

この記事では AWS IoT SiteWise と AWS IoT SiteWise Edge を使用して AWS IoT SiteWise のデータストアだけでなく、他の多くの場所にデータを保存する方法について解説します。デフォルトでは、データは AWS 上の AWS IoT SiteWise データストアに保存されます。

お客様は AWS IoT SiteWise を使用して OPC-UA データソースから産業用データを収集したいと考えています。しかし、すべてのお客様が AWS IoT SiteWise のデータストアにのみデータを保存したいわけではありません。このブログ記事では、 Amazon S3Amazon Timestream のような他のサービスにデータを格納する方法、またはお客様のオンプレミス環境でデータを活用する方法について解説します。

AWS IoT SiteWise は、産業機器からのデータを大規模に収集、モデル化、分析、可視化することができるマネージドサービスです。 AWS IoT SiteWise ゲートウェイは、産業機器からデータの収集を行い、クラウド上の AWS IoT SiteWise データストアにデータを格納します。

AWS IoT SiteWise Edge は、クラウド上の AWS IoT SiteWise の機能をお客様の施設環境に導入するものです。ローカルの AWS IoT SiteWise ゲートウェイ内でデータを処理し、 AWS IoT SiteWise ゲートウェイで提供されるローカル用の AWS IoT SiteWise Monitor ダッシュボードを使用して機器データを可視化することが可能となります。

デフォルトでは、データは AWS 上の AWS IoT SiteWise データストアに保存されるようになっています。

このブログ記事では、お客様が AWS IoT SiteWise Edge ゲートウェイでデータを収集しながら AWS IoT SiteWise データストア以外のデータストアにデータを保存する方法について解説します。

読了目安時間: 8分
学習レベル: 300
使用サービス: AWS IoT SiteWise Edge, AWS IoT Greengrass, Amazon Kinesis Data Streams, Amazon Timestream

ソリューション

AWS IoT SiteWise Edge ゲートウェイを AWS IoT Greengrass Version 2 にデプロイする

AWS IoT SiteWise Edge ゲートウェイを AWS IoT Greengrass Version 2 に導入する方法を説明します。

AWS IoT SiteWise Edge ゲートウェイは、 AWS IoT Greengrass Version 2 上のコンポーネントという形で動作します。 データ収集パック には SiteWiseEdgeCollectorOpcuaSiteWiseEdgePublisher という2つのコンポーネントが含まれています。データ処理パックには SiteWiseEdgeProcessor というコンポーネントが1つ含まれています。

データ収集パックは、お客様の産業データを収集し AWS の送信先にルーティングします。データ処理パックは、エッジ処理設定されたアセットモデルやアセットとのゲートウェイ上でのデータの取り扱いを可能にします。エッジでの処理設定により、ローカルで計算および処理するアセットデータを制御することができます。その後、データを AWS IoT SiteWise やクラウド上の他のAWSサービスに送信することができます。

次のスクリーンショットは、データ収集パックデータ処理パックをデプロイした AWS IoT Greengrass V2 のデプロイメントです。

図1: AWS IoT Greengrass V2 のデプロイメント

AWS IoT SiteWise のゲートウェイアーキテクチャの理解

AWS IoT SiteWise データストア以外の場所にデータを送信するには、まず AWS IoT SiteWise ゲートウェイ標準のアーキテクチャを理解する必要があります。

データが AWS IoT SiteWise データストアに取り込まれるまでの流れとしては、データは SiteWiseEdgeCollectorOpcua によって OPC-UA ソースから収集され、ゲートウェイ上の AWS IoT Greengrass ストリーム(デフォルトで SiteWise_Stream )に取り込まれます。 SiteWiseEdgePublisher は、ストリームからデータを読み出し、 AWS 上の SiteWise データストアに転送します。

図2:AWS IoT SiteWise ゲートウェイのアーキテクチャ

AWS IoT SiteWise ゲートウェイに送信先を設定し、様々な場所にデータを保存する

AWS IoT SiteWise データストア以外の宛先にデータを送信するには、ゲートウェイ構成で、SiteWiseEdgeCollectorOpcua がデータを格納する AWS IoT Greengrass ストリーム名を設定することができます。AWS IoT SiteWise ゲートウェイの各データソース毎に格納先ストリーム名を設定します。ストリーム名を設定するには AWS IoT SiteWise コンソール、 AWS CLI 、または AWS SDK を使用できます。

AWS IoT Greengrass V2 上で独自のカスタムストリームを作成し、データソースの送信先をそのストリームに向けることができます。ストリームはエクスポート定義を持つことができ、これにはデータのエクスポート先となる AWS のサービスを設定できます。現在 AWS IoT SiteWise 、 AWS IoT AnalyticsAmazon S3Amazon Kinesis Data Streams がエクスポート先としてサポートされています。 Amazon Kinesis Data Streams にデータをエクスポートした場合 Amazon Kinesis Data Streams からデータを読み込んで別のサービスに転送するための多くの選択肢があります。 Amazon Kinesis Data Streams からデータを読み取るコンシューマを使用すると、データをさまざまな場所に送信することができます。

例えば Amazon Timestream にデータを保存したい場合 AWS Lambda 関数Amazon Kinesis Data Analytics for Apache Flink を Amazon Kinesis Data Streams のコンシューマとして使用し Amazon Timestream テーブルにデータを書き込むことが可能です。

このようなアーキテクチャにより Amazon Timestream だけでなく Amazon Kinesis Data Streams コンシューマからアクセス可能な任意の場所にデータを保存することができるようになります。

カスタムストリームのエクスポート設定を必要としない場合でも、カスタムストリームからデータを取得する独自の AWS IoT Greengrass コンポーネントを開発することができます。

図3: AWS IoT SiteWise で多拠点にデータを保存するアーキテクチャ

AWS IoT SiteWise Edge のゲートウェイアーキテクチャの理解

AWS IoT SiteWise Edge ゲートウェイのアーキテクチャは、エッジで AWS IoT SiteWise Monitor ポータルを提供し、エッジでデータ処理もできる SiteWiseEdgeProcessor を含む点で AWS IoT SiteWise ゲートウェイのアーキテクチャと異なっています。

図4: AWS IoT SiteWise Edge のゲートウェイアーキテクチャ

AWS IoT SiteWise Edge から様々な場所にデータを送信するには、 AWS IoT SiteWise と同じ方法を使用することはできません。データソースのカスタムストリームは SiteWiseEdgeCollectorOpcua がデータを送信する場所として定義しており、データ処理パックではすでに SiteWise_Edge_Stream というカスタムストリーム名を使用しています。ストリーム名をカスタム・ストリームに変更した場合、データは SiteWiseEdgeProcessor に到達しません。

AWS IoT SiteWise Edge を構成し、様々な場所にデータを保存する

AWS IoT SiteWise Edge から様々な場所にデータを送信するための複数の選択肢があります。 AWS IoT SiteWise データストアにデータを送信したくない場合は SiteWiseEdgePublisherSiteWise_Stream からデータを読み取り AWS IoT SiteWise データストアに保存するので AWS IoT Greengrass デプロイから SiteWiseEdgePublisher を削除する必要があります。

エッジ側のAPIを使用してデータを取得し、それを AWS IoT Greengrass 上のストリームに保存して、さらに処理することができます。このオプションでは、アセットプロパティごとにAPIを照会する必要があり、アセットプロパティが変更された場合は、アプリケーションまたはアプリケーションの構成も変更する必要があります。

もう一つの選択肢としては SiteWise_Stream からデータを読み取るためのコンポーネントを開発することです。このコンポーネントは、別のストリームやオンプレミス環境のターゲットなど、別の宛先にデータを転送するように実装します。

図5:AWS IoT SiteWise Edge で多拠点にデータを保存するアーキテクチャ

以下の例では、 SiteWise_Stream からデータを読み取り AWS に転送するカスタムストリームにデータを取り込み、更にはローカルの MQTT メッセージブローカーにデータをパブリッシュする方法についても説明します。カスタムストリームは AWS 上の Amazon Kinesis Data Streams へのエクスポート構成で作成されています。

以下のコードスニペットは、Python で書かれた AWS IoT Greengrass V2 コンポーネントをベースにしています。このコードでは AWS Greengrass Stream Manager SDK for PythonPaho Python Client を使用しています。

カスタムコンポーネントでは、以下の変数が使用されます。

  • STREAM_NAME_SOURCE は、データを読み込むストリームの名前です。
  • STREAM_NAME_TARGET は、データの送信先となるカスタムストリームの名前です。
  • STREAM_NAME_CLOUD は AWS 上の Amazon Kinesis Data Streams の名前です。ストリームSTREAM_NAME_TARGETSTREAM_NAME_CLOUD へのエクスポート設定により作成されます。

例:

STREAM_NAME_SOURCE = "SiteWise_Stream" 
STREAM_NAME_TARGET = "SiteWise_Anywhere_Stream" 
STREAM_NAME_CLOUD = "SiteWiseToKinesisDatastream"

コンポーネントを起動する前に、 AWS 上にストリーム名 STREAM_NAME_CLOUD で Amazon Kinesis Data Stream のストリームを作成しておく必要があります。

起動時にこのコンポーネントはストリーム STREAM_NAME_TARGET が存在するかどうかをチェックします。ストリームが存在しない場合、 AWS 上の Amazon Kinesis Data Streams へのエクスポート構成とともに作成されます。

try:
    response = stream_manager_client.describe_message_stream(STREAM_NAME_TARGET)
    logger.info("stream_name: %s details: %s", STREAM_NAME_TARGET, response)
except ResourceNotFoundException as error:
    logger.info("create message stream: %s error: %s", STREAM_NAME_TARGET, error)
    
    exports = ExportDefinition(
        kinesis=[KinesisConfig(
            identifier=f"{STREAM_NAME_CLOUD}",
            kinesis_stream_name=STREAM_NAME_CLOUD,
            batch_size=10,
            batch_interval_millis=60000
            )]
        )
    
    stream_manager_client.create_message_stream(
        MessageStreamDefinition(
            name=STREAM_NAME_TARGET,
            strategy_on_full=StrategyOnFull.OverwriteOldestData,
            persistence=Persistence.File,
            max_size=1048576,
            export_definition=exports
        )
    )
except Exception as error:
        logger.error("%s", error)

このコンポーネントは STREAM_NAME_SOURCE からメッセージを読み取ります。メッセージが利用可能になると、メッセージ内のエントリを順に処理し、スレッドを起動してカスタムストリームにエントリを格納し MQTT メッセージブローカーにそれらをパブリッシュします。

response = stream_manager_client.read_messages(
            STREAM_NAME_SOURCE,
            ReadMessagesOptions(
                desired_start_sequence_number=LAST_READ_SEQ_NO + 1,
                min_message_count=MIN_MESSAGE_COUNT,
                read_timeout_millis=1000
            )
        )

for entry in response:
    logger.info("stream_name: %s payload: %s",
                STREAM_NAME_SOURCE, entry.payload)

   # send data to another stream at the edge
    thread_stream = Thread(
        target=store_message_to_stream,
        args=[entry.payload])
    thread_stream.start()
    logger.info('thread_stream started: %s', thread_stream)
    
   # send data to a local MQTT message broker
    thread_mqtt = Thread(
        target=publish_message_to_mqtt_broker,
        args=[entry.payload])
    thread_mqtt.start()
    logger.info('thread_mqtt started: %s', thread_mqtt)

以下の関数コードは、カスタムストリーム STREAM_NAME_TARGET にデータを書き込んでいます。このカスタムストリームに取り込まれたデータは、 AWS 上の Amazon Kinesis Data Streams に自動的に転送されます。

def store_message_to_stream(payload):
    try:
        sequence_number = stream_manager_client.append_message(stream_name=STREAM_NAME_TARGET, data=payload)
        logger.info('appended message to stream: %s sequence_number: %s message: %s',
                    STREAM_NAME_TARGET, sequence_number, payload)
    except Exception as error:
        logger.error("append message to stream: %s: %s",
                     STREAM_NAME_TARGET, error)

次の関数コードは、MQTTメッセージブローカーでトピックsitewiseにデータをパブリッシュします。

def publish_message_to_mqtt_broker(payload):
    try:
        logger.info('MQTT: publish message: %s', payload)
        c_mqtt = paho.Client()
        c_mqtt.mqtt_on_publish = mqtt_on_publish
        c_mqtt.mqtt_on_disconnect = mqtt_on_disconnect
        c_mqtt.connect(MQTT_BROKER, MQTT_PORT)
        ret = c_mqtt.publish("sitewise", payload) 
        logger.info('MQTT: publish: ret: %s', ret)
        c_mqtt.disconnect()
    except Exception as error:
        logger.error("MQTT: publish message: %s", error)

最後に

このブログでは、 AWS IoT SiteWise ゲートウェイを使用して産業用機器からデータを収集し、様々な場所にデータを送信する方法を学びました。 AWS IoT SiteWise または AWS IoT SiteWise Edge からカスタムの宛先にデータを送信するためにゲートウェイを構成する方法について学びました。サンプルコードに基づいて AWS 上のカスタムロケーションにデータを転送したり、オンプレミス環境で活用する方法を確認しました。詳細は AWS IoT SiteWise の製品ページまたは AWS IoT SiteWise のワークショップでご確認ください。

著者について

Philipp Sacha
Philipp は、 Amazon Web Services の IoT 分野のスペシャリスト・ソリューション・アーキテクトとして、 IoT 分野のお客様をサポートしています。2015年にソリューションアーキテクトとして AWS に入社し、2018年にIoT領域のスペシャリストに移行しました。

この記事はソリューションアーキテクトの渡邉が翻訳しました。