Amazon Web Services ブログ

AWS Glue を使用して非ネイティブ JDBC データソースに対して ETL ジョブを実行する

AWS Glue は、抽出、変換、およびロード (ETL) のための完全管理型サービスで、これで分析のためのデータの準備と読み込みが簡単になります。AWS マネジメントコンソールで数回クリックするだけで ETL ジョブを作成し実行することができます。AWS Glue をデータストアにポイントするだけです。AWS Glue はデータを検出し、関連付けられたメタデータ (テーブル定義やスキーマなど) を AWS Glue データカタログに保存します。

AWS Glue には、IP 接続がある限り、AWS やその他の場所で JDBC ドライバーを用いたデータソースへのネイティブコネクタがあります。この記事では、現在のところ AWS Glue でネイティブにサポートされていないデータソースに接続する方法を示します。IBM DB2 と SAP Sybase の 2 つのデータソースに対する ETL ジョブへの接続と実行をご紹介します。ただし、他の JDBC アクセス可能データベースと同じプロセスを使用することもできます。

AWS Glue データソース

AWS Glue は、JDBC プロトコルを使用して、次のデータストアをネイティブでサポートします。

  • 一般に公開されているデータベース
    • Amazon Aurora
    • MariaDB
    • Microsoft SQL Server
    • MySQL
    • Oracle
    • PostgreSQL

詳細については、「データストアに接続を追加する」を参照してください。これは「 AWS Glue 開発者ガイド」の中にあります。

データレイクは、AWS にデプロイされているアーキテクチャの中で最も急成長しているものの 1 つです。このアーキテクチャでは、データの取り込み、消去、変換、および構造化に使用される ETL 処理が非常に重要となります。より幅広いデータベースエンジンと相互運用できる柔軟性を確保することで、データレイクアーキテクチャを迅速に導入することが可能になります。

IBM DB2、Pivotal Greenplum、SAP Sybase、またはその他のリレーショナルデータベースマネジメントシステム (RDBMS) など、AWS Glue がネイティブにサポートしていないデータソースの場合でも、Amazon S3 から AWS Glue ジョブへカスタムデータベースコネクタをインポートすることができます。この場合、AWS Glue 接続を使用するのではなく、データを抽出するために AWS Glue スクリプトからデータソースへの接続を行う必要があります。詳細については、「独自のカスタムスクリプトを提供する」を参照してください。これは「AWS Glue 開発者ガイド」の中にあります。

IBM DB2 データソース用 ETL ジョブのセットアップ

最初の例は、AWS Glue ETL ジョブを IBM DB2 インスタンスに接続し、ソースからデータを変換し、Amazon S3 で Apache Parquet 形式で格納する方法を示しています。外部 JDBC ドライバーを使用して ETL ジョブを正常に作成するには、次の項目を定義する必要があります。

  • ジョブスクリプトの S3 の場所
  • 一時ディレクトリの S3 の場所
  • JDBC ドライバーの S3 の場所
  • Parquet データ (出力) の S3 の場所
  • ジョブの IAM ロール

デフォルトでは、AWS Glue はスクリプトと一時ディレクトリのバケット名は次の形式を使うことをお勧めしてします。

s3://aws-glue-scripts-<ACCOUNT_ID>-<REGION>/<USER>
s3://aws-glue-temporary-<ACCOUNT_ID>-<REGION>/<USER>

JDBC ドライバーの場合、同様の場所を作成できます。

s3://aws-glue-jdbc-drivers-<ACCOUNT_ID>-<REGION>/<USER>

また、Parquet データ (出力) についても、同様の場所を作成することができます。

s3://aws-glue-data-output-<ACCOUNT_ID>-<REGION>/<USER>

AWS Glue ジョブと S3 バケットを同じ AWS リージョンに配置すると、リージョン間のデータ転送料金が節約されることに留意してください。この記事では、米国東部 (オハイオ) リージョン (us-east-2) を使用する予定です。

IAM ロールを作成する

次の手順では、ETL ジョブが使用する IAM ロールを設定します。

  1. AWS マネジメントコンソールにサインインし、IAM を検索します。

  1. IAM コンソールで、左側のナビゲーションペインで [ Roles] を選択します。
  2. [Create role] を選択します。信頼できるエンティティのロールタイプは、AWS サービス、特に AWS Glue である必要があります。

  1. [Next:Permissions] を選択します。
  2. AWSGlueServiceRole ポリシーを検索して、選択します。

  1. 今度は SecretsManagerReadWrite をもう一度検索します。このポリシーにより、AWS Glue ジョブは AWS Secrets Manager に保存されているデータベース認証情報にアクセスできます。

注意 : このポリシーは公開されており、テスト目的でのみ使用されます。カスタムポリシーを作成して、ETL ジョブで使用するシークレットだけにアクセスを絞り込む必要があります。

  1. このポリシーを選択し、[Next: Review] 選択します。
  2. 例えば、GluePermissions のようにロール名を付けて、両方のポリシーが選択されていることを確認します。

  1. [Create role] を選択します。

