AWS Database Blog

Capture key source table headers data using AWS DMS and use it for Amazon S3 data lake operations

Migrating the raw data from source systems into a central repository is usually the first step in establishing a data lake. Many systems store source data in relational database tables, therefore a mechanism is required to ingest this data in the data lake and also get some relevant metadata about these tables so that data in the lake can be consolidated for create, read, update, and delete (CRUD) operations.

Amazon Simple Storage Service (Amazon S3) is the storage of choice for setting up data lakes on AWS. Source data is first ingested into Amazon S3, transformed, and then consumed from Amazon S3 using purpose-built AWS services. The first step in this process is to ingest data from a variety of source systems, including relational databases such as Oracle database and Microsoft SQL Server database.

AWS Database Migration Service (AWS DMS) is a web service that you can use to migrate data from these source databases to your S3 data lake. AWS DMS can migrate the initial full load from the source database tables into Amazon S3, and it can also perform ongoing change data capture (CDC). The replication process is performed by creating and starting an AWS DMS task. The S3 endpoint you define as the target for this task accumulates data from the source tables in CSV or Parquet files. You can then transform and curate this raw data so different tools can consume it.

AWS DMS also provides certain transformation rules that you can apply to any selected schema, table, or view. For more information on specific transformation expressions, see Transformation rules and actions. In this post, I show how you can use AWS DMS to replicate headers from source tables to Amazon S3 as the target and how to use these columns to consolidate the data in the data lake for CRUD operations.

Architecture overview

The following architecture diagram shows AWS DMS being used for data ingestion from a relational database to Amazon S3. AWS DMS can connect to many of the most popular data engines as a source for data replication. For this post, I use Oracle as the data source for the sample data. We let AWS DMS push this data in an S3 bucket designated to store raw source data. When AWS DMS runs for the first time, it creates initial bulk files containing all records from the table selected. When it’s complete, it starts generating CDC files for that table. Because AWS DMS gets the CDC data from the Redo logs of the database, over a period of time the S3 raw bucket receives all the committed changes on the same record as separate rows. This raw data first needs to be consolidated and transformed before it can be used by the consuming systems. Amazon EMR is a popular choice for data transformation by using Apache Spark, and it also comes with the Apache Hudi framework, which helps consolidate data on Amazon S3 for CRUD operations.

Replicate source table headers using expressions

Let’s look at the table headers that we can get from the source and how we can utilize them for our data lake operations in Amazon S3. By default, headers for source tables aren’t replicated to the target. To indicate which headers to replicate, use a transformation rule with an expression that includes the table column header. For more information about transformation rule expressions, see Replicating source table headers using expressions.

AR_H_CHANGE_SEQ

For ongoing replication, AR_H_CHANGE_SEQ represents a unique incrementing number from the source database that consists of a timestamp and an auto-incrementing number. The value depends on the source database system.

The transformation rule for it with an expression is as follows:

{
      "rule-type": "transformation",
      "rule-id": "2",
      "rule-name": "2",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "add-column",
      "value": "transact_seq",
      "expression": "$AR_H_CHANGE_SEQ",
      "data-type": {
        "type": "string",
        "length": 50
      }
    }

The following example data is from a seat type table in Oracle. A record was first inserted and then updated three times after that. The AR_H_CHANGE_SEQ, which is represented by the transact_seq column in the transformation, represents a 35-digit unique number at task level. The first 16 digits are part of a timestamp, and the last 19 digits are the record_id number incremented by the DBMS. This number is always in chronological order, meaning the biggest number across the same record is always the last change that record has undergone in the source table. During the consolidation process, you can pass this column to Apache Hudi in Amazon EMR as the sort key so that Hudi can use the largest value to identify which record for a particular primary key is the latest record to persist in the target S3 bucket. In this example, Hudi persists the last record from the following table because the transact_seq ending with the number 21 is the largest value for this particular record.

Op NAME DESCRIPTION transact_seq
I recliner This seat can recline 20210525043936000000000000000000009
U recliner This seat can recline by 10 20210525043936000000000000000000013
U recliner This seat can recline by 20 20210525043936000000000000000000017
U recliner This seat can recline by 30 20210525043936000000000000000000021

AR_H_STREAM_POSITION

For ongoing replication, AR_H_STREAM_POSITION represents the stream position value from the source. This value might be the system change number (SCN) or the log sequence number (LSN), depending on the source endpoint.

