日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

流對象(Stream)

Stream是MaxCompute自動管理Delta Table增量查詢數據版本的流對象,記錄對增量表所做的數據操作語言(DML)更改,包括插入、更新和刪除,以及有關每次更改的元數據,以便您可以使用更改的數據采取操作。本文為您詳細介紹Stream操作相關命令。

創建Stream

語法

CREATE STREAM [IF NOT EXISTS] <stream_name> 
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc") 
[comment <stream_comment>];

名稱

功能說明

IF NOT EXISTS

可選。如果不指定IF NOT EXISTS選項而存在同名Stream,會報錯。如果指定IF NOT EXISTS,只要存在同名Stream,即使原Stream結構與要創建的目標Stream結構不一致,均返回成功。已存在的同名Stream的元數據信息不會被改動。

stream_name

必填。待創建的Stream名。

ON TABLE <delta_table_name>

代表Stream對象關聯的Delta Table源表,一個Stream對象只能關聯一張源表。

timestamp as of t

代表Stream對象創建時VersionOffset初始化數據時間戳為t ,查詢范圍為(t, 最新增量數據時間戳]

version as of v

代表Stream對象創建時VersionOffset初始化數據版本為v,查詢范圍為(v, 最新增量數據版本]

strmproperties

同表屬性一樣以key/value字符類型值對的形式出現,目前stream只支持了一個屬性read_mode,可選值有兩個:append模式用來消費Delta Table的表數據,CDC模式用來消費Delta Table的cdc數據。

stream_comment

可選。Stream注釋內容且為長度不超過1024字節的有效字符串,否則報錯。

系統列

對于"read_mode" = "cdc",會額外輸出三個系統列: __meta_timestamp代表數據寫入時間,__meta_op_type (包含INSERT | DELETE)和__meta_is_update (包含TRUE | FALSE),可組合成四種情況: INSERT + FALSE代表新記錄,INSERT + TRUE代表Update后的值,DELETE + TRUE代表Update前的值,DELETE+FALSE代表刪除。但當前輸出的系統列__meta_op_type__meta_is_update的值為tinyint類型,下面是對應的組合值和對應的含義:

+--------------------+--------------+-----------------+
|     表示的操作       |__meta_op_type|__meta_is_update |
+--------------------+--------------+-----------------+
|       INSERT       |    INSERT(1) |      FALSE(0)   |
|       DELETE       |    DELETE(0) |      FALSE(0)   |
|   UDPATE_BEFORE    |    DELETE(0) |      TRUE(1)    |
|   UPDATE_AFTER     |    INSERT(1) |      TRUE(1)    |
+--------------------+--------------+-----------------+

示例說明

創建Delta Table源表,再創建一個Stream關聯Delta Table表。

CREATE TABLE delta_table_src (
  pk bigint NOT NULL PRIMARY KEY, 
  val bigint
) tblproperties ("transactional"="true");

CREATE STREAM delta_table_stream 
ON TABLE delta_table_src version as of 1 
strmproperties('read_mode'='append') 
comment 'stream demo';

查看Stream信息

語法

DESC STREAM <stream_name>;

示例

創建Delta Table源表,再創建一個Stream關聯Delta Table表,查詢Stream對象delta_table_stream

CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key, 
val BIGINT) TBLPROPERTIES ("transactional"="true");

CREATE STREAM delta_table_stream ON TABLE delta_table_src 
version AS OF 1 strmproperties('read_mode'='append') 
comment 'stream demo';

DESC STREAM delta_table_stream;

輸出結果

Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-06 17:03:32                     Last Modified Time                      2024-09-06 17:03:32                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      5e19a67eb97b4477b7fbce0c7bbcebca        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

名稱

說明

Name

當前Stream的名稱。

Project

當前Stream所在的項目名稱。

Create Time

當前Stream的創建時間。

Offset Version

當前Stream的初始化數據版本。

Reference Table Project

關聯的源表的項目名稱。

Reference Table Name

關聯的源表名稱。

Reference Table Id

關聯的源表的唯一標識ID。

Reference Table Version

關聯的源表的數據版本。

Parameters

當前Stream對象的屬性信息。

說明

