本文介紹如何在E-MapReduce的Hadoop集群運行Spark Streaming作業,處理Kafka集群的數據。

背景信息

E-MapReduce上的Hadoop集群和Kafka集群都是基于純開源軟件,相關編程使用方法可參見官方相應文檔。

訪問Kerberos Kafka集群

E-MapReduce支持創建基于Kerberos認證的Kafka集群。當Hadoop集群作業需要訪問Kerberos Kafka集群時,有以下兩種使用方式:
  • 非Kerberos Hadoop集群:提供用于Kafka集群的Kerberos認證的kafka_client_jaas.confkrb5.conf文件。
  • Kerberos Hadoop集群:基于Kerberos集群跨域互信,提供用于Hadoop集群的Kerberos認證的kafka_client_jaas.confkrb5.conf文件。

    跨域互信詳細信息,請參見跨域互信

以上兩種方式都需要運行作業時提供kafka_client_jaas.confkrb5.conf文件,用于Kerberos認證。

  • kafka_client_jaas.conf文件格式如下。
    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        serviceName="kafka"
        keyTab="/path/to/kafka.keytab"
        principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
    };
    說明 keytab文件的獲取,請參見兼容MIT Kerberos認證
  • krb5.conf文件,請從Kafka集群的/etc/目錄下獲取。

Spark Streaming訪問Kerberos Kafka集群

添加Kafka集群各個節點的長域名和IP信息至Hadoop集群各個節點的/etc/hosts中。長域名和IP信息,您可以在/etc/hosts中獲取,長域名形式為emr-xxx-x.cluster-xxx

當運行Spark Streaming作業訪問Kerberos kafka時,可以在spark-submit命令行參數中提供所需的kafka_client_jaas.confkafka.keytab文件。
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class  xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
kafka_client_jaas.conf文件中,keytab文件路徑需要寫相對路徑,請嚴格按照如下keyTab配置項寫法。
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};

spark-sql訪問Kafka

示例代碼如下。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
說明 /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*中包含Kafka DataSource類型。如果您EMR集群使用的是Spark2,則應修改上面命令中的spark3應該換成spark2
創建和查詢示例如下。
create table test_kafka
using loghub
  options(kafka.bootstrap.servers='alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
          subscribe='test_topic',
          startingoffsets='earliest'
)

select * from test_kafka;

附錄