本文簡單介紹如何使用Spark DataFrame API開發一個流式作業消費LogService數據。
Spark Structured Streaming Scala訪問LogHub
代碼示例
## StructuredLoghubSample.Scala
object StructuredLoghubSample {
def main(args: Array[String]) {
if (args.length < 7) {
System.err.println("Usage: StructuredLoghubSample <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
"<starting-offsets> <max-offsets-per-trigger>[outputPath] [<checkpoint-location>]")
System.exit(1)
}
val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
val checkpointLocation =
if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredLoghubSample")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from loghub
val lines = spark
.readStream
.format("loghub")
.option("sls.project", project)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(__value__ AS STRING)")
.as[String]
val query = lines.writeStream
.format("parquet")
.option("checkpointLocation", checkpointLocation)
.option("path", outputPath)
.outputMode("append")
.trigger(Trigger.ProcessingTime(30000))
.start()
query.awaitTermination()
}
}
Maven pom文件可以參見aliyun-emapreduce-demo。
編譯運行
## 編譯命令
mvn clean package -DskipTests
## 編譯完后,作業JAR包位于target目錄下。
## 提交執行
spark-submit --master --master yarn --deploy-mode cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g \
--num-executors 2 --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --class x.x.x.StructuredLoghubSample xxx.jar \
<logService-project> <logService-store> <access-key-id> <access-key-secret> <endpoint> \
<starting-offsets><max-offsets-per-trigger> <output-path> <checkpoint-location>
作業資源需要根據實際數據規模和實際集群規模調整,如果集群規模太小,直接運行以上命令可能無法執行。
以下信息,請根據您實際環境替換:
x.x.x.StructuredLoghubSample
:其中的x.x.x
為您實際環境中StructuredLoghubSample類的包名。xxx.jar
:打包項目工程后的JAR包。<starting-offsets>
:開啟offset位置,取值為earliest和latest。<max-offsets-per-trigger>
:每個觸發周期內處理的消息或偏移量的最大數量。<output-path>
:輸出數據的目錄。例如,/loghub/data/。<checkpoint-location>
:checkpoint目錄。例如,/loghub/checkpoint。--jars
:必須加上該參數,參數值為LogHub的Spark DataSource的JAR。如果不加上該參數,則會報Caused by: java.lang.ClassNotFoundException: loghub.DefaultSource
。針對Spark2,對應內容如下。
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
針對Spark3,對應內容如下。
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar
說明如果您的集群中沒有以上目錄,則使用/usr/lib/emrsdk-current/目錄。
PySpark Structured Streaming訪問LogHub
代碼示例
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("xx") \
.getOrCreate()
# 讀取LogHub數據源。
lines = spark \
.readStream \
.format("loghub") \
.option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
.option("access.key.id", "LTAI----") \
.option("access.key.secret", "DTi----") \
.option("sls.project", "emr-test-hz-1") \
.option("sls.store", "test1") \
.option("startingoffsets", "earliest") \
.load()
# 處理transform邏輯。
wordCounts = lines.groupBy("__logStore__").count()
# 處理Sink邏輯。
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
配置參數說明
參數 | 描述 |
endpoint | LogHub的endpoint。例如,cn-hangzhou-intranet.log.aliyuncs.com。 |
access.key.id | 您阿里云賬號的AccessKey ID。 |
access.key.secret | 您阿里云賬號的AccessKey Secret。 |
sls.project | LogStore名。 |
sls.store | LogService項目名。 |
startingoffsets | 開啟offset位置,取值為earliest和latest。 |
執行Python腳本
針對Spark2,對應內容如下。
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --master local loghub.py
針對Spark3,對應內容如下。
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar --master local loghub.py
如果您的集群中沒有以上目錄,則使用/usr/lib/emrsdk-current/目錄。