亚马逊AWS官方博客

Amazon Managed Streaming for Apache Kafka 故障转移和扩容时间测试报告

1. 测试背景

在实际的生产环境中,随着数据量和流量的增长,Kafka 集群的性能和容量需求也会随之增加。因此,对于 Kafka 集群的扩容需求成为了一个常见的问题。为了解决这一问题,Amazon Web Services(AWS)推出了 Amazon Managed Streaming for Apache Kafka(Amazon MSK),这是一项托管式的 Kafka 服务,可帮助用户轻松地管理和扩展 Kafka 集群。为了测试 Amazon MSK 的扩容性能,我们对其进行了一系列的测试,包括扩容分区数量、扩容代理数量和扩容磁盘容量等方面的测试。此外,我们还模拟了故障转移场景,观察了集群在故障转移过程中的性能表现和响应时间,以评估 Amazon MSK 在故障发生时的自动恢复能力。这些测试旨在评估 Amazon MSK 在扩容过程中的性能表现和响应时间,并为用户提供参考,以便他们更好地了解 Amazon MSK 的扩容能力,并做出更好的决策。在本测试报告中,我们将详细介绍测试的环境和方法,并分析测试结果。

2. 相关名词解释

  • Topic(主题):Topic 是 Kafka 中消息的逻辑分类或主题。它是消息发布和订阅的中心概念,用于将消息按照特定的主题进行组织和存储。
  • Producer(生产者):Producer 是将消息发布到 Kafka 集群的组件或应用程序。它负责将消息发送到指定的 Topic。
  • Consumer(消费者):Consumer 是从 Kafka 集群中订阅和读取消息的组件或应用程序。它负责从指定的 Topic 接收和处理消息。
  • Partition(分区):Partition 是 Kafka 中 Topic 的分片或分段。每个 Topic 可以被分成多个 Partition,每个 Partition 在不同的 Broker 上进行存储和处理。Partition 允许消息的并行处理和提供高吞吐量。
  • Offset(偏移量):Offset 是指向 Partition 中消息的唯一标识符。消费者可以通过 Offset 来跟踪已读取的消息位置,并支持按照特定顺序读取消息。
  • Broker(代理服务器):Broker 是 Kafka 集群中的服务器,负责接收、存储和复制消息。一个 Kafka 集群可以由多个 Broker 组成,它们协同工作来提供高可用性和容错性。
  • Consumer Group(消费者组):Consumer Group 是一组具有相同 Group ID 的消费者的集合。在一个 Topic 中,每个 Partition 只能由一个 Consumer Group 中的一个消费者进行消费。消费者组允许消息的并行处理和提供负载均衡。
  • Consumer Group Balancing(消费者组平衡/重新平衡):当消费者加入或离开消费者组时,或者分区的分配发生变化时,会触发重新平衡操作。重新平衡是指协调者根据分区分配策略,将分区重新分配给消费者,以确保消费者组内各个消费者的负载均衡。

3. 测试环境

被测服务 Amazon Managed Streaming for Apache Kafka
测试区域 弗吉尼亚北部(us-east-1)
测试版本 Apache Kafka 2.8.1(MSK 推荐版本)
测试机型 kafka.m5.large
集群模式 集群采用已预置模式
压测实例 c5.large*3
压测软件 kafka-producer-perf-test.sh(Kafka 自带压测工具,请查看下一章节)

4. Kafka 常用命令

由于 MSK 测试环境使用 Apache Kafka 2.8.1 版本,因此下载客户端时请选择 2.8.1 版本,下载链接:https://kafka.apache.org/downloads,以下命令均来包含在客户端中。

1)创建当前 Topic

bin/kafka-topics.sh --create --zookeeper z-3.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181,z-1.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181,z-2.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181 --replication-factor 3 --partitions 5 --topic MSKTopic

2)删除当前 Topic

bin/kafka-topics.sh --zookeeper z-3.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181,z-1.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181,z-2.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181 --delete --topic MSKTopic

3)修改当前 Topic

bin/kafka-topics.sh --alter --partitions 10 --topic MSKTopic --bootstrap-server b-1.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092,b-2.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092,b-3.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092

