Amazon Web Services ブログ

AWS Step Functions コールバックを利用した外部システム連携

Shared Delivery Teams で Sr. Cloud App Architect を務める Zach Abrahamson による記事です。

AWS Step Functions を使用すると、複数の AWS サービスを調整してサーバーレスワークフローに落とし込めるため、アプリケーションの開発と更新を迅速に行うことができます。Step Functions を使用すると、AWS Lambda や Amazon ECS のようなサービスを機能豊富なアプリケーションに統合するワークフローを設計および実行できます。

ワークフローは一連のステップで構成され、あるステップの出力が次のステップの入力として動作します。Step Functions は各ステップを自動的にトリガーおよび追跡し、エラーが発生した場合はリトライを実行することで、ワークフローに回復性をもたらします。

この直感的な性質を利用して、Step Functions と外部システムを同期または非同期で統合したいというお客様もいらっしゃるかもしれません。コールバックタスク統合パターン を使うことで、Amazon SQS, AWS Lambda, Amazon ECS または他の AWS サービス を介して Step Functions のワークフローから外部システムにトークンを送信することができます。

ターゲットの外部システムが AWS 上で稼働している場合は、SQS と Amazon SNS の連携によってトークンをシステムに転送して処理することが可能です。しかし、これらのサービスは別の AWS アカウントや他のプラットフォームにホストされていることもあるでしょう。その場合は、Lambda 関数を使ってサービスへの呼び出しを仲介することができます。

このブログでは、Lambda、DynamoDB、API Gateway を使用して、Step Functions ワークフローを外部システムと統合する方法について説明します。これらのサービスを使って抽象化レイヤーを導入することで、Step Functions の実装と外部システムを疎結合に保つことができます。

外部システムは、タスクトークンとして保存されている一意の ID またはタプルを使用して API にコールバックします。トークンは DynamoDB テーブル内の値と照合され、Step Functions のワークフローが開始されます。

概要

このアーキテクチャには、2つの Lambda 関数が含まれています。最初の関数は、ID の生成と外部サービスへの呼び出しを処理し、もう1つは、API Gateway へのコールバックの処理、DynamoDB に対するトークンの検索、そして Step Functions API の呼び出しを行います。

最初の関数は、外部サービスに転送される Step Functions タスクトークンとペイロードを受信できるワークフローの1ステップとして定義されます。このステップが完了し、外部サービスがリクエストを処理する間、ワークフローブランチは一時停止してコールバックを待ちます。

一意の ID は、Step Functions のペイロードに含まれるビジネスケース固有の値 (注文 ID やタスクトークンなど) にすることもできます。外部サービスはタスクを完了すると、この ID、タスクステータス、および任意の出力をパラメータとして API Gateway エンドポイントにコールバックします。

{
  "order_id": "a96cbeed-cbd7-4711-815c-0913113c6064",
  "task_type": "ORDER_SHIPPING_SERVICE",
  "task_status": "SUCCESS",
  "task_output": {
    "shipping_status": "PROCESSING",
    "tracking_number": "1ZA1B2C3D4"
  }
}

ハンドラーはこの ID を使用して、対応する Step Functions タスクトークンを DynamoDB テーブルから検索し、Step Functions の SendTaskSuccess API を呼び出します。

外部システムは、Step Functions または 他の AWS サービス API に依存する必要はありません。ペイロードを処理し、その結果とともに REST API を呼び出すだけです。バックエンドサービスを Step Functions から EKS ベースのカスタムソリューションに移行するとなった場合でも、同じ外部サービスとエンドポイントを使用して 2つのシステムを抽象化することができます。外部サービスは何も変更する必要はありません。

このコードは GitHub リポジトリ の README の指示に従ってデプロイできます。このセクションでは、デプロイされるリソースの概要を説明していきます。

Step Functions ワークフロー

{
  "StartAt": "Get Order Metadata",
  "States": {
    "Get Order Metadata": {
      "Type": "Task",
      "Resource": "${GetOrderMetadataFunction.Arn}",
      "ResultPath": "$.order_contents",
      "Next": "Shipping Service Callback"
    },
    "Shipping Service Callback": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "${SNSCallbackFunction.Arn}",
        "Payload": {
          "token.$": "$$.Task.Token",
          "input.$": "$",
          "callback": "true"
        }
      },
      "ResultPath": "$.shipping_info",
      "Next": "Process Shipping Results"
    },
    "Process Shipping Results": {
      "Type": "Task",
      "Resource": "${ProcessShippingResultFunction.Arn}",
      "ResultPath": "$",
      "End": true
    }
  }
}

これは、注文サービスワークフローのシンプルな例です。これは GetOrderMetadataFunction 関数から始まります。この関数は、データベースから注文情報を読み取って、後続のステップでの処理のためにイベントに保存します。これは、ビジネスロジックを使用して、外部システムを呼び出す次のステップのワークフローに結び付けます。この例では、このシステムがすでに存在することを前提としています。

