日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark對接DataHub

本文介紹如何在E-MapReduce的Hadoop集群,運行Spark作業消費DataHub數據、統計數據個數并打印出來。

Spark Streaming消費DataHub

  • 準備工作

    使用DataHub的訂閱功能訂閱Topic,詳細信息請參見創建訂閱。

  • 消費DataHub數據

    運行Spark Streaming作業消費DataHub數據有兩種使用方式:

    • 指定特定的ShardId,消費該ShardId的數據。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub的項目名。
                topic, // DataHub的topic名稱。
                subId, // DataHub的訂閱ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint。
                shardId, // DataHub Topic中的一個ShardId。
                read, // 處理DataHub數據的RecordEntry。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一個Field的數據。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • 消費所有Shard的數據。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub的項目名。
                topic, // DataHub的topic名稱。
                subId, // DataHub的訂閱ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint。
                read, // 處理DataHub數據的RecordEntry。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一個Field的數據。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    說明

    完整示例代碼,請參見SparkDatahubDemo.scala。

Spark Structured Streaming消費DataHub

  • Maven依賴

    • Spark2

       <dependency>
              <groupId>com.aliyun.emr</groupId>
              <artifactId>emr-datahub_2.11</artifactId>
              <version>2.0.0</version>
       </dependency>
    • Spark3

      請在集群/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/目錄,下載emr-datasources_shaded_***.jar作為依賴。

      說明
      • 如果您的集群中沒有以上目錄,則使用/usr/lib/emrsdk-current/目錄。

      • emr-datasources_shaded_***.jar,請根據您實際集群目錄下的JAR包來替換。

  • 消費示例

    val spark =  SparkSession
      .builder()
      .appName("test datahub")
      .getOrCreate()
    
    
    //創建readstream。
    val datahubRows = spark
      .readStream
      .format("datahub")
      .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com")
      .option("project", "project_test")
      .option("startingoffsets", "earliest")
      .option("topic", "topic_test")
      .load
    
    //DataFrame處理邏輯。
    datahubRows.printSchema() // 當前實例中,schema有key和value兩個字段。
    println("print schema" + datahubRows.schema.toString())
    val df = datahubRows.groupBy("key").count()
    
    
    //創建writestream,輸出數據。
    val query = df
      .writeStream
      .format("console")
      .outputMode("complete")
      .start()
    
    //結束流任務。
    query.awaitTermination(100000)
    spark.close()

    核心流程如下:

    1. 創建readstream讀取DataHub DataFrame數據。

    2. 自定義數據源DataFrame處理邏輯。

    3. 創建writestream輸出數據。

    說明

    運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量。

  • DataHub核心配置參數

    重要

    Structured Streaming消費DataHub不需要填寫參數subId。主要是因為DataHub的消費offset由Spark管理,不需要DataHub管理,所以不需要提供subId。

    參數

    描述

    是否必選

    access.key.id

    創建DataHub的阿里云AccessKey ID。

    access.key.secret

    創建DataHub的阿里云AccessKey Secret。

    endpoint

    DataHub API Endpoint。您可以在DataHub頁面查看。

    project

    DataHub的項目名。

    topic

    DataHub的topic名稱。

    decimal.precision

    當topic字段中包含decimal字段時,需要指定該參數。

    decimal.scale

    當topic字段中包含decimal字段時,需要指定該參數。

    startingoffsets

    開始消費點位。取值如下:

    • latest:表示最晚。

    • earliest:表示最早。

    • json字符串:json結構如下所示。

      {
         "project名稱#topic名稱" : {
                 "shardId" : "startingoffsets的值"
            }
      }

      示例如下。

      {
          "project_test#topic_test" : {
                 "0" : "100"
         }
      }

    endingoffsets

    結束消費點位。取值如下:

    • latest:表示最晚。

    • json字符串:json結構如下所示。

      {
         "project名稱#topic名稱" : {
                 "shardId" : "endingoffsets的值"
            }
      }