本文介紹Spark Streaming如何消費Log Service中的日志數據和統計日志條數。
Spark接入Log Service
方法一:Receiver Based DStream
val logServiceProject = args(0) // LogService中的project名。 val logStoreName = args(1) // LogService中的logstore名。 val loghubConsumerGroupName = args(2) // loghubGroupName相同的作業將共同消費logstore的數據。 val loghubEndpoint = args(3) // 阿里云日志服務數據類API Endpoint。 val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") // 訪問日志服務的AccessKey Id。 val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // 訪問日志服務的AccessKey Secret。 val numReceivers = args(4).toInt // 啟動多少個Receiver來讀取logstore中的數據。 val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming中每次處理批次時間間隔。 val conf = new SparkConf().setAppName("Test Loghub Streaming") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, logServiceProject, logStoreName, loghubConsumerGroupName, loghubEndpoint, numReceivers, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.foreachRDD(rdd => println(rdd.count())) ssc.start() ssc.awaitTermination()
說明運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量。
方法二: Direct API Based DStream
val logServiceProject = args(0) val logStoreName = args(1) val loghubConsumerGroupName = args(2) val loghubEndpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkConnect = args(7) val checkpointPath = args(8) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub Streaming") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, logServiceProject, logStoreName, loghubConsumerGroupName, accessKeyId, accessKeySecret, loghubEndpoint, zkParas, LogHubCursorPosition.END_CURSOR) ssc.checkpoint(checkpointPath) val stream = loghubStream.checkpoint(batchInterval) stream.foreachRDD(rdd => { println(rdd.count()) loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc } val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _) ssc.start() ssc.awaitTermination()
從E-MapReduce SDK 1.4.0版本開始,提供基于Direct API的實現方式。此種方式可以避免將Loghub數據重復存儲到Write Ahead Log中,即無需開啟Spark Streaming的WAL特性即可實現數據的at least once。目前Direct API實現方式處于experimental狀態,需要注意以下幾點:
在DStream的action中,必須做一次commit操作。
一個Spark Streaming中,不支持對LogStore數據源做多個action操作。
Direct API方式需要Zookeeper服務的支持。
支持MetaService
上面的例子中是顯式地將AccessKey傳入到接口中,但是從E-MapReduce SDK 1.3.2版本開始,Spark Streaming可以基于MetaService實現免AccessKey處理LogService數據,具體可以參見E-MapReduce SDK中的LoghubUtils類說明。
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
E-MapReduce SDK支持Log Service的三種消費模式,即BEGIN_CURSOR、END_CURSOR和SPECIAL_TIMER_CURSOR,默認是 END_CURSOR。
BEGIN_CURSOR:從日志頭開始消費,如果有checkpoint記錄,則從checkpoint處開始消費。
END_CURSOR:從日志尾開始消費,如果有checkpoint記錄,則從checkpoint處開始消費。
SPECIAL_TIMER_CURSOR:從指定時間點開始消費,如果有checkpoint記錄,則從checkpoint處開始消費,單位為秒。
以上三種消費模式都受到checkpoint記錄的影響,如果存在checkpoint記錄,則從checkpoint處開始消費,不管指定的是什么消費模式。E-MapReduce SDK基于“SPECIAL_TIMER_CURSOR”模式支持用戶強制在指定時間點開始消費,在LoghubUtils#createStream接口中,以下參數需要組合使用:
cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
forceSpecial:true
E-MapReduce的服務器(除了Master節點)無法連接公網。配置LogService endpoint時,請注意使用Log Service提供的內網 endpoint,否則無法請求到Log Service。
附錄
完整示例代碼,請參見Spark接入LogService。