これで、IAM ロールが作成されました。ここで、Amazon S3 で定義された場所に JDBC ドライバーをアップロードします。この例では、IBM サポート のサイトで入手可能な DB2 ドライバーを使用します。

データベース認証情報を保存する

安全なストアにデータベースの認証情報を保存することをお勧めします。今回の場合、AWS Secrets Manager を使って、認証情報を安全に保管します。これらの認証情報を作成するには、次の手順を実行します。

  1. コンソールを開き、Secrets Manager を検索します
  2. AWS Secrets Manager コンソールで、[Store a new secret] を選択します。
  3. [Select a secret type] の下で、[Other type of secrets] を選択します
  4. [Secret key/value] で、次の各パラメータに対して 1 つの行を設定します。
    • db_username
    • db_password
    • db_url (for example, jdbc:db2://10.10.12.12:50000/SAMPLE)
    • db_table
    • driver_name (ibm.db2.jcc.DB2Driver)
    • output_bucket: (for example, aws-glue-data-output-1234567890-us-east-2/User)
  5. [Next] を選択します
  6. Secret nameの場合は、DB2_Database_Connection_Info を使用します。
  7. [Next] を選択します
  8. [Disable automatic rotation] チェックボックスを選択したままにします。
  9. [Next] を選択します
  10. [Store] を選択します

AWS Glue でジョブを追加する

今度は、次の手順に従って、AWS Glue ジョブを作成します。

  1. AWS マネジメントコンソールで、AWS Glue を検索します。

  1. 左側のナビゲーションペインで、[Jobs] を選択します。これは [ETL] の下にあります。
  2. [Add job] を選択します

  1. 基本的な Job properties を記入する
  2. ジョブに名前を付けます (例えば、db2-job など)。
  3. 前に作成した IAM ロールを選択します (GluePermissions)。
  4. This job runs には、[A new script to be authored by you] を選択します。
  5. ETL language には、[Python] を選択します。

  1. Script libraries and job parameters」セクション で、Dependent jars path の JDBC ドライバーの場所を選択します。

  1. [Next] を選択します。
  2. Connections」ページで、[Next] を選択します。
  3. 概要ページで、[Save job and edit script] を選択します。これでジョブが作成され、スクリプトエディタが開きます。

エディタで、既存のコードを次のスクリプトに置き換えます。重要 : スクリプトの 47 行目は、ソーステーブルのフィールドを宛先にマッピングすること、Parquet 宛先のスペースを節約するために null フィールドを削除すること、最後に Parquet フォーマットの Amazon S3 に書き込むことに対応しています。

import sys
import boto3
import json
awsglue.transformsからインポート *
awsglue.utilsからgetResolvedOptionsをインポート
pyspark.contextからSparkContextをインポート
awsglue.contextからGlueContextをインポート
from awsglue.dynamicframe import DynamicFrame
awsglue.jobからjobをインポート


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Getting DB credentials from Secrets Manager
client = boto3.client("secretsmanager", region_name="us-east-2")

get_secret_value_response = client.get_secret_value(
        SecretId="DB2_Database_Connection_Info"
)

secret = get_secret_value_response['SecretString']
secret = json.loads(secret)

db_username = secret.get('db_username')
db_password = secret.get('db_password')
db_url = secret.get('db_url')
table_name = secret.get('db_table')
jdbc_driver_name = secret.get('driver_name')
s3_output = "s3://" + secret.get('output_bucket') + "/" + table_name

# Connecting to the source
df = glueContext.read.format("jdbc").option("driver", jdbc_driver_name).option("url", db_url).option("dbtable", table_name).option("user", db_username).option("password", db_password).load()

df.printSchema()
print df.count()

datasource0 = DynamicFrame.fromDF(df, glueContext, "datasource0")

# Defining mapping for the transformation
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("EMPNO", "string", "EMPNO", "string"), ("FIRSTNME", "string", "FIRSTNME", "string"), ("MIDINIT", "string", "MIDINIT", "string"), ("LASTNAME", "string", "LASTNAME", "string"), ("WORKDEPT", "string", "WORKDEPT", "string"), ("PHONENO", "string", "PHONENO", "string"), ("HIREDATE", "date", "HIREDATE", "date"), ("JOB", "string", "JOB", "string"), ("EDLEVEL", "integer", "EDLEVEL", "integer"), ("SEX", "string", "SEX", "string"), ("BIRTHDATE", "date", "BIRTHDATE", "date"), ("SALARY", "double", "SALARY", "double"), ("BONUS", "double", "BONUS", "double"), ("COMM", "double", "COMM", "double")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

# Writing to destination
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": s3_output}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

  1. [Save] を選択します。
  2. エディタを閉じるには、画面の右側にある黒い [X] を選択します。

ETL ジョブを実行する

これでジョブが作成されました。次の手順では、次のように実行します。

  1. Jobs」ページで、新しいジョブを選択します。[Action] メニューで、[Run job] を選択し、ジョブの実行を確認します。実行が終了するまで、しばらくお待ちください。

  1. ジョブが Succeeded のように表示されたら[Logs] を選択してジョブの出力を読み取ります。

  1. ジョブの出力では、df.printSchema()df.count() のメッセージを実行した結果が表示されます。

また、Amazon S3 で出力バケットに行くと、ETL ジョブの Parquet 結果が表示されます。

AWS Glue では、外部 JDBC ドライバーを用いて既存のデータベースに接続する ETL ジョブを作成できます。これにより、必要な変換を実行することができます。

SAP Sybase データソースの ETL ジョブを設定する

このセクションでは、SAP Sybase データソースに対して AWS Glue ETL ジョブを作成する方法について説明します。ジョブで必要ないくつかの変更を加えることで、前のセクションで説明したプロセスが Sybase データソースに対して機能するようになります。

  1. ジョブを作成する際に、JDBC 依存関係の正しい jar を選択します。
  2. スクリプトでは、AWS Secrets Manager から使用するシークレットへのリファレンスを変更します。
get_secret_value_response = client.get_secret_value(
        SecretId="Sybase_Database_Connection_Info"
)

次のように、47 行目のフィールドのマッピングを変更します。

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("au_id", "string", "au_id", "string"), ("au_lname", "string", "au_lname", "string"), ("au_fname", "string", "au_fname", "string"), ("phone", "string", "phone", "string"), ("address", "string", "address", "string"), ("city", "string", "city", "string"), ("state", "string", "state", "string"), ("country", "string", "country", "string"), ("postalcode", "string", "postalcode", "string")], transformation_ctx = "applymapping1")

 

