日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

快速開始使用EMR Kafka

更新時間:

本文為您介紹如何創建E-MapReduce(簡稱EMR)Kafka集群、Kafka訪問的設置,使用Kafka Topic和Kafka Connect服務,幫您快速了解和上手使用EMR Kafka。

注意事項

創建EMR Kafka集群前,您需要根據業務的預估負載,選擇合適的ECS實例機型以及Broker實例個數。由于業務場景差異很大,所以無法給出通用的集群規劃,您需要根據您的實際環境創建集群。通常,建議您選擇機型時考慮以下配置:

  • Broker機型的CPU和內存配比為1:4。

  • 選擇云盤作為數據存儲盤。

  • 充分考慮云盤的I/O吞吐率以及網卡帶寬之間的關系。

在部署參數上,考慮以下因素:

  • 由于EMR Kafka版本仍依賴于Zookeeper,且Zookeeper的可用性直接關系到Kafka服務的高可用,因此,建議您創建集群時,選擇高可用的部署方式。啟用高可用后,將創建3個節點的Zookeeper服務。

  • 如果Master機器組只部署Zookeeper,則Master機器組只需要配置1塊數據盤即可。

更詳細的評估建議,請參見集群資源規格評估建議

創建EMR Kafka集群

該部分內容為您簡單介紹如何創建Kafka集群,更詳細的創建操作,請參見創建集群

  1. 進入創建集群頁面。

    1. 登錄E-MapReduce控制臺

    2. 單擊上方的創建集群

  2. 軟件配置階段,您可以根據需要的Kafka版本,選擇對應的EMR版本。

    打開服務高可用開關,創建3節點的ZooKeeper集群。軟件配置

    重要

    啟用高可用后,將在Master機器組上部署3個節點的Zookeeper服務。由于EMR Kafka版本的服務可用性仍依賴于Zookeeper,所以建議您創建集群時,選擇高可用的部署方式。

  3. 硬件配置階段,選擇合適的ECS實例機型以及節點數量。

    • 機型:Core節點組選擇CPU和內存配比為1 Core:4 GB的機型。

    • 節點數量:Core節點組選擇比Kafka分區副本數多1的節點數量以保持足夠的冗余。例如,如果規劃副本數為3,則節點數選擇為4。

    硬件配置

  4. 基礎配置階段,根據需求填寫相關參數。

    基礎配置

  5. 確認訂單頁面,選中E-MapReduce服務條款復選框,單擊創建

(可選)Kafka訪問設置

默認情況下,Kafka不啟用登錄鑒權等安全設置。您可以根據實際情況選擇是否執行以下操作啟用相關配置。

啟用默認SSL加密功能

您可以執行以下步驟使用默認證書配置SSL。更多SSL的配置,請參見使用SSL加密Kafka鏈接

  1. 進入服務的配置頁面。

    1. 登錄E-MapReduce控制臺

    2. 在頂部菜單欄處,根據實際情況選擇地域和資源組

    3. 單擊目標集群操作列的集群服務

    4. 集群服務頁面,單擊Kafka服務區域的配置

  2. 修改配置項。

    1. 配置頁面,單擊server.properties頁簽。

    2. 修改kafka.ssl.config.type的參數值為DEFAULT

      修改參數

  3. 保存配置。

    1. 單擊保存

    2. 在彈出的對話框中,輸入執行原因,單擊保存

  4. 重啟Kafka服務。

    1. 在Kafka服務的配置頁面,選擇更多操作 > 重啟

    2. 在彈出的對話框中,輸入執行原因,單擊確定

    3. 確認對話框中,單擊確定

配置SASL登錄認證功能

該部分內容為您介紹如何啟用SASL/SCRAM-SHA-512認證機制。

  1. 創建Kafka服務管理用戶。

    1. 使用SSH方式登錄Kafka集群,詳情請參見登錄集群

    2. 執行以下命令,創建用戶kafka。

      sudo su - kafka
      kafka-configs.sh --bootstrap-server core-1-1:9092 --alter --add-config 'SCRAM-SHA-256=[password=kafka-secret],SCRAM-SHA-512=[password=kafka-secret]' --entity-type users --entity-name kafka
  2. 修改配置項。

    1. 在EMR控制臺的集群服務頁面,單擊Kafka服務區域的配置

    2. 配置頁面,單擊server.properties頁簽。

    3. 配置過濾中,輸入配置項kafka.sasl.config.type,單擊Search圖標。

    4. 修改kafka.sasl.config.type的參數值為CUSTOM,單擊保存

    5. 在彈出的對話框中,輸入執行原因,單擊確認

  3. 新增配置項。

    1. 在Kafka服務配置頁面的server.properties頁簽,添加配置項。

    2. 單擊自定義配置

    3. 新增配置項對話框中,添加以下配置。

      參數

      參數值

      sasl.mechanism.inter.broker.protocol

      SCRAM-SHA-512

      sasl.enabled.mechanisms

      SCRAM-SHA-512

      listener.name.${listener}.sasl.enabled.mechanisms

      SCRAM-SHA-512

      listener.name.${listener}.scram-sha-512.sasl.jaas.config

      org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret" ;

      說明

      ${listener}需要替換成具體的listener的名稱,例如sasl_ssl。

    4. 單擊確定

    5. 在彈出的對話框中,輸入執行原因,單擊保存

  4. 配置客戶端JAAS文件內容。

    通過kafka_client_jaas.conf配置文件的kafka.client.jaas.content配置項,配置Kafka客戶端JAAS,該配置將會用于啟動Kafka Schema Registry以及Kafka Rest Proxy組件。

    1. 在Kafka服務配置頁面,修改以下配置項。

      頁簽

      參數

      參數值

      kafka_client_jaas.conf

      kafka.client.jaas.content

      KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="admin"
      password="admin-secret";
      };

      schema-registry.properties

      schema_registry_opts

      -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf

      kafka-rest.properties

      kafkarest_opts

      -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf

    2. 服務配置區域,單擊保存

    3. 在彈出的對話框中,輸入執行原因,單擊保存

  5. 重啟Kafka服務。

    1. 在Kafka服務的配置頁面,選擇更多操作 > 重啟

    2. 在彈出的對話框中,輸入執行原因,單擊確定

    3. 確認對話框中,單擊確定