4)查询当前 Topic

bin/kafka-topics.sh --describe --zookeeper z-3.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181,z-1.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181,z-2.democluster1.7qpb2k.c2.kafka.us-east-1.amazonaws.com:2181 --topic MSKTopic

5)分区重新分配

bin/kafka-reassign-partitions.sh --bootstrap-server b-1.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092,b-2.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092,b-3.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092 --reassignment-json-file partition-replica-reassignment.json --execute

6)生产测试数据

bin/kafka-producer-perf-test.sh --topic MSKTopic --record-size 100 --num-records 60000 --throughput -1 --producer-props bootstrap.servers=b-1.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092,b-2.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092,b-3.democluster1.cm7d9o.c2.kafka.us-east-1.amazonaws.com:9092

5. 测试前提

本次测试基于以下两个前提:

  1. 压测实例、Amazon Managed Streaming for Apache 均在一个 VPC
  2. Amazon Managed Streaming for Apache 使用生产模板的默认配置

6. 测试架构图

7. 测试用例

7.1 增加磁盘容量时间测试用例,测试基于以下配置模式,分别进行测试

1)无工作负载情况下的增加磁盘容量时间。

2)有工作负载情况下的增加磁盘容量时间、消费者因重新平衡被阻塞时间、以及生产者和消费者是否中断、偏移量是否延迟。

配置模式 增加磁盘容量 无工作负载 有工作负载

集群已预置模式

kafka.m5.large

3 个代理  5 个分区

磁盘容量由 100G 增加至 200G 增加磁盘容量时间 增加磁盘容量时间
消费者因重新平衡被阻塞时间
生产者是否中断
消费者是否中断
偏移量是否延迟

*分区最佳实践:请参考官网分区数量最佳实践

*模拟数据存储:发送 600,000,000 条数据,数据总量约为 60G

*工作负载场景:MSK 的代理节点平均 10% CPU 的工作负载(由 1 个 Producer 和 10 个 Consumer 同时工作产生)

*工作负载指标:当集群有工作负载时,集群的环境变化可能会导致消费者组重新平衡或消费异常等问题,因此增加以下指标:消费者因重新平衡被阻塞时间、以及生产者和消费者是否中断、偏移量是否延迟。

7.2 增加代理数量时间测试用例,测试基于以下配置模式,分别进行测试

1)无工作负载情况下的增加代理数量时间、分区重新分配时间。

2)有工作负载情况下的增加代理数量时间、分区重新分配时间、消费者因重新平衡被阻塞时间、以及生产者和消费者是否中断、偏移量是否延迟。

配置模式 增加代理数量 无工作负载 有工作负载

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

10 个分区

代理数量由 3 个增加至 6 个

(增加代理数量期间)

增加代理数量时间 增加代理数量时间
消费者因重新平衡被阻塞时间
生产者是否中断
消费者是否中断
偏移量是否延迟

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

10 个分区

代理数量由 3 个增加至 6 个

(分区重新分配期间)

分区重新分配时间 分区重新分配时间
消费者因重新平衡被阻塞时间
生产者是否中断
消费者是否中断
偏移量是否延迟

7.3 增加分区数量时间测试用例,测试基于以下两种配置模式,分别进行测试

1)无工作负载情况下的增加分区数量时间。

2)有工作负载情况下的增加分区数量时间、消费者因重新平衡被阻塞时间、以及生产者和消费者是否中断、偏移量是否延迟。

配置模式 增加分区数量 无工作负载 有工作负载

集群已预置模式kafka.m5.large

每个代理磁盘容量 100G

3 个代理

分区数量由 10 个增加至 100 个 增加分区数量时间 增加分区数量时间
消费者因重新平衡被阻塞时间
生产者是否中断
消费者是否中断
偏移量是否延迟

7.4 故障转移时间测试用例,测试基于以下配置模式,并在有工作负载的情况下分别进行测试

1)在代理 Broker 宕机的情况下,消费者因重新平衡被阻塞时间、以及生产者和消费者是否中断、偏移量是否延迟。

