日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數據管理

阿里云EMR Delta Lake提供了強大的數據處理能力,可以幫助您管理和操作數據,確保數據的質量和一致性。本文為您介紹EMR Delta Lake如何進行刪除、更新與合并數據等操作。

DELETE

該命令用于刪除數據。示例如下。

DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
import io.delta.tables.
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.delete("date < '2019-11-11'")
import org.apache.spark.sql.functions.
import spark.implicits.
deltaTable.delete(col("date") < "2019-11-11")
說明

使用DELETE命令時,如果沒有條件限制,則會刪除所有數據。

  • 暫不支持帶有子查詢的WHERE條件。但如果子查詢為標量子查詢且使用SQL,可以設置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后進行查詢,例如:

    DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • 如果您需要根據另一張表對目標表的匹配行進行刪除(例如DELETE FROM target WHERE target.col = ref.col ...),請使用Merge語法。

UPDATE

該命令用于更新數據。示例如下。

UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")

deltaTable.updateExpr(            //使用SQL字符串。
  "name = 'Robet'",
  Map("name" -> "'Robert'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                //使用SQL函數和隱式轉換。
  col("name") === "Robet"),
  Map("name" -> lit("Robert"));
  • 暫不支持帶有子查詢的WHERE條件。但如果子查詢為標量子查詢且使用SQL,可以設置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后進行查詢,例如:

    UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • 如果要根據另一張表對目標表的匹配行進行更新(例如,UPDATE target SET target.col = ref.col ... WHERE target.col = ref.col ...),請使用Merge語法。

MERGE

該命令用于合并數據。示例如下。

MERGE INTO target AS t
USING source AS s
ON t.date = s.date
WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, id, name]

DeltaTable.forPath(spark, "/tmp/delta_table")
  .as("target")
  .merge(updatesDF.as("source"), "target.id = source.id")
  .whenMatched("target.name = 'should_update'")
  .updateExpr(Map("target.name" -> "source.name"))
  .whenMatched("target.name = 'should_delete'")
  .delete()
  .whenNotMatched("source.name = 'shoulde_insert'")
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()
  • UPDATE子句和INSERT子句支持*語法,如果設置為UPDATE SET *或者INSERT *,則會更新或插入所有字段。

  • 暫不支持帶有子查詢的ON條件,但如果子查詢為標量子查詢的形式且使用SQL,可以設置spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue后進行查詢。

ALTER TABLE

該命令用于更改現有表的結構和屬性。支持對表進行以下操作:

  • ADD COLUMN:可以向表中添加新的列。

  • RENAME COLUMN(需要開啟Column Mapping):可以將表中的列更改為新的名稱。

  • DROP COLUMN(需要開啟Column Mapping):可以從表中刪除指定的列。

  • SET/UNSET TBLPROPERTIES:可以設置表級別的屬性,如表的描述、表的存儲格式等。

  • RENAME TO:將表的名稱更改為新的名稱。

重要

對分區表執行ADD COLUMN操作,建議將新增字段追加到分區字段之前,以避免在查詢引擎如Hive中查詢Delta表時出現數據異常。

例如,在指定位置執行ADD COLUMN操作。

-- 假設delta_tbl表的Schema為(id IN, name STRING, pt STRING),其中pt為分區字段。
-- 新增new_col字段,并將其追加到name字段后,pt字段前。
ALTER TABLE dbName.tableName ADD COLUMN (new_col STRING AFTER name);

DESCRIBE HISTORY

該命令用于顯示Delta Lake的詳細操作歷史。

按照順序展示版本號、操作時間、用戶ID、用戶名、操作類型、操作參數、作業信息、Notebook信息、集群、操作基于的前置版本、隔離等級、是否直接追加和操作Metrics等信息。

說明

通常大多數信息顯示為Null。

示例如下:

  • 顯示所有的操作記錄。

    DESC HISTORY dbName.tableName;
  • 顯示最新一條的操作記錄。

    DESC HISTORY dbName.tableName limit 1;

CONVERT

該命令用于將Parquet格式的表轉成Delta表。

