一、 业务背景
当客户端或 IoT 设备的应用程序需要传递日志等消息时,传统方式一般要通过 HTTP 的方式请求到服务端应用程序进行接受,处理,转发或存储,再进行分析处理(架构如下图)。此架构方案,对实时性、扩展性以及安全性上都有很大的挑战。
原始架构图
二、改造后架构图
改造后架构图
三、架构优势
- 客户端及 IoT 设备的应用程序,无需改造,可保持使用 HTTP 协议的方式。
- 不支持 REST API 调用,将 Kafka API 转换为 REST API。
- 从原始的离线处理方式处理转换为实时分析处理。
- 使用 Kafka REST Proxy 镜像部署于 Amazon ECS Fargate 的方式,使用 ELB 提供对外的 HTTP REST 服务解决上述问题,并且可以根据流量负载的情况实现自动伸缩,承载终端设备的海量请求。
- 通过使用托管服务提升安全性。
四、环境准备
1. 在 cn-northwest-1 控制台创建对应的 VPC(kafka-ecs-proxy)。
2.在 VPC(kafka-ecs-proxy)中准备一台 EC2 实例,并使用 aws configure 配置 Access Key 和 Secret Access Key。
五、MSK Cluster
1.在 VPC(kafka-ecs-proxy)中按需创建 MSK 集群。
2.创建完成后点击查看客户端信息获取集群 bootstrap-server。
3.配置 MSK 集群安全组以使 EC2 可访问。
4.在 EC2 中,安装并使用命令行工具创建对应 Topic。
$ sudo yum install java
$ wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
$ tar -zxf kafka_2.12-2.7.2.tgz
$ kafka_2.12-2.7.2/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}
六、ECR
1.在 ECR 控制台创建存储库。
2.在 EC2 中安装 docker,拉取 confluentinc/cp-kafka-rest 镜像并推送至 ECR(官方镜像地址:https://hub.docker.com/r/confluentinc/cp-kafka-rest)。
$ sudo yum update
$ sudo yum install docker
$ sudo systemctl start docker
$ sudo docker pull confluentinc/cp-kafka-rest
$ sudo docker tag confluentinc/cp-kafka-rest:latest <YOUR_ACCOUNT_ID>.dkr.ecr.cn-northwest-1.amazonaws.com.cn/kafka-rest:latest
$ sudo docker push <YOUR_ACCOUNT_ID>.dkr.ecr.cn-northwest-1.amazonaws.com.cn/kafka-rest:latest
七、ALB
1.在控制台中创建 ALB
2.创建目标组
3.返回 ALB 创建页面刷新目标组后完成创建
4.ALB 安全组需添加入站 8082 端口
八、ECS
1.在 ECS 控制台中创建 ECS 集群
2.创建任务定义
3.添加容器后,完成创建任务定义
4.创建 ECS 服务
a)创建基于 Fargate 的服务。
b)选择私有子网,并配置安全组。
c)配置步骤 5 中创建的 ALB 及注册目标组。
d)配置 AutoScaling,并选择基于哪项指标扩展(可配置多个策略)。
e)完成创建。
f)配置 MSK 安全组,添加 ECS Service 的安全组 Searvi-4828 入站规则。
九、测试
1.在公网环境下进行公网测试访问 ALB 地址:
# Get a list of topics
$ curl http://<YOUR_ALB_URL>:8082/topics
# Produce a message with JSON data
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"name": "testUser"}}]}' \
"http://<YOUR_ALB_URL>:8082/topics/http-logs"
2.使用压力测试工具 Siege 进行压测,并观测 AutoScaling 情况(默认为 2,最小任务数 2,最大任务数 20)。
#安装siege
$ sudo amazon-linux-extras install epel
$ sudo yum install siege
#并发200持60秒
$ siege -c200 -t60S --content-type "application/vnd.kafka.json.v2+json" 'http:// <YOUR_ALB_URL>:8082/topics/http-logs POST {"records":[{"value":{"name": "testUser"}}]}'
十、监控
1.打开 CloudWatch 控制台,导航 Container Insights,即可对 ECS 集群、服务以及任务进行监控。
总结
这篇文章呈现了使用 ALB 配合 Amazon ECS Fargate为Amazon MSK 设置 REST API 的快速构建方式。 该解决方案可以帮助您从任何 IoT 设备、客户端或任意编程语言向 Amazon MSK 生成或使用消息,并且可以使用 Amazon ECS Fargate 自动伸缩的能力灵活扩展,以满足海量实时消息的流失处理。
本篇作者