日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark對接SMQ

本文介紹如何通過Spark Streaming消費輕量消息隊列(原 MNS) SMQ(Simple Message Queue (formerly MNS))中的數據,并統計每個Batch內的單詞個數。

Spark接入SMQ

示例代碼如下。

val conf = new SparkConf().setAppName("Test MNS Streaming")
    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(conf, batchInterval)
    val queuename = "queuename"
    val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Endpoint以華東1(杭州)為例,其它Region請按實際情況填寫。
    val endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
    val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
      StorageLevel.MEMORY_ONLY)
    mnsStream.foreachRDD( rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
    })
    ssc.start()
    ssc.awaitTermination()
說明

運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量

支持MetaService

上面的示例是顯式地將AccessKey傳入到輕量消息隊列(原 MNS)中。從E-MapReduce SDK 1.3.2版本開始,Spark Streaming可以基于MetaService實現免AccessKey處理輕量消息隊列(原 MNS)數據。具體可以參見E-MapReduce SDK中的MnsUtils類說明。

MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)

附錄

完整示例代碼,請參見SparkMNSDemo