本文以Spark 3.x操作Iceberg表為例,介紹如何通過Spark DataFrame API以批處理的方式讀寫Iceberg表。
前提條件
操作步驟
新建Maven項目,引入Pom依賴。
引入Spark及Iceberg的依賴,以下代碼示例指定了Spark 3.1.1與Iceberg 0.12.0版本,使用provided引包編譯,運行時使用集群上的軟件包。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>0.12.0</version> <scope>provided</scope> </dependency>
說明由于EMR集群的Iceberg軟件包與開源依賴包存在一定差異,例如EMR Iceberg默認集成了DLF Catalog,所以建議您在本地使用provided方式引入開源Iceberg依賴進行代碼編譯,打包放到集群上運行時使用集群環境中的依賴。
配置Catalog。
使用Spark API操作Iceberg表,首先需要配置Catalog,在SparkConf中加入必要配置項即可。
以下是在Spark SQL中使用數據湖元數據的配置,集群版本不同默認的Catalog名稱不同,需要配置的參數也不同,具體請參見數據湖元數據配置。
EMR-3.40及后續版本和EMR-5.6.0及后續版本
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
EMR-3.39.x和EMR-5.5.x版本
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")
EMR-3.38.x版本和EMR-5.3.x~EMR-5.4.x版本(包含)
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
說明運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量。
寫表。
Spark 3.x支持DataFrameWriterV2 API寫入數據到Iceberg表。目前v1 DataFrame API已不推薦使用,以下代碼以V2 API寫Iceberg表sample為例。
以下示例中的
<yourCatalogName>
為Catalog的名稱,請根據實際情況修改Catalog名稱。創建數據表
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").create()
說明創建表支持create、replace以及createOrReplace語義,另外支持通過tableProperty和partitionedBy配置表的屬性與分區字段。
您可以通過以下命令追加或覆蓋數據:
追加數據
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").append()
覆蓋數據
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()
讀表。
請根據您Spark的版本,選擇讀表的方式:
Spark 3.x(推薦)
val df = spark.table("<yourCatalogName>.iceberg_db.sample")
Spark 2.4
val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")
示例
本示例是使用Spark DataFrame API批式讀寫Iceberg表。
示例中數據湖元數據的配置參數,根據集群版本不同,配置的參數不同,默認的Catalog名稱也不同。本示例以EMR-5.3.0版本為列,其中dlf_catalog
為Catalog名稱。具體版本對應的配置請參見數據湖元數據配置。
通過Spark SQL創建測試使用的數據庫iceberg_db,詳細信息請參見基礎使用。
編寫Spark代碼。
以Scala版代碼為例,代碼示例如下。
def main(args: Array[String]): Unit = { // 配置使用數據湖元數據。 val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>") val spark = SparkSession .builder() .config(sparkConf) .appName("IcebergReadWriteTest") .getOrCreate() // 從DataFrame中創建或替換Iceberg表 val firstDF = spark.createDataFrame(Seq( (1, "a"), (2, "b"), (3, "c") )).toDF("id", "data") firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace() // 將DataFrame寫入Iceberg表 val secondDF = spark.createDataFrame(Seq( (4, "d"), (5, "e"), (6, "f") )).toDF("id", "data") secondDF.writeTo("dlf_catalog.iceberg_db.sample").append() // 讀Iceberg表 val icebergTable = spark.table("dlf_catalog.iceberg_db.sample") icebergTable.show() }
打包程序并部署到EMR集群。
檢查編譯Scala代碼的Maven插件,可以在pom.xml中配置如下插件。
<build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
您可以在本地完成代碼調試后,通過如下命令打包。
mvn clean install
使用SSH方式登錄到集群,詳情信息請參見登錄集群。
上傳JAR包至EMR集群。
本示例是上傳到EMR集群的根目錄下。
執行以下命令,通過spark-submit運行Spark作業。
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 1 \ --executor-memory 1g \ --num-executors 1 \ --class com.aliyun.iceberg.IcebergTest \ iceberg-demos.jar
說明iceberg-demos.jar為您打包好的JAR包。--class和JAR包請根據您實際信息修改。
運行結果如下。
+---+----+ | id|data| +---+----+ | 4| d| | 1| a| | 5| e| | 6| f| | 2| b| | 3| c| +---+----+