AWS Big Data Blog

Ten new visual transforms in AWS Glue Studio

AWS Glue Studio is a graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. It allows you to visually compose data transformation workflows using nodes that represent different data handling steps, which later are converted automatically into code to run.

AWS Glue Studio recently released 10 more visual transforms to allow creating more advanced jobs in a visual way without coding skills. In this post, we discuss potential uses cases that reflect common ETL needs.

The new transforms that will be demonstrated in this post are: Concatenate, Split String, Array To Columns, Add Current Timestamp, Pivot Rows To Columns, Unpivot Columns To Rows, Lookup, Explode Array Or Map Into Columns, Derived Column, and Autobalance Processing.

Solution overview

In this use case, we have some JSON files with stock option operations. We want to make some transformations before storing the data to make it easier to analyze, and we also want to produce a separate dataset summary.

In this dataset, each row represents a trade of option contracts. Options are financial instruments that provide the right—but not the obligation—to buy or sell stock shares at a fixed price (called  strike price) before a defined expiration date.

Input data

The data follows the following schema:

  • order_id – A unique ID
  • symbol – A code generally based on a few letters to identify the corporation that emits the underlying stock shares
  • instrument – The name that identifies the specific option being bought or sold
  • currency – The ISO currency code in which the price is expressed
  • price – The amount that was paid for the purchase of each option contract (on most exchanges, one contract allows you to buy or sell 100 stock shares)
  • exchange – The code of the exchange center or venue where the option was traded
  • sold – A list of the number of contracts that where allocated to fill the sell order when this is a sell trade
  • bought – A list of the number of contracts that where allocated to fill the buy order when this is buy trade

The following is a sample of the synthetic data generated for this post:

{"order_id": 1679931512485, "symbol": "AMZN", "instrument": "AMZN MAR 24 23 102 PUT", "currency": "usd", "price": 17.18, "exchange": "EDGX", "bought": [18, 38]}
{"order_id": 1679931512486, "symbol": "BMW.DE", "instrument": "BMW.DE MAR 24 23 96 PUT", "currency": "eur", "price": 2.98, "exchange": "XETR", "bought": [28]}
{"order_id": 1679931512487, "symbol": "BMW.DE", "instrument": "BMW.DE APR 28 23 101 CALL", "currency": "eur", "price": 14.71, "exchange": "XETR", "sold": [9, 59, 54]}
{"order_id": 1679931512489, "symbol": "JPM", "instrument": "JPM JUN 30 23 140 CALL", "currency": "usd", "price": 11.83, "exchange": "EDGX", "bought": [33, 42, 55, 67]}
{"order_id": 1679931512490, "symbol": "SIE.DE", "instrument": "SIE.DE MAR 24 23 149 CALL", "currency": "eur", "price": 13.68, "exchange": "XETR", "bought": [96, 89, 82]}
{"order_id": 1679931512491, "symbol": "NKE", "instrument": "NKE MAR 24 23 112 CALL", "currency": "usd", "price": 3.23, "exchange": "EDGX", "sold": [67]}
{"order_id": 1679931512492, "symbol": "AMZN", "instrument": "AMZN MAY 26 23 95 CALL", "currency": "usd", "price": 11.44, "exchange": "EDGX", "sold": [41, 62, 12]}
{"order_id": 1679931512493, "symbol": "JPM", "instrument": "JPM MAR 24 23 121 PUT", "currency": "usd", "price": 1.0, "exchange": "EDGX", "bought": [61, 34]}
{"order_id": 1679931512494, "symbol": "SAP.DE", "instrument": "SAP.DE MAR 24 23 132 CALL", "currency": "eur", "price": 15.9, "exchange": "XETR", "bought": [69, 33]}

ETL requirements

This data has a number of unique characteristics, as often found on older systems, that make the data harder to use.

