亚马逊AWS官方博客

如何通过互联网安全地访问Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群(二)- mTLS认证

背景

承接上文如何通过互联网安全地访问Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群(一) ,我们介绍了 Amazon MSK 为用户提供了公开访问的特性,可以帮助用户设计更加安全和灵活的方式访问 MSK,无论访问请求是从本地网络、自建 IDC 或外部云环境中发起的。通过启用 Public Access 选项,Amazon Virtual Private Cloud(VPC)外部的授权客户端可以通过安全加密的通道进行数据传入和传出到对应的 Amazon MSK 集群。在上文中,我们介绍了三种访问启用 Public Access 选项的 MSK 集群的第一种方式,即通过 IAM 访问控制来完成,本文会演示第二种方式 – mTLS 身份验证的方法。

通过 mTLS 身份验证来访问 MSK 集群的架构图如下所示。

接下来,我们先通过以下步骤来完成Amazon MSK集群的公开访问配置和验证:

  1. 通过Amazon ACM 设置 Private CA;
  2. 创建 MSK 自定义集群配置;
  3. 创建Amazon MSK集群;
  4. 开启Public Access权限;
  5. 创建公开访问的客户端;
  6. 配置证书连接Amazon MSK集群;
  7. 验证公开访问;
  8. Python 客户端验证。

 

通过Amazon ACM 设置 Private CA

为了使用客户端证书认证,首先需要配置 Private CA。根据需求,填写相应的证书信息,例如 OU、CN 等。

等待创建完成,记录下对应的ARN,稍后创建 MSK 集群时会使用到。

点击右上角 Actions 选择安装证书。

确认并安装,之后可以看到 PCA 的状态变为了 Active 激活状态。

 

创建MSK自定义集群配置

如果使用mTLS 或者 SASL/SCRAM 身份验证的方法,必须配置启用 Kafka ACL 功能,因此这里我们首先创建一个自定义的集群配置。此外,我们也配置上auto.create.topics.enable集群选项,并设置为true(默认为false),这样可以启用在服务器上自动创建topic 便于演示,更多信息可以参考文档自定义MSK集群配置

auto.create.topics.enable=true
allow.everyone.if.no.acl.found=false

 

创建Amazon MSK集群

参考上文,Amazon MSK 允许用户选择 Apache Kafka 2.6.0 或更高版本的 MSK 集群开启Public Access选项,并且我们需要在创建 MSK 集群后才能启用Public Access。这里仍以us-east-1美东1区域为例进行演示。

集群配置选项选择刚刚创建的自定义配置。

要启用对集群的Public Access选项,与集群关联的子网必须是公有子网Public Subnet,这意味着子网必须具有连接 Internet 网关的关联路由表,并确保选择的安全规则组配置了合适的入站规则:

对于公开访问的MSK集群,可以参考以下集群端口信息:

  • 如果是通过TLS 加密与Broker进行通信,需要通过9194 端口公开访问;
  • 如果是通过 SASL/SCRAM 与Broker进行通信,需要通过9196 端口公开访问;
  • 如果是通过IAM 访问控制与Broker进行通信,需要通过9198 端口公开访问;
  • 如果是通过TLS 加密与Apache Zookeeper进行通信,需要通过2182 端口公开访问。

接下来是安全设置,要想使用Public Access选项,集群必须关闭Unauthenticated access未经身份验证的访问控制选项,并且这里在示例中我们开启 mTLS 访问控制的选项,并选择对应的 Private CA。

如果开启Public Access选项,集群内加密默认是必选项;另外,客户端和Broker之间的明文流量也会被关闭,必须使用TLS加密的方式进行。

保持其他参数为默认选项,然后创建MSK集群。

开启Public Access权限

在集群创建完成后,在集群的网络设置里选择配置公开访问。勾选开启,同时注意这次再次提醒需要确保集群的子网具有连接 Internet 网关的关联路由表,并且安全规则组限制了访问对象来源。

创建公开访问的客户端

在等待集群公开访问配置生效的过程中,我们来创建一个示例Kafka客户端,来模拟Internet访问请求。此处参照文档创建客户端实例,并根据文档创建Topic来安装Java环境。本文使用CloudShell来模拟客户端环境,更多CloudShell的详细信息可以参考文档

配置证书连接Amazon MSK集群

