CDC
CDC(Change Data Capture)定義了識(shí)別并捕獲數(shù)據(jù)庫表中數(shù)據(jù)的變更場景,用于記錄Delta Table增量表行級(jí)別的插入、更新和刪除等操作,從而可以有效捕捉該表的數(shù)據(jù)變化事件流,后續(xù)可以通過事件驅(qū)動(dòng)輔助增量計(jì)算、數(shù)據(jù)同步、數(shù)倉分層等業(yè)務(wù)需求。
功能介紹
增量計(jì)算:可增量讀取CDC變更記錄進(jìn)行增量計(jì)算,如增量物化視圖更新等。
流式計(jì)算:可流式讀取CDC變更記錄進(jìn)行流式計(jì)算,如Flink計(jì)算任務(wù)消費(fèi)。
多引擎間的數(shù)據(jù)同步:不同引擎間的增量數(shù)據(jù)同步和計(jì)算。
日志審計(jì): 詳細(xì)記錄所有的數(shù)據(jù)操作記錄,用于操作日志審計(jì)。
基本操作
建表DDL
建表DDL分為兩種:同步CDC和異步CDC。
同步CDC:只支持SQL DML操作生成增量表,不支持實(shí)時(shí)數(shù)據(jù)寫入。SQL操作完成,CDC數(shù)據(jù)即生成。
異步CDC:支持SQL DML操作生成增量表,支持實(shí)時(shí)數(shù)據(jù)寫入。但CDC數(shù)據(jù)生成屬于異步行為,非立即生效。
同步CDC
創(chuàng)建Delta Table時(shí)添加"acid.cdc.mode.enable"="true"
屬性, 可以選擇性添加"cdc.insert.into.passthrough.enable"="true"
和"cdc.data.retain.hours"="24"
屬性。
SQL示例如下:
CREATE TABLE acid_with_cdc_tbl (pk bigint NOT NULL PRIMARY KEY, val bigint)
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");
acid.cdc.mode.enable
:開啟Delta Table CDC功能,默認(rèn)為同步方式,支持在SQL DML時(shí)同步生成CDC數(shù)據(jù),不支持Tunnel實(shí)時(shí)寫入。cdc.insert.into.passthrough.enable
:對(duì)于開啟CDC功能的表默認(rèn)不支持INSERT INTO,當(dāng)添加這個(gè)屬性,可支持執(zhí)行INSERT INTO語句,寫入的數(shù)據(jù)在CDC中僅代表INSERT類型,如果存在相同PK的數(shù)據(jù)行,后面查詢會(huì)因?yàn)镻K沖突導(dǎo)致查詢失敗,需要用戶數(shù)據(jù)保障PK數(shù)據(jù)唯一。cdc.data.retain.hours
:CDC數(shù)據(jù)的保留時(shí)間(1-168小時(shí)),默認(rèn)24小時(shí)。
異步CDC
創(chuàng)建Delta Table時(shí)添加"acid.cdc.mode.enable"="true"
、"acid.cdc.build.async"="true"
、"acid.cdc.build.interval"="300"
屬性。
SQL示例如下:
CREATE TABLE acid_with_cdc_build_tbl (pk bigint NOT NULL PRIMARY KEY, val bigint)
tblproperties ("transactional" = "true",
"acid.cdc.mode.enable"="true",
"acid.cdc.build.async"="true",
"acid.cdc.build.interval"="300");
acid.cdc.mode.enable
:開啟Delta Table CDC功能,默認(rèn)為同步方式,支持在SQL DML時(shí)同步生成CDC數(shù)據(jù),不支持Tunnel 實(shí)時(shí)寫入。
acid.cdc.build.async
:開啟異步構(gòu)建CDC的功能,支持Tunnel實(shí)時(shí)寫入該表,同時(shí)對(duì)于SQL的DML也會(huì)異步生成。acid.cdc.build.interval
:異步構(gòu)建的周期配置[60-3540],單位秒,結(jié)合業(yè)務(wù)或增量場景配置該參數(shù)。其他可選參數(shù)(Project級(jí) / Session級(jí)):
"odps.storage.orc.enable.memcmp.sort.key="true"
,建議Project級(jí)別開啟,對(duì)于CDC的異步構(gòu)建以及查詢都會(huì)有性能幫助。
CDC查詢
CDC查詢分兩種:table_changes和Stream。
table_changes
語法:
select * from table_changes('table_name', version_start, [version_end]);
參數(shù)說明:
table_name
:必填,指定查詢的Delta Table。version_start
:必填,是對(duì)該表CDC數(shù)據(jù)查詢的起始version,table version可以通過show history for table table_name
查詢,詳見SHOW。version_end
:選填,指定對(duì)該表CDC數(shù)據(jù)查詢的截止version,不填的話,會(huì)查詢到最新version。結(jié)果說明:
除數(shù)據(jù)列外,會(huì)額外輸出三個(gè)系統(tǒng)列:
__meta_timestamp
:代表數(shù)據(jù)寫入系統(tǒng)時(shí)間。__meta_op_type
:包含INSERT和DELETE。__meta_is_update
:包含true和false。__meta_op_type
和__meta_is_update
可組合成如下四種情況:__meta_op_type
__meta_is_update
說明
INSERT
FALSE
代表Insert的新記錄。
INSERT
TRUE
代表Update后的值。
DELETE
TRUE
代表Update前的值。
DELETE
FALSE
代表刪除。
示例:
select * from table_changes('acid_with_cdc_build_tbl', 1);
// NOTE:__operation(storage operation type) vs table_changes
// +--------------------+--------------+
// |__meta_op_type|__meta_is_update |
// +--------------------+--------------+
// | INSERT(1) | FALSE(0) |
// | DELETE(0) | FALSE(0) |
// | DELETE(0) | TRUE(1) |
// | INSERT(1) | TRUE(1) |
// +--------------------+--------------+
--如查詢結(jié)果
+------------+------------+------------+------------+------------------+----------------+------------------+
| id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update |
+------------+------------+------------+------------+------------------+----------------+------------------+
| 1 | 1006 | 1006 | 1006 | 2024-09-07 16:48:17 | 1 | 0 |
| 1 | 1008 | 1008 | 1008 | 2024-09-07 16:48:17 | 1 | 0 |
| 1 | 1032 | 1032 | 1032 | 2024-09-07 16:48:17 | 1 | 0 |
...
Stream
指定對(duì)Delta Table CDC數(shù)據(jù)配合Stream使用,詳見流對(duì)象(Stream),這里只給出基本使用方式和示例。
create stream stream_cdc_tbl on table acid_with_cdc_tbl stmproperties ("read_mode"="cdc");
"read_mode"
指定對(duì)Delta Table的消費(fèi)模式,"cdc"
該Stream會(huì)根據(jù)查詢的范圍查詢CDC數(shù)據(jù)。Offset Version(
DESC STREAM
可查)查詢時(shí)采用左開右閉區(qū)間。返回結(jié)果同table_changes。