2)在消费者宕机的情况下,消费者因重新平衡被阻塞时间、以及生产者和消费者是否中断、偏移量是否延迟。

配置模式 Broker 宕机 Consumer 宕机

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

3 个代理

5 个分区

消费者因重新平衡被阻塞时间 消费者因重新平衡被阻塞时间
生产者是否中断 生产者是否中断
消费者是否中断 消费者是否中断
偏移量是否延迟 偏移量是否延迟

8. 测试方法

8.1 增加磁盘容量时间测试方法

* 磁盘扩容冷却期:在进行存储扩展时,两次事件之间的存储扩展具有至少六个小时的冷却期。尽管该操作可以立即提供额外的存储空间,但该服务对集群执行的优化可能需要 24 小时或更长时间。请参考:https://docs.thinkwithwp.com/zh_cn/msk/latest/developerguide/msk-update-storage.html

1)在 Producer 压测实例 large 上进行 Topic 的创建并写入模拟数据,步骤如下:

  • 按照如下命令,在压测 large 实例上安装 Kafka 客户端
    sudo yum install java-1.8.0
    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
    tar -xzf kafka_2.13-2.8.1.tgz
    cd kafka_2.13-2.8.1
    
  • 按照如下命令,进行 Topic 的创建和分区数量的指定
    bin/kafka-topics.sh --create --zookeeper z-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181,z-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181,z-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181 --replication-factor 3 --partitions 1000 --topic MSKTopic
  • 按照如下命令,向 MSK 的 Topic 中写入模拟数据
    bin/kafka-producer-perf-test.sh --topic MSKTopic --record-size 100 --num-records 600000000 --throughput -1 --producer-props bootstrap.servers=b-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092

2)在 Amazon Managed Streaming for Apache Kafka 控制台选择集群后,点击“操作”后点击“编辑”。

3)在打开新对话框中填写“每个代理的 Amazon EBS 存储”。由于两次事件之间的存储扩展具有至少 6 个小时的冷却期,因此建议更新的大小提前规划好,避免输入过小,6 小时内无法重新设置,而导致容量更新不足,仍满足不了生产。

4)利用控制台的“集群操作”来记录增加存储的开始时间和结束时间。如下图所示:

5)在模拟工作负载的场景下重新执行以上步骤,并统计时间,模拟工作负载的命令如下:

  • 在 1 台 Producer 压测实例 large 上进行数据的生产和发送,执行以下命令:
    bin/kafka-producer-perf-test.sh --topic MSKTopic --record-size 100 --num-records 1000000000 --throughput 10000 --producer-props bootstrap.servers=b-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092
  • 在 5 台 Consumer 压测实例 large 上采用 java 客户端模拟生产,并记录消费者阻塞时间,每台实例上同时运行两个 jar 包进行数据的消费和接收,共计 10 个消费者,关键代码如下:
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("group.id", args[1]);
    properties.put("enable.auto.commit", "true");
    properties.put("auto.commit.interval.ms", "1000");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("MSKTopic"));
    System.out.println("开始消费......");
    long startTimeLag = 0;
    long endTimeLag = 0;
    int isFirst = 0;
    int isEnd = 0;
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            if (records.count() == 0 && isFirst == 0) {
                isFirst = 1;
                isEnd = 0;
                startTimeLag = System.currentTimeMillis();
            }
            if (records.count() > 0 && isEnd == 0) {
                isFirst = 0;
                isEnd = 1;
                endTimeLag = System.currentTimeMillis();
                long minus = endTimeLag - startTimeLag;
                System.out.println("当前消费者被阻塞时间为:" + minus + "ms");
            }
            System.out.println("Processed: " + "topic=" + record.topic() +
                    ", partition=" + record.partition() + ", offset=" + record.offset() + "}");
            Thread.sleep(1);
        }
    }
    
  • 使用 CloudWatch 观察消费偏移量是否有延迟,进入 CloudWatch 后,选择全部指标,在搜索框中”Consumer Group”=”consumerGroup”和 Topic=”MSKTopic”,观察 MaxOffsetLag、EstimatedMaxTimeLag、SumOffsetLag 的斜率是否发生变化,若斜率无变化表明偏移未产生延迟。

