AWS Database Blog

Stream change data in a multicloud environment using AWS DMS, Amazon MSK, and Amazon Managed Service for Apache Flink

Customers fully realize cloud benefits when the majority of their workloads are on a single cloud service provider (CSP); however, some find themselves operating in a multicloud environment. To learn more about situations that may require a multicloud environment and key considerations, refer to Proven Practices for Developing a Multicloud Strategy.

When workloads and their corresponding transactional databases are distributed across multiple cloud providers, it can create challenges in using the data in near real time for advanced analytics.

In this post, we discuss architecture, approaches, and considerations for streaming data changes from the transactional databases deployed in other cloud providers to a streaming data solution deployed on AWS. We also show how you can use AWS Database Migration Service (AWS DMS), Amazon Managed Streaming for Apache Kafka (Amazon MSK), and Amazon Managed Service for Apache Flink to stream change data capture (CDC) from a Cloud SQL for MySQL database in Google Cloud. Furthermore, we provide references to supplementary Google Cloud documentation that offer additional context and discuss the steps relevant to set up on Google Cloud.

Decision model for choosing the architecture components

Let’s assume you have an ecommerce application deployed with another CSP. As mentioned in Proven Practices for Developing a Multicloud Strategy, this scenario can occur due to mergers and acquisitions, where portfolio companies may have their own CSP strategies. As a result, workloads could be deployed across various CSPs. Meanwhile, the parent company or holding group, along with several portfolio companies, may have chosen AWS as their primary CSP.

In these cases, when you need to stream CDC data from an external database hosted in another CSP to a streaming solution deployed in AWS, your architecture may resemble the following figure.

Architecture components in a Change data streaming solution

This architecture has the following key components:

  • Event sources – You can capture, enrich, and analyze real-time events, such as user-click events and backend API events, at all layers of your application. For this post, we focus on change data events from the database layer.
  • Source connector – Databases like MySQL and PostgreSQL support log-based CDC, and some streaming databases have native support for submitting events to streaming platforms. Depending on your source database, you may need a connector to stream the change data to the streaming solution. For instance, Debezium is an open source distributed platform for CDC that you can deploy as a Kafka Connect connector or a server to stream change events from a source database. Alternatively, you can use AWS DMS to stream change data from third-party managed database services in Microsoft Azure and Google Cloud without having to deploy and manage a custom connector. See Sources for AWS DMS for supported data sources. Not all third-party managed database services support CDC with AWS DMS. Refer to Sources for data migration for up-to-date information.
  • Data stream – If your use case is to keep an AWS database in sync with the external database, AWS DMS allows you to directly create and use a target endpoint for the corresponding data store. See Target endpoints for data migration for supported targets. However, if you need to enrich, filter, or aggregate the real-time change data before sending it to the target (such as a transactional data lake), or if you need to replay the change data, you can select a data streaming service like Amazon Kinesis Data Streams or Amazon MSK as the target endpoint for AWS DMS.
  • Stream processor – With Managed Service for Apache Flink, you can transform and analyze streaming data in real time using Apache Flink and integrate applications with other AWS services. For an overview of data enrichment patterns and their implementation, refer to Common streaming data enrichment patterns in Amazon Managed Service for Apache Flink and Implement Apache Flink real-time data enrichment patterns.
  • Destinations – The architecture presented in the preceding diagram unlocks a wide array of use cases, such as the following:
    • Real-time analytics – Stream changes to enable real-time reporting, dashboards, and recommendations based on real-time change data.
    • Data lake – Create a low-latency source-to-data lake pipeline.
    • Strangler fig pattern – Implement the CDC-based strangler fig pattern to migrate and refactor applications to AWS.

Solution overview

In the previous section, we introduced the key components of the architecture. In this section, we walk through the step-by-step process to implement the architecture. The solution presents only the steps to explore the streaming data in Flink Studio notebooks; however, you can extend it as discussed in Common streaming data enrichment patterns in Amazon Managed Service for Apache Flink and Implement Apache Flink real-time data enrichment patterns. We start by outlining the prerequisites and setup tasks. Then we guide you through deploying the solution with AWS CloudFormation. The following figure shows the solution architecture we deploy.

Change data streaming solution implementation

We deploy the following key resources, as numbered in the preceding figure:

  1. AWS Secrets Manager secrets for storing credentials for the Google Cloud MySQL database and SASL/SCRAM credentials for the MSK cluster
  2. An Amazon MSK cluster
  3. An AWS DMS replication instance
  4. AWS Identity and Access Management (IAM) roles for AWS DMS and Managed Service for Apache Flink
  5. An Amazon Managed Service for Apache Flink Studio notebook

Prerequisites