特別要注意Offset VersionReference Table Version的信息,Offset Version的值為1表示當前Stream已經消費的關聯Delta Table數據的版本,Reference Table Version的值為1表示當前關聯的Delta Table的最新的數據版本。由于關聯的Delta Table是空表,所以兩者的值都是1。創建Stream對象后如果其關聯的Delta Table執行了DML操作,Reference Table Version的值會隨之更新改變。讀取Stream時會轉換成對關聯表的增量查詢,讀取的數據版本范圍區間為左開右閉區間,為(Offset Version, Reference Table Version],從而確保Offset Version和Reference Table Version之間的增量數據被讀到。如果(Offset Version, Reference Table Version]版本的增量數據被DML操作消費,消費后Offset Version會等于 Reference Table Version,即都為關聯Delta Table的最新數據版本,表示沒有新的增量數據。

修改Stream

修改Stream屬性

ALTER STREAM <stream_name> SET stmproperties ("key"="value");
說明
  • tream_name:必填。待描述的Stream名。

  • strmproperties:Stream的屬性,同表屬性一樣以key/value字符類型值對的形式出現,目前Stream只支持屬性read_mode,并且當前不支持修改。

修改Stream的初始化數據版本

ALTER STREAM <stream_name> ON TABLE <delta_table_name> 
<timestamp as of t  |  version as of v > ;
說明
  • stream_name:必填。待修改的Stream名。

  • ON TABLE <delta_table_name>: 代表Stream對象關聯的Delta Table源表,源表為修改前的源表,目前還不支持修改源表。

  • timestamp as of t: 代表修改Stream對象VersionOffset初始化數據版本為t,查詢范圍(t, 最新增量數據版本]

  • version as of v: 代表修改Stream對象VersionOffset初始化數據版本為v,查詢范圍(v, 最新增量數據版本]

示例

-- 1. 創建Delta Table源表。
CREATE TABLE delta_table_src (pk bigint not null primary key, 
val bigint) tblproperties ("transactional"="true");

-- 2. 創建一個Stream關聯Delta Table表。
CREATE STREAM delta_table_stream on table delta_table_src 
version as of 1 strmproperties('read_mode'='append') 
comment 'stream demo';

-- 3. 查看新建的stream信息,當前Offset Version和Reference Table Version都為1。
DESC STREAM delta_table_stream;
-- 輸出結果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:26:56                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

-- 4. stream關聯的Delet Table插入一條數據,目的是為了使得Delta Table的lsn遞增,
-- 之后我們將引用Delta Table的version修改為遞增后的version.
INSERT INTO delta_table_src VALUES ('1', '1');

-- 5. 查看當前Delta Table的數據版本信息
SHOW history FOR TABLE delta_table_src;

ObjectType	ObjectId                        	ObjectName          	VERSION(LSN)    	Time               	Operation       
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000001	2024-09-07 10:25:59	CREATE          
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000002	2024-09-07 10:28:19	APPEND      

-- 6. 修改stream關聯的Delta Table的version為2
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;

-- 7. 查看修改后的stream信息,stream以及關聯的Delta Table的version都變成了2。
DESC STREAM delta_table_stream;

-- 輸出結果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:29:12                     
Offset Version                          2                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 2                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

列出項目下的所有Stream

語法

SHOW STREAMS;

示例

-- 列出當前項目下的所有stream對象
SHOW STREAMS;
-- 輸出結果。
delta_table_stream

刪除Stream

語法

DROP STREAM [IF EXISTS] <stream_name>;

示例

-- 1. 查看當前項目中存在的所有stream對象
SHOW STREAMS;
-- 輸出顯示。
delta_table_stream

-- 2. 刪除delta_table_stream stream對象
DROP STREAM IF EXISTS delta_table_stream;

-- 3. 再次查看當前項目中存在的所有stream對象;結果為空
SHOW STREAMS;

查詢Stream

語法

SELECT * FROM <stream_name>;
說明
  • 查詢Stream時,單純執行DQL,并不會改變Stream的狀態,即Stream的增量起始的Offset Version不會改變,但其關聯的Delta Table的Reference Table Version會隨著Delta Table狀態的改變而改變,保持為關聯Delta Table最新的Version數據版本。單純執行DQL,表示這份增量數據沒有被真正消費,只是進行了數據的探查。

  • 查詢Stream并且執行了DML操作,表示真正消費了Stream所表示的增量數據,會修改Stream的狀態,將關聯的數據版本遷移到這次DML操作查詢的最新增量數據版本,即Stream的Offset Version會等于關聯Delta Table的Reference Table Version,此時表示沒有新的增量數據,在當前狀態下如果Stream再被讀取,讀取的數據為空。