8.2 增加代理数量时间测试方法

1)在压测实例 large 上进行 Topic 的创建并写入模拟数据,步骤如下:

  • 按照如下命令,在压测 large 实例上安装 Kafka 客户端
    sudo yum install java-1.8.0
    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
    tar -xzf kafka_2.13-2.8.1.tgz
    cd kafka_2.13-2.8.1
    
  • 按照如下命令,进行 Topic 的创建和分区数量的指定
    bin/kafka-topics.sh --create --zookeeper z-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181,z-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181,z-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181 --replication-factor 3 --partitions 5 --topic MSKTopic
  • 按照如下命令,向 MSK 的 Topic 中写入模拟数据
    bin/kafka-producer-perf-test.sh --topic MSKTopic --record-size 100 --num-records 600000000 --throughput -1 --producer-props bootstrap.servers=b-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092

2)在 Amazon Managed Streaming for Apache Kafka 控制台选择集群后,点击“操作”后点击“编辑”。

3)在打开新对话框中填写“每个区的代理数”。由于默认选择 3 个可用区,因此当每个区的代理数为 2 时,MSK 集群的代理总数将为 6。

4)利用控制台的“集群操作”来记录增加代理的开始时间和结束时间。如下图所示:

5)代理数量增加后,开始进行分区重新分配,同时记录分区重新分配时间,命令如下:

  • 创建 reassign.json
    cat > reassign.json <<EOF
    {"topics": [{"topic": "MSKTopic"}],"version": 1}
    EOF
    
  • 使用以下命令生产当前的分区情况以及建议的分区情况
    bin/kafka-reassign-partitions.sh --zookeeper z-1.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:2181,z-3.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:2181,z-2.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:2181 --generate --topics-to-move-json-file reassign.json --broker-list 1,2,3,4,5,6
  • 将生成的 Proposed partition reassignment configuration 内容复制到文件 partition-replica-reassignment.json 中,命令如下:
    cat > partition-replica-reassignment.json <<EOF
    {"version":1,"partitions":[{"topic":"MSKTopic","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"MSKTopic","partition":1,"replicas":[5,3,1],"log_dirs":["any","any","any"]},{"topic":"MSKTopic","partition":2,"replicas":[4,1,5],"log_dirs":["any","any","any"]},{"topic":"MSKTopic","partition":3,"replicas":[6,5,4],"log_dirs":["any","any","any"]},{"topic":"MSKTopic","partition":4,"replicas":[2,4,6],"log_dirs":["any","any","any"]}]}
    EOF
    
  • 使用以下脚本进行分区的重新分配,并记录时间,步骤如下:
  • 使用 vim 命令将下列语句创建脚本,并命名为 partition-reassignment-timer.sh
    #!/bin/bash
    
    # Kafka集群地址和主题名称
    bootstrap_servers="b-2.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:9092,b-3.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:9092,b-1.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:9092"
    topic_name="MSKTopic"
    completed_partitions=0
    # 实际分区数量
    total_partitions=$(grep -o -w "partition" partition-replica-reassignment.json | wc -l)  
    # 记录分区重新分配的起始时间
    start_time=$(date +%s)
    
    bin/kafka-reassign-partitions.sh --bootstrap-server "$bootstrap_servers" --reassignment-json-file partition-replica-reassignment.json --execute
    
    # 等待分区重新分配完成
    while [[ $completed_partitions -lt $total_partitions ]]; do
      sleep 1
      reassignment_status=$(bin/kafka-reassign-partitions.sh --bootstrap-server "$bootstrap_servers" --reassignment-json-file partition-replica-reassignment.json --verify | grep "complete")
      completed_partitions=$(echo "$reassignment_status" | wc -l)
      echo "Number of partitions completed: $completed_partitions"
    done
    
    # 记录分区重新分配的结束时间
    end_time=$(date +%s)
    
    # 计算分区重新分配的总耗时
    duration=$((end_time - start_time))
    
    # 将分区重新分配的起始时间、结束时间和总耗时记录到适当的位置
    echo "Partition reassignment start time: $(date -d @$start_time)"
    echo "Partition reassignment end time: $(date -d @$end_time)"
    echo "Partition reassignment duration: $duration seconds"
    
  • 按照如下命令,使得 command-timer.sh 获得执行权限
    chmod +x partition-reassignment-timer.sh
  • 执行脚本进行分区的重新分配,并记录时间
    sh partition-reassignment-timer.sh

6)在模拟工作负载的场景下重新统计时间,模拟工作负载、测试和记录工作负载指标步骤,以及记录偏移量是否延迟,请参考 8.1.5。

8.3 增加分区数量时间测试方法

1)在压测实例 large 上进行 Topic 的创建并写入模拟数据,步骤如下:

  • 按照如下命令,在压测 large 实例上安装 Kafka 客户端
    sudo yum install java-1.8.0
    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
    tar -xzf kafka_2.13-2.8.1.tgz
    cd kafka_2.13-2.8.1
    
  • 按照如下命令,进行 Topic 的创建和分区数量的指定
    bin/kafka-topics.sh --create --zookeeper z-1.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:2181,z-3.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:2181,z-2.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:2181 --replication-factor 3 --partitions 10 --topic MSKTopic
  • 按照如下命令,向 MSK 的 Topic 中写入模拟数据
    bin/kafka-producer-perf-test.sh --topic MSKTopic --record-size 100 --num-records 600000000 --throughput -1 --producer-props bootstrap.servers=b-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092

2)对在压测实例 large 上进行分区的修改,步骤如下:

  • 使用 vim 命令将下列语句创建脚本,并命名为 command-timer.sh
    #!/bin/bash
    # 获取命令
    command=$1
    # 记录开始时间
    start_time=$(date +%s.%N)
    # 执行命令
    eval $command
    # 记录结束时间
    end_time=$(date +%s.%N)
    # 计算执行时间(以秒为单位)
    execution_time=$(echo "$end_time - $start_time" | bc)
    # 输出结果
    echo "Command: $command"
    echo "Execution Time: $execution_time seconds"
    
  • 按照如下命令,使得 command-timer.sh 获得执行权限
    chmod +x command-timer.sh
  • 按照如下命令,进行增加分区的增加,并记录分区增加的时间
    ./command-timer.sh "bin/kafka-topics.sh --alter --partitions 10 --topic MSKTopic --bootstrap-server b-1.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:9092,b-2.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:9092,b-3.democluster2.sk91xz.c2.kafka.us-east-1.amazonaws.com:9092"

