Delta Lake和Hudi是當前主流的數據湖產品,并且都支持了Spark的讀寫操作。本文為您介紹Spark如何處理Delta Lake和Hudi數據。
背景信息
Delta Lake和Hudi的更多信息,請參見Delta Lake文檔和Hudi文檔。
準備工作
環境
需要在項目中引入Delta Lake或Hudi相關的pom依賴。
參數
Delta Lake參數
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
如果您集群的Spark是Spark3,則額外還需以下參數。
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
Hudi參數
spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
如果您集群的Spark是Spark3,則額外還需以下參數。
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
Spark讀寫Delta
Spark SQL語法
詳細示例如下。
-- 建表
create table delta_tbl (id int, name string) using delta;
-- 寫入數據
insert into delta_tbl values (1, "a1"), (2, "a2");
-- 更新數據
update delta_tbl set name = 'a1_new' where id = 1;
-- 刪除數據
delete from delta_tbl where id = 1;
-- 查詢數據
select * from delta_tbl;
Spark Dataset語法
詳細示例如下。
// 寫數據
val df = Seq((1, "a1"), (2, "a2")).toDF("id", name)
df.write.format("delta").save("/tmp/delta_tbl")
// 讀數據
spark.read.format("delta").load("/tmp/delta_tbl")
Spark讀寫Hudi
Spark SQL語法
詳細示例如下。
-- 建表
create table hudi_tbl (
id bigint,
name string,
price double,
ts long
) using hudi
tblproperties (
primaryKey="id",
preCombineField="ts"
);
-- 寫入數據
insert into hudi_tbl values (1, 'a1', 10.0, 1000), (2, 'a2', 11.0, 1000);
-- 更新數據
update hudi_tbl set name = 'a1_new' where id = 1;
-- 刪除數據
delete from hudi_tbl where id = 1;
-- 查詢數據
select * from hudi_tbl;
Spark Dataset語法
詳細示例如下。
// 寫數據
import org.apache.hudi.DataSourceWriteOptions._
val df = Seq((1, "a1", 10.0, 1000), (2, "a2", 11.0, 1000)).toDF("id", "name", "price", "ts")
df.write.format("hudi").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "").
option("hoodie.table.name", "hudi_tbl").
mode("append").
save("/tmp/hudi_tbl")
// 讀數據
spark.read.format("hudi").load("/tmp/hudi_tbl")
文檔內容是否對您有幫助?