業(yè)務(wù)數(shù)據(jù)隨著時(shí)間在不斷變化,如果您要對(duì)數(shù)據(jù)進(jìn)行分析,則需要考慮如何存儲(chǔ)和管理數(shù)據(jù)。其中數(shù)據(jù)中隨著時(shí)間變化的維度被稱為Slowly Changing Dimension(SCD)。E-MapReduce根據(jù)實(shí)際的數(shù)倉場(chǎng)景定義了基于固定粒度的緩慢變化維(G-SCD)。本文為您介紹G-SCD的具體解決方案及如何通過G-SCD處理維度的數(shù)據(jù)。
背景信息
SCD簡(jiǎn)介
Slowly Changing Dimension(SCD)即緩慢變化維,是隨著時(shí)間變化的維度。在數(shù)據(jù)倉庫中存儲(chǔ)和管理當(dāng)前和歷史的數(shù)據(jù),就需要考慮如何處理緩慢變化維,因此SCD被認(rèn)為是跟蹤維度變化的關(guān)鍵ETL任務(wù)之一。
類型 | 描述 |
---|---|
直接覆蓋(Type 1) | 直接覆蓋原值,不保留歷史記錄。該方式無法分析歷史變化的信息。 |
添加維度行(Type 2) | 保留所有歷史值。當(dāng)屬性值有變化時(shí),都會(huì)新增一條記錄,并且需要標(biāo)記當(dāng)前記錄有效,同時(shí)修改前一個(gè)有效記錄的有效性字段。通常可以通過起始時(shí)間、截止時(shí)間標(biāo)識(shí)記錄的有效性。 |
添加屬性列(Type 3) | 通過額外的字段僅保留前一個(gè)版本的值。針對(duì)需要分析歷史信息的屬性添加一列,記錄該屬性變化前的值,而本屬性字段則記錄最新的值。 |
G-SCD概念和解決方案
SCD處理維度新值的三種方式不能覆蓋業(yè)務(wù)的實(shí)際場(chǎng)景,所以E-MapReduce根據(jù)業(yè)務(wù)實(shí)際數(shù)倉場(chǎng)景提出了G-SCD(Based-Granularity Slowly Changing Dimension),即基于固定粒度(或者業(yè)務(wù)快照)的緩慢變化維。G-SCD按照固定的時(shí)間粒度生成一份業(yè)務(wù)快照數(shù)據(jù),其中時(shí)間粒度可以是天、小時(shí)或者分鐘等,同時(shí)支持按照時(shí)間粒度查詢對(duì)應(yīng)時(shí)間段的數(shù)據(jù)。
解決方案 | 存在的問題 |
---|---|
流式構(gòu)建T+1時(shí)刻的增量數(shù)據(jù)表,和離線表的T時(shí)刻分區(qū)數(shù)據(jù)做合并,生成離線表T+1分區(qū)。 | 存儲(chǔ)資源浪費(fèi)。 |
保存離線的基礎(chǔ)表,每個(gè)業(yè)務(wù)時(shí)刻的增量數(shù)據(jù)獨(dú)立保存,在查詢數(shù)據(jù)時(shí)合并基礎(chǔ)表和增量表。 | 查詢性能差。 |
方案 | 相同點(diǎn) | 不同點(diǎn) |
---|---|---|
SCD的Type 2方案 | 保留歷史所有信息。 | 每次屬性值有變化,都會(huì)新增一條記錄。 |
G-SCD on Delta Lake方案 | GSCD on Delta Lake在具體實(shí)現(xiàn)上不是通過新增記錄的形式保留信息,而是借助Delta Lake本身的Versioning特性,通過Time-Travel的能力追溯具體的快照數(shù)據(jù)。 |
- 流批一體:不需要增量表和基礎(chǔ)表兩張表。
- 存儲(chǔ)資源節(jié)省:不需要按時(shí)間粒度保留歷史全量數(shù)據(jù)。
- 查詢性能高:借助Delta Lake的Optimize、Zorder和DataSkipping的能力,提升查詢性能。
- SQL使用兼容性高:保留原來實(shí)現(xiàn)的SQL語句,和利用分區(qū)實(shí)現(xiàn)快照的方式一樣,可以使用類似的分區(qū)字段查詢對(duì)應(yīng)時(shí)間粒度內(nèi)的快照數(shù)據(jù)。
前提條件
已創(chuàng)建集群,詳情請(qǐng)參見創(chuàng)建集群。
使用限制
- 需要保證Kafka內(nèi)同一個(gè)Partition內(nèi)的數(shù)據(jù)嚴(yán)格有序。
- 數(shù)據(jù)按Key分區(qū),保證同一Key必須落到同一個(gè)Kafka的Partition。
操作流程
- 步驟一:創(chuàng)建G-SCD表創(chuàng)建G-SCD表,按照要求配置需要的參數(shù)。
- 步驟二:處理數(shù)據(jù)您可以根據(jù)業(yè)務(wù)數(shù)據(jù)的情況,選擇使用流式寫入或者批量寫入的方式進(jìn)行數(shù)據(jù)的處理。示例中通過兩次批量寫入代替流式寫入的方式模擬G-SCD on Delta Lake的數(shù)據(jù)處理。
- 步驟三:驗(yàn)證數(shù)據(jù)寫入結(jié)果通過查詢語句,驗(yàn)證數(shù)據(jù)是否寫入成功。
步驟一:創(chuàng)建G-SCD表
CREATE TABLE target (id Int, body String, dt string)
USING delta
TBLPROPERTIES (
"delta.gscdTypeTable" = "true",
"delta.gscdGranularity" = "1 day",
"delta.gscdColumnFormat" = "yyyy-MM-dd",
"delta.gscdColumn" = "dt"
);
參數(shù) | 說明 |
---|---|
delta.gscdTypeTable | 定義當(dāng)前表是否為G-SCD Delta Lake表,本文示例需要設(shè)置為true。當(dāng)該值設(shè)置為false時(shí)則表示該表為普通表,無法使用G-SCD的相關(guān)功能。 |
delta.gscdGranularity | 業(yè)務(wù)快照粒度,例如:1 day、1 hour、30 minutes等。 |
delta.gscdColumnFormat | 業(yè)務(wù)快照粒度的格式,支持格式如下:
|
delta.gscdColumn | 定義查詢時(shí),表示業(yè)務(wù)快照版本的字段。當(dāng)前字段也需要在Schema內(nèi)定義,并且必須為String類型。 |
步驟二:處理數(shù)據(jù)
您可以根據(jù)業(yè)務(wù)數(shù)據(jù)的情況,選擇使用流式寫入或者批量寫入的方式進(jìn)行數(shù)據(jù)的處理。
- 流式寫入
CREATE TABLE IF NOT EXISTS gscd_kafka_table USING kafka OPTIONS( kafka.bootstrap.servers = 'localhost:9092', subscribe = 'xxxxxx' ); CREATE SCAN gscd_stream ON gscd_kafka_table USING STREAM OPTIONS ( `watermark.time` = 'floor(ts/1000)' --- 定義源頭watermark時(shí)間表達(dá)式,單位為秒。 ); CREATE STREAM delta_job OPTIONS ( triggerType = 'ProcessingTime', checkpointLocation = '/path/to/checkpoint' ) MERGE INTO gscd_target_table AS target USING ( SELECT *, from_unixtime(ts/1000, 'yyyy-MM-dd') AS dt FROM gscd_stream ) AS source ON source.id = target.id AND target.dt = source.dt WHEN MATCHED THEN update set * WHEN NOT MATCHED THEN insert *;
重要 在上述SQL語句中,watermark的使用原理及注意事項(xiàng)如下:- 為了在流作業(yè)中自動(dòng)觸發(fā)Savepoint,需要在
CREATE SCAN
語句中指定watermark時(shí)間表達(dá)式。 - watermark表示流作業(yè)源頭的時(shí)間值,單位為秒。
- watermark時(shí)間會(huì)生成作為delta.gscdColumn字段的值,當(dāng)watermark時(shí)間達(dá)到delta.gscdGranularity邊界時(shí)(示例中定義的為1 day),會(huì)自動(dòng)觸發(fā)Savepoint。
- watermark時(shí)間要求在同一個(gè)Partition內(nèi)遞增有序。
- 為了在流作業(yè)中自動(dòng)觸發(fā)Savepoint,需要在
- 批量寫入
一般場(chǎng)景下,通過流式寫入已經(jīng)可以滿足。但當(dāng)數(shù)據(jù)異常時(shí),G-SCD on Delta Lake的方案同時(shí)提供了回滾Rollback的能力,并可以使用批量離線寫入修復(fù)數(shù)據(jù)。修復(fù)完成后,執(zhí)行Savepoint,永久保留當(dāng)前Version。
MERGE INTO GSCD("2021-01-01") gscd_target_table USING gscd_source_table ON source.id = target.id WHEN MATCHED THEN UPDATE SET body = source.body WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);
在上述SQL語句中,
GSCD ("2021-01-01")
的語法表示要寫入的數(shù)據(jù)所屬的業(yè)務(wù)粒度值。重要 批量寫入不支持同一個(gè)作業(yè)寫入多個(gè)業(yè)務(wù)粒度數(shù)據(jù)。如果存在這種情況,需要提前進(jìn)行拆分。
為了幫助您快速使用G-SCD處理維度數(shù)據(jù),本文給出詳細(xì)的示例,具體操作步驟如下。
- 模擬源數(shù)據(jù)。
CREATE TABLE s1 (id Int, body String) USING delta; CREATE TABLE s2 (id Int, body String) USING delta; INSERT INTO s1 VALUES (1, "addr_1_v1"), (2, "addr_2_v1"), (3, "addr_3_v1"); INSERT INTO s2 VALUES (2, "addr_2_v2"), (4, "addr_1_v1");
- 通過使用兩次批量寫入,代替流式寫入的方式模擬G-SCD on Delta Lake的數(shù)據(jù)處理。
步驟三:驗(yàn)證數(shù)據(jù)寫入結(jié)果
通過查詢語句,驗(yàn)證數(shù)據(jù)是否寫入成功。查詢?cè)?a title="" class="xref" href="#section-2ps-5pl-nqw">步驟二:處理數(shù)據(jù)示例中兩次批量寫入的數(shù)據(jù),具體操作如下。
- 執(zhí)行以下命令,查詢第一次批量寫入的數(shù)據(jù)。
select id, body from target where dt = '2021-01-01';
重要- 查詢數(shù)據(jù)時(shí),可以使用正常的SQL語法。
- 查詢數(shù)據(jù)時(shí),必須指定gscdColumn字段作為查詢條件,并且必須為
=
表達(dá)式,例如dt = '2021-01-01'
。
- 執(zhí)行以下命令,查詢第二次批量寫入的數(shù)據(jù)。
select id, body from target where dt = '2021-01-02';
如果能夠查詢到寫入的數(shù)據(jù),則表明數(shù)據(jù)寫入成功。執(zhí)行上述查詢命令后,返回結(jié)果如下圖所示。