新しい ETL ジョブを正常に実行すると、DB2 データソースで生成されたのと同じタイプの情報が出力に含まれます。

これらの JDBC ドライバーはそれぞれ独自のニュアンスおよび異なるライセンス条項を持っていますので、使用する前に知っておく必要があります。

JDBC の並列読み取りを最大化する

メモリ使用量は、ビッグデータソースを扱う際に留意すべきものです。場合によっては、すべてのデータが 1 つのエグゼキュータに読み込まれると、「Out of Memory」エラーが出ることがあります。これを最適化する方法の 1 つは、Apache Spark と AWS Glue で実装できる並列読み取りに依存することです。詳細は、「Apache Spark SQL module」を参照してください。

次のオプションを使用できます。

  • partitionColumn: パーティション化に使用される整数列の名前。
  • lowerBound: パーティションストライドを決定するために使用される partitionColumn の最小値。
  • upperBound: パーティションストライドを決定するために使用される partitionColumn の最大値。
  • numPartitions: パーティションの数。これは、lowerBound (包括的) および upperBound (排他的) とともに、partitionColumn を分割するのに使われる生成された WHERE 節表現のためのパーティションストライドを形成します。これが SparkContext.defaultParallelism のデフォルトになります。
  • これらのオプションは、テーブルの並列読み取りを指定します。lowerBoundupperBound はパーティションストライドを決定しますが、テーブルの行をフィルタリングしません。したがって、Spark はパーティションを作成し、テーブル内のすべての行を返します。以下に例を挙げます。
df = glueContext.read.format("jdbc").option("driver", jdbc_driver_name).option("url", db_url).option("dbtable", table_name).option("user", db_username).option("password", db_password).option("partitionColumn", <column_name>).option("lowerBound", 1).option("upperBound", 100).option("numPartitions", 2).load()

パーティションが多すぎると Spark が外部データベースシステムをクラッシュさせる可能性があるので、パーティションの数に注意しましょう。

結論

この記事で説明したプロセスを使用すると、JDBC ドライバーを使って到達できる任意のデータソースに対し、AWS Glue ETL ジョブに接続して実行することが可能になります。これには、Greenplum などのよくある分析データベースの新世代が含まれます。

パーティショニングと pushdown predicates を用いると、これらのデータセットのクエリ効率を向上させることができます。詳細については、「AWS Glue での ETL 出力のパーティションの管理」を参照してください。この技術は、ハイブリッド環境でのデータの移動とデータレイクへの送信を可能にします。

 


その他の参考資料

この投稿がお役に立ったなら、「Work with partitioned data in AWS Glue」をぜひ確認してみてください。


著者たちについて

Kapil Shardha はテクニカルアカウントマネージャーです。AWS を導入した企業顧客をサポートしています。インフラストラクチャの自動化と DevOps 開発が専門です。

 

 

 

William Torrealba は AWS ソリューションアーキテクトで、AWS を導入したお客様をサポートしています。アプリケーション開発、高可用性分散システム、自動化、および DevOps 開発が専門です。