Amazon Web Services ブログ
AWS Data Wrangler v1.0がリリースされました
2019年9月、当ブログでもご紹介したAWS Data Wrangler(以下、Data Wrangler)のv1.0がリリースされました。
以前紹介したときと比べて、主にAmazon Redshift(以下、Redshift)・AWS Glueデータカタログとの連携強化、その他Amazon EMR・Amazon CloudWatch Logsとの連携が追加されており、全体的により使いやすくなりました。v1.0になったことに伴い、改めて「AWS Data Wranglerとは?」および「簡単なチュートリアル」についてご紹介します。
(*)ブログ下段に、2019年9月のブログ投稿以降に追加された主要アップデートのサマリーを記載しています。
AWS Data Wranglerとは?
Data WranglerはPandasライブラリの機能をAWSに拡張するオープンソースのPythonライブラリです。DataFrameとAWSデータ関連サービス(Redshift, AWS Glue(以下、Glue), Amazon Athena(以下、Athena), Amazon EMRなど)と接続します。
Pandas、Apache Arrow、Boto3、s3fs、SQLAlchemy、Psycopg2、PyMySQLなどのオープンソースライブラリを用いて、データレイク、データウェアハウス、データベースからのデータのロード/アンロードといった通常のETLタスクに必要となる抽象化された関数を提供します。
(公式ドキュメント:「What is AWS Data Wrangler?」より翻訳・引用)
チュートリアル
公式ドキュメントにいくつかサンプルチュートリアルがあります。本ブログではAmazon SageMaker(以下、SageMaker)ノートブックを用いて、公式チュートリアル“01-Introduction.ipynb”、“06-Amazon Athena.ipynb”を参考に、簡易版をご紹介します。
1.S3の設定
1-1.AWSマネジメントコンソールにログインして、サービス一覧から”S3″を選択します。
1-2.[バケットを作成]ボタンをクリックし、[バケット名]に任意の名前(※世界で一意)を入力、リージョンが「アジアパシフィック(東京)」になっていることを確認し、[作成]ボタンをクリックします。
1-3.バケットが作成されたら、[フォルダの作成]をクリックし、フォルダ名に「data」といれてフォルダを作成します。
2.Glueデータカタログのデータベース作成
2-1.サービス一覧から”Glue”のコンソールを開き、左のメニューバーから[データベース]を選択します。
2-2.[データベースの追加]ボタンをクリックし、データベース名に「awswrangler_test」と入力し、[作成]ボタンをクリックします。
3.SageMakerノートブックの起動
※SageMakerノートブックの起動からコード実行までの手順は簡略化したものとなっています。詳細については、下記URLのステップ2およびステップ3をご確認ください。
https://docs.thinkwithwp.com/ja_jp/sagemaker/latest/dg/gs-setup-working-env.html
3-1.サービス一覧から”SageMaker”のコンソールを開き、左のメニューバーから[ノートブックインスタンス]を選択し、[ノートブックインスタンスの作成]を選択します。
3-2.ノートブックインスタンス名に任意の名前を入力し、ノートブックを作成します。(※ここでは新規にAWS IAMロールを作成します。)
3-3.サービス一覧から”IAM”を選択します。
3-4.手順“3-2”で作成したAWS IAMロールに対して、「AmazonS3FullAccess」と「AmazonAthenaFullAccess」を付与します。
3-5.作成したノートブックインスタンスから[Jupyterを開く]を選択し、ノートブックを起動します。
4.サンプルチュートリアルの実行
以下チュートリアルのシナリオはAmazon S3(以下、S3)上にあるCSVファイルをParquetファイルに変換、その後、Athenaからクエリを実行する例です。
4-1.[New]タグから[conda_python3]を選択し、新規のファイルを作成します。
4-2.下記コマンドを実行し、Data Wranglerをインストールします。
!pip install awswrangler
4-3.バージョン確認のコードを実行して、バージョンが表示されることを確認します。(※本ブログ投稿時はV1.0.0で実行しています)
import awswrangler as wr
wr.__version__
4-4.初期設定として、S3バケットの設定を行います。実行後、テキスト入力が可能です。そこで、1-2で作成したS3バケット名を指定します。
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
4-5.サンプルのCSVファイルを読み込みます。(※データ量が多いため、少し時間がかかります。)
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/189",
names=cols,
parse_dates=["dt", "obs_time"])
df
4-6.CSVファイルをParquetファイルに変換します。
res = wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
)
[補足]
上記コードの「database」には2-2で作成したGlueのデータベース名が入ります。この例では、CSVからParquetへの変換とともに、Glueデータカタログの「awswrangler_test」データベースに対して、「noaa」テーブルを作成しています。処理完了後に1-3で作成したS3のPrefix以下にParquetファイル作成されていること、Glueデータカタログに「noaa」というテーブルが作成されていることが確認できます。詳細はこちらのAPIリファレンスをご参照ください。
4-6.Glueのデータカタログからテーブルを検索します。
wr.catalog.table(database="awswrangler_test", table="noaa")
4-7.Athenaからクエリを実行します。
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
公式ドキュメントではCTASを有効有無によるクエリ速度、メモリに制限がある環境での処理方法について言及しております。興味がある方は公式ドキュメントからお試しください。
4-8.S3のdataフォルダを削除します。
wr.s3.delete_objects(path)
4-9. Glueのデータカタログを削除します。
for table in wr.catalog.get_tables(database="awswrangler_test"):
wr.catalog.delete_table_if_exists(database="awswrangler_test", table=table["Name"])
以上で終了です。
今回、Athenaからのクエリ実行のご紹介のみでしたが、他にもData Wranglerの基本的な使い方からRedshiftを使ったチュートリアルなどいくつか用意されています。
前回からのアップデートサマリー
2019年9月から多くのアップデートがあったため、主要なものを絞って記述します。アップデートの詳細はGithubページをご確認ください。
- S3 から直接 Parquetファイルを Pandas DataFrame に読み込む(Link)
- Parquet出力経由でRedshiftの結果をPandas DataFrameに読み込む(Link)
- Redshiftの結果をParquetとしてS3に保存(Link)
- CTASを使って、Athenaの結果をPandas DataFrameに読み込む(Link)
- PandasからRedshiftへのUpsert(Link)
まとめ
1コマンドで簡単にインストールができ、ETL処理を手軽に実行できることはデータを扱う方々にとって大きなメリットではないでしょうか。今後も機能拡張される予定です。ぜひご利用ください。
著者について
倉光 怜(Satoshi Kuramitsu)はAWSのソリューションアーキテクトです。好きなAWSサービスはAWS Glue、Amazon Kinesis、Amazon S3です。