3)在模拟工作负载的场景下重新统计时间,模拟工作负载、测试和记录工作负载指标步骤,以及记录偏移量是否延迟,请参考 8.1.5。

8.4 故障转移时间测试方法

1)在 Producer 压测实例 large 上进行 Topic 的创建并写入模拟数据,步骤如下:

  • 按照如下命令,在压测 c5.large 实例上安装 Kafka 客户端
    sudo yum install java-1.8.0
    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
    tar -xzf kafka_2.13-2.8.1.tgz
    cd kafka_2.13-2.8.1
    
  • 按照如下命令,进行 Topic 的创建和分区数量的指定
    bin/kafka-topics.sh --create --zookeeper z-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181,z-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181,z-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:2181 --replication-factor 3 --partitions 5  --topic MSKTopic
  • 按照如下命令,向 MSK 的 Topic 中写入模拟数据
    bin/kafka-producer-perf-test.sh --topic MSKTopic --record-size 100 --num-records 600000000 --throughput -1 --producer-props bootstrap.servers=b-1.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-2.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092,b-3.democluster2.46ut9u.c14.kafka.us-east-1.amazonaws.com:9092

2)使用 Java 客户端模拟工作负载,关键代码请参考 8.1.5。

3)模拟 Broker 宕机后,记录消费者因重新平衡被阻塞时间,并观察生产者和消费者是否中断、偏移量是否延迟(请参考 8.1.5),重启 Broker 命令如下:

aws kafka reboot-broker --cluster-arn arn:aws:kafka:us-east-1:999364333834:cluster/demo-cluster-2/c11160e7-881b-4359-8ed2-0ae5c18bded3-2 --broker-ids 1