The following are the ETL requirements:

  • The instrument name has valuable information that is intended for humans to understand; we want to normalize it into separate columns for easier analysis.
  • The attributes bought and sold are mutually exclusive; we can consolidate them into a single column with the contract numbers and have another column indicating if the contracts where bought or sold in this order.
  • We want to keep the information about the individual contract allocations but as individual rows instead of forcing users to deal with an array of numbers. We could add up the numbers, but we would lose information about how the order was filled (indicating market liquidity). Instead, we choose to denormalize the table so each row has a single number of contracts, splitting orders with multiple numbers into separate rows. In a compressed columnar format, the extra dataset size of this repetition is often small when compression is applied, so it’s acceptable to make the dataset easier to query.
  • We want to generate a summary table of volume for each option type (call and put) for each stock. This provides an indication of the market sentiment for each stock and the market in general (greed vs. fear).
  • To enable overall trade summaries, we want to provide for each operation the grand total and standardize the currency to US dollars, using an approximate conversion reference.
  • We want to add the date when these transformations took place. This could be useful, for instance, to have a reference on when was the currency conversion made.

Based on those requirements, the job will produce two outputs:

  • A CSV file with a summary of the number of contracts for each symbol and type
  • A catalog table to keep a history of the order, after doing the transformations indicated
    Data schema

Prerequisites

You will need your own S3 bucket to follow along with this use case. To create a new bucket, refer to Creating a bucket.

Generate synthetic data

To follow along with this post (or experiment with this kind of data on your own), you can generate this dataset synthetically. The following Python script can be run on a Python environment with Boto3 installed and access to Amazon Simple Storage Service (Amazon S3).

To generate the data, complete the following steps:

  1. On AWS Glue Studio, create a new job with the option Python shell script editor.
  2. Give the job a name and on the Job details tab, select a suitable role and a name for the Python script.
  3. In the Job details section, expand Advanced properties and scroll down to Job parameters.
  4. Enter a parameter named --bucket and assign as the value the name of the bucket you want to use to store the sample data.
  5. Enter the following script into the AWS Glue shell editor:
    import argparse
    import boto3
    from datetime import datetime
    import io
    import json
    import random
    import sys
    
    # Configuration
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket')
    args, ignore = parser.parse_known_args()
    if not args.bucket:
        raise Exception("This script requires an argument --bucket with the value specifying the S3 bucket where to store the files generated")
    
    data_bucket = args.bucket
    data_path = "transformsblog/inputdata"
    samples_per_file = 1000
    
    # Create a single file with synthetic data samples
    s3 = boto3.client('s3')
    buff = io.BytesIO()
    
    sample_stocks = [("AMZN", 95, "usd"), ("NKE", 120, "usd"), ("JPM", 130, "usd"), ("KO", 130, "usd"),
                     ("BMW.DE", 95, "eur"), ("SIE.DE", 140, "eur"), ("SAP.DE", 115, "eur")]
    option_type = ["PUT", "CALL"]
    operations = ["sold", "bought"]
    dates = ["MAR 24 23", "APR 28 23", "MAY 26 23", "JUN 30 23"]
    for i in range(samples_per_file):
        stock = random.choice(sample_stocks)
        symbol = stock[0]
        ref_price = stock[1]
        currency = stock[2]
        strike_price = round(ref_price * 0.9 + ref_price * random.uniform(0.01, 0.3))
        sample = {
            "order_id": int(datetime.now().timestamp() * 1000) + i,
            "symbol": stock[0],
            "instrument":f"{symbol} {random.choice(dates)} {strike_price} {random.choice(option_type)}",
            "currency": currency,
            "price": round(random.uniform(0.5, 20.1), 2),
            "exchange": "EDGX" if currency == "usd" else "XETR"
         }
        sample[random.choice(operations)] = [random.randrange(1,100) for i in range(random.randrange(1,5))]
        buff.write(json.dumps(sample).encode())
        buff.write("\n".encode())
    
    s3.put_object(Body=buff.getvalue(), Bucket=data_bucket, Key=f"{data_path}/{int(datetime.now().timestamp())}.json")
  6. Run the job and wait until it shows as successfully completed on the Runs tab (it should take just a few seconds).

Each run will generate a JSON file with 1,000 rows under the bucket specified and prefix transformsblog/inputdata/. You can run the job multiple times if you want to test with more input files.
Each line in the synthetic data is a data row representing a JSON object like the following:

{
 "order_id":1681986991888,
 "symbol":"AMZN",
 "instrument":"AMZN APR 28 23 100 PUT",
 "currency":"usd",
 "price":2.89,
 "exchange":"EDGX",
 "sold":[88,49]
}

Create the AWS Glue visual job

