AWS Database Blog

Remediate object change notifications from Oracle to Amazon Aurora PostgreSQL or Amazon RDS for PostgreSQL

An Oracle to Amazon Aurora PostgreSQL-Compatible Edition or to Amazon Relational Database Service (Amazon RDS) for PostgreSQL migration is a multi-stage process with different technologies and skills involved, starting from the assessment stage to the cutover stage. For more information about the database migration process, refer to the following posts:

With an Oracle database, if a query is registered for object change notification (OCN), the database notifies the application whenever a transaction changes an object that the query references and commits, regardless of whether the query result changed.

If a query is registered for query result change notification (QRCN), the database notifies the application whenever a transaction changes the result of the query and commits.

The database adds a message that describes the change to an internal queue. A client-side application can listen to these notifications, transform data, and publish to further downstream applications. This feature comes as a built-in offering with Oracle; PostgreSQL doesn’t have a similar built-in solution. This is a common problem that can’t be solved solely through migration.

In this post, we discuss the options to remediate object change notifications from Oracle to Amazon Aurora PostgreSQL-Compatible Edition.

Prerequisites

To test this solution, you need the following prerequisites:

Solution overview

While considering solutions, you need to consider the following aspects:

  • Volume of changes and messages that need to be processed within a transaction
  • High availability of the client application, that connects to the database server to consume message/events, and data durability
  • Client-side application changes to support modernization

Based on these aspects, consider the following solutions:

In the following sections, we discuss each solution in more detail.

Database trigger with a queuing table for persisting the change data

A database trigger is a procedural code that automatically runs in response to certain events on a particular table or view in a database. With the trigger approach, we can track the DML (INSERT, UPDATE, and DELETE) changes on a list of tables and persist in a database table (which we can refer as a queuing table) for resiliency and data durability. This queuing table can be constructed using the following structure.

The following is the sample code for tracking the change data on tables and persisting to a queue table.

Create a test table and the queuing tables:

CREATE TABLE EMPLOYEE (EMPID INTEGER, FNAME VARCHAR(50), LNAME VARCHAR(50), MGRID INTEGER );

CREATE TABLE MESSAGEQUEUE (MSGID  SERIAL, OPERATION VARCHAR(6), TABLENAME VARCHAR(63), CHANGEDATA JSON, MSGDATE TIMESTAMP, CONSUMED BOOLEAN DEFAULT FALSE NOT NULL );

Create a trigger function to stream changes to a queuing table:

CREATE OR REPLACE FUNCTION TG_EMPLOYEE() RETURNS trigger AS
$$
BEGIN
IF (TG_OP = 'DELETE') THEN
        INSERT INTO MESSAGEQUEUE  (OPERATION,TABLENAME,CHANGEDATA,MSGDATE,CONSUMED)
                        VALUES( TG_OP,TG_TABLE_NAME,row_to_json(OLD.*),NOW(), 'NO');
           RETURN NEW;
       ELSIF (TG_OP = 'UPDATE') THEN
          INSERT INTO MESSAGEQUEUE  (OPERATION,TABLENAME,CHANGEDATA,MSGDATE,CONSUMED)
                        VALUES( TG_OP,TG_TABLE_NAME,row_to_json(NEW.*),NOW(), 'NO');
           RETURN NEW;
       ELSIF (TG_OP = 'INSERT') THEN
           INSERT INTO MESSAGEQUEUE  (OPERATION,TABLENAME,CHANGEDATA,MSGDATE,CONSUMED)
                        VALUES( TG_OP,TG_TABLE_NAME,row_to_json(NEW.*),NOW(), 'NO');
           RETURN NEW;
       END IF;
 RETURN NULL;
END;
$$
LANGUAGE plpgsql;

CREATE TRIGGER TG_EMPLOYEE
AFTER INSERT OR UPDATE OR DELETE ON EMPLOYEE
   FOR EACH ROW EXECUTE FUNCTION TG_EMPLOYEE();

Perform DML operations:

mydb3=> insert into employee values ('99999','Ranga','C',10);
INSERT 0 1
mydb3=> insert into employee values ('11111','Nava','T',10);
INSERT 0 1
mydb3=> select * from employee;
 empid | fname | lname | mgrid
-------+-------+-------+-------
 99999 | Ranga | C     |    10
 11111 | Nava  | C     |    10
