本文通過示例為您介紹如何使用MirrorMaker 2(簡稱MM2)on Kafka Connect跨集群同步數據。

背景信息

使用場景

Kafka MM2適用于下列場景:
  • 遠程數據同步:通過MM2,Kafka數據可以在不同地域的集群進行傳輸復制。
  • 災備場景:通過MM2,可以構建不同數據中心的主備兩個集群容災架構,MM2實時同步兩個集群的數據。當其中一個集群不可用時,可以將上面的應用程序切換到另一個集群,從而實現異地容災功能。
  • 數據遷移場景:在業務上云、混合云、集群升級等場景,存在數據從舊集群遷移到新集群的需求。此時,您可以使用MM2實現新舊數據的遷移,保證業務的連續性。
  • 聚合數據中心場景:通過MM2,可以將多個Kafka子集群的數據同步到一個中心Kafka集群,實現數據的匯聚。

功能

Kafka MM2作為數據復制工具,具有以下功能:
  • 復制topics數據以及配置信息。
  • 復制consumer groups及其消費topic的offset信息。
  • 復制ACLs。
  • 自動檢測新的topic以及partition。
  • 提供MM2的metrics。
  • 高可用以及可水平擴展的框架。

任務執行方式

MM2任務有以下執行方式:
  • Distributed Connect集群的connector方式(推薦):在已有Connect集群執行MM2 connector任務的方式。您可以參照本文使用Connect集群服務的功能來管理MM2任務。
  • Dedicated MirrorMaker集群方式:不需要使用Connect集群執行MM2 connector任務,而是直接通過Driver程序管理MM2的所有任務。具體操作,請參見使用MirrorMaker 2(Dedicated)跨集群同步數據
  • Standalone Connect的worker方式:執行單個MirrorSourceConnector任務,適合在測試場景下使用。
說明 推薦在Distributed Connect集群上啟動MM2 connector任務,可以借助Connect集群的Rest服務管理MM2任務。

MM2的詳細信息,請參見Apache Kafka

前提條件

已創建兩個Kafka集群,一個為源集群emrsource,一個為目標集群emrdest,并選擇了Kafka服務,創建DataFlow集群的具體操作,請參見創建集群
說明 本文示例的源集群和目標集群都以EMR-3.42.0版本,且在同一VPC下的DataFlow集群為例。

使用限制

目標集群的Kafka軟件版本為2.12_2.4.1及以上。

操作流程

  1. 步驟一:在目標集群創建Kafka Connect集群
  2. 步驟二:使用MirrorMaker2 connector

步驟一:在目標集群創建Kafka Connect集群

  1. 新增EMR Task機器組。
    在EMR控制臺目標集群emrdest的節點管理頁面,創建Task機器組。
    1. 單擊新增機器組
    2. 新增機器組面板,配置以下參數,其余參數請根據實際情況配置。
      參數說明
      節點組類型選擇TASK(任務實例組)
      節點組名稱本文示例為emr-task。
      存儲配置選擇一塊數據盤。
  2. 擴容Task機器組。
    1. 節點管理頁面,單擊新增的emr-task節點組操作列的擴容
    2. 在彈出的對話框中,選擇待增加的數量,勾選服務協議
      本示例增加的實例數量為1臺。您可以根據實際需要擴容Task實例的數量,如果需要高可用Connect集群,則建議擴容兩個以上實例。
    3. 單擊確定
  3. 查看KafkaConnect服務狀態,確保Kafka Connect集群已經啟動。
    1. 單擊上方的集群服務
    2. 單擊Kafka服務區域的狀態
    3. 組件列表區域,查看KafkaConnect的組件狀態,確保組件在運行中。
      KafkaConnect
  4. 使用SSH方式登錄目標集群emrdest,詳情請參見登錄集群
  5. 執行以下命令,檢查Kafka Connect Rest服務狀態。
    curl -X GET http://task-1-1:8083| jq .
    返回以下類似信息。
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
    {
      "version": "2.4.1",
      "commit": "42ce056344c5625a",
      "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
    }

