日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

實時Spark Streaming消費示例

本文簡單介紹如何使用Spark DataFrame API開發一個流式作業消費LogService數據。

Spark Structured Streaming Scala訪問LogHub

代碼示例

## StructuredLoghubSample.Scala

object StructuredLoghubSample {
  def main(args: Array[String]) {
    if (args.length < 7) {
      System.err.println("Usage: StructuredLoghubSample <logService-project> " +
        "<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
        "<starting-offsets> <max-offsets-per-trigger>[outputPath] [<checkpoint-location>]")
      System.exit(1)
    }

    val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
    val checkpointLocation =
      if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredLoghubSample")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from loghub
    val lines = spark
      .readStream
      .format("loghub")
      .option("sls.project", project)
      .option("sls.store", logStore)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("endpoint", endpoint)
      .option("startingoffsets", startingOffsets)
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .load()
      .selectExpr("CAST(__value__ AS STRING)")
      .as[String]

    val query = lines.writeStream
      .format("parquet")
      .option("checkpointLocation", checkpointLocation)
      .option("path", outputPath)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(30000))
      .start()

    query.awaitTermination()

  }
}
說明

Maven pom文件可以參見aliyun-emapreduce-demo

編譯運行

## 編譯命令
mvn clean package -DskipTests

## 編譯完后,作業JAR包位于target目錄下。

## 提交執行
spark-submit --master --master yarn --deploy-mode cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g \
--num-executors 2 --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --class x.x.x.StructuredLoghubSample xxx.jar \
<logService-project> <logService-store> <access-key-id> <access-key-secret> <endpoint> \
<starting-offsets><max-offsets-per-trigger> <output-path> <checkpoint-location>
說明

作業資源需要根據實際數據規模和實際集群規模調整,如果集群規模太小,直接運行以上命令可能無法執行。

以下信息,請根據您實際環境替換:

  • x.x.x.StructuredLoghubSample:其中的x.x.x為您實際環境中StructuredLoghubSample類的包名。

  • xxx.jar:打包項目工程后的JAR包。

  • <starting-offsets>:開啟offset位置,取值為earliest和latest。

  • <max-offsets-per-trigger>:每個觸發周期內處理的消息或偏移量的最大數量。

  • <output-path>:輸出數據的目錄。例如,/loghub/data/

  • <checkpoint-location>:checkpoint目錄。例如,/loghub/checkpoint

  • --jars:必須加上該參數,參數值為LogHub的Spark DataSource的JAR。如果不加上該參數,則會報Caused by: java.lang.ClassNotFoundException: loghub.DefaultSource

    • 針對Spark2,對應內容如下。

      --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
    • 針對Spark3,對應內容如下。

      --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar
    說明

    如果您的集群中沒有以上目錄,則使用/usr/lib/emrsdk-current/目錄。

PySpark Structured Streaming訪問LogHub

代碼示例

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("xx") \
    .getOrCreate()

# 讀取LogHub數據源。
lines = spark \
    .readStream \
    .format("loghub") \
    .option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
    .option("access.key.id", "LTAI----") \
    .option("access.key.secret", "DTi----") \
    .option("sls.project", "emr-test-hz-1") \
    .option("sls.store", "test1") \
    .option("startingoffsets", "earliest") \
    .load()


# 處理transform邏輯。
wordCounts = lines.groupBy("__logStore__").count()

# 處理Sink邏輯。
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

配置參數說明

參數

描述

endpoint

LogHub的endpoint。例如,cn-hangzhou-intranet.log.aliyuncs.com。

access.key.id

您阿里云賬號的AccessKey ID。

access.key.secret

您阿里云賬號的AccessKey Secret。

sls.project

LogStore名。

sls.store

LogService項目名。

startingoffsets

開啟offset位置,取值為earliest和latest。

執行Python腳本

  • 針對Spark2,對應內容如下。

    spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --master local loghub.py
  • 針對Spark3,對應內容如下。

    spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar --master local loghub.py
說明

如果您的集群中沒有以上目錄,則使用/usr/lib/emrsdk-current/目錄。