以下操作是在客户端实例上进行。在示例中,我们使用JVM Truststore与MSK集群进行通信,所以我们需要对应的Truststore文件,这里在客户端实例上将Java中的证书复制到当前目录下,根据用户不同的Java安装方式和路径,这里的文件路径需要进行适当的替换。

cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.342.b07-1.amzn2.0.1.x86_64/jre/lib/security/cacerts kafka.client.truststore.jks

执行以下命令创建private key,其中CN和Alias可以根据实际情况进行设置,并且设置对应的密码。

keytool -genkey -keystore kafka.client.keystore.jks -validity 365 -storepass Your-Store-Pass -keypass Your-Key-Pass -dname "CN=<Your-CN-Name>" -alias <Your-Alias-Name> -storetype pkcs12

*注:针对PKCS12,storepass和keypass需要设置一致,否则可能会遇到警告提示Warning:  Different store and key passwords not supported for PKCS12 KeyStores. Ignoring user-specified -keypass value。

执行以下命令创建证书请求。

keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias <Your-Alias-Name> -storepass Your-Store-Pass -keypass Your-Key-Pass

通过以下命令替换证书请求中的内容,将“BEGIN NEW CERTIFICATE REQUEST”替换为“BEGIN CERTIFICATE REQUEST”,以及” END NEW CERTIFICATE REQUEST”替换为” END CERTIFICATE REQUEST”。

sed -i s/"NEW CERTIFICATE"/"CERTIFICATE"/g client-cert-sign-request

执行以下命令进行证书请求的签名,这里需要替换 Private-CA-ARN 为之前创建 PCA 的ARN。另外,有效期根据实际情况进行修改,这里以365天为例。记录返回结果中 CertificateArn 的证书ARN信息。

aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=365,Type="DAYS"

执行以下命令获取签名后的证书信息,这里需要替换 Private-CA-ARN 为之前创建 PCA 的ARN,证书为刚刚的ARN。

CERT=$(aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN)

通过以下命令将证书的内容写入到signed-certificate-from-acm文件中。

echo $CERT | jq '.Certificate, .CertificateChain' | tr -d '"' | sed 's/\\n/\n/g' > signed-certificate-from-acm

通过以下命令将证书的内容写入到 kafka.client.keystore.jks 文件中,注意替换其中的 Example-Alias、Your-Store-Pass 以及 Your-Key-Pass。

keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass

示例中我们以 Kafka 自带的脚本工具为例演示如何进行配置,切换到 Kafka 文件夹中,并创建名为 client.properties 的配置文件,复制以下内容到配置文件中并根据实际环境进行调整:

security.protocol=SSL
ssl.truststore.location=/home/cloudshell-user/kafka.client.truststore.jks
ssl.keystore.location=/home/cloudshell-user/kafka.client.keystore.jks
ssl.keystore.password=Your-Store-Pass
ssl.key.password=Your-Key-Pass

验证公开访问

首先,查看MSK集群的bootstrap连接信息:

*注:公开访问启用成功后,可以到EC2控制台查看MSK集群配置的网卡信息,可以看到MSK集群对网卡进行了公网IP对应集群的域名。

然后,通过以下参考命令生产示例消息:

bin/kafka-console-producer.sh --broker-list BootstrapBroker-String --topic mytopic --producer.config client.properties

如下图所示:

这时候输入消息,会弹出报错:

ERROR [Producer clientId=console-producer] Topic authorization failed for topics [mytopic] (org.apache.kafka.clients.Metadata)

因为此时还没有配置 ACL 规则。首先,我们查看证书设置的CN,并以“User: <Your-CN>”的方式进行授权。

keytool --list -v -keystore ../kafka.client.keystore.jks -storepass Your-Store-Pass | grep -i owner

接下来,通过 ACL 工具进行授权。这里需要查看 Zookeeper 的连接信息:

因为安全原因 MSK 不允许公开访问 Zookeeper,所以这里需要能通过 VPC 连接到集群的实例进行设置。此外,这里有两种连接方式,加密和非加密的。如果使用加密信息,需要创建对应的配置文件供 kafka-acls.sh 的 –zk-tls-config-file 参数使用,如下所示:

zookeeper.ssl.client.enable=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.keystore.location=/home/cloudshell-user/kafka.client.keystore.jks
zookeeper.ssl.keystore.password=Your-Store-Pass
zookeeper.ssl.truststore.location=/home/cloudshell-user/kafka.client.truststore.jks
zookeeper.ssl.truststore.password=Your-Truststore-Pass

