日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark流式寫入Iceberg

本文為您介紹如何通過Spark Structured Streaming流式寫入Iceberg表。

前提條件

  • 已在E-MapReduce控制臺上,創建DataLake集群或Custom集群,詳情請參見創建集群

  • 已在E-MapReduce控制臺上,創建選擇了Kafka服務的DataFlow集群,詳情請參見創建集群

使用限制

創建的DataLake集群或Custom集群需要與Kafka集群在同一VPC和交換機下,不支持跨VPC。

流式寫入方式

Spark Structured Streaming通過DataStreamWriter接口流式寫數據到Iceberg表,代碼如下。

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()
說明

代碼中的tableIdentifier是元數據表名或者表路徑。流式寫入支持以下兩種方式:

  • append:追加每個批次的數據到Iceberg表,相當于insert into。

  • complete:使用最新批次的數據完全覆蓋Iceberg,相當于insert overwrite。

示例

本示例是從上游Kafka中讀取數據,寫入Iceberg表,打包放到EMR集群上通過spark-submit提交執行。

  1. 通過Kafka腳本創建測試使用的topic并準備測試數據。

    1. 使用SSH方式登錄到Kafka集群,詳情信息請參見登錄集群

    2. 執行以下命令,創建名為iceberg_test的topic。

      kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create
    3. 執行以下命令,生產測試數據。

      kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
  2. 通過Spark SQL創建測試使用的數據庫iceberg_db和表iceberg_table,詳細操作請參見基礎使用

  3. 新建Maven項目,引入Spark的依賴和檢查編譯Scala代碼的Maven插件,可以在pom.xml中添加如下配置。

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- the Maven Scala plugin will compile Scala source files -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  4. 編寫Spark代碼。

    以Scala版代碼為例,代碼示例如下。

    重要

    示例中數據湖元數據的配置參數,根據集群版本不同,配置的參數不同,Catalog名稱也不同。本示例以EMR-5.3.0版本為列,其中dlf_catalog為Catalog名稱。具體版本對應的配置請參見數據湖元數據配置

    def main(args: Array[String]): Unit = {
    
      // 配置使用數據湖元數據。
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
        .builder()
        .config(sparkConf)
        .appName("StructuredSinkIceberg")
        .getOrCreate()
    
      val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "iceberg_test"
    
      // 從上游Kafka讀取數據
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topic)
        .load()
    
      val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // 假設key是以Base64編碼的字符串,先解碼為普通字符串
          "CAST(value AS STRING) AS data")
          .select(
            col("strKey").cast(LongType).alias("id"), // 現在可以安全地將解碼后的字符串轉換為Long
            col("data")
          )
    
      // 流式寫入Iceberg表
      val query = resDF.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "dlf_catalog.iceberg_db.iceberg_table")
        .option("checkpointLocation", checkpointPath)
        .start()
    
      query.awaitTermination()
    }

    請您根據集群的實際情況,修改如下參數。

    參數

    描述

    checkpointPath

    Spark流式寫數據的Checkpoint路徑。

    bootstrapServers

    Kafka集群中任一Kafka Broker組件的內網IP地址。

    topic

    Topic名稱。

  5. 打包程序并部署到EMR集群。

    1. 本地調試完成后,通過以下命令打包。

      mvn clean install
    2. 使用SSH方式登錄到集群,詳情信息請參見登錄集群

    3. 上傳JAR包至EMR集群。

      本示例是上傳到EMR集群的根目錄下。

  6. 提交運行Spark作業。

    1. 執行以下命令,通過spark-submit提交Spark作業。

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \
       --class com.aliyun.iceberg.StructuredSinkIceberg \
       iceberg-demos.jar
      說明
      • 應替換為具體的版本號,且版本號需與您的Spark和Kafka版本兼容。

      • iceberg-demos.jar為您打包好的JAR包。--class和JAR包請根據您實際信息修改。

    2. 通過Spark SQL查詢數據的變化,詳細操作請參見基礎使用