2番目のステップは外部サービスのコールバックで、この例では 出荷サービス(shipping service)です。出荷サービスは、ワークフローの後続のステップのために shipping_info として tracking_number(追跡番号)を含む処理の結果を返します。Lambda 関数は、外部システムがサブスクライブできるように SNS トピックにメッセージを送信します。また、コールバック処理のために DynamoDB テーブルにこのトークンを永続化します。

最後のステップでは、コールバック後にイベントに保存された出荷サービスの出力を表示します。

Orders DynamoDB table

OrderTable:
    Type: "AWS::DynamoDB::Table"
    Properties:
      TableName: "OrderTable"
      AttributeDefinitions:
        - AttributeName: "order_id"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "order_id"
          KeyType: "HASH"

CallbackTasks DynamoDB Table

CallbackTable:
    Type: "AWS::DynamoDB::Table"
    Properties:
      TableName: "CallbackTable"
      AttributeDefinitions:
        - AttributeName: "order_id"
          AttributeType: "S"
        - AttributeName: "task_type"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "order_id"
          KeyType: "HASH"
        - AttributeName: "task_type"
          KeyType: "RANGE"

このテーブルは、Step Functions タスクトークンをタスクに関するメタデータとともに保持します。項目のキー属性としては、ビジネス固有のロジックを格納します。これによって、注文 ID とタスクタイプからコールバックトークンを導き出し、Step Functions のワークフローを再開することができます。

API Gateway API

API 定義 には、コールバックハンドラ用の POST ハンドラがあります。この API 仕様では、列挙型と必須フィールドを使用するリクエストのタイプを定義しています。これにより、ハンドラが基本的なデータのバリデーションを行う必要がなくなります。

Lambda ハンドラの例外は、レスポンスコードを決定するために使用されます。たとえば、メッセージに「内部エラー」を含む例外は、本文に例外メッセージを含んだ HTTP 500 ステータスコードを自動的に返します。

このエンドポイントは、次のようなペイロードのリクエストを処理します。

{
  "order_id": "a96cbeed-cbd7-4711-815c-0913113c6064",
  "task_type": "ORDER_SHIPPING_SERVICE",
  "task_status": "SUCCESS",
  "task_output": {
    "shipping_status": "PROCESSING",
    "tracking_number": "1ZA1B2C3D4"
  }
}

Lambda 関数 GetOrderMetadataFunction は DynamoDB の OrderTable テーブルに対してクエリを実行し、 注文内容をイベントに保存します。これは、Step Functions ワークフローで最初に呼び出される関数で、ワークフローの入力値として ‘order_id’ を使用します。

Lambda 関数 SNSCallbackFunction は、イベントから取り出したペイロードを SNS トピックに送信する汎用的なコールバックタスクハンドラです。まず、コールバックタスクトークンをイベントのトップレベルに差込みます。そして、Lambda 関数に設定した環境変数を使用して、ペイロードとトピックを決定します。

  • SNS_TOPIC_ARN: 送信先の SNS トピックの ARN
  • CALLBACK_TABLE: コールバックトークンの永続化に使用する DynamoDB テーブル
  • TASK_TYPE: 最終的なコールバックの検索のために DynamoDB に保存するタスクタイプ
  • PAYLOAD_EVENT_KEY: SNS トピックに送信するイベント内のキー

Lambda 関数 ExternalCallbackFunction は、環境変数 ‘CALLBACK_TABLE’ を使用して、どの DynamoDB テーブルに対してトークンの照合を行えばよいかを決定します。この関数は、API Gateway の externalCallback エンドポイントのバックエンドとして機能し、イベントとして ExternalCallbackTaskStatusRequest オブジェクトを受け取ります。フィールドはパースされ、出荷サービスのデータを Step Functions ワークフローイベントに保持するために使用されます。

Lambda 関数 ProcessShippingResultFunction は、後続のステップのイベントで外部サービスからのペイロードがどのように利用できるかを示します。また、出荷サービスのイベントから tracking_number(追跡番号)を取り出して、OrderTable に保存します。

一連のワークフローの確認

  1. 注文が OrderTable に以下の属性とともに登録されている
  2. 以下の入力値でワークフローが開始される
  3. 出荷サービスのコールバックのステップでワークフローが一時停止する。イベントには注文内容が保存されている
  4. 注文メッセージが SNS トピックに送信される
  5. CallbackTable にタスクトークンとともにタスクが保存される
  6. サンプルのペイロードで API Gateway の externalCallback エンドポイントを呼び出す
  7. ワークフローが完了する
  8. tracking_number(追跡番号)が OrderTable に保存される

まとめ

この記事では、実装と AWS サービスの詳細を外部システムに公開することなく、Step Functions ワークフローコールバックを使用できるアーキテクチャを紹介しました。

サンプルの出荷サービスは、SNS トピックをサブスクライブし、出荷情報を入力として API Gateway エンドポイントを呼び出してワークフローを再開および tracking_number(追跡番号)の保存を行います。

このパターンは、AWS 外部のシステムと同期的にやり取りする必要がある場合や、AWS サービスと通信できないワークフローにおいて活用できます。

サーバーレスについてさらに学習したい場合は、Serverless Land をご覧ください。

 

翻訳は Partner SA 櫻谷が担当しました。原文は こちら です。