當E-MapReduce(簡稱EMR)的DataFlow集群沒有安裝Ranger Kafka插件,或當前集群中的Ranger Kafka插件版本與實際使用的Ranger服務不兼容時,您需要進行手動集成。本文介紹如何手動安裝Ranger Kafka插件以及配置生效Ranger Kafka插件。
前提條件
已創建DataFlow集群。具體操作,請參見創建集群。
集群已啟用SASL登錄認證功能。開啟功能的具體操作,請參見使用SASL登錄認證Kafka服務。
重要配置SASL時,您需要確保Kafka服務內部通信使用的用戶具有所有Resource的權限,內部組件使用的Client用戶具有相應的權限(建議您內部組件Client與Kafka服務使用相同的用戶)。
已創建外部Ranger服務。
已在Ranger中創建了Kafka管理用戶,并且管理用戶擁有所有Kafka Resource的權限。
說明建議管理用戶的名稱為kafka。
使用限制
本文僅適用于EMR 3.45.0及以下版本(3.x系列)的EMR集群。
由于EMR 5.x系列的Kafka不支持Ranger鑒權,所以本文檔不適用于5.x系列的EMR集群。
操作步驟
本文示例中的Ranger代碼包以Ranger 2.1.0為例,實際場景請您根據對應的Ranger服務以及Kafka版本選取相應Ranger版本的代碼包。
建議Ranger Kafka插件選擇與Ranger服務相同的版本。
通過SSH方式連接集群,詳情請參見登錄集群。
執行以下命令,下載Ranger插件。
wget https://dlcdn.apache.org/ranger/2.1.0/apache-ranger-2.1.0.tar.gz
您也可以從Ranger官網下載所需版本的Ranger代碼包。
執行以下命令,構建Ranger Kafka插件安裝包。
tar xvf apache-ranger-2.1.0.tar.gz cd apache-ranger-2.1.0 mvn clean compile package assembly:assembly install -DskipTests -Drat.skip=true cd target ls -lrt ranger-2.1.0-kafka-plugin.tar.gz
上傳Ranger Kafka插件安裝包到所有Kafka Broker節點的固定安裝目錄,本文安裝根目錄以
/opt/apps/ranger-plugin
為例。準備腳本文件install.properties,并將該文件放置到安裝目錄
/opt/apps/ranger-plugin/ranger-2.1.0-kafka-plugin
下。請您根據實際場景來配置Ranger插件的安裝配置文件install.properties。
配置項說明
參數名稱
描述
是否必填
COMPONENT_INSTALL_DIR_NAME
Kafka安裝路徑。
EMR Kafka固定安裝在/opt/apps/KAFKA/kafka-current目錄。
是
POLICY_MGR_URL
Ranger策略庫URL。
根據Ranger服務的地址配置,格式可以參見install.properties配置文件。
是
REPOSITORY_NAME
使用的策略庫名稱。
根據Ranger服務的地址配置,格式可以參見install.properties配置文件。
是
XAAUDIT.SUMMARY.ENABLE
是否啟用審計。默認值為true。
是
XAAUDIT.SOLR.ENABLE
是否啟用Solr審計。默認值為true。
是
XAAUDIT.SOLR.URL
Solr地址。
根據實際情況填寫,格式可以參見install.properties配置文件。
是
XAAUDIT.SOLR.USER
Solr訪問用戶。
是
XAAUDIT.SOLR.PASSWORD
Solr訪問密碼。
是
XAAUDIT.SOLR.ZOOKEEPER
SolrCloud ZooKeeper訪問地址。
是
XAAUDIT.SOLR.FILE_SPOOL_DIR
審計日志存儲路徑。
根據實際情況填寫,格式可以參見install.properties配置文件。
是
代碼示例
以下為EMR 3.43.1版本的代碼示例。
# Location of component folder COMPONENT_INSTALL_DIR_NAME=/opt/apps/KAFKA/kafka-current # # Location of Policy Manager URL # # Example: # POLICY_MGR_URL=http://policymanager.xasecure.net:6080 # 按照實際場景修改 POLICY_MGR_URL=http://master-1-1.c-590b6062db9d****.cn-hangzhou.emr.aliyuncs.com:6080 # # This is the repository name created within policy manager # # Example: # REPOSITORY_NAME=kafkadev # 按照實際場景修改 REPOSITORY_NAME=kafkadev # AUDIT configuration with V3 properties #Should audit be summarized at source XAAUDIT.SUMMARY.ENABLE=true # Enable audit logs to Solr #Example #XAAUDIT.SOLR.ENABLE=true #XAAUDIT.SOLR.URL=http://localhost:6083/solr/ranger_audits #XAAUDIT.SOLR.ZOOKEEPER= #XAAUDIT.SOLR.FILE_SPOOL_DIR=/var/log/kafka/audit/solr/spool # 按照實際場景修改 XAAUDIT.SOLR.ENABLE=true XAAUDIT.SOLR.URL=http://master-1-1.c-590b6062db9d****.cn-hangzhou.emr.aliyuncs.com:6083/solr/ranger_audits XAAUDIT.SOLR.USER=NONE XAAUDIT.SOLR.PASSWORD=NONE XAAUDIT.SOLR.ZOOKEEPER=NONE XAAUDIT.SOLR.FILE_SPOOL_DIR=/var/log/taihao-apps/kafka/audit/spool # Enable audit logs to ElasticSearch #Example #XAAUDIT.ELASTICSEARCH.ENABLE=true #XAAUDIT.ELASTICSEARCH.URL=localhost #XAAUDIT.ELASTICSEARCH.INDEX=audit XAAUDIT.ELASTICSEARCH.ENABLE=false XAAUDIT.ELASTICSEARCH.URL=NONE XAAUDIT.ELASTICSEARCH.USER=NONE XAAUDIT.ELASTICSEARCH.PASSWORD=NONE XAAUDIT.ELASTICSEARCH.INDEX=NONE XAAUDIT.ELASTICSEARCH.PORT=NONE XAAUDIT.ELASTICSEARCH.PROTOCOL=NONE # Enable audit logs to HDFS #Example #XAAUDIT.HDFS.ENABLE=true #XAAUDIT.HDFS.HDFS_DIR=hdfs://node-1.example.com:8020/ranger/audit # If using Azure Blob Storage #XAAUDIT.HDFS.HDFS_DIR=wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path> #XAAUDIT.HDFS.HDFS_DIR=wasb://ranger_audit_cont****@my-azure-account.blob.core.windows.net/ranger/audit #XAAUDIT.HDFS.FILE_SPOOL_DIR=/var/log/kafka/audit/hdfs/spool XAAUDIT.HDFS.ENABLE=false XAAUDIT.HDFS.HDFS_DIR=hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit XAAUDIT.HDFS.FILE_SPOOL_DIR=/var/log/kafka/audit/hdfs/spool # Following additional propertis are needed When auditing to Azure Blob Storage via HDFS # Get these values from your /etc/hadoop/conf/core-site.xml #XAAUDIT.HDFS.HDFS_DIR=wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path> XAAUDIT.HDFS.AZURE_ACCOUNTNAME=__REPLACE_AZURE_ACCOUNT_NAME XAAUDIT.HDFS.AZURE_ACCOUNTKEY=__REPLACE_AZURE_ACCOUNT_KEY XAAUDIT.HDFS.AZURE_SHELL_KEY_PROVIDER=__REPLACE_AZURE_SHELL_KEY_PROVIDER XAAUDIT.HDFS.AZURE_ACCOUNTKEY_PROVIDER=__REPLACE_AZURE_ACCOUNT_KEY_PROVIDER #Log4j Audit Provider XAAUDIT.LOG4J.ENABLE=false XAAUDIT.LOG4J.IS_ASYNC=false XAAUDIT.LOG4J.ASYNC.MAX.QUEUE.SIZE=10240 XAAUDIT.LOG4J.ASYNC.MAX.FLUSH.INTERVAL.MS=30000 XAAUDIT.LOG4J.DESTINATION.LOG4J=true XAAUDIT.LOG4J.DESTINATION.LOG4J.LOGGER=xaaudit # End of V3 properties # # Audit to HDFS Configuration # # If XAAUDIT.HDFS.IS_ENABLED is set to true, please replace tokens # that start with __REPLACE__ with appropriate values # XAAUDIT.HDFS.IS_ENABLED=true # XAAUDIT.HDFS.DESTINATION_DIRECTORY=hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit/%app-type%/%time:yyyyMMdd% # XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit # XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit/archive # # Example: # XAAUDIT.HDFS.IS_ENABLED=true # XAAUDIT.HDFS.DESTINATION_DIRECTORY=hdfs://namenode.example.com:8020/ranger/audit/%app-type%/%time:yyyyMMdd% # XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY=/var/log/kafka/audit # XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY=/var/log/kafka/audit/archive # XAAUDIT.HDFS.IS_ENABLED=false XAAUDIT.HDFS.DESTINATION_DIRECTORY=hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit/%app-type%/%time:yyyyMMdd% XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY=__REPLACE__LOG_DIR/kafka/audit/archive XAAUDIT.HDFS.DESTINTATION_FILE=%hostname%-audit.log XAAUDIT.HDFS.DESTINTATION_FLUSH_INTERVAL_SECONDS=900 XAAUDIT.HDFS.DESTINTATION_ROLLOVER_INTERVAL_SECONDS=86400 XAAUDIT.HDFS.DESTINTATION_OPEN_RETRY_INTERVAL_SECONDS=60 XAAUDIT.HDFS.LOCAL_BUFFER_FILE=%time:yyyyMMdd-HHmm.ss%.log XAAUDIT.HDFS.LOCAL_BUFFER_FLUSH_INTERVAL_SECONDS=60 XAAUDIT.HDFS.LOCAL_BUFFER_ROLLOVER_INTERVAL_SECONDS=600 XAAUDIT.HDFS.LOCAL_ARCHIVE_MAX_FILE_COUNT=10 #Solr Audit Provider XAAUDIT.SOLR.IS_ENABLED=false XAAUDIT.SOLR.MAX_QUEUE_SIZE=1 XAAUDIT.SOLR.MAX_FLUSH_INTERVAL_MS=1000 XAAUDIT.SOLR.SOLR_URL=http://localhost:6083/solr/ranger_audits # End of V2 properties # # SSL Client Certificate Information # # Example: # SSL_KEYSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-keystore.jks # SSL_KEYSTORE_PASSWORD=none # SSL_TRUSTSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-truststore.jks # SSL_TRUSTSTORE_PASSWORD=none # # You do not need use SSL between agent and security admin tool, please leave these sample value as it is. # SSL_KEYSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-keystore.jks SSL_KEYSTORE_PASSWORD=myKeyFilePassword SSL_TRUSTSTORE_FILE_PATH=/etc/hadoop/conf/ranger-plugin-truststore.jks SSL_TRUSTSTORE_PASSWORD=changeit # # Custom component user # CUSTOM_COMPONENT_USER=<custom-user> # keep blank if component user is default CUSTOM_USER=kafka # # Custom component group # CUSTOM_COMPONENT_GROUP=<custom-group> # keep blank if component group is default CUSTOM_GROUP=hadoop
執行以下命令,安裝Ranger Kafka插件。
sudo su - root cd /opt/apps/ranger-plugin/ranger-2.1.0-kafka-plugin ./enable-kafka-plugin.sh ./install.properties
在EMR控制臺修改Kafka的配置文件server.properties。
在EMR控制臺Kafka服務的配置頁簽。
單擊server.properties頁簽。
修改以下參數信息。
kafka_server_start_cmd_addition_args:在參數值中追加內容CLASSPATH=$CLASSPATH:/opt/apps/KAFKA/kafka-current/config。
說明如果EMR控制臺Kafka服務不支持kafka_server_start_cmd_addition_args配置,則無法使用該方式修改CLASSPATH變量,此時您可以在Kafka Broker實例執行以下命令達到添加Ranger Kafka插件配置文件的目的。
cd /opt/apps/KAFKA/kafka-current/libs sudo ln -s /opt/apps/KAFKA/kafka-current/config kafka-conf
authorizer.class.name:修改參數值為org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer。
修改完后,保存配置。
單擊保存。
在彈出的對話框中,輸入執行原因,單擊保存。
在EMR控制臺重啟Kafka Broker服務。
在EMR控制臺Kafka服務的狀態頁面,單擊KafkaBroker操作列的重啟。
在彈出的對話框中,輸入執行原因,單擊確定。
在彈出的確認對話框中,單擊確定。
相關文檔
除了使用本文的手動集成Ranger Kafka插件的方式,您還可以根據實際情況決定是否使用EMR的腳本操作功能來批量安裝部署Ranger Kafka插件。具體操作,請參見手動執行腳本。
如果您的集群是EMR 3.45.0及以上版本,則可以在EMR控制臺直接開啟Ranger權限控制,詳情請參見配置Kafka開啟Ranger權限控制。