日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

開發入門

本文介紹Spark Streaming如何消費Log Service中的日志數據和統計日志條數。

Spark接入Log Service

  • 方法一:Receiver Based DStream

    val logServiceProject = args(0)    // LogService中的project名。
        val logStoreName = args(1)     // LogService中的logstore名。
        val loghubConsumerGroupName = args(2)  // loghubGroupName相同的作業將共同消費logstore的數據。
        val loghubEndpoint = args(3)  // 阿里云日志服務數據類API Endpoint。
        val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")     // 訪問日志服務的AccessKey Id。
        val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // 訪問日志服務的AccessKey Secret。
        val numReceivers = args(4).toInt  // 啟動多少個Receiver來讀取logstore中的數據。
        val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming中每次處理批次時間間隔。
        val conf = new SparkConf().setAppName("Test Loghub Streaming")
        val ssc = new StreamingContext(conf, batchInterval)
        val loghubStream = LoghubUtils.createStream(
          ssc,
          logServiceProject,
          logStoreName,
          loghubConsumerGroupName,
          loghubEndpoint,
          numReceivers,
          accessKeyId,
          accessKeySecret,
          StorageLevel.MEMORY_AND_DISK)
        loghubStream.foreachRDD(rdd => println(rdd.count()))
        ssc.start()
        ssc.awaitTermination()
    說明

    運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量

  • 方法二: Direct API Based DStream

    val logServiceProject = args(0)
        val logStoreName = args(1)
        val loghubConsumerGroupName = args(2)
        val loghubEndpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkConnect = args(7)
        val checkpointPath = args(8)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub Streaming")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            logServiceProject,
            logStoreName,
            loghubConsumerGroupName,
            accessKeyId,
            accessKeySecret,
            loghubEndpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
          ssc.checkpoint(checkpointPath)
          val stream = loghubStream.checkpoint(batchInterval)
          stream.foreachRDD(rdd => {
            println(rdd.count())
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()

    從E-MapReduce SDK 1.4.0版本開始,提供基于Direct API的實現方式。此種方式可以避免將Loghub數據重復存儲到Write Ahead Log中,即無需開啟Spark Streaming的WAL特性即可實現數據的at least once。目前Direct API實現方式處于experimental狀態,需要注意以下幾點:

    • 在DStream的action中,必須做一次commit操作。

    • 一個Spark Streaming中,不支持對LogStore數據源做多個action操作。

    • Direct API方式需要Zookeeper服務的支持。

支持MetaService

上面的例子中是顯式地將AccessKey傳入到接口中,但是從E-MapReduce SDK 1.3.2版本開始,Spark Streaming可以基于MetaService實現免AccessKey處理LogService數據,具體可以參見E-MapReduce SDK中的LoghubUtils類說明。

LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
說明
  • E-MapReduce SDK支持Log Service的三種消費模式,即BEGIN_CURSOR、END_CURSOR和SPECIAL_TIMER_CURSOR,默認是 END_CURSOR。

    • BEGIN_CURSOR:從日志頭開始消費,如果有checkpoint記錄,則從checkpoint處開始消費。

    • END_CURSOR:從日志尾開始消費,如果有checkpoint記錄,則從checkpoint處開始消費。

    • SPECIAL_TIMER_CURSOR:從指定時間點開始消費,如果有checkpoint記錄,則從checkpoint處開始消費,單位為秒。

    以上三種消費模式都受到checkpoint記錄的影響,如果存在checkpoint記錄,則從checkpoint處開始消費,不管指定的是什么消費模式。E-MapReduce SDK基于“SPECIAL_TIMER_CURSOR”模式支持用戶強制在指定時間點開始消費,在LoghubUtils#createStream接口中,以下參數需要組合使用:

    • cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR

    • forceSpecial:true

  • E-MapReduce的服務器(除了Master節點)無法連接公網。配置LogService endpoint時,請注意使用Log Service提供的內網 endpoint,否則無法請求到Log Service。

附錄

完整示例代碼,請參見Spark接入LogService。