(2 rows)

Query the messagequeue table.

mydb3=> select * from messagequeue;
 msgid | operation | tablename |                       changedata                       |          msgdate           | consumed
-------+-----------+-----------+--------------------------------------------------------+----------------------------+----------
     1 | INSERT    | employee  | {"empid":99999,"fname":"Ranga","lname":"C","mgrid":10} | 2022-09-23 03:17:08.552784 | f
     2 | INSERT    | employee  | {"empid":11111,"fname":"Nava","lname":"T","mgrid":10}  | 2022-09-23 03:17:22.350557 | f
(2 rows)

The client application polls for change data from the queuing table, extracts the data, and publishes it to downstream applications for further processing. In this case, the client application is responsible for extracting data since its last checkpoint.

Considerations

Keep in mind the following when considering this solution:

  • Track data changes on only select tables instead of the entire database
  • Consider the volume of data changes within a single transaction. Consider a statement-level trigger if the changes are large
  • Use this solution for a poll-based methodology

A benefit of this solution is that the change data is persisted in a queuing table for durability and fault tolerance. This allows the client application to resume and replay changes after the last checkpoint even in case of failure. However, this method has the following drawbacks:

  • Triggers are expensive and can slow down transactions because they’re synchronous
  • The queuing table needs regular maintenance

LISTEN/NOTIFY

With LISTEN/NOTIFY, multiple listeners can watch a single channel for messages and receive them instantly when the parent process issues a notification. pg_notify is a built-in function in PostgreSQL that generates an asynchronous notification, and the client application uses LISTEN to consume these event notifications. You can create a trigger that tracks the change data and calls the pg_notify function along with a JSON payload to generate event notifications.

The following is the sample code for tracking the change data on tables and persisting to a queuing table.

Create a test table:

CREATE TABLE EMPLOYEE (EMPID INTEGER, FNAME VARCHAR(50), LNAME VARCHAR(50), MGRID INTEGER );

Create a trigger function to publish changes to a channel:

CREATE OR REPLACE FUNCTION TG_EMPLOYEE() RETURNS trigger AS
$$
BEGIN
IF (TG_OP = 'DELETE') THEN
PERFORM PG_NOTIFY('EMPLOYEE'::TEXT, (SELECT ROW_TO_JSON(OLD.*)::TEXT));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
PERFORM PG_NOTIFY('EMPLOYEE'::TEXT, (SELECT ROW_TO_JSON(NEW.*)::TEXT));
RETURN NEW;
ELSIF (TG_OP = 'INSERT') THEN
PERFORM PG_NOTIFY('EMPLOYEE'::TEXT, (SELECT ROW_TO_JSON(NEW.*)::TEXT));
RETURN NEW;
END IF;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;

CREATE TRIGGER TG_EMPLOYEE
AFTER INSERT OR UPDATE OR DELETE ON EMPLOYEE
FOR EACH ROW EXECUTE FUNCTION TG_EMPLOYEE();

Perform DML operations:

mydb3=> insert into employee values ('99999','Ranga','C',10);
INSERT 0 1
mydb3=> insert into employee values ('11111','Nava','T',10);
INSERT 0 1
mydb3=> select * from employee;
empid | fname | lname | mgrid
-------+-------+-------+-------
99999 | Ranga | C     |    10
11111 | Nava  | T     |    10
(2 rows)

The client application receives messages continuously on polling (for this post, we use the psql client to extract the messages from the channel):

mydb3=> listen employee;
LISTEN
Asynchronous notification "employee" with payload "{"msgid":1,"operation":"INSERT","tablename":"employee","changedata":{"empid":99999,"fname":"Ranga","lname":"C","mgrid":10},"msgdate":"2022-09-23T03:57:50.301805"}" received from server process with PID 30191.
Asynchronous notification "employee" with payload "{"msgid":2,"operation":"INSERT","tablename":"employee","changedata":{"empid":11111,"fname":"Nava","lname":"T","mgrid":10},"msgdate":"2022-09-23T03:57:53.407565"}" received from server process with PID 30191.

Considerations

Keep in mind the following when considering this solution:

  • Track data changes on only select tables instead of the entire database
  • Consider the volume of data changes within a single transaction. Consider statement-level trigger if the changes are large
  • Use this solution for a push-based methodology

