本文通過示例為您介紹如何使用MirrorMaker 2(簡稱MM2)on Kafka Connect跨集群同步數據。
背景信息
使用場景
Kafka MM2適用于下列場景:
- 遠程數據同步:通過MM2,Kafka數據可以在不同地域的集群進行傳輸復制。
- 災備場景:通過MM2,可以構建不同數據中心的主備兩個集群容災架構,MM2實時同步兩個集群的數據。當其中一個集群不可用時,可以將上面的應用程序切換到另一個集群,從而實現異地容災功能。
- 數據遷移場景:在業務上云、混合云、集群升級等場景,存在數據從舊集群遷移到新集群的需求。此時,您可以使用MM2實現新舊數據的遷移,保證業務的連續性。
- 聚合數據中心場景:通過MM2,可以將多個Kafka子集群的數據同步到一個中心Kafka集群,實現數據的匯聚。
功能
Kafka MM2作為數據復制工具,具有以下功能:
- 復制topics數據以及配置信息。
- 復制consumer groups及其消費topic的offset信息。
- 復制ACLs。
- 自動檢測新的topic以及partition。
- 提供MM2的metrics。
- 高可用以及可水平擴展的框架。
任務執行方式
MM2任務有以下執行方式:
- Distributed Connect集群的connector方式(推薦):在已有Connect集群執行MM2 connector任務的方式。您可以參照本文使用Connect集群服務的功能來管理MM2任務。
- Dedicated MirrorMaker集群方式:不需要使用Connect集群執行MM2 connector任務,而是直接通過Driver程序管理MM2的所有任務。具體操作,請參見使用MirrorMaker 2(Dedicated)跨集群同步數據。
- Standalone Connect的worker方式:執行單個MirrorSourceConnector任務,適合在測試場景下使用。
說明 推薦在Distributed Connect集群上啟動MM2 connector任務,可以借助Connect集群的Rest服務管理MM2任務。
MM2的詳細信息,請參見Apache Kafka。
前提條件
已創建兩個Kafka集群,一個為源集群emrsource,一個為目標集群emrdest,并選擇了Kafka服務,創建DataFlow集群的具體操作,請參見創建集群。說明 本文示例的源集群和目標集群都以EMR-3.42.0版本,且在同一VPC下的DataFlow集群為例。
使用限制
目標集群的Kafka軟件版本為2.12_2.4.1及以上。
操作流程
步驟一:在目標集群創建Kafka Connect集群
- 新增EMR Task機器組。在EMR控制臺目標集群emrdest的節點管理頁面,創建Task機器組。
- 擴容Task機器組。
- 查看KafkaConnect服務狀態,確保Kafka Connect集群已經啟動。
- 使用SSH方式登錄目標集群emrdest,詳情請參見登錄集群。
- 執行以下命令,檢查Kafka Connect Rest服務狀態。
curl -X GET http://task-1-1:8083| jq .
返回以下類似信息。% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 91 100 91 0 0 13407 0 --:--:-- --:--:-- --:--:-- 15166 { "version": "2.4.1", "commit": "42ce056344c5625a", "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****" }
步驟二:使用MirrorMaker2 connector
- 準備MM2 connector配置文件。您需要準備以下文件:
- 準備MirrorSourceConnector配置文件本文示例MirrorSourceConnector配置文件命名為mm2-source-connector.json。按照如下示例并根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
{ "name": "mm2-source-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "topics": "^foo.*", "tasks.max": "4", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "offset-syncs.topic.replication.factor": "3", "sync.topic.acls.interval.seconds": "20", "sync.topic.configs.interval.seconds": "20", "refresh.topics.interval.seconds": "20", "refresh.groups.interval.seconds": "20", "consumer.group.id": "mm2-mirror-source-consumer-group", "producer.enable.idempotence":"true", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }
說明 本文示例代碼中參數:source.cluster.bootstrap.servers
:該參數值的IP地址,需要替換為您實際環境源集群emrsource中Kafka服務的訪問地址,并且需要確保源Kafka集群和Kafka Connect集群的聯通性。topics
:該參數值表示會復制您源集群中以foo開頭的Topic。
- 準備MirrorCheckpointConnector配置文件本文示例MirrorCheckpointConnector配置文件命名為mm2-checkpoint-connector.json。按照如下示例并根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
{ "name": "mm2-checkpoint-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "checkpoints.topic.replication.factor": "3", "emit.checkpoints.interval.seconds": "20", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }
- 準備MirrorHeartbeatConnector配置文件本文示例MirrorHeartbeatConnector配置文件命名為mm2-heartbeat-connector.json。按照如下示例并根據實際情況修改相應的參數值。更多配置項詳情,請參見KIP-382的相關章節。
{ "name": "mm2-heartbeat-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "heartbeats.topic.replication.factor": "3", "emit.heartbeats.interval.seconds": "20", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }
- 準備MirrorSourceConnector配置文件
- 使用MirrorSourceConnector。
- 使用MirrorCheckpointConnector。
- 使用MirrorHeartbeatConnector。
- 在目標集群執行以下命令,查看MM2相關topic。
kafka-topics.sh --list --bootstrap-server core-1-1:9092
此時,在目標集群中,您可以看到以下topic已經創建:- emrsource.foo開頭的topic:由MirrorSourceConnector創建。
foo開頭的topic是您源集群上已有的,需要復制的topic。
- emrsource.checkpoints.internal:由MirrorCheckpointConnector創建,用于存儲offset等信息。
- heartbeats:由MirrorHeartbeatConnector創建。
- emrsource.foo開頭的topic:由MirrorSourceConnector創建。