AWS Big Data Blog
Use MSK Connect for managed MirrorMaker 2 deployment with IAM authentication
August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.
In this post, we show how to use MSK Connect for MirrorMaker 2 deployment with AWS Identity and Access Management (IAM) authentication. We create an MSK Connect custom plugin and IAM role, and then replicate the data between two existing Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters. The goal is to have replication successfully running between two MSK clusters that are using IAM as an authentication mechanism. It’s important to note that although we’re using IAM authentication in this solution, this can be accomplished using no authentication for the MSK authentication mechanism.
Solution overview
This solution can help Amazon MSK users run MirrorMaker 2 on MSK Connect, which eases the administrative and operational burden because the service handles the underlying resources, enabling you to focus on the connectors and data to ensure correctness. The following diagram illustrates the solution architecture.
Apache Kafka is an open-source platform for streaming data. You can use it to build building various workloads like IoT connectivity, data analytic pipelines, or event-based architectures.
Kafka Connect is a component of Apache Kafka that provides a framework to stream data between systems like databases, object stores, and even other Kafka clusters, into and out of Kafka. Connectors are the executable applications that you can deploy on top of the Kafka Connect framework to stream data into or out of Kafka.
MirrorMaker is the cross-cluster data mirroring mechanism that Apache Kafka provides to replicate data between two clusters. You can deploy this mirroring process as a connector in the Kafka Connect framework to improve the scalability, monitoring, and availability of the mirroring application. Replication between two clusters is a common scenario when needing to improve data availability, migrate to a new cluster, aggregate data from edge clusters into a central cluster, copy data between Regions, and more. In KIP-382, MirrorMaker 2 (MM2) is documented with all the available configurations, design patterns, and deployment options available to users. It’s worthwhile to familiarize yourself with the configurations because there are many options that can impact your unique needs.
MSK Connect is a managed Kafka Connect service that allows you to deploy Kafka connectors into your environment with seamless integrations with AWS services like IAM, Amazon MSK, and Amazon CloudWatch.
In the following sections, we walk you through the steps to configure this solution:
- Create an IAM policy and role.
- Upload your data.
- Create a custom plugin.
- Create and deploy connectors.
Create an IAM policy and role for authentication
IAM helps users securely control access to AWS resources. In this step, we create an IAM policy and role that has two critical permissions:
- MSK Connect can assume the created role (see the following trust policy example)
- MSK Connect is authorized to create topics, publishing events, and so on
A common mistake made when creating an IAM role and policy needed for common Kafka tasks (publishing to a topic, listing topics) is to assume that the AWS managed policy AmazonMSKFullAccess
(arn:aws:iam::aws:policy/AmazonMSKFullAccess
) will suffice for permissions.
The following is an example of a policy with both full Kafka and Amazon MSK access:
This policy supports the creation of the cluster within the AWS account infrastructure and grants access to the components that make up the cluster anatomy like Amazon Elastic Compute Cloud (Amazon EC2), Amazon Virtual Private Cloud (Amazon VPC), logs, and kafka:*
. There is no managed policy for a Kafka administrator to have full access on the cluster itself.
After you create the KafkaAdminFullAccess
policy, create a role and attach the policy to it. You need two entries on the role’s Trust relationships tab:
- The first statement allows Kafka Connect to assume this role and connect to the cluster.
- The second statement follows the pattern
arn:aws:sts::(YOUR ACCOUNT NUMBER):assumed-role/(YOUR ROLE NAME)/(YOUR ACCOUNT NUMBER)
. Your account number should be the same account number where MSK Connect and the role are being created in. This role is the role you’re editing the trust entity on. In the following example code, I’m editing a role calledMSKConnectExample
in my account. This is so that when MSK Connect assumes the role, the assumed user can assume the role again to publish and consume records on the target cluster.
In the following example trust policy, provide your own account number and role name:
Now we’re ready to deploy MirrorMaker 2.
Upload data
MSK Connect custom plugins accept a file or folder with a .jar or .zip ending. For this step, create a dummy folder or file and compress it. Then upload the .zip object to your Amazon Simple Storage Service (Amazon S3) bucket:
Create a custom plugin
On the Amazon MSK console, follow the steps to create a custom plugin from the .zip file. Enter the object’s Amazon S3 URI and for this post, and name the plugin Mirror-Maker-2
.
Create and deploy connectors
You need to deploy three connectors for a successful mirroring operation:
MirrorSourceConnector
MirrorHeartbeatConnector
MirrorCheckpointConnector
Complete the following steps for each connector:
- On the Amazon MSK console, choose Create connector.
- For Connector name, enter the name of your first connector.
- Select the target MSK cluster that the data is mirrored to as a destination.
- Choose IAM as the authentication mechanism.
- Pass the config into the connector.
Connector config files are JSON-formatted config maps for the Kafka Connect framework to use in passing configurations to the executable JAR. When using the MSK Connect console, we must convert the config file from a JSON config file to single-lined key=value (with no spaces) file.
You need to change some values within the configs for deployment, namely bootstrap.server
, sasl.jaas.config
and tasks.max
. Note the placeholders in the following code for all three configs.
The following code is for MirrorHeartBeatConnector
:
The following code is for MirrorCheckpointConnector
:
The following code is for MirrorSourceConnector
:
A general guideline for the number of tasks for a MirrorSourceConnector
is one task per up to 10 partitions to be mirrored. For example, if a Kafka cluster has 15 topics with 12 partitions each for a total partition count of 180 partitions, we deploy at least 18 tasks for mirroring the workload.
Exceeding the recommended number of tasks for the source connector may lead to offsets that aren’t translated (negative consumer group offsets). For more information about this issue and its workarounds, refer to MM2 may not sync partition offsets correctly.
- For the heartbeat and checkpoint connectors, use provisioned scale with one worker, because there is only one task running for each of them.
- For the source connector, we set the maximum number of workers to the value decided for the
tasks.max
property.
Note that we use the defaults of the auto scaling threshold settings for now.
- Although it’s possible to pass custom worker configurations, let’s leave the default option selected.
- In the Access permissions section, we use the IAM role that we created earlier that has a trust relationship with
kafkaconnect.amazonaws.com
andkafka-cluster:*
permissions. Warning signs display above and below the drop-down menu. These are to remind you that IAM roles and attached policies is a common reason why connectors fail. If you never get any log output upon connector creation, that is a good indicator of an improperly configured IAM role or policy permission problem.
On the bottom of this page is a warning box telling us not to use the aptly namedAWSServiceRoleForKafkaConnect
role. This is an AWS managed service role that MSK Connect needs to perform critical, behind-the-scenes functions upon connector creation. For more information, refer to Using Service-Linked Roles for MSK Connect. - Choose Next.
Depending on the authorization mechanism chosen when aligning the connector with a specific cluster (we chose IAM), the options in the Security section are preset and unchangeable. If no authentication was chosen and your cluster allows plaintext communication, that option is available under Encryption – in transit. - Choose Next to move to the next page.
- Choose your preferred logging destination for MSK Connect logs. For this post, I select Deliver to Amazon CloudWatch Logs and choose the log group ARN for my MSK Connect logs.
- Choose Next.
- Review your connector settings and choose Create connector.
A message appears indicating either a successful start to the creation process or immediate failure. You can now navigate to the Log groups page on the CloudWatch console and wait for the log stream to appear.
The CloudWatch logs indicate when connectors are successful or have failed faster than on the Amazon MSK console. You can see a log stream in your chosen log group get created within a few minutes after you create your connector. If your log stream never appears, this is an indicator that there was a misconfiguration in your connector config or IAM role and it won’t work.
Verify that the connector launched successfully
In this section, we walk through two confirmation steps to determine a successful launch.
Check the log stream
Open the log stream that your connector is writing to. In the log, you can check if the connector has successfully launched and is publishing data to the cluster. In the following screenshot, we can confirm data is being published.
Mirror data
The second step is to create a producer to send data to the source cluster. We use the console producer and consumer that Kafka ships with. You can follow Step 1 from the Apache Kafka quickstart.
- On a client machine that can access Amazon MSK, download Kafka from https://kafka.apache.org/downloads and extract it:
- Download the latest stable JAR for IAM authentication from the repository. As of this writing, it is 1.1.3:
- Next, we need to create our
client.properties
file that defines our connection properties for the clients. For instructions, refer to Configure clients for IAM access control. Copy the following example of theclient.properties
file:You can place this properties file anywhere on your machine. For ease of use and simple referencing, I place mine inside
kafka_2.13-3.1.0/bin
.
After we create theclient.properties
file and place the JAR in thelibs
directory, we’re ready to create the topic for our replication test. - From the
bin
folder, run thekafka-topics.sh
script:The details of the command are as follows:
–bootstrap-server – Your bootstrap server of the source cluster.
–topic – The topic name you want to create.
–create – The action for the script to perform.
–replication-factor – The replication factor for the topic.
–partitions – Total number of partitions to create for the topic.
–command-config – Additional configurations needed for successful running. Here is where we pass in theclient.properties
file we created in the previous step. - We can list all the topics to see that it was successfully created:
When defining bootstrap servers, it’s recommended to use one broker from each Availability Zone. For example:
Similar to the
create topic
command, the preceding step simply callslist
to show all topics available on the cluster. We can run this same command on our target cluster to see if MirrorMaker has replicated the topic.
With our topic created, let’s start the consumer. This consumer is consuming from the target cluster. When the topic is mirrored with the default replication policy, it will have asource
. prefixed to it. - For our topic, we consume from
source.MirrorMakerTest
as shown in the following code:The details of the code are as follows:
–bootstrap-server – Your target MSK bootstrap servers
–topic – The mirrored topic
–consumer.config – Where we pass in ourclient.properties
file again to instruct the client how to authenticate to the MSK cluster
After this step is successful, it leaves a consumer running all the time on the console until we either close the client connection or close our terminal session. You won’t see any messages flowing yet because we haven’t started producing to the source topic on the source cluster. - Open a new terminal window, leaving the consumer open, and start the producer:
The details of the code are as follows:
–bootstrap-server – The source MSK bootstrap servers
–topic – The topic we’re producing to
–producer.config – Theclient.properties
file indicating which IAM authentication properties to useAfter this is successful, the console returns
>
, which indicates that it’s ready to produce what we type. Let’s produce some messages, as shown in the following screenshot. After each message, press Enter to have the client produce to the topic.Switching back to the consumer’s terminal window, you should see the same messages being replicated and now showing on your console’s output.
Clean up
We can close the client connections now by pressing Ctrl+C to close the connections or by simply closing the terminal windows.
We can delete the topics on both clusters by running the following code:
Delete the source cluster topic first, then the target cluster topic.
Finally, we can delete the three connectors via the Amazon MSK console by selecting them from the list of connectors and choosing Delete.
Conclusion
In this post, we showed how to use MSK Connect for MM2 deployment with IAM authentication. We successfully deployed the Amazon MSK custom plugin, and created the MM2 connector along with the accompanying IAM role. Then we deployed the MM2 connector onto our MSK Connect instances and watched as data was replicated successfully between two MSK clusters.
Using MSK Connect to deploy MM2 eases the administrative and operational burden of Kafka Connect and MM2, because the service handles the underlying resources, enabling you to focus on the connectors and data. The solution removes the need to have a dedicated infrastructure of a Kafka Connect cluster hosted on Amazon services like Amazon Elastic Compute Cloud (Amazon EC2), AWS Fargate, or Amazon EKS. The solution also automatically scales the resources for you (if configured to do so), which eliminates the need for the administers to check if the resources are scaling to meet demand. Additionally, using the Amazon managed service MSK Connect allows for easier compliance and security adherence for Kafka teams.
If you have any feedback or questions, please leave a comment.
About the Authors
Tanner Pratt is a Practice Manager at Amazon Web Services. Tanner is leading a team of consultants focusing on Amazon streaming services like Managed Streaming for Apache Kafka, Kinesis Data Streams/Firehose and Kinesis Data Analytics.
Ed Berezitsky is a Senior Data Architect at Amazon Web Services.Ed helps customers design and implement solutions using streaming technologies, and specializes on Amazon MSK and Apache Kafka.