Stream是MaxCompute自動管理Delta Table增量查詢數據版本的流對象,記錄對增量表所做的數據操作語言(DML)更改,包括插入、更新和刪除,以及有關每次更改的元數據,以便您可以使用更改的數據采取操作。本文為您詳細介紹Stream操作相關命令。
創建Stream
語法
CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];
名稱 | 功能說明 |
IF NOT EXISTS | 可選。如果不指定 |
stream_name | 必填。待創建的Stream名。 |
ON TABLE <delta_table_name> | 代表Stream對象關聯的Delta Table源表,一個Stream對象只能關聯一張源表。 |
timestamp as of t | 代表Stream對象創建時VersionOffset初始化數據時間戳為t ,查詢范圍為 |
version as of v | 代表Stream對象創建時VersionOffset初始化數據版本為v,查詢范圍為 |
strmproperties | 同表屬性一樣以key/value字符類型值對的形式出現,目前stream只支持了一個屬性read_mode,可選值有兩個: |
stream_comment | 可選。Stream注釋內容且為長度不超過1024字節的有效字符串,否則報錯。 |
系統列 | 對于
|
示例說明
創建Delta Table源表,再創建一個Stream關聯Delta Table表。
CREATE TABLE delta_table_src (
pk bigint NOT NULL PRIMARY KEY,
val bigint
) tblproperties ("transactional"="true");
CREATE STREAM delta_table_stream
ON TABLE delta_table_src version as of 1
strmproperties('read_mode'='append')
comment 'stream demo';
查看Stream信息
語法
DESC STREAM <stream_name>;
示例
創建Delta Table源表,再創建一個Stream關聯Delta Table表,查詢Stream對象delta_table_stream
。
CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key,
val BIGINT) TBLPROPERTIES ("transactional"="true");
CREATE STREAM delta_table_stream ON TABLE delta_table_src
version AS OF 1 strmproperties('read_mode'='append')
comment 'stream demo';
DESC STREAM delta_table_stream;
輸出結果
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-06 17:03:32 Last Modified Time 2024-09-06 17:03:32
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
名稱 | 說明 |
Name | 當前Stream的名稱。 |
Project | 當前Stream所在的項目名稱。 |
Create Time | 當前Stream的創建時間。 |
Offset Version | 當前Stream的初始化數據版本。 |
Reference Table Project | 關聯的源表的項目名稱。 |
Reference Table Name | 關聯的源表名稱。 |
Reference Table Id | 關聯的源表的唯一標識ID。 |
Reference Table Version | 關聯的源表的數據版本。 |
Parameters | 當前Stream對象的屬性信息。 |
特別要注意Offset Version
和Reference Table Version
的信息,Offset Version
的值為1表示當前Stream已經消費的關聯Delta Table數據的版本,Reference Table Version
的值為1表示當前關聯的Delta Table的最新的數據版本。由于關聯的Delta Table是空表,所以兩者的值都是1。創建Stream對象后如果其關聯的Delta Table執行了DML操作,Reference Table Version的值會隨之更新改變。讀取Stream時會轉換成對關聯表的增量查詢,讀取的數據版本范圍區間為左開右閉區間,為(Offset Version, Reference Table Version]
,從而確保Offset Version和Reference Table Version之間的增量數據被讀到。如果(Offset Version, Reference Table Version]
版本的增量數據被DML操作消費,消費后Offset Version會等于 Reference Table Version,即都為關聯Delta Table的最新數據版本,表示沒有新的增量數據。
修改Stream
修改Stream屬性
ALTER STREAM <stream_name> SET stmproperties ("key"="value");
tream_name:必填。待描述的Stream名。
strmproperties:Stream的屬性,同表屬性一樣以key/value字符類型值對的形式出現,目前Stream只支持屬性
read_mode
,并且當前不支持修改。
修改Stream的初始化數據版本
ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<timestamp as of t | version as of v > ;
stream_name:必填。待修改的Stream名。
ON TABLE <delta_table_name>: 代表Stream對象關聯的Delta Table源表,源表為修改前的源表,目前還不支持修改源表。
timestamp as of t: 代表修改Stream對象VersionOffset初始化數據版本為t,查詢范圍
(t, 最新增量數據版本]
。version as of v: 代表修改Stream對象VersionOffset初始化數據版本為v,查詢范圍
(v, 最新增量數據版本]
。
示例
-- 1. 創建Delta Table源表。
CREATE TABLE delta_table_src (pk bigint not null primary key,
val bigint) tblproperties ("transactional"="true");
-- 2. 創建一個Stream關聯Delta Table表。
CREATE STREAM delta_table_stream on table delta_table_src
version as of 1 strmproperties('read_mode'='append')
comment 'stream demo';
-- 3. 查看新建的stream信息,當前Offset Version和Reference Table Version都為1。
DESC STREAM delta_table_stream;
-- 輸出結果。
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:26:56
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
-- 4. stream關聯的Delet Table插入一條數據,目的是為了使得Delta Table的lsn遞增,
-- 之后我們將引用Delta Table的version修改為遞增后的version.
INSERT INTO delta_table_src VALUES ('1', '1');
-- 5. 查看當前Delta Table的數據版本信息
SHOW history FOR TABLE delta_table_src;
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000001 2024-09-07 10:25:59 CREATE
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000002 2024-09-07 10:28:19 APPEND
-- 6. 修改stream關聯的Delta Table的version為2
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;
-- 7. 查看修改后的stream信息,stream以及關聯的Delta Table的version都變成了2。
DESC STREAM delta_table_stream;
-- 輸出結果。
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:29:12
Offset Version 2
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 2
Parameters {
"comment": "stream demo",
"read_mode": "append"}
列出項目下的所有Stream
語法
SHOW STREAMS;
示例
-- 列出當前項目下的所有stream對象
SHOW STREAMS;
-- 輸出結果。
delta_table_stream
刪除Stream
語法
DROP STREAM [IF EXISTS] <stream_name>;
示例
-- 1. 查看當前項目中存在的所有stream對象
SHOW STREAMS;
-- 輸出顯示。
delta_table_stream
-- 2. 刪除delta_table_stream stream對象
DROP STREAM IF EXISTS delta_table_stream;
-- 3. 再次查看當前項目中存在的所有stream對象;結果為空
SHOW STREAMS;
查詢Stream
語法
SELECT * FROM <stream_name>;
查詢Stream時,單純執行DQL,并不會改變Stream的狀態,即Stream的增量起始的Offset Version不會改變,但其關聯的Delta Table的Reference Table Version會隨著Delta Table狀態的改變而改變,保持為關聯Delta Table最新的Version數據版本。單純執行DQL,表示這份增量數據沒有被真正消費,只是進行了數據的探查。
查詢Stream并且執行了DML操作,表示真正消費了Stream所表示的增量數據,會修改Stream的狀態,將關聯的數據版本遷移到這次DML操作查詢的最新增量數據版本,即Stream的Offset Version會等于關聯Delta Table的Reference Table Version,此時表示沒有新的增量數據,在當前狀態下如果Stream再被讀取,讀取的數據為空。
CDC模式查詢輸出示例
Delta Table CDC模式的使用,詳情請參見CDC。
創建Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ( "transactional"="true", 'acid.cdc.mode.enable'='true', 'cdc.insert.into.passthrough.enable'='true' );
創建目標表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");
創建CDC模式的Stream。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties('read_mode'='cdc') comment 'stream cdc mode';
插入兩條數據至Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
查詢
delta_table_stream
輸出CDC格式數據。但執行單純的DQL,并不會改變delta_table_stream
狀態,如下語句執行多次,返回結果一樣。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 2 | 2 | 2024-09-07 11:03:53 | 1 | 0 | | 1 | 1 | 2024-09-07 11:03:53 | 1 | 0 | +------------+------------+------------------+----------------+------------------+
讀取
delta_table_stream
表的增量數據,并插入到目標表delta_table_dest
。同時將delta_table_stream
的Offset Version修改為關聯表delta_table_src
最新的數據版本。此操作真正消費了delta_table_stream
表的增量數據。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
查詢目標表。表中保存的是步驟6操作消費的Stream表的增量數據。
SELECT * FROM delta_table_dest; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+
再次查詢
delta_table_stream
表,輸出為空。由于delta_table_stream
表示的增量數據已經被消費,沒有新的增量數據。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+
執行Update操作將源表pk為1記錄的val設為10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;
由于源表由Update操作產生的新的增量數據,查詢
delta_table_stream
,可輸出Update操作的CDC數據。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 1 | 1 | 2024-09-07 11:10:21 | 0 | 1 | | 1 | 10 | 2024-09-07 11:10:21 | 1 | 1 | +------------+------------+------------------+----------------+------------------+
由上述示例可見,CDC輸出模式會跟蹤Delta Table源表的記錄變化,輸出所有變化狀態的記錄,可有效用于增量計算的邏輯。
Append模式查詢輸出示例
創建Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");
創建目標表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");
創建Append模式的Stream,并關聯Delta Table。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties ('read_mode'='append') comment 'stream append mode';
插入兩條數據到Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
查詢
delta_table_stream
。SELECT * FROM delta_table_stream; -- 輸出結果,不包含系統字段。 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+
讀取
delta_table_stream
表的增量數據,并插入到目標表,同時將delta_table_stream
的Offset Version修改為關聯表delta_table_src
最新的數據版本,此操作真正消費delta_table_stream
表的增量數據。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
查詢目標表。表中保存的是步驟6 操作消費的
delta_table_stream
表的增量數據。SELECT * FROM delta_table_dest; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+
再次查詢
delta_table_stream
表,輸出為空。由于delta_table_stream
表示的增量數據已經被消費,沒有新的增量數據。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+
執行Update操作將源表pk為1的記錄val設為10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;
執行刪除操作,將源表中pk為2的記錄刪除。
DELETE FROM delta_table_src WHERE pk = 2;
查詢
delta_table_stream
,只輸出了Update操作的結果記錄 (1, 10),Delete操作的記錄不可見,也不會輸出。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 10 | +------------+------------+
由上述示例可見,Append輸出模式并不會顯示數據的操作狀態,只會輸出一條記錄的最終狀態,Delete記錄也不會輸出。因此使用的場景有限,通常可用于一些典型的ETL場景,不斷對增量插入的數據進行清洗。