本文為您介紹如何使用表格存儲Tablestore(OTS)連接器。
背景信息
表格存儲Tablestore(又名OTS)面向海量結構化數據提供Serverless表存儲服務,同時針對物聯網場景深度優化提供一站式的IoTstore解決方案。適用于海量賬單、IM消息、物聯網、車聯網、風控和推薦等場景中的結構化數據存儲,提供海量數據低成本存儲、毫秒級的在線數據查詢和檢索以及靈活的數據分析能力。詳情請參見表格存儲Tablestore。
Tablestore連接器支持的信息如下。
類別 | 詳情 |
運行模式 | 流模式 |
API種類 | SQL |
支持類型 | 源表、維表和結果表 |
數據格式 | 暫不支持 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
是否支持更新或刪除結果表數據 | 是 |
前提條件
已購買Tablestore實例并創建表,詳情請參見使用流程。
使用限制
僅實時計算引擎VVR 3.0.0及以上版本支持表格存儲Tablestore連接器。
語法結構
結果表
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}', 'endPoint'='<yourEndpoint>', 'valueColumns'='birthday' );
說明Tablestore結果表必須定義有Primary Key,輸出數據以Update方式追加Tablestore表。
維表
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}' );
源表
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='${ak_id}', 'accessKey' ='${ak_secret}', 'ignoreDelete' = 'false' );
屬性列支持讀取待消費字段和Tunnel Service,以及返回數據中的
OtsRecordType
和OtsRecordTimestamp
兩個字段。字段說明請參見下表。字段名
Flink映射名
描述
OtsRecordType
type
數據操作類型。
OtsRecordTimestamp
timestamp
數據操作時間,單位為微秒。
說明全量讀取數據時,OtsRecordTimestamp取值為0。
當需要讀取
OtsRecordType
和OtsRecordTimestamp
字段時,Flink提供了METADATA關鍵字用于獲取源表中的屬性字段,具體DDL示例如下。CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', record_timestamp BIGINT METADATA FROM 'timestamp' ) WITH ( ... );
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為
ots
。instanceName
實例名。
String
是
無
無。
endPoint
實例訪問地址。
String
是
無
請參見服務地址。
tableName
表名。
String
是
無
無。
accessId
阿里云賬號或者RAM用戶的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量和密鑰管理。
accessKey
阿里云賬號或者RAM用戶的AccessKey Secret。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量和密鑰管理。
connectTimeout
連接器連接Tablestore的超時時間。
Integer
否
30000
單位為毫秒。
socketTimeout
連接器連接Tablestore的Socket超時時間。
Integer
否
30000
單位為毫秒。
ioThreadCount
IO線程數量。
Integer
否
4
無。
callbackThreadPoolSize
回調線程池大小。
Integer
否
4
無。
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
tunnelName
表格存儲數據表的數據通道名稱。
String
是
無
您需要提前在表格存儲產品側創建好通道名稱和對應的通道類型(增量、全量和全量加增量)。關于創建通道的具體操作,請參見創建數據通道。
ignoreDelete
是否忽略DELETE操作類型的實時數據。
Boolean
否
false
參數取值如下:
true:忽略。
false(默認值):不忽略。
skipInvalidData
是否忽略臟數據。如果不忽略臟數據,則處理臟數據時會進行報錯。
Boolean
否
false
參數取值如下:
true:忽略臟數據。
false(默認值):不忽略臟數據。
說明僅實時計算引擎VVR 8.0.4及以上版本支持該參數。
retryStrategy
重試策略。
Enum
否
TIME
參數取值如下:
TIME:在超時時間retryTimeoutMs內持續進行重試。
COUNT:在最大重試次數retryCount內持續進行重試。
retryCount
重試次數。
Integer
否
3
當retryStrategy設置為COUNT時,可以設置重試次數。
retryTimeoutMs
重試的超時時間。
Integer
否
180000
當retryStrategy設置為TIME時,可以設置重試的超時時間,單位為毫秒。
streamOriginColumnMapping
原始列名到真實列名的映射。
String
否
無
原始列名與真實列名之間,請使用半角冒號(:)隔開;多組映射之間,請使用半角逗號(,)隔開。例如
origin_col1:col1,origin_col2:col2
。outputSpecificRowType
是否透傳具體的RowType。
Boolean
否
false
參數取值如下:
false:不透傳,所有數據RowType均為INSERT。
true:透傳,將根據透傳的類型相應設置為INSERT、DELETE或UPDATE_AFTER。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
retryIntervalMs
重試間隔時間。
Integer
否
1000
單位為毫秒。
maxRetryTimes
最大重試次數。
Integer
否
10
無。
valueColumns
插入字段的列名。
String
是
無
多個字段以半角逗號(,)分割,例如ID或NAME。
bufferSize
流入多少條數據后開始輸出。
Integer
否
5000
無。
batchWriteTimeoutMs
寫入超時的時間。
Integer
否
5000
單位為毫秒。表示如果緩存中的數據在等待batchWriteTimeoutMs秒后,依然沒有達到輸出條件,系統會自動輸出緩存中的所有數據。
batchSize
一次批量寫入的條數。
Integer
否
100
最大值為200。
ignoreDelete
是否忽略DELETE操作。
Boolean
否
False
無。
autoIncrementKey
當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。
String
否
無
當結果表沒有主鍵自增列時,請不要設置此參數。
說明僅實時計算引擎VVR 8.0.4及以上版本支持該參數。
overwriteMode
數據覆蓋模式。
Enum
否
PUT
參數取值如下:
PUT:以PUT方式將數據寫入到Tablestore表。
UPDATE:以UPDATE方式寫入到Tablestore表。
說明動態列模式下只支持UPDATE模式。
defaultTimestampInMillisecond
設定寫入Tablestrore數據的默認時間戳。
Long
否
-1
如果不指定,則會使用系統當前的毫秒時間戳。
dynamicColumnSink
是否開啟動態列模式。
Boolean
否
false
動態列模式適用于在表定義中無需指定列名,根據作業運行情況動態插入數據列的場景。建表語句中主鍵需要定義為前若干列,最后兩列中前一列的值作為列名變量,且類型必須為String,后一列的值作為該列對應的值。
說明開啟動態列模式時,不支持主鍵自增列,且參數overwriteMode必須設置為UPDATE。
checkSinkTableMeta
是否檢查結果表元數據。
Boolean
否
true
若設置為true,會檢查Tablestore表的主鍵列和此處的建表語句中指定的主鍵是否一致。
enableRequestCompression
數據寫入過程中是否開啟數據壓縮。
Boolean
否
false
無。
維表獨有
參數
說明
數據類型
是否必填
默認值
備注
retryIntervalMs
重試間隔時間。
Integer
否
1000
單位為毫秒。
maxRetryTimes
最大重試次數。
Integer
否
10
無。
cache
緩存策略。
String
否
ALL
目前Tablestore維表支持以下三種緩存策略:
None:無緩存。
LRU:緩存維表里的部分數據。源表的每條數據都會觸發系統先在Cache中查找數據,如果沒有找到,則去物理維表中查找。
需要配置相關參數:緩存大?。╟acheSize)和緩存更新時間間隔(cacheTTLMs)。
ALL(默認值):緩存維表里的所有數據。在Job運行前,系統會將維表中所有數據加載到Cache中,之后所有的維表查找數據都會通過Cache進行。如果在Cache中無法找到數據,則KEY不存在,并在Cache過期后重新加載一遍全量Cache。
適用于遠程表數據量小且MISS KEY(源表數據和維表JOIN時,ON條件無法關聯)特別多的場景。需要配置相關參數:緩存更新時間間隔cacheTTLMs,更新時間黑名單cacheReloadTimeBlackList。
說明因為系統會異步加載維表數據,所以在使用CACHE ALL時,需要增加維表JOIN節點的內存,增加的內存大小為遠程表數據量的兩倍。
cacheSize
緩存大小。
Integer
否
無
當緩存策略選擇LRU時,可以設置緩存大小。
說明單位為數據條數。
cacheTTLMs
緩存失效時間。
Integer
否
無
單位為毫秒。cacheTTLMs配置和cache有關:
如果cache配置為None,則cacheTTLMs可以不配置,表示緩存不超時。
如果cache配置為LRU,則cacheTTLMs為緩存超時時間。默認不過期。
如果cache配置為ALL,則cacheTTLMs為緩存加載時間。默認不重新加載。
cacheEmpty
是否緩存空結果。
Boolean
否
無
true:緩存
false:不緩存
cacheReloadTimeBlackList
更新時間黑名單。在緩存策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11場景)。
String
否
無
格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情況如下所示:
用半角逗號(,)來分隔多個黑名單。
用箭頭(->)來分割黑名單的起始結束時間。
async
是否異步返回數據。
Boolean
否
false
true:表示異步返回數據。異步返回數據默認是無序的。
false(默認值):表示不進行異步返回數據。
類型映射
源表
Tablestore字段類型
Flink字段類型
INTEGER
BIGINT
STRING
STRING
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
BINARY
BINARY
結果表
Flink字段類型
Tablestore字段類型
BINARY
BINARY
VARBINARY
CHAR
STRING
VARCHAR
TINYINT
INTEGER
SMALLINT
INTEGER
BIGINT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
使用示例
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;