4)模拟 Consumer 宕机后,记录消费者因重新平衡被阻塞时间,并观察生产者和消费者是否中断、偏移量是否延迟(请参考 8.1.5),方法如下:选择一个正在消费 MSK 的控制台,使用 Command+C 进行任务的中止。

9. 测试数据

9.1 增加磁盘容量时间测试数据

配置模式 增加磁盘容量 测试指标 无工作负载 有工作负载

集群已预置模式

kafka.m5.large

3 个代理  10 个分区

磁盘容量由100G 增加至 200G 增加磁盘容量时间 00:05:00 00:06:00
消费者因重新平衡被阻塞时间 0ms
生产者是否中断
消费者是否中断
偏移量是否延迟

测试结论:增加 Amazon Managed Streaming for Apache Kafka 的磁盘容量与集群工作负载之间存在一定的相关性。通常情况下,增加磁盘容量需要约 5 至 6 分钟的时间,有工作负载时可能会增加大约 1 分钟,但不会对当前工作负载造成任何影响。总体来看,增加磁盘容量不会对当前工作负载产生影响,也不会因增加磁盘容量而阻塞生产者和消费者的操作。然而需要注意的是,两次存储扩展事件之间需要至少六小时的冷却时间,这意味着在此期间无法再次进行存储扩展操作。

9.2 增加代理数量时间测试数据

配置模式 增加代理数量 测试指标 无工作负载 有工作负载

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

10 个分区

代理数量由 3 个增加至 6 个

(增加代理数量期间)

增加代理数量时间 00:19:00 00:22:00
消费者因重新平衡被阻塞时间 0ms
生产者是否中断
消费者是否中断
偏移量是否延迟

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

10 个分区

代理数量由 3 个增加至 6 个

(分区重新分配期间)

分区重新分配时间 00:18:19 00:21:40
消费者因重新平衡被阻塞时间 0ms
生产者是否中断
消费者是否中断
偏移量是否延迟

测试结论:增加 Amazon Managed Streaming for Apache Kafka 的代理数量和集群的工作负载之间存在一定的相关性。总体而言,增加代理数量的时间范围大约是 19 分钟到 22 分钟,有工作负载时会增加大约 3 分钟。由于增加代理后需要重新分配分区,分区的分配时间大约在 18 分钟到 21 分钟左右,有工作负载时可能会略有增加。由于增加代理后,需要重新进行选举,过程中会导致生产者有一次中断。根据记录的的消费者重新平衡情况来看,增加代理数量和分区重新分配时的指标没有发生变化,表明消费者组重新平衡与增加代理数量、分区重新分配之间没有明显关联。

9.3 增加分区数量时间测试数据

配置模式 增加分区数量 测试指标 无工作负载 有工作负载

集群已预置模式kafka.m5.large

每个代理磁盘容量 100G

3 个代理

分区数量由 10 个增加至 100 个 增加分区数量时间 00:00:02.464 00:00:02.833
消费者因重新平衡被阻塞时间 00:00:02.791
生产者是否中断
消费者是否中断 部分消费者被阻塞
偏移量是否延迟

集群已预置模式kafka.m5.large

每个代理磁盘容量 100G

3 个代理

分区数量由 10 个增加至 1000 个 增加分区数量时间 00:00:04.645 00:00:04.978
消费者因重新平衡被阻塞时间 00:00:46.463
生产者是否中断
消费者是否中断 部分消费者被阻塞
偏移量是否延迟 有轻微延迟

测试结论:增加 Amazon Managed Streaming for Apache Kafka 的分区数量与集群的工作负载之间没有明显的关联,但在分区数量变大时,增加分区的时间由两秒增加到 4 秒。在测试中观察到,由于消费者重新平衡的原因,部分消费者在一段时间内可能无法接收数据。随着增加的分区数量变多,消费者因重新平衡而被阻塞的时间也逐渐增加,从最初的 2 秒增加到大约 46 秒左右,同时消费偏移量会有轻微延迟。然而,从生产者的角度来看,并没有观察到生产者因此而出现错误或中断。