The transformation rule for it with an expression looks like the following code:

{
      "rule-type": "transformation",
      "rule-id": "3",
      "rule-name": "3",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "add-column",
      "value": "transact_position",
      "expression": "$AR_H_STREAM_POSITION",
      "data-type": {
        "type": "string",
        "length": 50
      }
}

The following example depicts the header values received from a sample Oracle table for the same record undergoing multiple updates. The transact_position values are the SCN values in hexadecimal format for these records. To extract the SCN, convert the first 16 digits from hex to decimal format. The format or logic for SCN may change in future releases of AWS DMS, so it’s always good to verify the logic first before converting. Don’t use the SCN to identify the latest record to persist, because on tables that have high update rates, two commits of the same record may end up having the same values. Always use the AR_H_CHANGE_SEQ for consolidation of records.

Op NAME DESCRIPTION transact_position
I recliner This seat can recline 000000000032360101000001000003A5000000340010000100000000003235DD
U recliner This seat can recline by 10 000000000032360101000001000003A50000003501C4000000000000003235DD
U recliner This seat can recline by 20 000000000032360101000001000003A500000036013C000000000000003235DD
U recliner This seat can recline by 30 000000000032360101000001000003A50000003700B8000000000000003235DD

AR_H_TIMESTAMP

For the initial full load, the value is a timestamp indicating the current time data arrives at the target, whereas for ongoing replication, it’s a timestamp indicating the current time of the change.

The transformation rule for it with an expression looks like the following:

{
      "rule-type": "transformation",
      "rule-id": "4",
      "rule-name": "4",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "add-column",
      "value": "transact_change_timestamp",
      "expression": "$AR_H_TIMESTAMP",
      "data-type": {
        "type": "datetime",
        "precision": 7
      }
    }

If the rate of updates is high in the source, you might want to use a data source with microsecond precision and the AR_H_TIMESTAMP internal header column, which captures the timestamp of when the changes were made instead of the timestamp indicating the time of the commit.

If you need to use the AR_H_TIMESTAMP internal header column with a data source that supports microsecond precision such as PostgreSQL, we recommend using the Hudi DataSource writer job instead of the Hudi DeltaStreamer utility. The reason for this is that although the AR_H_TIMESTAMP internal header column (in a source that supports microsecond precision) has microsecond precision, the actual value written by AWS DMS on Amazon S3 has a nanosecond format (microsecond precision with the nanosecond dimension set to 0). When you use the Hudi DataSource writer job, you can convert the AR_H_TIMESTAMP internal header column to a timestamp datatype in Spark with microsecond precision and use that new value as the PARTITIONPATH_FIELD_OPT_KEY.

AR_H_COMMIT_TIMESTAMP

For the initial full load, the value is a timestamp indicating the current time, whereas for ongoing replication, it’s a timestamp indicating the time of the commit in the source.

The transformation rule for it with an expression is as follows:

{
      "rule-type": "transformation",
      "rule-id": "5",
      "rule-name": "5",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "add-column",
      "value": "transact_commit_timestamp",
      "expression": "$AR_H_COMMIT_TIMESTAMP",
      "data-type": {
        "type": "datetime",
        "precision": 7
      }
    }

You can also get the same value by setting the timestampColumnName option in the extra connection attributes when setting up the S3 target endpoint. Don’t use the AR_H_COMMIT_TIMESTAMP to identify the latest record to persist, because on tables that have high update rates, two commits of the same record may end up having the same values. Always use AR_H_CHANGE_SEQ for consolidation of records.

AR_H_OPERATION

For the initial full load, the value is INSERT, whereas for ongoing replication, it’s either INSERT, UPDATE, or DELETE.

The transformation rule for it with an expression looks like the following code:

{
      "rule-type": "transformation",
      "rule-id": "6",
      "rule-name": "6",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "add-column",
      "value": "transact_operation",
      "expression": "$AR_H_OPERATION",
      "data-type": {
        "type": "string",
        "length": 50
      }
    }