步驟二:使用MirrorMaker2 connector

  1. 準備MM2 connector配置文件。
    您需要準備以下文件:
    • 準備MirrorSourceConnector配置文件
      本文示例MirrorSourceConnector配置文件命名為mm2-source-connector.json。按照如下示例并根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
      {
        "name": "mm2-source-connector",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "clusters": "emrsource,emrdest",
        "source.cluster.alias": "emrsource",
        "target.cluster.alias": "emrdest",
        "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
        "source.cluster.bootstrap.servers": "10.0.**.**:9092",
        "topics": "^foo.*",
        "tasks.max": "4",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "replication.factor": "3",
        "offset-syncs.topic.replication.factor": "3",
        "sync.topic.acls.interval.seconds": "20",
        "sync.topic.configs.interval.seconds": "20",
        "refresh.topics.interval.seconds": "20",
        "refresh.groups.interval.seconds": "20",
        "consumer.group.id": "mm2-mirror-source-consumer-group",
        "producer.enable.idempotence":"true",
        "source.cluster.security.protocol": "PLAINTEXT",
        "target.cluster.security.protocol": "PLAINTEXT"
      }
      說明 本文示例代碼中參數:
      • source.cluster.bootstrap.servers:該參數值的IP地址,需要替換為您實際環境源集群emrsource中Kafka服務的訪問地址,并且需要確保源Kafka集群和Kafka Connect集群的聯通性。
      • topics:該參數值表示會復制您源集群中以foo開頭的Topic。
    • 準備MirrorCheckpointConnector配置文件
      本文示例MirrorCheckpointConnector配置文件命名為mm2-checkpoint-connector.json。按照如下示例并根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
      {
          "name": "mm2-checkpoint-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "checkpoints.topic.replication.factor": "3",
          "emit.checkpoints.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
    • 準備MirrorHeartbeatConnector配置文件
      本文示例MirrorHeartbeatConnector配置文件命名為mm2-heartbeat-connector.json。按照如下示例并根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
      {
          "name": "mm2-heartbeat-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "heartbeats.topic.replication.factor": "3",
          "emit.heartbeats.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
  2. 使用MirrorSourceConnector。
    1. 通過Connect rest服務,使用mm2-source-connector.json文件,創建MirrorSourceConnector任務。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-source-connector.json http://task-1-1:8083/connectors/mm2-source-connector/config
    2. 執行以下命令,查看mm2-source-connector狀態。
      curl -s task-1-1:8083/connectors/mm2-source-connector/status | jq .
  3. 使用MirrorCheckpointConnector。
    1. 通過Connect rest服務,使用mm2-checkpoint-connector.json文件,創建MirrorCheckpointConnector任務。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-checkpoint-connector.json http://task-1-1:8083/connectors/mm2-checkpoint-connector/config
    2. 執行以下命令,查看mm2-checkpoint-connector狀態。
      curl -s task-1-1:8083/connectors/mm2-checkpoint-connector/status | jq .
  4. 使用MirrorHeartbeatConnector。
    1. 通過Connect rest服務,使用mm2-heartbeat-connector.json文件,創建MirrorHeartbeatConnector任務。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-heartbeat-connector.json http://task-1-1:8083/connectors/mm2-heartbeat-connector/config
    2. 執行以下命令,查看mm2-heartbeat-connector狀態。
      curl -s task-1-1:8083/connectors/mm2-heartbeat-connector/status | jq .
  5. 在目標集群執行以下命令,查看MM2相關topic。
    kafka-topics.sh --list --bootstrap-server core-1-1:9092
    此時,在目標集群中,您可以看到以下topic已經創建:
    • emrsource.foo開頭的topic:由MirrorSourceConnector創建。

      foo開頭的topic是您源集群上已有的,需要復制的topic。

    • emrsource.checkpoints.internal:由MirrorCheckpointConnector創建,用于存儲offset等信息。
    • heartbeats:由MirrorHeartbeatConnector創建。