To create the AWS Glue visual job, complete the following steps:

  1. Go to AWS Glue Studio and create a job using the option Visual with a blank canvas.
  2. Edit Untitled job to give it a name and assign a role suitable for AWS Glue on the Job details tab.
  3. Add an S3 data source (you can name it JSON files source) and enter the S3 URL under which the files are stored (for example, s3://<your bucket name>/transformsblog/inputdata/), then select JSON as the data format.
  4. Select Infer schema so it sets the output schema based on the data.

From this source node, you’ll keep chaining transforms. When adding each transform, make sure the selected node is the last one added so it gets assigned as the parent, unless indicated otherwise in the instructions.

If you didn’t select the right parent, you can always edit the parent by selecting it and choosing another parent in the configuration pane.

Node parent configuration

For each node added, you’ll give it a specific name (so the node purpose shows in the graph) and configuration on the Transform tab.

Every time a transform changes the schema (for instance, add a new column), the output schema needs to be updated so it’s visible to the downstream transforms. You can manually edit the output schema, but it’s more practical and safer to do it using the data preview.
Additionally, that way you can verify the transformation are working so far as expected. To do so, open the Data preview tab with the transform selected and start a preview session. After you have verified the transformed data looks as expected, go to the Output schema tab and choose Use data preview schema to update the schema automatically.

As you add new kinds of transforms, the preview might show a message about a missing dependency. When this happens, choose End Session and the start a new one, so the preview picks up the new kind of node.

Extract instrument information

Let’s start by dealing with the information on the instrument name to normalize it into columns that are easier to access in the resulting output table.

  1. Add a Split String node and name it Split instrument, which will tokenize the instrument column using a whitespace regex: \s+ (a single space would do in this case, but this way is more flexible and visually clearer).
  2. We want to keep the original instrument information as is, so enter a new column name for the split array: instrument_arr.
    Split config
  3. Add an Array To Columns node and name it Instrument columns to convert the array column just created into new fields, except for symbol, for which we already have a column.
  4. Select the column instrument_arr, skip the first token and tell it to extract the output columns month, day, year, strike_price, type using indexes 2, 3, 4, 5, 6 (the spaces after the commas are for readability, they don’t impact the configuration).
    Array config

The year extracted is expressed with two digits only; let’s put a stopgap to assume it’s in this century if they just use two digits.

  1. Add a Derived Column node and name it Four digits year.
  2. Enter year as the derived column so it overrides it, and enter the following SQL expression:
    CASE WHEN length(year) = 2 THEN ('20' || year) ELSE year END
    Year derived column config

For convenience, we build an expiration_date field that a user can have as reference of the last date the option can be exercised.

  1. Add a Concatenate Columns node and name it Build expiration date.
  2. Name the new column expiration_date, select the columns year, month, and day (in that order), and a hyphen as spacer.
    Concatenated date config

The diagram so far should look like the following example.

DAG

The data preview of the new columns so far should look like the following screenshot.

Data preview

Normalize the number of contracts

Each of the rows in the data indicates the number of contracts of each option that were bought or sold and the batches on which the orders were filled. Without losing the information about the individual batches, we want to have each amount on an individual row with a single amount value, while the rest of the information is replicated in each row produced.

First, let’s merge the amounts into a single column.

  1. Add an Unpivot Columns Into Rows node and name it Unpivot actions.
  2. Choose the columns bought and sold to unpivot and store the names and values in columns named action and contracts, respectively.
    Unpivot config
    Notice in the preview that the new column contracts is still an array of numbers after this transformation.
  1. Add an Explode Array Or Map into Rows row named Explode contracts.
  2. Choose the contracts column and enter contracts as the new column to override it (we don’t need to keep the original array).

The preview now shows that each row has a single contracts amount, and the rest of the fields are the same.

This also means that order_id is no longer a unique key. For your own use cases, you need to decide how to model your data and if you want to denormalize or not.
Explode config

The following screenshot is an example of what the new columns look like after the transformations so far.
Data preview

Create a summary table

Now you create a summary table with the number of contracts traded for each type and each stock symbol.

Let’s assume for illustration purposes that the files processed belong to a single day, so this summary gives the business users information about what the market interest and sentiment are that day.

  1. Add a Select Fields node and select the following columns to keep for the summary: symbol, type, and contracts.
    Selected fields
  2. Add a Pivot Rows Into Columns node and name it Pivot summary.
  3. Aggregate on the contracts column using sum and choose to convert the type column.
    Pivot config

Normally, you would store it on some external database or file for reference; in this example, we save it as a CSV file on Amazon S3.

  1. Add an Autobalance Processing node and name it Single output file.
  2. Although that transform type is normally used to optimize the parallelism, here we use it to reduce the output to a single file. Therefore, enter 1 in the number of partitions configuration.
    Autobalance config
  3. Add an S3 target and name it CSV Contract summary.
  4. Choose CSV as the data format and enter an S3 path where the job role is allowed to store files.

The last part of the job should now look like the following example.
DAG

  1. Save and run the job. Use the Runs tab to check when it has finished successfully.
    You’ll find a file under that path that is a CSV, despite not having that extension. You’ll probably need to add the extension after downloading it to open it.
    On a tool that can read the CSV, the summary should look something like the following example.
    Spreadsheet

Clean up temporary columns

In preparation for saving the orders into a historical table for future analysis, let’s clean up some temporary columns created along the way.

  1. Add a Drop Fields node with the Explode contracts node selected as its parent (we are branching the data pipeline to generate a separate output).
  2. Select the fields to be dropped: instrument_arr, month, day, and year.
    The rest we want to keep so they are saved in the historical table we’ll create later.
    Drop fields

Currency standardization

This synthetic data contains fictional operations on two currencies, but in a real system you could get currencies from markets all over the world. It’s useful to standardize the currencies handled into a single reference currency so they can be easily be compared and aggregated for reporting and analysis.

We use Amazon Athena to simulate a table with approximate currency conversions that gets updated periodically (here we assume we process the orders timely enough that the conversion is a reasonable representative for comparison purposes).

  1. Open the Athena console in the same Region where you’re using AWS Glue.
  2. Run the following query to create the table by setting an S3 location where both your Athena and AWS Glue roles can read and write. Also, you might want to store the table in a different database than default (if you do that, update the table qualified name accordingly in the examples provided).
    CREATE EXTERNAL TABLE default.exchange_rates(currency string, exchange_rate double)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION 's3://<enter some bucket>/exchange_rates/';
  3. Enter a few sample conversions into the table:
    INSERT INTO default.exchange_rates VALUES ('usd', 1.0), ('eur', 1.09), ('gbp', 1.24);
  4. You should now be able to view the table with the following query:
    SELECT * FROM default.exchange_rates
  5. Back on the AWS Glue visual job, add a Lookup node (as a child of Drop Fields) and name it Exchange rate.
  6. Enter the qualitied name of the table you just created, using currency as the key and select the exchange_rate field to use.
    Because the field is named the same in both the data and the lookup table, we can just enter the name currency and don’t need to define a mapping.Lookup config
  7. Add a Derived Column node and name it Total in usd.
  8. Name the derived column total_usd and use the following SQL expression:
    round(contracts * price * exchange_rate, 2)
    Currency conversion config
  9. Add a Add Current Timestamp node and name the column ingest_date.
  10. Use the format %Y-%m-%d for your timestamp (for demonstration purposes, we are just using the date; you can make it more precise if you want to).
    Timestamp config

Save the historical orders table

To save the historical orders table, complete the following steps:

  1. Add an S3 target node and name it Orders table.
  2. Configure Parquet format with snappy compression, and provide an S3 target path under which to store the results (separate from the summary).
  3. Select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  4. Enter a target database and a name for the new table, for instance: option_orders.
    Table sink config

The last part of the diagram should now look similar to the following, with two branches for the two separate outputs.
DAG

After you run the job successfully, you can use a tool like Athena to review the data the job has produced by querying the new table. You can find the table on the Athena list and choose Preview table or just run a SELECT query (updating the table name to the name and catalog you used):

SELECT * FROM default.option_orders limit 10

Your table content should look similar to the following screenshot.
Table content

Clean up

If you don’t want to keep this example, delete the two jobs you created, the two tables in Athena, and the S3 paths where the input and output files were stored.

Conclusion

In this post, we showed how the new transforms in AWS Glue Studio can help you do more advanced transformation with minimum configuration. This means you can implement more ETL uses cases without having to write and maintain any code. The new transforms are already available on AWS Glue Studio, so you can use the new transforms today in your visual jobs.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.