業(yè)務(wù)數(shù)據(jù)隨著時(shí)間在不斷變化,如果您要對(duì)數(shù)據(jù)進(jìn)行分析,則需要考慮如何存儲(chǔ)和管理數(shù)據(jù)。其中數(shù)據(jù)中隨著時(shí)間變化的維度被稱為Slowly Changing Dimension(SCD)。E-MapReduce根據(jù)實(shí)際的數(shù)倉場(chǎng)景定義了基于固定粒度的緩慢變化維(G-SCD)。本文為您介紹G-SCD的具體解決方案及如何通過G-SCD處理維度的數(shù)據(jù)。

背景信息

SCD簡(jiǎn)介

Slowly Changing Dimension(SCD)即緩慢變化維,是隨著時(shí)間變化的維度。在數(shù)據(jù)倉庫中存儲(chǔ)和管理當(dāng)前和歷史的數(shù)據(jù),就需要考慮如何處理緩慢變化維,因此SCD被認(rèn)為是跟蹤維度變化的關(guān)鍵ETL任務(wù)之一。

根據(jù)處理維度新值的方式,SCD被分為以下三種類型。
類型描述
直接覆蓋(Type 1)直接覆蓋原值,不保留歷史記錄。該方式無法分析歷史變化的信息。
添加維度行(Type 2)保留所有歷史值。當(dāng)屬性值有變化時(shí),都會(huì)新增一條記錄,并且需要標(biāo)記當(dāng)前記錄有效,同時(shí)修改前一個(gè)有效記錄的有效性字段。通常可以通過起始時(shí)間、截止時(shí)間標(biāo)識(shí)記錄的有效性。
添加屬性列(Type 3)通過額外的字段僅保留前一個(gè)版本的值。針對(duì)需要分析歷史信息的屬性添加一列,記錄該屬性變化前的值,而本屬性字段則記錄最新的值。

G-SCD概念和解決方案

SCD處理維度新值的三種方式不能覆蓋業(yè)務(wù)的實(shí)際場(chǎng)景,所以E-MapReduce根據(jù)業(yè)務(wù)實(shí)際數(shù)倉場(chǎng)景提出了G-SCD(Based-Granularity Slowly Changing Dimension),即基于固定粒度(或者業(yè)務(wù)快照)的緩慢變化維。G-SCD按照固定的時(shí)間粒度生成一份業(yè)務(wù)快照數(shù)據(jù),其中時(shí)間粒度可以是天、小時(shí)或者分鐘等,同時(shí)支持按照時(shí)間粒度查詢對(duì)應(yīng)時(shí)間段的數(shù)據(jù)。

在傳統(tǒng)的數(shù)倉體系下,基于Hive表的實(shí)現(xiàn)有以下兩個(gè)解決方案可以考慮,但各有弊端。
解決方案存在的問題
流式構(gòu)建T+1時(shí)刻的增量數(shù)據(jù)表,和離線表的T時(shí)刻分區(qū)數(shù)據(jù)做合并,生成離線表T+1分區(qū)。存儲(chǔ)資源浪費(fèi)。
保存離線的基礎(chǔ)表,每個(gè)業(yè)務(wù)時(shí)刻的增量數(shù)據(jù)獨(dú)立保存,在查詢數(shù)據(jù)時(shí)合并基礎(chǔ)表和增量表。查詢性能差。
其中按T保留全量數(shù)據(jù)的解決方案如下圖所示。按T保留全量數(shù)據(jù)
為了解決上述兩個(gè)解決方案存在的問題,阿里云E-MapReduce團(tuán)隊(duì)基于Delta Lake提供了G-SCD的解決方案,即G-SCD on Delta Lake。G-SCD on Delta Lake方案與SCD的Type 2方案類似,兩者之間的相同點(diǎn)和不同點(diǎn)如下表所示。
方案相同點(diǎn)不同點(diǎn)
SCD的Type 2方案保留歷史所有信息。每次屬性值有變化,都會(huì)新增一條記錄。
G-SCD on Delta Lake方案GSCD on Delta Lake在具體實(shí)現(xiàn)上不是通過新增記錄的形式保留信息,而是借助Delta Lake本身的Versioning特性,通過Time-Travel的能力追溯具體的快照數(shù)據(jù)。
G-SCD on Delta Lake方案如下圖所示。G-SCD
G-SCD解決方案的優(yōu)勢(shì)如下:
  • 流批一體:不需要增量表和基礎(chǔ)表兩張表。
  • 存儲(chǔ)資源節(jié)省:不需要按時(shí)間粒度保留歷史全量數(shù)據(jù)。
  • 查詢性能高:借助Delta Lake的Optimize、Zorder和DataSkipping的能力,提升查詢性能。
  • SQL使用兼容性高:保留原來實(shí)現(xiàn)的SQL語句,和利用分區(qū)實(shí)現(xiàn)快照的方式一樣,可以使用類似的分區(qū)字段查詢對(duì)應(yīng)時(shí)間粒度內(nèi)的快照數(shù)據(jù)。