Complete the following steps to set up and configure the prerequisites for the solution described in this post:

  1. Create a Cloud SQL for MySQL instance and choose MySQL 8.0 as the database version.
  2. Download the SQL file mysqlsampletables.sql.
  3. Connect to the Cloud SQL from Cloud Shell and upload the downloaded SQL file to Cloud Shell.
  4. Run the following statement in Cloud Shell to create the salesdb database and upload data:
    source mysqlsampletables.sql;
  5. Choose Launch Stack to create a VPC, subnets, route tables, internet gateway, and NAT gateway in AWS.
  6. Choose Next.
  7. For Stack name, enter a unique name. The default value is MultiCloudCDCVPC.
  8. For VPCCIDR, enter the starting IP and the size of your VPC using IPv4 CIDR notation. The default value is 10.0.0.0/16.
  9. For PublicSubnetCIDR, enter the starting IP and the size of the subnet using IPv4 CIDR notation. The default value is 10.0.0.0/24.
  10. For PrivateSubnet1CIDR block, enter the starting IP and the size of the subnet using CIDR notation. The default value is 10.0.1.0/24.
  11. For PrivateSubnet2CIDR block, enter the starting IP and the size of the subnet using CIDR notation. The default value is 10.0.2.0/24.
  12. Choose Create Stack.
    Wait for the stack to deploy completely before proceeding to the next step. Stack deployment takes about 5 minutes.
  13. When stack creation is complete, navigate to the Outputs tab and note down the public IPv4 address of the NAT gateway to use in the next section.

Connectivity between AWS and Google Cloud environments

You can connect to your Cloud SQL for MySQL instance with a private IP. For instructions to connect to the database instance on its private IP, refer to Create a VPN connection between Google Cloud and AWS. Note the following configurations:

  1. Use the VPC ID of the VPC created in the previous step for configuring the virtual private gateway and VPN connection.
  2. Update the route tables associated with both public and private subnets to add a route to your virtual private gateway using the private IP address used in the Google Cloud environment.

The following screenshot shows the details of private route table as an example, where 10.1.0.0/16 is the IPv4 CIDR block for our Google Cloud environment.

Private subnet route table in AWS with route to virtual private gateway

Run a connectivity test in Google Cloud to confirm that you can reach the database instance from the AWS network. The following screenshot shows the results of the connectivity test for us, where 10.0.0.8 is a private IP address from the VPC’s IPv4 CIDR block.

AWS-to-GCP connectivity test results

Alternatively, if you configured your Cloud SQL instance to have a public Ipv4 address and you want to connect directly on its public IP address, configure public IP address of the NAT gateway from the previous step in the authorized networks setting of the database instance.

Deploy the change data streaming solution with AWS CloudFormation

To deploy the solution using AWS CloudFormation, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a unique name. The default value is MultiCloudCDCStreaming.
  4. For ExternalDBPassword, enter MySQL database password.
  5. For ExternalDBPort, enter the database port number. The default value is 3306.
  6. For ExternalDBServerName, enter the database server name or IP address. Depending on your network connectivity with the Google Cloud, this will either be the public IPv4 address or the private IPv4 address of the MySQL database.
  7. For ExternalDBUsername, enter the MySQL database user name.
  8. For PrivateSubnetIds, enter the private subnets that you created.
  9. For VpcId, select the VPC that you created.
  10. For CreateDMSVPCRole, enter true if you don’t have dms-vpc-role set up already in your AWS account. The default value is false. AWS DMS requires this role to be present to use the AWS DMS API. See Creating the IAM roles to use with the AWS CLI and AWS DMS API for more information. Make sure you use true only if the role doesn’t already exist, otherwise the deployment will fail with a non-recoverable error.
  11. For CreateDMSCloudWatchRole, enter true if you don’t have dms-cloudwatch-logs-role set up already in your AWS account. The default value is false. AWS DMS requires this role to be present to use the AWS DMS API. Make sure you use true only if the role doesn’t already exist, otherwise the deployment will fail with a non-recoverable error.
  12. Select the acknowledgement check box and choose Create stack.
    Stack deployment takes about 30 minutes.
  13. When stack creation is complete, navigate to the Outputs tab and note down the values for the resources created.
    Cloudformation outputs
  14. Open the Amazon MSK console and make sure your MSK cluster is in Active status before proceeding to the next section.

Set up and start change data streaming

