日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Delta Lake CDC構(gòu)建增量數(shù)倉

本文為您介紹Delta Lake CDC功能的相關(guān)參數(shù)、Schema和使用示例。

背景信息

CDC(Change Data Capture)定義了一種場景,即識別并捕獲數(shù)據(jù)庫表中數(shù)據(jù)的變更,并交付給下游進一步處理。Delta Lake CDC能夠?qū)elta Lake表作為Source,直接獲取變更的數(shù)據(jù)信息。

Delta Lake CDC是通過Change Data Feed(CDF)來實現(xiàn)的。CDF允許Delta Lake表能夠追溯行級的變更信息。開啟CDF后,Delta Lake將在必要的情況下持久化變更的數(shù)據(jù)信息,并寫入到特定的表下的目錄文件中。應(yīng)用CDC,可以方便的構(gòu)建增量數(shù)倉。

使用限制

僅EMR-3.41.0及后續(xù)版本(Delta Lake 0.6.1)和EMR-5.9.0及后續(xù)版本(Delta Lake 2.0)的集群,支持使用Delta Lake CDC功能。

相關(guān)參數(shù)

SparkConf參數(shù)

參數(shù)

說明

spark.sql.externalTableValuedFunctions

EMR自定義Spark Config,用于拓展Spark 2.4.x的Table Valued Function。使用Spark SQL執(zhí)行CDF查詢時需要配置為table_changes。

spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled

取值如下:

  • false(默認值):CDF查詢會強制要求指定的startingTimestamp或endingTimestamp是有效的,否則報錯。

  • true:如果startingTimestamp無效,則直接返回空數(shù)據(jù);如果endingTimestamp無效,則直接返回當(dāng)前快照的數(shù)據(jù)。

說明

該參數(shù)僅在Delta 2.x版本生效。

CDC寫參數(shù)

參數(shù)

說明

delta.enableChangeDataFeed

是否開啟CDF,取值如下:

  • false(默認值):不開啟CDF。

  • true:開啟CDF。

CDC讀參數(shù)

僅DataFram和Spark Streaming模式需要設(shè)置以下參數(shù)。

參數(shù)

說明

readChangeFeed

如果設(shè)置為true,則返回表的Change Data,且必須同時指定startingVersion或startingTimestamp任意一個參數(shù)搭配使用。

startingVersion

readChangeFeed為true時設(shè)置有效,表示從指定版本開始讀取表的Change Data。

endingVersion

readChangeFeed為true時設(shè)置有效,表示讀取表的Change Data的最后版本。

startingTimestamp

readChangeFeed為true時設(shè)置有效,表示從指定的時間戳開始讀取表的Change Data。

endingTimestamp

readChangeFeed為true時設(shè)置有效,表示讀取表的Change Data的最后時間戳。

Schema

Delta Lake CDF查詢返回的Schema是在原表的Schema基礎(chǔ)上追加以下三個額外字段:

  • _change_type:引起變更的操作,取值如下:

    • insert:標(biāo)識數(shù)據(jù)為新插入的。

    • delete:標(biāo)識數(shù)據(jù)為剛刪除的。

    • update_preimage和update_postimage:標(biāo)識數(shù)據(jù)為更新,分別記錄其變更前的記錄和變更后的記錄。

  • _commit_version:變更對應(yīng)的Delta表版本。

  • _commit_timestamp:變更對應(yīng)的Delta表版本提交的時間。

使用示例

Spark SQL示例

重要

僅在EMR Spark2,Delta 0.6.1版本支持使用Spark SQL語法。

EMR Spark2上使用Spark SQL語法需要額外配置以下參數(shù),代碼如下所示。

spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes

SQL語法如下所示。

-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");

-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);

-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);

-- Update
UPDATE cdf_tbl set age = age + 1;

-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);

MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;

-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;

-- CDF Query
-- 查詢從版本0開始的所有的Change Data。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); --2023-02-03 15:33:34為commit0的提交時間戳。


-- 查詢版本4對應(yīng)的Change Data。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); --2023-02-03 15:34:06為commit4的提交時間戳。

兩次查詢返回信息如下所示。

圖 1. 查詢1結(jié)果fig1

圖 2. 查詢2結(jié)果fig2

DataFrame示例

// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
  .option("delta.enableChangeDataFeed", "true") //首次寫入delta數(shù)據(jù)時開啟CDF,后續(xù)寫入無需設(shè)置。
  .saveAsTable("cdf_table")

// CDF Query Using DataFrame
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) //endingVersion可選。
  .table("cdf_table")

Spark Streaming示例

// Streaming CDF Query Using Dats
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) //endingVersion可選。
  .table("cdf_table")