本文為您介紹如何在Kafka集群運維中對Kafka運維流量進行限制,以避免由于運維流量影響到正常的業務流量。本文以EMR Kafka 2.4.1版本為例。

背景信息

由于運維操作而出現的IO流量稱為運維流量。在以下運維場景中需要對運維流量進行限制:
  • Partition Reassign場景。
  • 節點內副本移動到不同目錄的場景。
  • 集群Broker恢復時,副本數據同步的場景。

注意事項

  • 由于Kafka流量構成不同、業務場景不同和運維場景不同,您需要綜合判斷是否需要對運維流量進行限制。
  • 限流的閾值需要根據具體的業務場景來確定。通常運維流量閾值過小將導致運維操作無法完成,運維流量閾值過大將造成IO爭搶或帶寬滿載等問題,從而影響到正常業務流量,您應該合理的評估限流閾值。
  • 限流閾值設定需要考慮Topic業務流量的大小、業務可以承受的延遲、業務場景是否允許Kafka服務中斷、Kafka集群自身的磁盤IO與網絡IO的帶寬能力等因素。
  • 通常情況下,建議您在業務低峰期間進行此類運維操作。

Kafka運維流量限制

Kafka限流相關參數

參數描述
leader.replication.throttled.replicasTopic級別,表示需要限流的分區leader副本列表。

格式為[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... 或者用星號(*)表示該Topic的所有leader副本限流。

follower.replication.throttled.replicasTopic級別,表示需要限流的分區follower副本列表。

格式為[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... 或者用星號(*)表示該Topic的所有follwer副本限流。

leader.replication.throttled.rateBroker級別,leader節點復制讀流量。
follower.replication.throttled.rateBroker級別,follower節點復制寫流量。

限流參數查看方式

您可以通過kafka-configs.sh命令來查看限流參數的值。

  • 查看指定節點的Broker參數。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <your broker id> --describe
  • 查看指定Topic的參數。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <your topic name> --describe

Partition Reassign場景限流

重要
  • 限流速度不能過小,如果限流速度過小,將不能觸發實際的reassign復制過程。
  • 限流參數不會對正常的副本fetch流量進行限速。
  • 任務完成后,您需要通過verify參數移除Topic和Broker上的限速參數配置。
  • 如果剛開始已經設置了throttle參數,則可以通過execute命令再次修改throttle參數。
  • 如果剛開始沒有設置throttle參數,則需要使用kafka-configs.sh命令修改Topic上的leader.replication.throttled.replicas和follower.replication.throttled.replicas參數、修改Broker上的leader.replication.throttled.rate和follower.replication.throttled.rate參數。

通常使用kafka-reassign-partitions.sh工具來進行Partition Reassign操作,通過使用throttle參數來設置限流的大小。示例如下所示:

  1. 創建測試Topic。
    1. 以SSH方式登錄到Kafka集群的Master節點,詳情請參見登錄集群
    2. 執行以下命令,創建測試Topic。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      您可以通過以下命令查看Topic詳情。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. 執行以下命令,模擬數據寫入。
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. 設置throttle參數并執行reassign操作。
    1. 創建reassignment-json-file文件reassign.json,寫入如下內容。
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}
    2. 執行reassign操作。
      由于模擬的寫入速度為10 Mbit/s,所以將reassign限流速度設置為30 Mbit/s。
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
  4. 查看限流參數。
    • 查看指定節點的Broker參數。
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe
    • 查看指定Topic的參數。
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
  5. 查看reassign任務執行情況。
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    說明 任務完成后,您需要重復執行上述命令以移除限流參數。

節點內副本移動到不同目錄的場景限流

通過kafka-reassign-partitions.sh工具可以進行Broker節點內的副本遷移,參數replica-alter-log-dirs-throttle可以對節點內的遷移IO進行限制。示例如下所示:

  1. 創建測試Topic。
    1. 以SSH方式登錄到Kafka集群的Master節點,詳情請參見登錄集群
    2. 執行以下命令,創建測試Topic。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      您可以通過以下命令查看Topic詳情。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. 執行以下命令,模擬數據寫入。
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. 設置參數replica-alter-log-dirs-throttle并執行reassign操作。
    1. 創建文件reassign.json,將目標目錄寫入reassignment文件中,內容如下。
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}
    2. 執行replicas movement操作。
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  4. 查看限流參數。
    Broker節點內目錄間移動副本會在Broker上配置限流參數,參數名為Brokerreplica.alter.log.dirs.io.max.bytes.per.second。
    執行以下命令,查看指定節點的Broker參數。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  5. 查看reassign任務執行情況。
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    說明 任務完成后,您需要重復執行上述命令以移除限流參數。

集群Broker恢復時,副本數據同步場景限流

重要
  • 限流速度不能過小,如果限流速度過小,將不能觸發實際的reassign復制過程。
  • 限流參數不會對正常的副本fetch流量進行限速。
  • 數據恢復完成后,需要使用kafka-configs.sh命令刪除相應的參數。

當Broker重啟時,需要從leader副本進行副本數據的同步。在Broker節點遷移、壞盤修復重新上線等場景時,由于之前的副本數據完全丟失、副本數據恢復會產生大量的同步流量,有必要對恢復過程進行限流避免恢復流量過大影響正常流量。示例如下所示:

  1. 創建測試Topic。
    1. 以SSH方式登錄到Kafka集群的Master節點,詳情請參見登錄集群
    2. 執行以下命令,創建測試Topic。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      您可以通過以下命令查看Topic詳情。
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. 執行以下命令,寫入測試數據。
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. 通過kafka-configs.sh命令設置限流參數。
    //設置Topic上的限流參數。
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"
    //設置Broker上的限流參數。
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
    ......
  4. 在EMR控制臺停止Broker 1節點。
  5. 刪除Broker 1上的副本數據,模擬數據丟失的場景。
    rm -rf /mnt/disk2/kafka/log/test-throttled-0/
  6. 在EMR控制臺啟動Broker 1節點,觀察限流參數是否起作用。
  7. 待Broker 1相應的副本恢復到ISR列表后,使用kafka-configs.sh命令刪除限流參數的配置。
    //刪除Topic上的限流參數。
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled
    //刪除Broker上的限流參數
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0
    ......