CONVERT遍歷指定路徑下的Parquet數據文件,推測表的Schema,生成Delta表需要的元數據信息。如果Parquet表本身是分區表,則需要額外指定分區字段和類型。

示例如下:

  • 轉換指定路徑下的Parquet數據文件。

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
  • 轉換指定路徑下的Parquet數據文件,并按照dt和hour進行分區。

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);

使用CONVERT后,僅將表路徑構建為Delta表所需的格式,尚未將其注冊為表,需要繼續使用CREATE TABLE命令。此時無需指定建表字段和分區字段。以下是具體示例。

CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";

OPTIMIZE

該命令通過合并小文件或ZOrder排序優化Delta表的數據布局,提升查詢效率。OPTIMIZE命令支持如下操作:

  • 針對分區表,可以通過指定分區來進行優化。

  • 在進行正常Compact優化時,可以通過指定非分區字段進行ZOrder排序,以調整數據布局。

代碼示例如下。

set spark.databricks.delta.stats.skipping=true;
set spark.databricks.delta.stats.collect=true;

-- 對dbName.tableName表進行全局優化。
OPTIMIZE dbName.tableName;

-- 對2021-04-01之前的分區進行優化。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01';

-- 對2021-04-01之前的分區進行優化,并使用col2, col3列進行排序。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
說明
  • 對于Streaming入湖場景,通常每個batch較小,會導致小文件較多,可以定期執行Optimize命令合并小文件。

  • 對于查詢模式相對固定的場景,例如,除分區字段外,僅指定幾個列作為查詢條件時,可以采用Zorder方式優化。

VACUUM

該命令可以刪除表路徑中不需要的,且超過指定時間的數據文件。

EMR的Delta Lake定義數據文件不需要包含以下兩部分:

  • 當前最新版本關聯到的數據文件。

  • 執行過Savepoint的特定版本關聯到的數據文件。

VACUUM命令可以通過兩種方式指定刪除多久前的數據文件:

  • 通過參數delta.deletedFileRetentionDuration配置表屬性,默認值為1周。

  • 通過VACUUM命令指定,單位為小時。

  • 語法

    VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
  • 示例

    -- 刪除數據文件。
    VACUUM dbName.tableName; 
    -- 刪除24小時之前的數據文件。
    VACUUM dbName.tableName RETAIN 24 HOURS; 
    -- 顯示待刪除24小時之前的數據文件。
    VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN; 
    說明
    • 根據您創建表的實際情況,可以定期執行VACUUM命令,節省存儲空間。

    • 實際執行VACUUM命令前,可以先通過DRY RUN命令,確認刪除內容。

SAVEPOINT

該命令可以永久保存Delta Lake的歷史版本。

Delta Lake會在每次執行CheckPoint(固定版本間隔,由參數delta.checkpointInterval決定)時清理掉log元數據文件(默認保留30天內的log元數據,由參數delta.logRetentionDuration決定)。通過VACUUM也會刪除歷史版本不再需要的數據文件。執行SAVEPOINT命令,可以永久避免log元數據和數據文件被刪除,同時配合time-travel的能力,可以讀取歷史版本數據。

示例如下:

  • 保存ID為0的版本。

    CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    說明

    /path/to/delta_tbl 為您實際的Delta表文件系統路徑

  • 保存指定時間之前最近的版本。

    CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";

刪除或查看SAVEPOINT操作記錄。示例如下:

  • 刪除記錄

    --刪除特定版本的數據。
    DROP SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    --刪除特定時間戳之前的數據。
    DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
  • 查看記錄

    可以顯示SAVEPOINT的版本號、版本提交時間、SAVEPOINT時間及其他信息。

    SHOW SAVEPOINT delta.`/path/to/delta_tbl`;
    SHOW SAVEPOINT dbName.tableName;

ROLLBACK

該命令可以恢復到Delta Lake某個歷史版本。

如果指定要恢復到的歷史版本不可重建(即缺失log元數據或者對應的數據文件),則拋出異常。示例如下:

  • 回滾到ID為0的版本。

    ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • 回滾到指定時間之前最近的版本。

    ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";