9.4 故障转移时间测试数据

配置模式 不同分区和消费者 测试指标 Broker宕机 Consumer宕机

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

3 个代理

10 个分区 消费者因重新平衡被阻塞时间 465ms
生产者是否中断
消费者是否中断 部分消费者被短暂阻塞
偏移量是否延迟

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

3 个代理

100 个分区 消费者因重新平衡被阻塞时间 484ms
生产者是否中断
消费者是否中断 部分消费者被短暂阻塞
偏移量是否延迟

集群已预置模式

kafka.m5.large

每个代理磁盘容量 100G

3 个代理

1000 个分区 消费者因重新平衡被阻塞时间 450ms
生产者是否中断
消费者是否中断 部分消费者被短暂阻塞
偏移量是否延迟

测试结论:通过使用命令进行 Broker 重启以模拟宕机情况,我们观察到在有负载的情况下,消费者组并没有因 Broker 宕机而中断,也没有发生因重新平衡而导致消费者被阻塞的情况。同时,我们也观察到生产者只发生了短暂的中断,但由于中断时间很短,消费偏移量并没有出现延迟。在消费者宕机的情况下,我们进行了不同分区大小下的消费者宕机测试,发现消费者会因重新平衡而被短暂地阻塞,阻塞时间大约在 450 毫秒左右。然而,我们观察到生产者并没有发生中断,而在偏移量方面,由于阻塞时间很短,没有观察到偏移量延迟的情况。

10. 测试结论

1)当增加磁盘容量时,工作负载对磁盘容量增加的时间影响较小,大约需要 5-6 分钟的时间。而当增加代理数量时,工作负载对其会有一定的影响,有工作负载要比无工作负载的情况多耗时 1 分钟左右。

2)在集群增加代理数量后,需要手动进行分区重新分配,而重新分配的时间与工作负载和分区个数相关性较大,在有负载的情况下,要比无负载的情况下多出一倍的时间,长达 20 分钟。

3)当分区数量或消费者数量发生变化时,消费者组会进行重新平衡[1]。在分区数量和消费者数量较大时,这导致消费者被阻塞的时间会相对较长

4)生产者会在代理数量发生变化时,即在增加 Broker 数量和 Broker 宕机的情况下产生短暂中断,但并没有影响整体工作负载。

5)从整体测试情况看,偏移量并没有明显延迟,但由于在生产环境中数据量变化多样,并且消费者处理性能也和下游组件相关,因此偏移量延迟情况可能会在不同的生产环境下有不同的结果,在部分生产实践中,在分区个数和消费者实例个数较多的情况,会导致消费者长时间处于重平衡状态数小时,导致数据严重积压在 Kafka,该情况可以通过使用 Consumer 端使用渐进式黏性重平衡分配分区策略 – CooperativeStickyAssignor[2]改进, 该方法通过多轮重平衡但保证在重平衡期间部分分区依然可以被读取的方式减少带来的影响。另外,在 Kafka 3.0 中,CooperativeStickyAssignor 已被加入默认重平衡策略列表中。

[1]在实际的生产环境中,消费者因重新平衡被阻塞时间和其他测试时间可能会更长,本 Blog 中的测试仅为理想状态下结果。

[2] 使用 CooperativeStickyAssignor 改进重平衡:https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol

本篇作者

刘轶君

AWS 解决方案架构师,负责基于 AWS 云平台的解决方案咨询和设计,在系统架构、大数据、网络、应用研发领域有丰富的研发和实践经验。

郭立

于 2019 年加入亚马逊云科技,担任资深解决方案架构师和解决方案架构师经理,负责带领架构师团队支持战略客户和企业客户,同时为亚马逊云科技机器学习、数据分析与安全合规技术专家。

张冠霖

于 2020 年加入西云数据,担任亚马逊云科技中国区资深云计算技术支持工程师,Amazon OpenSearch Service 服务主题专家(Subject Matter Expert),专注于搜索引擎,流数据处理和消息队列领域,具有多年大数据应用架构设计与复杂问题排查经验。