本文介紹如何通過Spark Streaming消費消息隊列RocketMQ(簡稱MQ)中的數據并計算每個Batch中的單詞。
通過Spark訪問MQ
代碼示例如下。
val Array(cId, topic, subExpression, parallelism, interval) = args
val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
val numStreams = parallelism.toInt
val batchInterval = Milliseconds(interval.toInt)
val conf = new SparkConf().setAppName("Test ONS Streaming")
val ssc = new StreamingContext(conf, batchInterval)
def func: Message => Array[Byte] = msg => msg.getBody
val onsStreams = (0 until numStreams).map { i =>
println(s"starting stream $i")
OnsUtils.createStream(
ssc,
cId,
topic,
subExpression,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK_2,
func)
}
val unionStreams = ssc.union(onsStreams)
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
說明
運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量。
附錄
完整示例代碼,請參見通過Spark訪問RocketMQ。
文檔內容是否對您有幫助?