前提條件

已創(chuàng)建集群,詳情請(qǐng)參見創(chuàng)建集群

使用限制

  • 需要保證Kafka內(nèi)同一個(gè)Partition內(nèi)的數(shù)據(jù)嚴(yán)格有序。
  • 數(shù)據(jù)按Key分區(qū),保證同一Key必須落到同一個(gè)Kafka的Partition。

操作流程

  1. 步驟一:創(chuàng)建G-SCD表
    創(chuàng)建G-SCD表,按照要求配置需要的參數(shù)。
  2. 步驟二:處理數(shù)據(jù)
    您可以根據(jù)業(yè)務(wù)數(shù)據(jù)的情況,選擇使用流式寫入或者批量寫入的方式進(jìn)行數(shù)據(jù)的處理。示例中通過兩次批量寫入代替流式寫入的方式模擬G-SCD on Delta Lake的數(shù)據(jù)處理。
  3. 步驟三:驗(yàn)證數(shù)據(jù)寫入結(jié)果
    通過查詢語句,驗(yàn)證數(shù)據(jù)是否寫入成功。

步驟一:創(chuàng)建G-SCD表

創(chuàng)建G-SCD表的示例如下,該表會(huì)在步驟二:處理數(shù)據(jù)使用。
CREATE TABLE target (id Int, body String, dt string)
USING delta
TBLPROPERTIES (
  "delta.gscdTypeTable" = "true", 
  "delta.gscdGranularity" = "1 day",
  "delta.gscdColumnFormat" = "yyyy-MM-dd",
  "delta.gscdColumn" = "dt"
);
參數(shù)說明如下表所示。
參數(shù)說明
delta.gscdTypeTable定義當(dāng)前表是否為G-SCD Delta Lake表,本文示例需要設(shè)置為true。當(dāng)該值設(shè)置為false時(shí)則表示該表為普通表,無法使用G-SCD的相關(guān)功能。
delta.gscdGranularity業(yè)務(wù)快照粒度,例如:1 day、1 hour、30 minutes等。
delta.gscdColumnFormat業(yè)務(wù)快照粒度的格式,支持格式如下:
  • yyyyMM
  • yyyyMMdd
  • yyyyMMddHH
  • yyyyMMddHHmm
  • yyyy-MM
  • yyyy-MM-dd
  • yyyy-MM-dd HH
  • yyyy-MM-dd HH:mm
delta.gscdColumn定義查詢時(shí),表示業(yè)務(wù)快照版本的字段。當(dāng)前字段也需要在Schema內(nèi)定義,并且必須為String類型。

步驟二:處理數(shù)據(jù)