使用Kafka Topic

創建EMR Kafka集群并進行相關安全配置等設置之后,您可以開始使用Kafka Topic進行數據的生產消費。該部分內容為您介紹如何使用Kafka自帶命令操作Kafka Topic。實際業務場景,您也可以使用Kafka Manager或Cruise Control等軟件管理集群。

  1. 創建客戶端配置文件。

    1. 使用SSH方式登錄Kafka集群,詳情請參見登錄集群

    2. 創建配置文件。

      1. 執行以下命令,創建配置文件client.properties

        vim client.properties
      2. 添加以下內容至配置文件client.properties中。

        bootstrap.server=core-1-1:9092
        security.protocol=SASL_SSL
        sasl.mechanism=SCRAM-SHA-512
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret";
        ssl.truststore.location=/var/taihao-security/ssl/ssl/truststore
        ssl.truststore.password=${password}
        ssl.keystore.location=/var/taihao-security/ssl/ssl/keystore
        ssl.keystore.password=${password}
        ssl.endpoint.identification.algorithm=
        說明

        上面示例文件中的部分參數,需要您根據實際情況修改。

        • usernamepassword:訪問Kafka服務的用戶名和密碼。

        • ssl.truststore.passwordssl.keystore.password:如果Kafka集群使用默認SSL配置,則可以在EMR控制臺Kafka服務的server.properties配置頁面,查看ssl.truststore.passwordssl.key.password參數的參數值。

  2. 執行以下命令,創建Kafka Topic。

    sudo su - kafka
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create
  3. 執行以下命令,查看Kafka topic詳細信息。

    kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --topic test --describe
  4. 執行以下命令,生產數據。

     kafka-console-producer.sh  --broker-list core-1-1:9092 --producer.config client.properties --topic test
  5. 執行以下命令,消費數據。

    kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config  client.properties --topic test --from-beginning --group test-consumer-group

使用Kafka Connect服務

EMR-3.41.0之后版本、EMR-5.7.0之后版本支持Kafka Connect組件的部署。該部分內容為您介紹如何使用Kafka Connect服務。

  1. 進入節點管理頁面。

    1. 登錄E-MapReduce控制臺

    2. 在頂部菜單欄處,根據實際情況選擇地域和資源組

    3. 單擊目標集群操作列的節點管理

  2. 創建Kafka Connect節點組。

    Connect安裝在EMR Task節點組。在EMR Kafka集群創建Task節點組后,EMR會自動在該節點組創建Kafka Connect集群。

    1. 新建EMR Task節點組

      可以在節點管理頁面,單擊新增節點組,新建Task節點組。具體操作,請參見新增節點組

    2. 擴容Task節點組

      根據您的實際需求,可以擴容Task機器組實例數量。具體操作,請參見擴容集群

  3. 查看KafkaConnect服務狀態,確保Kafka Connect集群已經啟動。

    您可以在Kafka服務的狀態頁面的組件列表區域,查看KafkaConnect的組件狀態,確保組件在運行中。KafkaConnect

  4. 檢查Kafka Connect Rest服務狀態。

    1. 使用SSH方式登錄Kafka集群,詳情請參見登錄集群

    2. 執行以下命令,檢查Kafka Connect Rest服務狀態。

      curl -X GET http://task-1-1:8083| jq .

      您會看到返回以下類似信息。

        % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                       Dload  Upload   Total   Spent    Left  Speed
      100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
      {
        "version": "2.4.1",
        "commit": "42ce056344c5625a",
        "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
      }
  5. 使用Kafka Connect遷移數據。

    您可以在Kafka集群中啟動MirrorMaker任務,進行數據復制與遷移。具體操作,請參見使用MirrorMaker 2(on Connect)跨集群同步數據