本文介紹Delta Lake如何批式讀寫數據。
建表并寫入數據
Scala
// 非分區表 data.write.format("delta").save("/tmp/delta_table") // 分區表 data.write.format("delta").partitionedBy("date").save("/tmp/delta_table")
SQL
-- 非分區表 CREATE TABLE delta_table (id INT) USING delta LOCATION "/tmp/delta_table"; INSERT INTO delta_table VALUES 0,1,2,3,4; -- 分區表 CREATE TABLE delta_table ( id INT, date STRING) USING delta PARTITIONED BY (date) LOCATION "/tmp/delta_table"; INSERT INTO delta_table PARTITION (date='2019-11-11') VALUES 0,1,2,3,4; -- 或者使用動態分區寫入 INSERT INTO delta_table PARTITION (date) VALUES (0,'2019-11-01'),(1,'2019-11-02'),(2,'2019-11-05'),(3,'2019-11-08'),(4,'2019-11-11');
追加數據
Scala
// 非分區表 data.write.format("delta").mode("append").save("/tmp/delta_table") // 分區表 data.write.format("delta").mode("append").save("/tmp/delta_table")
SQL
-- 非分區表 INSERT INTO delta_table VALUES 0,1,2,3,4; -- 分區表 INSERT INTO delta_table PARTITION (date='2019-11-11') VALUES 0,1,2,3,4; -- 或者使用動態分區寫入 INSERT INTO delta_table PARTITION (date) VALUES (0,'2019-11-01'),(1,'2019-11-02'),(2,'2019-11-05'),(3,'2019-11-08'),(4,'2019-11-11');
覆蓋數據
Scala
// 非分區表 data.write.format("delta").mode("overwrite").save("/tmp/delta_table") // 分區表 data.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2019-11-01' AND date <= '2019-11-11'").save("/tmp/delta_table")
SQL
INSERT OVERWRITE TABLE delta_table VALUES 0,1,2,3,4; -- 分區表 INSERT OVERWRITE delta_table PARTITION (date='2019-11-11') VALUES 0,1,2,3,4; -- 或者使用動態分區寫入 INSERT OVERWRITE delta_table PARTITION (date) VALUES (0,'2019-11-01'),(1,'2019-11-02'),(2,'2019-11-05'),(3,'2019-11-08'),(4,'2019-11-11');
讀表
Scala
spark.read.format("delta").load("/tmp/delta_table")
SQL
SELECT * FROM delta_table;
歷史版本訪問
Scala
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta_table") df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta_table")
SQL
不支持。
文檔內容是否對您有幫助?