Delta Lake CDC構(gòu)建增量數(shù)倉
本文為您介紹Delta Lake CDC功能的相關(guān)參數(shù)、Schema和使用示例。
背景信息
CDC(Change Data Capture)定義了一種場景,即識別并捕獲數(shù)據(jù)庫表中數(shù)據(jù)的變更,并交付給下游進一步處理。Delta Lake CDC能夠?qū)elta Lake表作為Source,直接獲取變更的數(shù)據(jù)信息。
Delta Lake CDC是通過Change Data Feed(CDF)來實現(xiàn)的。CDF允許Delta Lake表能夠追溯行級的變更信息。開啟CDF后,Delta Lake將在必要的情況下持久化變更的數(shù)據(jù)信息,并寫入到特定的表下的目錄文件中。應(yīng)用CDC,可以方便的構(gòu)建增量數(shù)倉。
使用限制
僅EMR-3.41.0及后續(xù)版本(Delta Lake 0.6.1)和EMR-5.9.0及后續(xù)版本(Delta Lake 2.0)的集群,支持使用Delta Lake CDC功能。
相關(guān)參數(shù)
SparkConf參數(shù)
參數(shù) | 說明 |
spark.sql.externalTableValuedFunctions | EMR自定義Spark Config,用于拓展Spark 2.4.x的Table Valued Function。使用Spark SQL執(zhí)行CDF查詢時需要配置為table_changes。 |
spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled | 取值如下:
說明 該參數(shù)僅在Delta 2.x版本生效。 |
CDC寫參數(shù)
參數(shù) | 說明 |
delta.enableChangeDataFeed | 是否開啟CDF,取值如下:
|
CDC讀參數(shù)
僅DataFram和Spark Streaming模式需要設(shè)置以下參數(shù)。
參數(shù) | 說明 |
readChangeFeed | 如果設(shè)置為true,則返回表的Change Data,且必須同時指定startingVersion或startingTimestamp任意一個參數(shù)搭配使用。 |
startingVersion | readChangeFeed為true時設(shè)置有效,表示從指定版本開始讀取表的Change Data。 |
endingVersion | readChangeFeed為true時設(shè)置有效,表示讀取表的Change Data的最后版本。 |
startingTimestamp | readChangeFeed為true時設(shè)置有效,表示從指定的時間戳開始讀取表的Change Data。 |
endingTimestamp | readChangeFeed為true時設(shè)置有效,表示讀取表的Change Data的最后時間戳。 |
Schema
Delta Lake CDF查詢返回的Schema是在原表的Schema基礎(chǔ)上追加以下三個額外字段:
_change_type:引起變更的操作,取值如下:
insert:標(biāo)識數(shù)據(jù)為新插入的。
delete:標(biāo)識數(shù)據(jù)為剛刪除的。
update_preimage和update_postimage:標(biāo)識數(shù)據(jù)為更新,分別記錄其變更前的記錄和變更后的記錄。
_commit_version:變更對應(yīng)的Delta表版本。
_commit_timestamp:變更對應(yīng)的Delta表版本提交的時間。
使用示例
Spark SQL示例
僅在EMR Spark2,Delta 0.6.1版本支持使用Spark SQL語法。
EMR Spark2上使用Spark SQL語法需要額外配置以下參數(shù),代碼如下所示。
spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes
SQL語法如下所示。
-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");
-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);
-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);
-- Update
UPDATE cdf_tbl set age = age + 1;
-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);
MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;
-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;
-- CDF Query
-- 查詢從版本0開始的所有的Change Data。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); --2023-02-03 15:33:34為commit0的提交時間戳。
-- 查詢版本4對應(yīng)的Change Data。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); --2023-02-03 15:34:06為commit4的提交時間戳。
兩次查詢返回信息如下所示。
DataFrame示例
// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
.option("delta.enableChangeDataFeed", "true") //首次寫入delta數(shù)據(jù)時開啟CDF,后續(xù)寫入無需設(shè)置。
.saveAsTable("cdf_table")
// CDF Query Using DataFrame
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) //endingVersion可選。
.table("cdf_table")
Spark Streaming示例
// Streaming CDF Query Using Dats
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) //endingVersion可選。
.table("cdf_table")