These values play a critical role during consolidation of records in the S3 data lake. During the lifecycle of a record, an INSERT operation may be followed by an UPDATE operation. Identifying and consolidating the record using the latest update can easily be done in Hudi by using the primary key along with the AR_H_CHANGE_SEQ header value as the sort key. However, the DELETE operation could be a hard delete or a soft delete. For soft deletes, you can use the same DataFrame in Spark to update the data lake with the latest changed record, and the DELETE value serves as a soft delete flag to downstream consuming applications. For hard deletes, you can use the DELETE value from the AR_H_OPERATION to create a separate DataFrame in Spark and then let Hudi do the actual deletion of the record from the data lake by setting hoodie.datasource.write.payload.class to org.apache.hudi.EmptyHoodieRecordPayload.

AR_H_USER

For the initial full load, AR_H_USER is the transformation that you want to apply to the object, whereas for ongoing replication, it’s the user name, ID, or any other information that the source provides about the user that made the change. This header is supported on the SQL Server and Oracle (version 11.2.0.3 and higher) source endpoints only.

The transformation rule for it with an expression looks like the following:

{
      "rule-type": "transformation",
      "rule-id": "7",
      "rule-name": "7",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "add-column",
      "value": "transact_user",
      "expression": "$AR_H_USER",
      "data-type": {
        "type": "string",
        "length": 50
      }
    }

Data lake operation example

Let’s consider an example with sample data to show what it looks like before and after the consolidation operation in an S3 data lake.

After AWS DMS completes the initial data load from the tables, it switches to CDC mode, where it looks for any records that get committed to the tables. AWS DMS puts all the copies of the update in the raw bucket defined during the S3 endpoint setup in AWS DMS. AWS Step Functions gets invoked by an Amazon EventBridge scheduler, which in turn invokes an EMR job. The EMR job has the data transformation logic, and it can also use Apache Hudi to consolidate the data in the data lake for CRUD operations.

I have a sample seat table in Oracle that has columns: Name and Description. To show all the pieces working together, I inserted a record in this table followed by three rapid updates. AWS DMS captured the initial inserted record along with the three updates, and a data file was created in Amazon S3. Because I also enabled the AWS DMS transformations with all the header expressions described in this post, the raw copy of the data in Amazon S3 also gets the header information, as shown in the following table.

NAME DESCRIPTION transact_position transact_seq transact_change_timestamp transact_commit_timestamp transact_operation transact_user
recliner This seat can recline 000000000032360101000001000003A5000000340010000100000000003235DD 20210525043936000000000000000000009 5/25/21 4:39 5/25/21 4:39 INSERT DBMASTER
recliner This seat can recline by 10 000000000032360101000001000003A50000003501C4000000000000003235DD 20210525043936000000000000000000013 5/25/21 4:39 5/25/21 4:39 UPDATE DBMASTER
recliner This seat can recline by 20 000000000032360101000001000003A500000036013C000000000000003235DD 20210525043936000000000000000000017 5/25/21 4:39 5/25/21 4:39 UPDATE DBMASTER
recliner This seat can recline by 30 000000000032360101000001000003A50000003700B8000000000000003235DD 20210525043936000000000000000000021 5/25/21 4:39 5/25/21 4:39 UPDATE DBMASTER

As you can see from this example, when there are rapid changes to the same record, for the SCN number (the first 16 digits of the transact_position column) and the source commit timestamp of the record (the transact_commit_timestamp column), those values for all these updated records are the same. That means we can’t rely on those fields to identify which row contains the latest update. But the transact_seq column shows that the last record from the preceding table has the biggest value amongst all the changes. If you pass the transact_seq column as the sort value to Apache Hudi, it makes sure that this row persists as the final copy of that record.

The final consolidated record looks like the following, and you can query this record using SQL in Amazon Athena. The SCN column value was derived by using the transformation logic of converting the first 16 digits from hex to decimal.

NAME DESCRIPTION SCN transact_seq transact_change_timestamp transact_commit_timestamp transact_operation transact_user
recliner This seat can recline by 30 3290625 20210525043936000000000000000000021 5/25/21 4:39 5/25/21 4:39 UPDATE DBMASTER

If a delete operation is committed after this, the transact_operation column shows the value as DELETE, which can then be used in the downstream application as an indicator that this record was deleted from the source table.

Conclusion

In this post, you went through the different source table headers that AWS DMS captures and how you can use these to perform operations on your S3 data lake.

Try out these header transformations in AWS DMS to migrate your source database table data into Amazon S3 and use them to perform consolidation operations in the data lake.

If you have comments or feedback, please leave them below.


About the Author

Behram Irani, Senior Analytics Solutions Architect.