本文通過示例為您介紹如何通過EMR的集群腳本功能,快速部署使用MirrorMaker 2.0(MM2)服務同步數據。

背景信息

本文的業務場景以EMR DataFlow集群作為目的集群,并且在目的集群中以Dedicated MirrorMaker集群的方式部署MM2,即EMR DataFlow集群既作為目的集群又作為Dedicated MirrorMaker集群。在實際業務場景中,您可以將MirrorMaker集群部署到單獨的服務器上。

Kafka MM2適用于下列場景:
  • 遠程數據同步:通過MM2,Kafka數據可以在不同地域的集群進行傳輸復制。
  • 災備場景:通過MM2,可以構建不同數據中心的主備兩個集群容災架構,MM2實時同步兩個集群的數據。當其中一個集群不可用時,可以將上面的應用程序切換到另一個集群,從而實現異地容災功能。
  • 數據遷移場景:在業務上云、混合云、集群升級等場景,存在數據從舊集群遷移到新集群的需求。此時,您可以使用MM2實現新舊數據的遷移,保證業務的連續性。
  • 聚合數據中心場景:通過MM2,可以將多個Kafka子集群的數據同步到一個中心Kafka集群,實現數據的匯聚。
Kafka MM2作為數據復制工具,具有以下功能:
  • 復制topics數據以及配置信息。
  • 復制consumer groups及其消費topic的offset信息。
  • 復制ACLs。
  • 自動檢測新的topic以及partition。
  • 提供MM2的metrics。
  • 高可用以及可水平擴展的框架。
MM2任務有以下執行方式:
  • Distributed Connect集群的connector方式(推薦):在已有Connect集群執行MM2 connector任務的方式。具體操作,請參見使用MirrorMaker 2(on Connect)跨集群同步數據
  • Dedicated MirrorMaker集群方式:不需要使用Connect集群執行MM2 connector任務,而是直接通過Driver程序管理MM2的所有任務。

    您可以參照本文通過Driver程序來管理MM2任務。

  • Standalone Connect的worker方式:執行單個MirrorSourceConnector任務,適合在測試場景下使用。
說明 推薦在Distributed Connect集群上啟動MM2 connector任務,可以借助Connect集群的Rest服務管理MM2任務。

前提條件

  • 已創建兩個Kafka集群,一個為源集群,一個為目的集群(EMR DataFlow集群),并選擇了Kafka服務,創建DataFlow集群詳情請參見創建集群。
    說明 本文示例的源和目的集群都以EMR-3.42.0版本的DataFlow集群為例。
  • 已在OSS上創建存儲空間,詳情請參見創建存儲空間。

使用限制

EMR DataFlow集群的Kafka軟件的版本為2.12_2.4.1及以上。

操作步驟

  1. 準備MM2配置文件mm2.properties并上傳到您的OSS存儲。
    以下配置內容僅作為參考,您需要替換文本中的源集群和目標集群的src.bootstrap.serversdest.bootstrap.servers,并根據實際業務需求進行相應的配置。MM2配置的詳細信息請參見Configuring Geo-Replication。
    # see org.apache.kafka.clients.consumer.ConsumerConfig for more details
    
    # Sample MirrorMaker 2.0 top-level configuration file
    # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
    
    # specify any number of cluster aliases
    clusters = src, dest
    
    # connection information for each cluster
    src.bootstrap.servers = <your source kafka cluster servers>
    dest.bootstrap.servers = <your destination kafka cluster servers>
    
    # enable and configure individual replication flows
    src->dest.enabled = true
    src->dest.topics = foo-.*
    groups=.*
    topics.blacklist="__.*"
    
    # customize as needed
    replication.factor=3
  2. 準備部署腳本kafka_mm2_deploy.sh并上傳到OSS存儲。
    #!/bin/bash
    SIGNAL=${SIGNAL:-TERM}
    PIDS=$(ps ax | grep -i 'org.apache.kafka.connect.mirror.MirrorMaker' | grep java | grep -v grep | awk '{print $1}')
    if [ -n "$PIDS" ]; then
      echo "stop the exist mirror maker server."
      kill -s $SIGNAL $PIDS
    fi
    KAFKA_CONF=/etc/taihao-apps/kafka-conf/kafka-conf
    TAIHAO_EXECUTOR=/usr/local/taihao-executor-all/executor/1.0.1
    cd $KAFKA_CONF
    if [ -e "./mm2.properties" ]; then
      mv mm2.properties mm2.properties.bak
    fi
    ${TAIHAO_EXECUTOR}/ossutil64 cp oss://<yourBuket>/mm2.properties ./ -e <yourEndpoint> -i <yourAccessKeyId> -k <yourAccessKeySecret>
    su - kafka <<EOF
    exec connect-mirror-maker.sh -daemon $KAFKA_CONF/mm2.properties
    exit;
    EOF
    涉及替換參數如下。
    參數 描述
    KAFKA_CONF 檢查變量路徑是否正確,如果不正確,則需要修改為實際的地址。
    TAIHAO_EXECUTOR
    oss://<yourBucket>/mm2.properties 替換為mm2.properties的實際存儲路徑。
    <yourEndpoint> OSS服務的地址。
    <yourAccessKeyId> 阿里云賬號的AccessKey ID。
    <yourAccessKeySecret> 阿里云賬號的AccessKey Secret。
  3. 在EMR控制臺執行腳本, 具體操作請參見手動執行腳本
    說明 在創建執行腳本的過程中,您應正確選擇腳本的執行節點,通常選擇所有的Broker節點。
    執行完成后,即實現了Kafka集群間的數據遷移。