Tablestore Stream配置同步任務(wù)
Tablestore Stream插件主要用于導(dǎo)出Tablestore增量數(shù)據(jù),本文將為您介紹如何通過(guò)Tablestore Stream配置同步任務(wù)。
背景信息
Tablestore Stream插件與全量導(dǎo)出插件不同,增量導(dǎo)出插件僅支持多版本模式,且不支持指定列。增量數(shù)據(jù)可以看作操作日志,除數(shù)據(jù)本身外還附有操作信息。詳情請(qǐng)參見(jiàn)Tablestore Stream數(shù)據(jù)源。
Tablestore Stream配置同步任務(wù)時(shí),請(qǐng)注意以下問(wèn)題:
如果配置任務(wù)為日調(diào)度,您可以讀取當(dāng)前時(shí)間24小時(shí)以內(nèi)的數(shù)據(jù),但會(huì)丟失當(dāng)前時(shí)間前5分鐘的數(shù)據(jù)。建議您配置任務(wù)為小時(shí)調(diào)度。
設(shè)置的結(jié)束時(shí)間不能超過(guò)系統(tǒng)顯示的時(shí)間,即您設(shè)置的結(jié)束時(shí)間要比運(yùn)行時(shí)間早5分鐘。
配置日調(diào)度會(huì)出現(xiàn)數(shù)據(jù)丟失的情況。
不可以配置周期調(diào)度和月調(diào)度。
開(kāi)始時(shí)間和結(jié)束時(shí)間需要包含操作Table Store表的時(shí)間。例如,20171019162000您向Table Store插入2條數(shù)據(jù),則開(kāi)始時(shí)間設(shè)置為20171019161000,結(jié)束時(shí)間設(shè)置為20171019162600。
新增數(shù)據(jù)源
進(jìn)入數(shù)據(jù)集成頁(yè)面。
登錄DataWorks控制臺(tái),切換至目標(biāo)地域后,單擊左側(cè)導(dǎo)航欄的數(shù)據(jù)集成,在下拉框中選擇對(duì)應(yīng)工作空間后單擊進(jìn)入數(shù)據(jù)集成。
單擊左側(cè)導(dǎo)航欄中的數(shù)據(jù)源,進(jìn)入數(shù)據(jù)源列表。
單擊新增數(shù)據(jù)源。
在新增數(shù)據(jù)源對(duì)話框中,選擇數(shù)據(jù)源類(lèi)型為Tablestore 。
填寫(xiě)Tablestore數(shù)據(jù)源的各配置項(xiàng)。
參數(shù)
描述
數(shù)據(jù)源名稱(chēng)
數(shù)據(jù)源名稱(chēng)必須以字母、數(shù)字、下劃線組合,且不能以數(shù)字和下劃線開(kāi)頭。
數(shù)據(jù)源描述
對(duì)數(shù)據(jù)源進(jìn)行簡(jiǎn)單描述,不得超過(guò)80個(gè)字符。
Endpoint
Table Store服務(wù)對(duì)應(yīng)的Endpoint。
Table Store實(shí)例名稱(chēng)
Table Store服務(wù)對(duì)應(yīng)的實(shí)例名稱(chēng)。
AccessKey ID
訪問(wèn)密鑰中的AccessKey ID,您可以進(jìn)入用戶信息管理頁(yè)面進(jìn)行復(fù)制。
AccessKey Secret
訪問(wèn)密鑰中的AccessKey Secret,相當(dāng)于登錄密碼。
單擊測(cè)試連通性。
測(cè)試連通性通過(guò)后,單擊完成。
通過(guò)向?qū)J脚渲猛饺蝿?wù)
進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
登錄DataWorks控制臺(tái),切換至目標(biāo)地域后,單擊左側(cè)導(dǎo)航欄的 ,在下拉框中選擇對(duì)應(yīng)工作空間后單擊進(jìn)入數(shù)據(jù)開(kāi)發(fā)。
在目標(biāo)業(yè)務(wù)流程中,右鍵單擊數(shù)據(jù)集成,選擇 。
在新建節(jié)點(diǎn)對(duì)話框中,輸入名稱(chēng)并選擇路徑,單擊確認(rèn)。
選擇離線同步任務(wù)的數(shù)據(jù)來(lái)源Table Stream和數(shù)據(jù)去向MaxCompute(ODPS),以及用于執(zhí)行同步任務(wù)的資源組,并測(cè)試連通性。
配置數(shù)據(jù)來(lái)源與去向。
類(lèi)別
參數(shù)
描述
數(shù)據(jù)來(lái)源
表
導(dǎo)出增量數(shù)據(jù)的表的名稱(chēng)。該表需要開(kāi)啟Stream,您可以在建表時(shí)開(kāi)啟。
開(kāi)始時(shí)間
增量數(shù)據(jù)的時(shí)間范圍(左閉右開(kāi))的左邊界,格式為yyyymmddhh24miss,單位為毫秒。
結(jié)束時(shí)間
增量數(shù)據(jù)的時(shí)間范圍(左閉右開(kāi))的右邊界,格式為yyyymmddhh24miss,單位為毫秒。
狀態(tài)表
用于記錄狀態(tài)的表的名稱(chēng)。
最大重試次數(shù)
從TableStore中讀增量數(shù)據(jù)時(shí),每次請(qǐng)求的最大重試次數(shù),默認(rèn)是30。
導(dǎo)出時(shí)序信息
是否導(dǎo)出時(shí)序信息,包括數(shù)據(jù)的寫(xiě)入時(shí)間等信息。
數(shù)據(jù)去向
表
選擇需要寫(xiě)入的表。
分區(qū)信息
此處需同步的表是非分區(qū)表,所以無(wú)分區(qū)信息。
清理規(guī)則
寫(xiě)入前清理已有數(shù)據(jù):導(dǎo)數(shù)據(jù)之前,清空表或者分區(qū)的所有數(shù)據(jù),相當(dāng)于
insert overwrite
。寫(xiě)入前保留已有數(shù)據(jù):導(dǎo)數(shù)據(jù)之前,不清理任何數(shù)據(jù),每次運(yùn)行數(shù)據(jù)都是追加進(jìn)去的,相當(dāng)于
insert into
。
空字符串作為null
默認(rèn)值為否。
配置字段映射關(guān)系。
左側(cè)的源頭表字段和右側(cè)的目標(biāo)表字段為一一對(duì)應(yīng)的關(guān)系。單擊添加一行可以增加單個(gè)字段,鼠標(biāo)放至需要?jiǎng)h除的字段上,即可單擊刪除圖標(biāo)進(jìn)行刪除。
配置通道。
單擊工具欄中的保存圖標(biāo)。
單擊工具欄中的運(yùn)行圖標(biāo),運(yùn)行之前需要配置自定義參數(shù)。
通過(guò)腳本模式配置同步任務(wù)
如果您需要通過(guò)腳本模式配置此任務(wù),單擊工具欄中的轉(zhuǎn)換腳本,選擇確認(rèn)即可進(jìn)入腳本模式。
您可以根據(jù)自身進(jìn)行配置,示例腳本如下。
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "Tablestore",
"parameter": {
"datasource": "Tablestore",//數(shù)據(jù)源名,需要與您添加的數(shù)據(jù)源名稱(chēng)保持一致。
"dataTable": "person",//導(dǎo)出增量數(shù)據(jù)的表的名稱(chēng)。該表需要開(kāi)啟Stream,可以在建表時(shí)開(kāi)啟。
"startTimeString": "${startTime}",//增量數(shù)據(jù)的時(shí)間范圍(左閉右開(kāi))的左邊界,格式為yyyymmddhh24miss,單位毫秒。
"endTimeString": "${endTime}",//運(yùn)行時(shí)間。
"statusTable": "TableStoreStreamReaderStatusTable",//用于記錄狀態(tài)的表的名稱(chēng)。
"maxRetries": 30,//請(qǐng)求的最大重試次數(shù)。
"isExportSequenceInfo": false,
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_source",//數(shù)據(jù)源名。
"table": "person",//目標(biāo)表名。
"truncate": true,
"partition": "pt=${bdp.system.bizdate}",//分區(qū)信息。
"column": [//目標(biāo)列名。
"id",
"colname",
"version",
"colvalue",
"optype",
"sequenceinfo"
]
}
},
"setting": {
"speed": {
"mbps": 7,//作業(yè)速率上限,此處1mbps = 1MB/s。
"concurrent": 7//并發(fā)數(shù)。
}
}
}
}
關(guān)于運(yùn)行時(shí)間參數(shù)和結(jié)束時(shí)間參數(shù),有以下兩種表現(xiàn)形式(配置任務(wù)時(shí)選擇其中一種):
"startTimeString": "${startTime}"
:增量數(shù)據(jù)的時(shí)間范圍(左閉右開(kāi))的左邊界,格式為yyyymmddhh24miss,單位為毫秒。"endTimeString": "${endTime}"
:增量數(shù)據(jù)的時(shí)間范圍(左閉右開(kāi))的右邊界,格式為yyyymmddhh24miss,單位為毫秒。"startTimestampMillis":""
:增量數(shù)據(jù)的時(shí)間范圍(左閉右開(kāi))的左邊界,單位為毫秒。Reader插件會(huì)從statusTable中找對(duì)應(yīng)startTimestampMillis的位點(diǎn),從該點(diǎn)開(kāi)始讀取開(kāi)始導(dǎo)出數(shù)據(jù)。
如果statusTable中找不到對(duì)應(yīng)的位點(diǎn),則從系統(tǒng)保留的增量數(shù)據(jù)的第1條開(kāi)始讀取,并跳過(guò)寫(xiě)入時(shí)間小于startTimestampMillis的數(shù)據(jù)。
"endTimestampMillis":" "
:增量數(shù)據(jù)的時(shí)間范圍(左閉右開(kāi))的右邊界,單位為毫秒。Reader插件startTimestampMillis位置開(kāi)始導(dǎo)出數(shù)據(jù)后,當(dāng)遇到第1條時(shí)間戳大于等于endTimestampMillis的數(shù)據(jù)時(shí),結(jié)束導(dǎo)出數(shù)據(jù),導(dǎo)出完成。
當(dāng)讀取完當(dāng)前全部的增量數(shù)據(jù)時(shí),結(jié)束讀取,即使未達(dá)endTimestampMillis。
如果配置isExportSequenceInfo項(xiàng)為true,如“isExportSequenceInfo”: true
,則會(huì)導(dǎo)出時(shí)序信息,目標(biāo)會(huì)多出1行,目標(biāo)字段列則多1列。時(shí)序信息包含了數(shù)據(jù)的寫(xiě)入時(shí)間等,默認(rèn)該值為false,即不導(dǎo)出。