本文介紹如何在E-MapReduce的Hadoop集群,運行Spark作業消費DataHub數據、統計數據個數并打印出來。
Spark Streaming消費DataHub
準備工作
使用DataHub的訂閱功能訂閱Topic,詳細信息請參見創建訂閱。
消費DataHub數據
運行Spark Streaming作業消費DataHub數據有兩種使用方式:
指定特定的ShardId,消費該ShardId的數據。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的項目名。 topic, // DataHub的topic名稱。 subId, // DataHub的訂閱ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 shardId, // DataHub Topic中的一個ShardId。 read, // 處理DataHub數據的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一個Field的數據。 def read(record: RecordEntry): String = { record.getString(0) }
消費所有Shard的數據。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的項目名。 topic, // DataHub的topic名稱。 subId, // DataHub的訂閱ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 read, // 處理DataHub數據的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一個Field的數據。 def read(record: RecordEntry): String = { record.getString(0) }
說明完整示例代碼,請參見SparkDatahubDemo.scala。
Spark Structured Streaming消費DataHub
Maven依賴
Spark2
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datahub_2.11</artifactId> <version>2.0.0</version> </dependency>
Spark3
請在集群
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/
目錄,下載emr-datasources_shaded_***.jar作為依賴。說明如果您的集群中沒有以上目錄,則使用
/usr/lib/emrsdk-current/
目錄。emr-datasources_shaded_***.jar,請根據您實際集群目錄下的JAR包來替換。
消費示例
val spark = SparkSession .builder() .appName("test datahub") .getOrCreate() //創建readstream。 val datahubRows = spark .readStream .format("datahub") .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com") .option("project", "project_test") .option("startingoffsets", "earliest") .option("topic", "topic_test") .load //DataFrame處理邏輯。 datahubRows.printSchema() // 當前實例中,schema有key和value兩個字段。 println("print schema" + datahubRows.schema.toString()) val df = datahubRows.groupBy("key").count() //創建writestream,輸出數據。 val query = df .writeStream .format("console") .outputMode("complete") .start() //結束流任務。 query.awaitTermination(100000) spark.close()
核心流程如下:
創建readstream讀取DataHub DataFrame數據。
自定義數據源DataFrame處理邏輯。
創建writestream輸出數據。
說明運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量。
DataHub核心配置參數
重要Structured Streaming消費DataHub不需要填寫參數subId。主要是因為DataHub的消費offset由Spark管理,不需要DataHub管理,所以不需要提供subId。
參數
描述
是否必選
access.key.id
創建DataHub的阿里云AccessKey ID。
是
access.key.secret
創建DataHub的阿里云AccessKey Secret。
是
endpoint
DataHub API Endpoint。您可以在DataHub頁面查看。
是
project
DataHub的項目名。
是
topic
DataHub的topic名稱。
是
decimal.precision
當topic字段中包含decimal字段時,需要指定該參數。
否
decimal.scale
當topic字段中包含decimal字段時,需要指定該參數。
否
startingoffsets
開始消費點位。取值如下:
latest:表示最晚。
earliest:表示最早。
json字符串:json結構如下所示。
{ "project名稱#topic名稱" : { "shardId" : "startingoffsets的值" } }
示例如下。
{ "project_test#topic_test" : { "0" : "100" } }
否
endingoffsets
結束消費點位。取值如下:
latest:表示最晚。
json字符串:json結構如下所示。
{ "project名稱#topic名稱" : { "shardId" : "endingoffsets的值" } }
否