本文為您介紹如何創建E-MapReduce(簡稱EMR)Kafka集群、Kafka訪問的設置,使用Kafka Topic和Kafka Connect服務,幫您快速了解和上手使用EMR Kafka。
注意事項
創建EMR Kafka集群前,您需要根據業務的預估負載,選擇合適的ECS實例機型以及Broker實例個數。由于業務場景差異很大,所以無法給出通用的集群規劃,您需要根據您的實際環境創建集群。通常,建議您選擇機型時考慮以下配置:
Broker機型的CPU和內存配比為1:4。
選擇云盤作為數據存儲盤。
充分考慮云盤的I/O吞吐率以及網卡帶寬之間的關系。
在部署參數上,考慮以下因素:
由于EMR Kafka版本仍依賴于Zookeeper,且Zookeeper的可用性直接關系到Kafka服務的高可用,因此,建議您創建集群時,選擇高可用的部署方式。啟用高可用后,將創建3個節點的Zookeeper服務。
如果Master機器組只部署Zookeeper,則Master機器組只需要配置1塊數據盤即可。
更詳細的評估建議,請參見集群資源規格評估建議。
創建EMR Kafka集群
該部分內容為您簡單介紹如何創建Kafka集群,更詳細的創建操作,請參見創建集群。
進入創建集群頁面。
單擊上方的創建集群。
在軟件配置階段,您可以根據需要的Kafka版本,選擇對應的EMR版本。
打開服務高可用開關,創建3節點的ZooKeeper集群。
重要啟用高可用后,將在Master機器組上部署3個節點的Zookeeper服務。由于EMR Kafka版本的服務可用性仍依賴于Zookeeper,所以建議您創建集群時,選擇高可用的部署方式。
在硬件配置階段,選擇合適的ECS實例機型以及節點數量。
機型:Core節點組選擇CPU和內存配比為1 Core:4 GB的機型。
節點數量:Core節點組選擇比Kafka分區副本數多1的節點數量以保持足夠的冗余。例如,如果規劃副本數為3,則節點數選擇為4。
在基礎配置階段,根據需求填寫相關參數。
在確認訂單頁面,選中E-MapReduce服務條款復選框,單擊創建。
(可選)Kafka訪問設置
默認情況下,Kafka不啟用登錄鑒權等安全設置。您可以根據實際情況選擇是否執行以下操作啟用相關配置。
啟用默認SSL加密功能
您可以執行以下步驟使用默認證書配置SSL。更多SSL的配置,請參見使用SSL加密Kafka鏈接。
進入服務的配置頁面。
在頂部菜單欄處,根據實際情況選擇地域和資源組。
單擊目標集群操作列的集群服務。
在集群服務頁面,單擊Kafka服務區域的配置。
修改配置項。
在配置頁面,單擊server.properties頁簽。
修改kafka.ssl.config.type的參數值為DEFAULT。
保存配置。
單擊保存。
在彈出的對話框中,輸入執行原因,單擊保存。
重啟Kafka服務。
在Kafka服務的配置頁面,選擇
。在彈出的對話框中,輸入執行原因,單擊確定。
在確認對話框中,單擊確定。
配置SASL登錄認證功能
該部分內容為您介紹如何啟用SASL/SCRAM-SHA-512認證機制。
創建Kafka服務管理用戶。
使用SSH方式登錄Kafka集群,詳情請參見登錄集群。
執行以下命令,創建用戶kafka。
sudo su - kafka kafka-configs.sh --bootstrap-server core-1-1:9092 --alter --add-config 'SCRAM-SHA-256=[password=kafka-secret],SCRAM-SHA-512=[password=kafka-secret]' --entity-type users --entity-name kafka
修改配置項。
在EMR控制臺的集群服務頁面,單擊Kafka服務區域的配置。
在配置頁面,單擊server.properties頁簽。
在配置過濾中,輸入配置項kafka.sasl.config.type,單擊圖標。
修改kafka.sasl.config.type的參數值為CUSTOM,單擊保存。
在彈出的對話框中,輸入執行原因,單擊確認。
新增配置項。
在Kafka服務配置頁面的server.properties頁簽,添加配置項。
單擊自定義配置。
在新增配置項對話框中,添加以下配置。
參數
參數值
sasl.mechanism.inter.broker.protocol
SCRAM-SHA-512
sasl.enabled.mechanisms
SCRAM-SHA-512
listener.name.${listener}.sasl.enabled.mechanisms
SCRAM-SHA-512
listener.name.${listener}.scram-sha-512.sasl.jaas.config
org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret" ;
說明${listener}
需要替換成具體的listener的名稱,例如sasl_ssl。單擊確定。
在彈出的對話框中,輸入執行原因,單擊保存。
配置客戶端JAAS文件內容。
通過kafka_client_jaas.conf配置文件的kafka.client.jaas.content配置項,配置Kafka客戶端JAAS,該配置將會用于啟動Kafka Schema Registry以及Kafka Rest Proxy組件。
在Kafka服務配置頁面,修改以下配置項。
頁簽
參數
參數值
kafka_client_jaas.conf
kafka.client.jaas.content
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; };
schema-registry.properties
schema_registry_opts
-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
kafka-rest.properties
kafkarest_opts
-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf
在服務配置區域,單擊保存。
在彈出的對話框中,輸入執行原因,單擊保存。
重啟Kafka服務。
在Kafka服務的配置頁面,選擇
。在彈出的對話框中,輸入執行原因,單擊確定。
在確認對話框中,單擊確定。
使用Kafka Topic
創建EMR Kafka集群并進行相關安全配置等設置之后,您可以開始使用Kafka Topic進行數據的生產消費。該部分內容為您介紹如何使用Kafka自帶命令操作Kafka Topic。實際業務場景,您也可以使用Kafka Manager或Cruise Control等軟件管理集群。
創建客戶端配置文件。
使用SSH方式登錄Kafka集群,詳情請參見登錄集群。
創建配置文件。
執行以下命令,創建配置文件client.properties。
vim client.properties
添加以下內容至配置文件client.properties中。
bootstrap.server=core-1-1:9092 security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret"; ssl.truststore.location=/var/taihao-security/ssl/ssl/truststore ssl.truststore.password=${password} ssl.keystore.location=/var/taihao-security/ssl/ssl/keystore ssl.keystore.password=${password} ssl.endpoint.identification.algorithm=
說明上面示例文件中的部分參數,需要您根據實際情況修改。
username
和password
:訪問Kafka服務的用戶名和密碼。ssl.truststore.password
和ssl.keystore.password
:如果Kafka集群使用默認SSL配置,則可以在EMR控制臺Kafka服務的server.properties配置頁面,查看ssl.truststore.password和ssl.key.password參數的參數值。
執行以下命令,創建Kafka Topic。
sudo su - kafka kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create
執行以下命令,查看Kafka topic詳細信息。
kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --topic test --describe
執行以下命令,生產數據。
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config client.properties --topic test
執行以下命令,消費數據。
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config client.properties --topic test --from-beginning --group test-consumer-group
使用Kafka Connect服務
EMR-3.41.0之后版本、EMR-5.7.0之后版本支持Kafka Connect組件的部署。該部分內容為您介紹如何使用Kafka Connect服務。
進入節點管理頁面。
在頂部菜單欄處,根據實際情況選擇地域和資源組。
單擊目標集群操作列的節點管理。
創建Kafka Connect節點組。
查看KafkaConnect服務狀態,確保Kafka Connect集群已經啟動。
您可以在Kafka服務的狀態頁面的組件列表區域,查看KafkaConnect的組件狀態,確保組件在運行中。
檢查Kafka Connect Rest服務狀態。
使用SSH方式登錄Kafka集群,詳情請參見登錄集群。
執行以下命令,檢查Kafka Connect Rest服務狀態。
curl -X GET http://task-1-1:8083| jq .
您會看到返回以下類似信息。
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 91 100 91 0 0 13407 0 --:--:-- --:--:-- --:--:-- 15166 { "version": "2.4.1", "commit": "42ce056344c5625a", "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****" }
使用Kafka Connect遷移數據。
您可以在Kafka集群中啟動MirrorMaker任務,進行數據復制與遷移。具體操作,請參見使用MirrorMaker 2(on Connect)跨集群同步數據。