*注:kafka.client.truststore.jks 的默认密码是 changeit,可以通过以下命令进行修改:

keytool -keystore kafka.client.truststore.jks -storepass changeit -storepasswd -new 123456

通过以下命令设置对应的 ACL 规则,根据实际情况进行相应的替换:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ZooKeeper-Connection-String --add --allow-principal "User:CN=david " --operation Write --topic mytopic

再次执行producer命令,可以看到能够成功发送消息:

再同时打开另一个客户端连接窗口,通过以下参考命令消费示例消息:

bin/kafka-console-consumer.sh --bootstrap-server BootstrapBroker-String --topic mytopic --consumer.config client.properties

 

会遇到以下报错信息:

[2022-10-18 09:41:07,268] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-87555
Processed a total of 0 messages

 

再补充读权限:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect= ZooKeeper-Connection-String --add --allow-principal "User:CN=david" --operation Read --group=* --topic mytopic

再次运行可以正常查看消息:

以上,验证成功。

Python客户端验证

除了上述验证之外,我们也是用Python客户端进行基本的消息消费验证。其中,需要将上面的证书做一些转换以便在程序中使用。

首先,生成truststore的pem形式证书(默认密码为changeit,也可以先修改密码再生成):

keytool --list -rfc -keystore kafka.client.truststore.jks &gt;/tmp/truststore.pem

 

然后,将Private Key转换为pem形式证书:

openssl pkcs12 -nodes -in kafka.client.keystore.jks -out /tmp/private_key.pem

 

最后,拷贝一份ACM签名后的证书:

 

mv signed-certificate-from-acm /tmp/client_cert.pem

 

Python代码可以参考以下内容:

 

from kafka import KafkaConsumer
from kafka import TopicPartition
TOPIC = "mytopic"
       
consumer = KafkaConsumer(bootstrap_servers=BootstrapBroker-String,
                                 security_protocol='SSL',
                                 ssl_cafile='/tmp/truststore.pem',
                                 ssl_check_hostname=True,
                                 ssl_certfile='/tmp/client_cert.pem',
                                 ssl_keyfile='/tmp/private_key.pem')
       
# Read and print all messages from test topic
parts = consumer.partitions_for_topic(TOPIC)
if parts is None:
    exit(1)
partitions = [TopicPartition(TOPIC, p) for p in parts]
consumer.assign(partitions)
for  partition in partitions:
    consumer.seek_to_end(partition)
for msg in consumer:
    print(msg)

 

查看消息:

以上,验证成功。

 

小结

Amazon MSK提供了多种可以公开访问集群的机制,本文手把手地介绍了如何配置Amazon MSK集群的Public Access选项,以及通过 mTLS 身份验证的方式来安全地访问集群。之后,我们会通过另一篇博客来介绍公开访问Amazon MSK集群的最后一种模式:SASL/SCRAM,敬请期待!

参考资料

https://docs.thinkwithwp.com/msk/latest/developerguide/public-access.html

https://docs.thinkwithwp.com/msk/latest/developerguide/msk-update-security.html

https://docs.thinkwithwp.com/msk/latest/developerguide/msk-authentication.html

https://docs.thinkwithwp.com/msk/latest/developerguide/msk-working-with-encryption.html

https://catalog.us-east-1.prod.workshops.aws/workshops/c2b72b6f-666b-4596-b8bc-bafa5dcca741/en-US/securityencryption/tlsmauth/authorization

https://thinkwithwp.com/cn/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk/

https://thinkwithwp.com/cn/blogs/china/how-to-safely-access-amazon-managed-streaming-for-apache-kafka-amazon-msk-cluster-through-the-internet-i/

本篇作者

史天

亚马逊云科技资深解决方案架构师。拥有丰富的云计算、数据分析和机器学习经验,目前致力于数据科学、机器学习、无服务器等领域的研究和实践。译有《机器学习即服务》《基于Kubernetes的DevOps实践》《Kubernetes微服务实战》《Prometheus监控实战》《云原生时代的CoreDNS学习指南》等。

齐海澎

齐海澎,亚马逊云科技资深解决方案架构师,亚马逊云科技游戏技术社区中国区负责人,有多年Devops经验,擅长各种类型游戏架构与运维解决方案,致力于Amazon GameTech的推广,在Gamelift等游戏服务上有丰富的实践经验。