Apache Paimon是一種流批統一的湖存儲格式,支持高吞吐的寫入和低延遲的查詢,詳情請參見Apache Paimon。本文為您介紹如何在EMR Serverless Spark中實現Paimon表的讀取與寫入操作。
前提條件
已創建工作空間,詳情請參見創建工作空間。
操作流程
步驟一:創建SQL會話
進入會話管理頁面。
在左側導航欄,選擇
。在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導航欄中的會話管理。
在SQL會話頁面,單擊創建SQL會話。
在創建SQL會話頁面的Spark配置區域,配置以下信息,單擊創建。詳情請參見管理SQL會話。
Spark對Paimon表的讀寫基于Catalog,根據不同場景可以有以下兩種選擇:
使用Paimon Catalog(僅限于查詢和寫入Paimon表,同時支持自定義元數據類型,目前支持以下三類)。
DLF
元數據保存在DLF中。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf
Hive
元數據保存在指定的Hive MetaStore中。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore hive spark.sql.catalog.paimon.uri thrift://<yourHMSUri>:<port>
參數
說明
thrift://<yourHMSUri>:<port>
Hive MetaStore的URI。格式為
thrift://<Hive metastore的IP地址>:9083
。<Hive metastore的IP地址>
為HMS服務的內網IP地址。如果您需要指定外部Metastore服務,請參見EMR Serverless Spark連接外部Hive Metastore。FileSystem
元數據保存在文件系統中。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore filesystem spark.sql.catalog.paimon.warehouse oss://<yourBucketName>/warehouse
使用spark_catalog(可以查詢、寫入Paimon表或者非Paimon表,元數據僅支持DLF)。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.spark_catalog org.apache.paimon.spark.SparkGenericCatalog
步驟二:讀寫Paimon表
進入SQL開發頁面。
在EMR Serverless Spark頁面,單擊左側導航欄中的數據開發。
在開發目錄頁簽下,單擊新建。
在新建對話框中,輸入名稱(例如users_task),類型使用默認的SparkSQL,然后單擊確定。
拷貝如下代碼到新增的Spark SQL頁簽(users_task)中。
使用Paimon Catalog
此時,訪問Paimon表需通過paimon.db.tbl,訪問非Paimon表,需通過spark_catalog.db.tbl。
-- 創建數據庫 CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db; CREATE DATABASE IF NOT EXISTS spark_catalog.ss_parquet_db; -- 創建Paimon表和Parquet表 CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; CREATE TABLE spark_catalog.ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c"; -- 寫入Paimon表 INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"); INSERT INTO paimon.ss_paimon_db.paimon_tbl SELECT * FROM spark_catalog.ss_parquet_db.parquet_tbl; -- 查詢寫入結果 SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id; -- 刪除數據庫 DROP DATABASE paimon.ss_paimon_db CASCADE; DROP DATABASE spark_catalog.ss_parquet_db CASCADE;
使用spark_catalog
在該情況下,無論是訪問Paimon表還是非Paimon表,都可以通過spark_catalog.db.tbl進行訪問(由于spark_catalog為默認Catalog,因此可以省略不寫)。
-- 創建數據庫 CREATE DATABASE IF NOT EXISTS ss_paimon_db; CREATE DATABASE IF NOT EXISTS ss_parquet_db; -- 創建Paimon表和Parquet表 CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c"; -- 寫入Paimon表 INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"); INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl; -- 查詢寫入結果 SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id; -- 刪除數據庫 DROP DATABASE ss_paimon_db CASCADE; DROP DATABASE ss_parquet_db CASCADE;
在數據庫下拉列表中選擇一個數據庫,在會話下拉列表中選擇剛剛創建的SQL會話。
單擊運行,執行任務。返回信息如下所示。
相關文檔
SQL任務和任務編排完整的開發流程示例,請參見SQL開發快速入門。
更多Paimon相關用法和配置,請參見Paimon官方文檔。
如果需要指定外部Metastore服務,請參見EMR Serverless Spark連接外部Hive Metastore。