CDC模式查詢輸出示例

Delta Table CDC模式的使用,詳情請參見CDC

  1. 創建Delta Table源表。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties (
      "transactional"="true", 
      'acid.cdc.mode.enable'='true', 
      'cdc.insert.into.passthrough.enable'='true'
    );
  2. 創建目標表。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. 創建CDC模式的Stream。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties('read_mode'='cdc') 
    comment 'stream cdc mode';
  4. 插入兩條數據至Delta Table源表。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. 查詢delta_table_stream輸出CDC格式數據。但執行單純的DQL,并不會改變delta_table_stream狀態,如下語句執行多次,返回結果一樣。

    SELECT * FROM  delta_table_stream;
    
    -- 輸出結果
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 2          | 2          | 2024-09-07 11:03:53 | 1             | 0                |
    | 1          | 1          | 2024-09-07 11:03:53 | 1              | 0                |
    +------------+------------+------------------+----------------+------------------+
  6. 讀取delta_table_stream表的增量數據,并插入到目標表delta_table_dest。同時將delta_table_stream的Offset Version修改為關聯表delta_table_src最新的數據版本。此操作真正消費了delta_table_stream表的增量數據。

    INSERT INTO  delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 查詢目標表。表中保存的是步驟6操作消費的Stream表的增量數據。

    SELECT * FROM delta_table_dest; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. 再次查詢delta_table_stream表,輸出為空。由于delta_table_stream表示的增量數據已經被消費,沒有新的增量數據。

    SELECT * FROM delta_table_stream;
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. 執行Update操作將源表pk為1記錄的val設為10。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. 由于源表由Update操作產生的新的增量數據,查詢delta_table_stream,可輸出Update操作的CDC數據。

    SELECT * FROM delta_table_stream;  
    
    --  輸出結果
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 1          | 1          | 2024-09-07 11:10:21 | 0              | 1                |
    | 1          | 10         | 2024-09-07 11:10:21 | 1              | 1                |
    +------------+------------+------------------+----------------+------------------+

由上述示例可見,CDC輸出模式會跟蹤Delta Table源表的記錄變化,輸出所有變化狀態的記錄,可有效用于增量計算的邏輯。

Append模式查詢輸出示例

  1. 創建Delta Table源表。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  2. 創建目標表。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. 創建Append模式的Stream,并關聯Delta Table。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties ('read_mode'='append') 
    comment 'stream append mode';
  4. 插入兩條數據到Delta Table源表。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. 查詢delta_table_stream

    SELECT * FROM delta_table_stream; 
    
    -- 輸出結果,不包含系統字段。
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  6. 讀取delta_table_stream表的增量數據,并插入到目標表,同時將delta_table_stream的Offset Version修改為關聯表delta_table_src最新的數據版本,此操作真正消費delta_table_stream表的增量數據。

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 查詢目標表。表中保存的是步驟6 操作消費的delta_table_stream表的增量數據。

    SELECT * FROM delta_table_dest; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. 再次查詢delta_table_stream表,輸出為空。由于delta_table_stream表示的增量數據已經被消費,沒有新的增量數據。

    SELECT * FROM delta_table_stream; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. 執行Update操作將源表pk為1的記錄val設為10。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. 執行刪除操作,將源表中pk為2的記錄刪除。

    DELETE FROM delta_table_src WHERE pk = 2;
  11. 查詢delta_table_stream,只輸出了Update操作的結果記錄 (1, 10),Delete操作的記錄不可見,也不會輸出。

    SELECT * FROM delta_table_stream; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 10         |
    +------------+------------+

由上述示例可見,Append輸出模式并不會顯示數據的操作狀態,只會輸出一條記錄的最終狀態,Delete記錄也不會輸出。因此使用的場景有限,通常可用于一些典型的ETL場景,不斷對增量插入的數據進行清洗。