阿里云EMR Delta Lake提供了強大的數據處理能力,可以幫助您管理和操作數據,確保數據的質量和一致性。本文為您介紹EMR Delta Lake如何進行刪除、更新與合并數據等操作。
DELETE
該命令用于刪除數據。示例如下。
DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
import io.delta.tables.
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.delete("date < '2019-11-11'")
import org.apache.spark.sql.functions.
import spark.implicits.
deltaTable.delete(col("date") < "2019-11-11")
使用DELETE
命令時,如果沒有條件限制,則會刪除所有數據。
暫不支持帶有子查詢的WHERE條件。但如果子查詢為標量子查詢且使用SQL,可以設置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
為true
后進行查詢,例如:DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
如果您需要根據另一張表對目標表的匹配行進行刪除(例如
DELETE FROM target WHERE target.col = ref.col ...
),請使用Merge語法。
UPDATE
該命令用于更新數據。示例如下。
UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.updateExpr( //使用SQL字符串。
"name = 'Robet'",
Map("name" -> "'Robert'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( //使用SQL函數和隱式轉換。
col("name") === "Robet"),
Map("name" -> lit("Robert"));
暫不支持帶有子查詢的WHERE條件。但如果子查詢為標量子查詢且使用SQL,可以設置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
為true
后進行查詢,例如:UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
如果要根據另一張表對目標表的匹配行進行更新(例如,
UPDATE target SET target.col = ref.col ...
或WHERE target.col = ref.col ...
),請使用Merge語法。
MERGE
該命令用于合并數據。示例如下。
MERGE INTO target AS t
USING source AS s
ON t.date = s.date
WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, id, name]
DeltaTable.forPath(spark, "/tmp/delta_table")
.as("target")
.merge(updatesDF.as("source"), "target.id = source.id")
.whenMatched("target.name = 'should_update'")
.updateExpr(Map("target.name" -> "source.name"))
.whenMatched("target.name = 'should_delete'")
.delete()
.whenNotMatched("source.name = 'shoulde_insert'")
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
UPDATE子句和INSERT子句支持
*
語法,如果設置為UPDATE SET *
或者INSERT *
,則會更新或插入所有字段。暫不支持帶有子查詢的ON條件,但如果子查詢為標量子查詢的形式且使用SQL,可以設置
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
為true
后進行查詢。
ALTER TABLE
該命令用于更改現有表的結構和屬性。支持對表進行以下操作:
ADD COLUMN:可以向表中添加新的列。
RENAME COLUMN(需要開啟Column Mapping):可以將表中的列更改為新的名稱。
DROP COLUMN(需要開啟Column Mapping):可以從表中刪除指定的列。
SET/UNSET TBLPROPERTIES:可以設置表級別的屬性,如表的描述、表的存儲格式等。
RENAME TO:將表的名稱更改為新的名稱。
對分區表執行ADD COLUMN操作,建議將新增字段追加到分區字段之前,以避免在查詢引擎如Hive中查詢Delta表時出現數據異常。
例如,在指定位置執行ADD COLUMN操作。
-- 假設delta_tbl表的Schema為(id IN, name STRING, pt STRING),其中pt為分區字段。
-- 新增new_col字段,并將其追加到name字段后,pt字段前。
ALTER TABLE dbName.tableName ADD COLUMN (new_col STRING AFTER name);
DESCRIBE HISTORY
該命令用于顯示Delta Lake的詳細操作歷史。
按照順序展示版本號、操作時間、用戶ID、用戶名、操作類型、操作參數、作業信息、Notebook信息、集群、操作基于的前置版本、隔離等級、是否直接追加和操作Metrics等信息。
通常大多數信息顯示為Null。
示例如下:
顯示所有的操作記錄。
DESC HISTORY dbName.tableName;
顯示最新一條的操作記錄。
DESC HISTORY dbName.tableName limit 1;
CONVERT
該命令用于將Parquet格式的表轉成Delta表。
CONVERT遍歷指定路徑下的Parquet數據文件,推測表的Schema,生成Delta表需要的元數據信息。如果Parquet表本身是分區表,則需要額外指定分區字段和類型。
示例如下:
轉換指定路徑下的Parquet數據文件。
CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
轉換指定路徑下的Parquet數據文件,并按照dt和hour進行分區。
CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);
使用CONVERT后,僅將表路徑構建為Delta表所需的格式,尚未將其注冊為表,需要繼續使用CREATE TABLE命令。此時無需指定建表字段和分區字段。以下是具體示例。
CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";
OPTIMIZE
該命令通過合并小文件或ZOrder排序優化Delta表的數據布局,提升查詢效率。OPTIMIZE命令支持如下操作:
針對分區表,可以通過指定分區來進行優化。
在進行正常Compact優化時,可以通過指定非分區字段進行ZOrder排序,以調整數據布局。
代碼示例如下。
set spark.databricks.delta.stats.skipping=true;
set spark.databricks.delta.stats.collect=true;
-- 對dbName.tableName表進行全局優化。
OPTIMIZE dbName.tableName;
-- 對2021-04-01之前的分區進行優化。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01';
-- 對2021-04-01之前的分區進行優化,并使用col2, col3列進行排序。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
對于Streaming入湖場景,通常每個batch較小,會導致小文件較多,可以定期執行Optimize命令合并小文件。
對于查詢模式相對固定的場景,例如,除分區字段外,僅指定幾個列作為查詢條件時,可以采用Zorder方式優化。
VACUUM
該命令可以刪除表路徑中不需要的,且超過指定時間的數據文件。
EMR的Delta Lake定義數據文件不需要包含以下兩部分:
當前最新版本關聯到的數據文件。
執行過Savepoint的特定版本關聯到的數據文件。
VACUUM命令可以通過兩種方式指定刪除多久前的數據文件:
通過參數
delta.deletedFileRetentionDuration
配置表屬性,默認值為1周。通過VACUUM命令指定,單位為小時。
語法
VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
示例
-- 刪除數據文件。 VACUUM dbName.tableName; -- 刪除24小時之前的數據文件。 VACUUM dbName.tableName RETAIN 24 HOURS; -- 顯示待刪除24小時之前的數據文件。 VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN;
說明根據您創建表的實際情況,可以定期執行VACUUM命令,節省存儲空間。
實際執行VACUUM命令前,可以先通過
DRY RUN
命令,確認刪除內容。
SAVEPOINT
該命令可以永久保存Delta Lake的歷史版本。
Delta Lake會在每次執行CheckPoint(固定版本間隔,由參數delta.checkpointInterval
決定)時清理掉log元數據文件(默認保留30天內的log元數據,由參數delta.logRetentionDuration
決定)。通過VACUUM也會刪除歷史版本不再需要的數據文件。執行SAVEPOINT命令,可以永久避免log元數據和數據文件被刪除,同時配合time-travel的能力,可以讀取歷史版本數據。
示例如下:
保存ID為0的版本。
CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
說明/path/to/delta_tbl
為您實際的Delta表文件系統路徑保存指定時間之前最近的版本。
CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
刪除或查看SAVEPOINT操作記錄。示例如下:
刪除記錄
--刪除特定版本的數據。 DROP SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0; --刪除特定時間戳之前的數據。 DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
查看記錄
可以顯示SAVEPOINT的版本號、版本提交時間、SAVEPOINT時間及其他信息。
SHOW SAVEPOINT delta.`/path/to/delta_tbl`; SHOW SAVEPOINT dbName.tableName;
ROLLBACK
該命令可以恢復到Delta Lake某個歷史版本。
如果指定要恢復到的歷史版本不可重建(即缺失log元數據或者對應的數據文件),則拋出異常。示例如下:
回滾到ID為0的版本。
ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
回滾到指定時間之前最近的版本。
ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";