Complete the following steps to configure the change data streaming:

  1. On the Managed Service for Apache Flink console, choose Studio notebooks in the navigation pane.
  2. Select the notebook you deployed with AWS CloudFormation (the output for MSFStudioAppName) and choose Run.
    Run the deployed studio notebook
  3. Wait for the status to show as Running, then choose Open in Apache Zeppelin, which opens the Apache Zeppelin UI. Flink Studio notebooks use notebooks powered by Apache Zeppelin, and use Apache Flink as the stream processing engine.
  4. Download SetupCDCStreaming.zpln.
  5. Choose Import note and import the downloaded SetupCDCStreaming.zpln file.
    Import the downloaded zeppelin notebook
  6. Run all the paragraphs in SetupCDCStreaming.zpln to perform the following actions:
    1. Define the target Kafka topics.
    2. Use the boto3 API to create a source endpoint for the Google Cloud SQL The database credentials are retrieved from the Secrets Manager secret that was created with AWS CloudFormation.
    3. Create the AWS DMS target endpoint. By default, AWS DMS uses kafka-default-topic as the migration topic. We created the default topic in a previous step. Alternatively, set auto.create.topics.enable = true as part of your MSK cluster configuration—we used the default configuration for the MSK cluster, which doesn’t allow automatic creation of a topic.
    4. Create a replication task to run an initial full load and stream subsequent CDC events. If you want to only stream the CDC events, choose “cdc” for the MigrationType We also map tables from the source database to the respective Kafka topics we created in a previous step. See Using object mapping to migrate data to Kafka topic to learn how you can group and migrate source tables to separate Kafka topics, which is Kafka multitopic replication, using a single replication task.
    5. Start the replication after the replication task is in Ready status,

Test the solution

Complete the following steps to test the solution:

  1. Download TestCDCStreaming.zpln.
  2. In the Apache Zeppelin UI, choose Import note and import the downloaded TestCDCStreaming.zpln file.
  3. Run all the paragraphs in TestCDCStreaming.zpln to perform the following actions:
    1. Define a Flink table with the Kafka topic ‘salesdb.orders’ as the source. For this example, we have configured IAM authentication in the Kafka connector options.
    2. Define a Flink view to get the most recent change
    3. Run a query against the Flink view defined in the previous step.

Before any changes were made to the test data we uploaded in the prerequisites section, the result will be similar to that shown in the following screenshot.

  1. Run the following queries against the orders table in the source database:
    UPDATE orders SET comments = "Need expedited delivery" WHERE orderNumber = 10100;
    DELETE FROM orders WHERE orderNumber = 10102;

The Flink view results in TestCDCStreaming.zpln will update to reflect the changes in the source database.

The deployed solution presents only the steps to explore the streaming data in Flink Studio notebooks; however, if you need to build Flink applications in Java, Scala, or Python (and embedded SQL) using an IDE of your choice and the Apache Flink Datastream or Table APIs, you can use Managed Service for Apache Flink. Refer to Choosing Managed Service for Apache Flink or Managed Service for Apache Flink Studio for additional guidance.

Clean up

To avoid future charges for the deployed resources, delete the resources you created in this walkthrough if you don’t intend to use them further:

  1. Download CleanupCDCStreaming.zpln.
  2. In the Apache Zeppelin UI, choose Import note and import the downloaded CleanupCDCStreaming.zpln file.
  3. Run all the paragraphs in CleanupCDCStreaming.zpln to stop and delete the AWS DMS replication task and delete the source and target AWS DMS endpoints.
  4. Stop the Flink Studio notebook (AWS CloudFormation output key MSFStudioAppName).
  5. Delete the CloudFormation stacks you deployed to delete the AWS resources.
  6. Delete the Cloud SQL instance and VPN configuration you deployed in Google Cloud.

Conclusion

In this post, we discussed the architecture and considerations for streaming data changes from transactional databases in external cloud providers to streaming solutions on AWS. We also outlined a decision model for selecting components and services for the CDC streaming solution. Lastly, we showed how you can use AWS DMS to stream change data from a database hosted on another CSP, specifically Google Cloud, for further processing and analytics using AWS services.

If you have questions or need more information about the solution in this post, contact your AWS representative.


About the Authors

Simran Singh is a Senior Solutions Architect at AWS. In this role, he assists our large enterprise customers in meeting their key business objectives using AWS. His areas of expertise include artificial intelligence and machine learning, security, and improving the experience of developers building on AWS. He has also earned a coveted golden jacket for achieving all currently offered AWS certifications.

Deevanshu Budhiraja is a Senior Solutions Architect at AWS with a chronicle of success in designing business and technology strategy, cloud adoption framework, and data center migrations for enterprise customers across the globe.

Amandeep Bajwa is a Principal Solutions Architect at AWS supporting financial services enterprises. He helps organizations achieve their business outcomes by identifying the appropriate cloud transformation strategy based on industry trends and organizational priorities. Some of the areas Amandeep consults on are cloud migration, cloud strategy (including hybrid and multi cloud), digital transformation, data and analytics, and technology in general.

Sasha Luthra is a Solutions Architect at AWS and supports enterprise customers to build secure, resilient, and scalable architectures on the AWS platform. She has implemented software projects across diverse industries, including transportation, telecommunication, and finance, within global organizations. Her expertise lies in understanding customer requirements and implementing solutions that adhere to architectural best practices.