您可以根據(jù)業(yè)務(wù)數(shù)據(jù)的情況,選擇使用流式寫入或者批量寫入的方式進(jìn)行數(shù)據(jù)的處理。

  • 流式寫入
    CREATE TABLE IF NOT EXISTS gscd_kafka_table
    USING kafka
    OPTIONS(
      kafka.bootstrap.servers = 'localhost:9092',
      subscribe = 'xxxxxx'
    );
    
    CREATE SCAN gscd_stream ON gscd_kafka_table USING STREAM
    OPTIONS (
      `watermark.time` = 'floor(ts/1000)'  --- 定義源頭watermark時(shí)間表達(dá)式,單位為秒。
    );
    
    CREATE STREAM delta_job
    OPTIONS (
      triggerType = 'ProcessingTime',
      checkpointLocation = '/path/to/checkpoint'
    )
    MERGE INTO gscd_target_table AS target
    USING (
      SELECT *, from_unixtime(ts/1000, 'yyyy-MM-dd') AS dt FROM gscd_stream
    ) AS source
    ON source.id = target.id AND target.dt = source.dt
    WHEN MATCHED THEN update set *
    WHEN NOT MATCHED THEN insert *;
    重要 在上述SQL語句中,watermark的使用原理及注意事項(xiàng)如下:
    • 為了在流作業(yè)中自動(dòng)觸發(fā)Savepoint,需要在CREATE SCAN語句中指定watermark時(shí)間表達(dá)式。
    • watermark表示流作業(yè)源頭的時(shí)間值,單位為秒。
    • watermark時(shí)間會(huì)生成作為delta.gscdColumn字段的值,當(dāng)watermark時(shí)間達(dá)到delta.gscdGranularity邊界時(shí)(示例中定義的為1 day),會(huì)自動(dòng)觸發(fā)Savepoint。
    • watermark時(shí)間要求在同一個(gè)Partition內(nèi)遞增有序。
  • 批量寫入

    一般場(chǎng)景下,通過流式寫入已經(jīng)可以滿足。但當(dāng)數(shù)據(jù)異常時(shí),G-SCD on Delta Lake的方案同時(shí)提供了回滾Rollback的能力,并可以使用批量離線寫入修復(fù)數(shù)據(jù)。修復(fù)完成后,執(zhí)行Savepoint,永久保留當(dāng)前Version。

    MERGE INTO GSCD("2021-01-01") gscd_target_table
    USING gscd_source_table
    ON source.id = target.id  
    WHEN MATCHED THEN UPDATE SET body = source.body
    WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);

    在上述SQL語句中,GSCD ("2021-01-01")的語法表示要寫入的數(shù)據(jù)所屬的業(yè)務(wù)粒度值。

    重要 批量寫入不支持同一個(gè)作業(yè)寫入多個(gè)業(yè)務(wù)粒度數(shù)據(jù)。如果存在這種情況,需要提前進(jìn)行拆分。

為了幫助您快速使用G-SCD處理維度數(shù)據(jù),本文給出詳細(xì)的示例,具體操作步驟如下。

  1. 模擬源數(shù)據(jù)。
    CREATE TABLE s1 (id Int, body String) USING delta;
    CREATE TABLE s2 (id Int, body String) USING delta;
    INSERT INTO s1 VALUES (1, "addr_1_v1"), (2, "addr_2_v1"), (3, "addr_3_v1");
    INSERT INTO s2 VALUES (2, "addr_2_v2"), (4, "addr_1_v1");
  2. 通過使用兩次批量寫入,代替流式寫入的方式模擬G-SCD on Delta Lake的數(shù)據(jù)處理。
    1. 第一次批量寫入,然后創(chuàng)建對(duì)應(yīng)時(shí)間粒度的Savepoint。
      -- 第一次批量寫入。
      MERGE INTO GSCD ("2021-01-01") target as target
      USING s1 as source
      ON source.id = target.id  
      WHEN MATCHED THEN UPDATE SET body = source.body
      WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);
      
      -- 創(chuàng)建2021-01-01的Savepoint。
      CREATE SAVEPOINT target GSCD('2021-01-01');
    2. 第二次批量寫入,然后創(chuàng)建對(duì)應(yīng)時(shí)間粒度的Savepoint。
      -- 第二次批量寫入。
      MERGE INTO GSCD ("2021-01-02") target as target
      USING s2 as source
      ON source.id = target.id  
      WHEN MATCHED THEN UPDATE SET body = source.body
      WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);
      
      -- 創(chuàng)建2021-01-02的Savepoint。
      CREATE SAVEPOINT target GSCD('2021-01-02');

步驟三:驗(yàn)證數(shù)據(jù)寫入結(jié)果

通過查詢語句,驗(yàn)證數(shù)據(jù)是否寫入成功。查詢?cè)?a title="" class="xref" href="#section-2ps-5pl-nqw">步驟二:處理數(shù)據(jù)示例中兩次批量寫入的數(shù)據(jù),具體操作如下。

  1. 執(zhí)行以下命令,查詢第一次批量寫入的數(shù)據(jù)。
    select id, body from target where dt = '2021-01-01';
    重要
    • 查詢數(shù)據(jù)時(shí),可以使用正常的SQL語法。
    • 查詢數(shù)據(jù)時(shí),必須指定gscdColumn字段作為查詢條件,并且必須為=表達(dá)式,例如dt = '2021-01-01'
  2. 執(zhí)行以下命令,查詢第二次批量寫入的數(shù)據(jù)。
    select id, body from target where dt = '2021-01-02';
    如果能夠查詢到寫入的數(shù)據(jù),則表明數(shù)據(jù)寫入成功。執(zhí)行上述查詢命令后,返回結(jié)果如下圖所示。返回結(jié)果