同步增量數(shù)據(jù)到MaxCompute
如果需要將表格存儲(chǔ)中新增和變化的數(shù)據(jù)定期同步到MaxCompute中備份或者使用,您可以通過在DataWorks數(shù)據(jù)集成控制臺(tái)新建和配置離線同步任務(wù)來實(shí)現(xiàn)周期性增量數(shù)據(jù)同步。
前提條件
已新增表格存儲(chǔ)數(shù)據(jù)源。具體操作,請(qǐng)參見步驟一:新增表格存儲(chǔ)數(shù)據(jù)源。
已新增MaxCompute數(shù)據(jù)源。具體操作,請(qǐng)參見步驟二:新增MaxCompute數(shù)據(jù)源。
步驟一:新建同步任務(wù)節(jié)點(diǎn)
進(jìn)入數(shù)據(jù)開發(fā)頁(yè)面。
以項(xiàng)目管理員身份登錄DataWorks控制臺(tái)。
選擇地域,在左側(cè)導(dǎo)航欄,單擊工作空間列表。
在工作空間列表頁(yè)面,在目標(biāo)工作空間操作列選擇快速進(jìn)入>數(shù)據(jù)開發(fā)。
在DataStudio控制臺(tái)的數(shù)據(jù)開發(fā)頁(yè)面,單擊業(yè)務(wù)流程節(jié)點(diǎn)下的目標(biāo)業(yè)務(wù)流程。
如果需要新建業(yè)務(wù)流程,請(qǐng)參見創(chuàng)建業(yè)務(wù)流程。
在數(shù)據(jù)集成節(jié)點(diǎn)上右鍵選擇新建節(jié)點(diǎn) > 離線同步。
在新建節(jié)點(diǎn)對(duì)話框,選擇路徑并填寫節(jié)點(diǎn)名稱。
單擊確認(rèn)。
在數(shù)據(jù)集成節(jié)點(diǎn)下會(huì)顯示新建的離線同步節(jié)點(diǎn)。
步驟二:配置離線同步任務(wù)并啟動(dòng)
配置表格存儲(chǔ)到MaxCompute的增量數(shù)據(jù)同步任務(wù),具體步驟如下:
在數(shù)據(jù)集成節(jié)點(diǎn)下,雙擊打開新建的離線同步任務(wù)節(jié)點(diǎn)。
配置同步網(wǎng)絡(luò)鏈接。
選擇離線同步任務(wù)的數(shù)據(jù)來源、數(shù)據(jù)去向以及用于執(zhí)行同步任務(wù)的資源組,并測(cè)試連通性。
重要數(shù)據(jù)同步任務(wù)的執(zhí)行必須經(jīng)過資源組來實(shí)現(xiàn),請(qǐng)選擇資源組并保證資源組與讀寫兩端的數(shù)據(jù)源能聯(lián)通訪問。
在網(wǎng)絡(luò)與資源配置步驟,選擇數(shù)據(jù)來源為Tablestore Stream,并選擇數(shù)據(jù)源名稱為表格存儲(chǔ)數(shù)據(jù)源。
選擇資源組。
選擇資源組后,系統(tǒng)會(huì)顯示資源組的地域、規(guī)格等信息以及自動(dòng)測(cè)試資源組與所選數(shù)據(jù)源之間連通性。
重要請(qǐng)與新增數(shù)據(jù)源時(shí)選擇的資源組保持一致。
選擇數(shù)據(jù)去向為MaxCompute(ODPS),并選擇數(shù)據(jù)源名稱為MaxCompute數(shù)據(jù)源。
系統(tǒng)會(huì)自動(dòng)測(cè)試資源組與所選數(shù)據(jù)源之間連通性。
測(cè)試可連通后,單擊下一步。
配置任務(wù)并保存。
您可以通過向?qū)J交蛘吣_本模式配置任務(wù),請(qǐng)根據(jù)實(shí)際需要選擇。
(推薦)向?qū)J?/h2>
在配置任務(wù)步驟的配置數(shù)據(jù)來源與去向區(qū)域,根據(jù)實(shí)際配置數(shù)據(jù)來源和數(shù)據(jù)去向。
數(shù)據(jù)來源配置
參數(shù)
說明
表
表格存儲(chǔ)中的數(shù)據(jù)表名稱。
開始時(shí)間
增量讀取數(shù)據(jù)的開始時(shí)間和結(jié)束時(shí)間,分別配置為變量形式
${startTime}
和${endTime}
,具體格式在后續(xù)調(diào)度屬性中配置。增量數(shù)據(jù)的時(shí)間范圍為左閉右開的區(qū)間。結(jié)束時(shí)間
狀態(tài)表
用于記錄狀態(tài)的表名稱,默認(rèn)值為TableStoreStreamReaderStatusTable。
最大重試次數(shù)
從TableStore中讀取增量數(shù)據(jù)時(shí),每次請(qǐng)求的最大重試次數(shù)。
導(dǎo)出時(shí)序信息
是否導(dǎo)出時(shí)序信息,時(shí)序信息包含了數(shù)據(jù)的寫入時(shí)間等。
數(shù)據(jù)去向配置
參數(shù)
說明
Tunnel資源組
即Tunnel Quota,默認(rèn)選擇“公共傳輸資源”,即MC的免費(fèi)quota。
MaxCompute的數(shù)據(jù)傳輸資源選擇,具體請(qǐng)購(gòu)買與使用獨(dú)享數(shù)據(jù)傳輸服務(wù)資源組。
說明如果獨(dú)享tunnel quota因欠費(fèi)或到期不可用,任務(wù)在運(yùn)行中將會(huì)自動(dòng)切換為“公共傳輸資源”。
表
MaxCompute中的表名稱。
重要請(qǐng)確保目標(biāo)表中字段與源表中字段的數(shù)量和類型相匹配。
如果未創(chuàng)建與源表匹配的目標(biāo)表,請(qǐng)執(zhí)行如下操作:
單擊一鍵生成目標(biāo)表結(jié)構(gòu)。
在新建表對(duì)話框,根據(jù)實(shí)際修改目標(biāo)表的字段類型和源表的字段類型相匹配。
單擊新建表。
分區(qū)信息
如果您每日增量數(shù)據(jù)限定在對(duì)應(yīng)日期的分區(qū)中,可以使用分區(qū)做每日增量,比如配置分區(qū)pt值為${bizdate}
寫入模式
數(shù)據(jù)寫入表中的模式。取值范圍如下:
寫入前保留已有數(shù)據(jù)(Insert Into):直接向表或靜態(tài)分區(qū)中插入數(shù)據(jù)。
寫入前清理已有數(shù)據(jù)(Insert Overwrite):先清空表中的原有數(shù)據(jù),再向表或靜態(tài)分區(qū)中插入數(shù)據(jù)。
空字符串轉(zhuǎn)為Null寫入
如果源頭數(shù)據(jù)為空字符串,在向目標(biāo)MaxCompute列寫入時(shí)是否轉(zhuǎn)為Null值寫入。
同步完成才可見
單擊高級(jí)配置后才會(huì)顯示該參數(shù)。
同步到MaxCompute中的數(shù)據(jù)是否在同步完成后才能被查詢到。
在字段映射區(qū)域,系統(tǒng)自動(dòng)進(jìn)行同名映射,保持默認(rèn)即可。
在通道控制區(qū)域,配置任務(wù)運(yùn)行參數(shù),例如任務(wù)期望最大并發(fā)數(shù)、同步速率、臟數(shù)據(jù)策略、分布式處理能力等。關(guān)于參數(shù)配置的更多信息,請(qǐng)參見配置通道。
單擊圖標(biāo),保存配置。
說明執(zhí)行后續(xù)操作時(shí),如果未保存腳本,則系統(tǒng)會(huì)出現(xiàn)保存確認(rèn)的提示,單擊確認(rèn)即可。
腳本模式
增量數(shù)據(jù)的同步需要使用到Tablestore Stream Reader和MaxCompute Writer插件。腳本配置規(guī)則請(qǐng)參見Tablestore Stream數(shù)據(jù)源和MaxCompute數(shù)據(jù)源。
重要任務(wù)轉(zhuǎn)為腳本模式后,將無法轉(zhuǎn)為向?qū)J健?/p>
在配置任務(wù)步驟,單擊圖標(biāo),然后在彈出的對(duì)話框中單擊確認(rèn)。
在腳本配置頁(yè)面,請(qǐng)根據(jù)如下示例完成配置。
重要為了便于理解,在配置示例中增加了注釋內(nèi)容,實(shí)際使用腳本時(shí)請(qǐng)刪除所有注釋內(nèi)容。
{ "type": "job", "steps": [ { "stepType": "otsstream", # Reader插件的名稱。 "parameter": { "mode": "single_version_and_update_only", # 配置導(dǎo)出模式,默認(rèn)不設(shè)置,為列模式。 "statusTable": "TableStoreStreamReaderStatusTable", # 存儲(chǔ)TableStore Stream狀態(tài)的表,一般無需修改。 "maxRetries": 30, # 從TableStore中讀取增量數(shù)據(jù)時(shí),每次請(qǐng)求的最大重試次數(shù),默認(rèn)為30。重試之間有間隔,重試30次的總時(shí)間約為5分鐘,一般無需修改。 "isExportSequenceInfo": false, # 是否導(dǎo)出時(shí)序信息,時(shí)序信息包含了數(shù)據(jù)的寫入時(shí)間等。默認(rèn)該值為false,即不導(dǎo)出。single_version_and_update_only模式下只能為false。 "datasource": "", # Tablestore的數(shù)據(jù)源名稱,如果有此項(xiàng)則無需配置endpoint、accessId、accessKey和instanceName。 "column": [ # 按照需求添加需要導(dǎo)出TableStore中的列,您可以自定義個(gè)數(shù)。 { "name": "h" # 列名示例,可以是主鍵或?qū)傩粤小? }, { "name": "n" }, { "name": "s" } ], "startTimeString": "${startTime}", # 增量數(shù)據(jù)的時(shí)間范圍(左閉右開)的左邊界,格式為yyyymmddhh24miss,單位毫秒。輸入$[yyyymmddhh24miss-10/24/60],表示調(diào)度時(shí)間減去10分鐘。 "table": "", # TableStore中的表名。 "endTimeString": "${endTime}" # 增量數(shù)據(jù)的時(shí)間范圍(左閉右開)的右邊界,格式為yyyymmddhh24miss,單位為毫秒。輸入$[yyyymmddhh24miss-5/24/60],表示調(diào)度時(shí)間減去5分鐘。注意,endTime需要比調(diào)度時(shí)間提前5分鐘及以上。 }, "name": "Reader", "category": "reader" }, { "stepType": "odps", # Writer插件的名稱。 "parameter": { "partition": "", # 需要寫入數(shù)據(jù)表的分區(qū)信息。 "truncate": false, # 清理規(guī)則,寫入前是否清理已有數(shù)據(jù)。 "compress": false, # 是否壓縮。 "datasource": "", # 數(shù)據(jù)源名。 "column": [ # 需要導(dǎo)入的字段列表。 "h", "n", "s" ], "emptyAsNull": false, # 空字符串是否作為null,默認(rèn)是。 "table": "" # 表名。 }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "0" # 允許出錯(cuò)的個(gè)數(shù),當(dāng)錯(cuò)誤超過這個(gè)數(shù)目的時(shí)候同步任務(wù)會(huì)失敗。 }, "speed": { "throttle": false, #同步速率不限流。 "concurrent": 2 # 每次同步任務(wù)的并發(fā)度。 } } }
單擊圖標(biāo),保存配置。
說明執(zhí)行后續(xù)操作時(shí),如果未保存腳本,則系統(tǒng)會(huì)出現(xiàn)保存確認(rèn)的提示,單擊確認(rèn)即可。
配置調(diào)度屬性。
通過調(diào)度配置,您可以配置同步任務(wù)的執(zhí)行時(shí)間、重跑屬性、調(diào)度依賴等。
單擊任務(wù)右側(cè)的調(diào)度配置。
在調(diào)度配置面板的調(diào)度參數(shù)區(qū)域,單擊新增參數(shù),根據(jù)下表說明新增參數(shù)。更多信息,請(qǐng)參見調(diào)度參數(shù)支持的格式。
參數(shù)
參數(shù)值
bizdate
該變量名無特殊含義,您可根據(jù)業(yè)務(wù)需求自定義代碼中的變量名。
startTime
$[yyyymmddhh24-2/24]$[miss-10/24/60]
endTime
$[yyyymmddhh24-1/24]$[miss-10/24/60]
配置示例如下圖所示。
假如任務(wù)運(yùn)行時(shí)的時(shí)間為2023年04月23日19:00:00點(diǎn),則startTime為20230423175000,endTime為20230423185000。任務(wù)將會(huì)同步17:50到18:50時(shí)段內(nèi)新增的數(shù)據(jù)。
在時(shí)間屬性部分,配置時(shí)間屬性。更多信息,請(qǐng)參見時(shí)間屬性配置說明。
此處以任務(wù)整點(diǎn)每小時(shí)自動(dòng)運(yùn)行為例介紹配置,如下圖所示。
在調(diào)度依賴部分,單擊使用工作空間根節(jié)點(diǎn),系統(tǒng)會(huì)自動(dòng)生成依賴的上游節(jié)點(diǎn)信息。
使用工作空間根節(jié)點(diǎn)表示該任務(wù)無上游的依賴任務(wù)。
配置完成后,關(guān)閉配置調(diào)度面板。
單擊圖標(biāo),保存配置。
說明執(zhí)行后續(xù)操作時(shí),如果未保存腳本,則系統(tǒng)會(huì)出現(xiàn)保存確認(rèn)的提示,單擊確認(rèn)即可。
(可選)根據(jù)需要調(diào)試腳本代碼。
通過調(diào)試腳本代碼,確保同步任務(wù)能成功同步表格存儲(chǔ)的增量數(shù)據(jù)到MaxCompute中。
重要調(diào)試腳本代碼時(shí)配置的時(shí)間范圍內(nèi)的數(shù)據(jù)可能會(huì)多次導(dǎo)入到MaxCompute中,相同數(shù)據(jù)行會(huì)覆蓋寫入到MaxCompute中。
單擊圖標(biāo)。
在參數(shù)對(duì)話框,選擇運(yùn)行資源組的名稱,并配置自定義參數(shù)。
自定義參數(shù)的格式為
yyyyMMddHHmmss
,例如20230423175000。單擊運(yùn)行。
提交同步任務(wù)。
提交同步任務(wù)后,同步任務(wù)會(huì)按照配置的調(diào)度屬性進(jìn)行運(yùn)行。
單擊圖標(biāo)。
在提交對(duì)話框,根據(jù)需要填寫變更描述。
單擊確認(rèn)。
步驟三:查看任務(wù)執(zhí)行結(jié)果
在DataWorks控制臺(tái)查看任務(wù)運(yùn)行狀態(tài)。
單擊同步任務(wù)工具欄右側(cè)的運(yùn)維。
在周期實(shí)例頁(yè)面的實(shí)例視角頁(yè)簽,查看實(shí)例的運(yùn)行狀態(tài)。
查看數(shù)據(jù)同步結(jié)果。
您可使用開發(fā)ODPS SQL任務(wù)或創(chuàng)建臨時(shí)查詢功能,通過表操作查詢MaxCompute表的數(shù)據(jù)。
后續(xù)操作
同步到MaxCompute中的增量數(shù)據(jù)為列數(shù)據(jù)的變化記錄,您可以根據(jù)需要將表格存儲(chǔ)的增量數(shù)據(jù)轉(zhuǎn)換為全量數(shù)據(jù)格式。具體操作,請(qǐng)參見將表格存儲(chǔ)的增量數(shù)據(jù)轉(zhuǎn)換為全量數(shù)據(jù)格式。