A benefit of this solution is that pg_notify is asynchronous. However, it has the following drawbacks:

  • Triggers are expensive and can slow down transactions because they’re synchronous
  • Notifications are lost if the client application isn’t listening

Logical decoding output plugin

Logical decoding is the process of extracting persistent changes to a database’s tables into a coherent, simple-to-understand format that can be interpreted without detailed knowledge of the database’s internal state.

In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements. In the context of logical replication, a slot represents a stream of changes that can be replayed to a client application in the order they were made on the origin server. Each slot streams a sequence of changes from a single database. The output plugins transform the data from the write-ahead log’s internal representation into the format the consumer of a replication slot desires. The format in which those changes are streamed is determined by the output plugin used.

In this example, we use the wal2json plugin to stream the table changes in JSON format. For more information about logical decoding and output plugins, refer to Understanding logical replication and logical decoding and Logical Decoding Plugins.

Create a test table:

CREATE TABLE EMPLOYEE (EMPID INTEGER, FNAME VARCHAR(50), LNAME VARCHAR(50), MGRID INTEGER );

Pre-requisites:

  • An Aurora PostgreSQL-Compatible Edition database cluster
  • An Amazon EC2 instance with PostgreSQL source code compile to get pg_recvlogical binary
  • An rds_superuser role for slot creation

pg_recvlogical is a client utility that can continuously read the contents from the slot and stream them. It’s the client’s responsibility to filter the required events before they’re consumed further.

Create a replication slot using the pg_recvlogical utility.

pg_recvlogical -h mydb3.c24eylcynj2u.us-east-1.rds.amazonaws.com -d mydb3 -U postgres -p 5432 -S employee_slot —create-slot -P wal2json

mydb3=> select * from pg_replication_slots;
slot_name   |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
---------------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
employee_slot | wal2json | logical   |  16397 | mydb3    | f         | t      |      31892 |      |         1247 | 5/10000028  | 5/10000060          | reserved   |               | f
(1 row)

Run the pg_recvlogical PostgreSQL client utility in terminal1 and run the DML statements in terminal2 and validate the streaming changes.

mydb3=> insert into employee values ( 9999 , 'Ranga','C',10);
INSERT 0 1

sh-4.2$ ./pg_recvlogical -h mydb3.c24eylcynj2u.us-east-1.rds.amazonaws.com -d mydb3 -U postgres -p 5432 -S employee_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
Password:
{
"change": [
]
}
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "employee",
"columnnames": ["empid", "fname", "lname", "mgrid"],
"columntypes": ["integer", "character varying(50)", "character varying(50)", "integer"],
"columnvalues": [9999, "Ranga", "C", 10]
}
]
}

Considerations

Keep in mind the following when considering this solution:

  • The client application needs to filter the DML change events for the concerned tables and stream these DML change events further
  • Consider this solution when additional overhead due to triggers is not acceptable

A benefit of this solution is that you can stream data changes in the order they are generated. However, it has the following drawbacks:

  • Logical replication should be enabled for a database cluster that can’t be configured on a single table
  • Orphan replication slots could consume your disk space
  • Every slot needs to filter the events needed by the consumer
  • Cannot re-read the changes after they are consumed from the replication slot

Clean up

To avoid future charges and to remove the components created while testing this use case, complete the following steps:

  1. On the Amazon RDS console, select the database you set up, and on the Actions menu, choose Delete.
  2. On the Amazon EC2 console, select the EC2 instance that you used, and on the Actions menu, choose Terminate.

Summary

In this post, we walked through multiple solutions for remediating Oracle object change notifications from Oracle to Amazon Aurora PostgreSQL-Compatible Edition. Based on your business requirements, such as the number of tables to notify on, the volume of change data, and the effort involved in building the client application, you can choose the solution that’s most feasible.

If you have any questions or suggestions, leave a comment.


About the Authors

Ranga Cherukuri is a Cloud Database Architect with Professional services team at AWS. Ranga focuses on helping customers to build highly available, cost-effective database solutions and migrate their large scale SQL Server databases to AWS. He is passionate about Databases and Analytics.

Navakanth Talluri is a Database Migration Architect with Professional services team at AWS. He works with internal and external Amazon customers to provide guidance and technical assistance on database projects and enable them to migrate from commercial database engines to Amazon RDS.