如果需要將表格存儲中新增和變化的數據定期同步到OSS中備份或者使用,您可以通過在DataWorks數據集成控制臺新建和配置離線同步任務來實現周期性增量數據同步。
注意事項
準備工作
已開通OSS服務并創建存儲空間Bucket。具體操作,請參見開通OSS服務和通過控制臺創建存儲空間。
已確認和記錄表格存儲中要同步到OSS的實例、數據表或者時序表信息。
已開通DataWorks服務并創建工作空間。具體操作,請參見開通DataWorks服務和創建工作空間。
已創建RAM用戶并為RAM用戶授予OSS完全管理權限(AliyunOSSFullAccess)和管理表格存儲權限(AliyunOTSFullAccess)。具體操作,請參見創建RAM用戶和為RAM用戶授權。
重要由于配置時需要填寫訪問密鑰AccessKey(AK)信息來執行授權,為避免阿里云賬號泄露AccessKey帶來的安全風險,建議您通過RAM用戶來完成授權和AccessKey的創建。
已為RAM用戶創建AccessKey。具體操作,請參見創建AccessKey。
已新增表格存儲數據源和OSS數據源。具體操作,請參見步驟一:新增表格存儲數據源和步驟二:新增OSS數據源。
步驟一:新建同步任務節點
進入數據開發頁面。
以項目管理員身份登錄DataWorks控制臺。
選擇地域,在左側導航欄,單擊工作空間列表。
在工作空間列表頁面,在目標工作空間操作列選擇快速進入>數據開發。
在DataStudio控制臺的數據開發頁面,單擊業務流程節點下的目標業務流程。
如果需要新建業務流程,請參見創建業務流程。
在數據集成節點上右鍵選擇新建節點 > 離線同步。
在新建節點對話框,選擇路徑并填寫節點名稱。
單擊確認。
在數據集成節點下會顯示新建的離線同步節點。
步驟二:配置離線同步任務并啟動
配置表格存儲到OSS的增量數據同步任務,請根據所用數據存儲模型選擇相應任務配置方式。
如果所用的數據存儲模型是寬表模型(即使用數據表存儲數據),則需要同步數據表中的數據,請按照同步數據表數據的任務配置進行配置。
如果所用的數據存儲模型是時序模型(即使用時序表存儲數據),則需要同步時序表中的數據,請按照同步時序表數據的任務配置進行配置。
同步數據表數據的任務配置
在數據集成節點下,雙擊打開新建的離線同步任務節點。
配置同步網絡鏈接。
選擇離線同步任務的數據來源、數據去向以及用于執行同步任務的資源組,并測試連通性。
重要數據同步任務的執行必須經過資源組來實現,請選擇資源組并保證資源組與讀寫兩端的數據源能聯通訪問。
在網絡與資源配置步驟,選擇數據來源為Tablestore Stream,并選擇數據源名稱為表格存儲數據源。
選擇資源組。
選擇資源組后,系統會顯示資源組的地域、規格等信息以及自動測試資源組與所選數據源之間連通性。
重要請與新增數據源時選擇的資源組保持一致。
選擇數據去向為OSS,并選擇數據源名稱為OSS數據源。
系統會自動測試資源組與所選數據源之間連通性。
測試可連通后,單擊下一步。
配置任務并保存。
使用向導模式時只能按照行的列值增量變化形式導出數據。使用腳本模式時支持按照行的列值增量變化形式導出數據或者按照行模式導出變化后的數據。如果要按照行模式導出變化后的數據請使用腳本模式。
(推薦)向導模式
在配置任務步驟的配置數據來源與去向區域,根據實際配置數據來源和數據去向。
數據來源配置
參數
說明
表
表格存儲中的數據表名稱。
開始時間
增量讀取數據的開始時間和結束時間,分別配置為變量形式
${startTime}
和${endTime}
,具體格式在后續調度屬性中配置。增量數據的時間范圍為左閉右開的區間。結束時間
狀態表
用于記錄狀態的表名稱,默認值為TableStoreStreamReaderStatusTable。
最大重試次數
從TableStore中讀取增量數據時,每次請求的最大重試次數。
導出時序信息
是否導出時序信息,時序信息包含了數據的寫入時間等。
數據去向配置
參數
說明
文本類型
寫入OSS的文件類型,例如csv、txt。
說明不同文件類型支持的配置有差異,請以實際界面為準。
文件名(含路徑)
當設置文本類型為text、csv或orc時才能配置該參數。
OSS中的文件名稱,支持帶有路徑,例如
tablestore/20231130/myotsdata.csv
。文件路徑
當設置文本類型為parquet時才能配置該參數。
文件在OSS上的路徑,例如
tablestore/20231130/
。文件名
當設置文本類型為parquet時才能配置該參數。
OSS中的文件名稱。
列分隔符
當設置文本類型為text或csv時才能配置該參數。
寫入OSS文件時,列之間使用的分隔符。
行分隔符
當設置文本類型為text時才能配置才參數。
自定義的行分隔符,用來分隔不同數據行。例如配置為
\u0001
。您需要使用數據中不存在的分隔符作為行分隔符。如果要使用Linux或Windows平臺的默認行分隔符(
\n
、\r\n
)建議置空此配置,平臺能自適應讀取。編碼
當設置文本類型為text或csv時才能配置該參數。
寫入文件的編碼配置。
null值
當設置文本類型為text、csv或orc時才能配置該參數。
源數據源中可以表示為null的字符串,例如配置為null,如果源數據是null,則系統會視作null字段。
時間格式
當設置文本類型為text或csv時才能配置該參數。
日期類型的數據寫入到OSS文件時的時間格式,例如
yyyy-MM-dd
。前綴沖突
當設置的文件名與OSS中已有文件名沖突時的處理方法。取值范圍如下:
替換:刪除原始文件,重建一個同名文件。
保留:保留原始文件,重建一個新文件,名稱為原文件名加隨機后綴。
報錯:同步任務停止執行。
切分文件
當設置文本類型為text或csv時才能配置該參數。
寫入OSS文件時,單個Object文件的最大大小。單位為MB。最大值為100 GB。當文件大小超過指定的切分文件大小時,系統會生成新文件繼續寫入數據,直到完成所有數據寫入。
寫為一個文件
當設置文本類型為text或csv時才能配置該參數。
寫入數據到OSS時,是否寫單個文件。默認寫多個文件,當讀不到任何數據時,如果配置了文件頭,則會輸出只包含文件頭的空文件,否則只輸出空文件。
如果需要寫入到單個文件,請選中寫為一個文件復選框。此時當讀不到任何數據時, 不會產生空文件。
首行輸出表頭
當設置文本類型為text或csv時才能配置該參數。
寫入文件時第一行是否輸出表頭。默認不輸出表頭。如果需要在第一行輸出表頭,請選中首行輸出表頭復選框。
在字段映射區域,系統自動進行字段映射,保持默認配置即可。
來源字段中包括了表主鍵和增量變更信息,目標字段不支持配置。
在通道控制區域,配置任務運行參數,例如同步速率、臟數據同步策略等。關于參數配置的更多信息,請參見配置通道。
單擊圖標,保存配置。
說明執行后續操作時,如果未保存配置,則系統會出現保存確認的提示,單擊確認即可。
腳本模式
增量數據的同步需要使用到OTSStream Reader和OSS Writer插件。腳本配置規則請參見Tablestore Stream數據源和OSS數據源。
重要任務轉為腳本模式后,將無法轉為向導模式,請謹慎操作。
在配置任務步驟,單擊圖標,然后在彈出的對話框中單擊確認。
在腳本配置頁面,請根據如下示例完成配置。
重要為了便于理解,在配置示例中增加了注釋內容,實際使用腳本時請刪除所有注釋內容。
按照行模式導出增量變化后的數據
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "otsstream", //插件名,不能修改。 "parameter": { "statusTable": "TableStoreStreamReaderStatusTable", //存儲Tablestore Stream狀態的表,一般無需修改。 "maxRetries": 30, //最大重試次數。 "isExportSequenceInfo": false, //是否導出時序信息,時序信息包含了數據的寫入時間等。 "mode": "single_version_and_update_only", //Tablestore Stream導出數據的格式,目前需要設置為single_version_and_update_only。如果配置模板中無此項,則需要增加。 "datasource": "otssource", //數據源名稱,請根據實際填寫。 "envType": 1, "column": [ { "name": "pk1" }, { "name": "pk2" }, { "name": "col1" } ], "startTimeString": "${startTime}", //開始導出的時間點,由于是增量導出,需要循環啟動此任務,則此處每次啟動時的時間都不同,因此需要設置一個變量,例如${startTime}。 "table": "mytable", //Tablestore中的數據表名稱。 "endTimeString": "${endTime}" //結束導出的時間點。此處也需要設置一個變量,例如${endTime}。 }, "name": "Reader", "category": "reader" }, { "stepType": "oss", //Writer插件的名稱,不能修改。 "parameter": { //此處參數配置只適用于導出csv和text格式的文件。如果要導出為parquet和orc格式,請參照OSS數據源文檔修改OSSWriter配置。 "fieldDelimiterOrigin": ",", "nullFormat": "null", //定義null值的字符串標識符方式,可以是空字符串。 "dateFormat": "yyyy-MM-dd HH:mm:ss", //時間格式。 "datasource": "osssource", //OSS數據源名稱,請根據實際填寫。 "envType": 1, "writeSingleObject": true, //是否寫單個文件。設置此參數為true,表示寫單個文件,當讀不到任何數據時不會產生空文件;設置此參數為false,表示寫多個文件,當讀不到任何數據時,如果配置文件頭會輸出空文件只包含文件頭,否則只輸出空文件。 "writeMode": "truncate", //當同名文件存在時系統進行的操作,可選值包括truncate、append和nonConflict。truncate表示會清理已存在的同名文件,append表示會增加到已存在的同名文件內容后面,nonConflict表示當同名文件存在時會報錯。 "encoding": "UTF-8", //編碼類型。 "fieldDelimiter": ",", //每一列的分隔符。 "fileFormat": "csv", //文件類型,可選值包括csv、text、parquet和orc格式。 "object": "" //備份到OSS的文件名前綴,建議使用"Tablestore實例名/表名/date",例如"instance/table/{date}"。 }, "name": "Writer", "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "category": "processor" } ], "setting": { "errorLimit": { "record": "0" //允許出錯的個數。當錯誤超過這個數目的時候同步任務會失敗。 }, "locale": "zh", "speed": { "throttle": false, //當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,需要配置mbps參數,表示限流。 "concurrent": 3 //作業并發數。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
按照行的列值增量變化形式導出數據
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "otsstream", //插件名,不能修改。 "parameter": { "statusTable": "TableStoreStreamReaderStatusTable", //存儲Tablestore Stream狀態的表,一般無需修改。 "maxRetries": 30, //最大重試次數。 "isExportSequenceInfo": false, //是否導出時序信息,時序信息包含了數據的寫入時間等。 "datasource": "otssource", "envType": 1, "column": [//設置數據表中需要導出到OSS中的列,如果配置模板中無此項則需要增加,保持默認配置即可。 "pk1", //主鍵列名稱,如果有多個主鍵列則需要全部配置。 "pk2", //主鍵列名稱,如果有多個主鍵列則需要全部配置。 "colName", //表示有增量變化的屬性列名稱,無需修改。 "version", //表示增量變化后列的數據版本號,無需修改。格式為64位時間戳。單位為毫秒。 "colValue", //表示增量變化后屬性列值,無需修改。 "opType", //表示增量操作類型,無需修改。 "sequenceInfo" //表示自增保序sequenceid,無需修改。 ], "startTimeString": "${startTime}", //開始導出的時間點,由于是增量導出,需要循環啟動此任務,則此處每次啟動時的時間都不同,因此需要設置一個變量,例如${startTime}。 "table": "mytable", //Tablestore中的數據表名稱。 "endTimeString": "${endTime}" //結束導出的時間點。此處也需要設置一個變量,例如${endTime}。 }, "name": "Reader", "category": "reader" }, { "stepType": "oss", //Writer插件的名稱,不能修改。 "parameter": { //此處參數配置只適用于導出csv和text格式的文件。如果要導出為parquet和orc格式,請參照OSS數據源文檔修改OSSWriter配置。 "fieldDelimiterOrigin": ",", "nullFormat": "null", //定義null值的字符串標識符方式,可以是空字符串。 "dateFormat": "yyyy-MM-dd HH:mm:ss", //時間格式。 "datasource": "osssource", //OSS數據源名稱,請根據實際填寫。 "envType": 1, "writeSingleObject": true, //是否寫單個文件。設置此參數為true,表示寫單個文件,當讀不到任何數據時不會產生空文件;設置此參數為false,表示寫多個文件,當讀不到任何數據時,如果配置文件頭會輸出空文件只包含文件頭,否則只輸出空文件。 "column": [ //導出到OSS的列,只需要用序號表示即可,無需修改。 "0", "1", "2", "3", "4", "5", "6" ], "writeMode": "truncate", //當同名文件存在時系統進行的操作,可選值包括truncate、append和nonConflict。truncate表示會清理已存在的同名文件,append表示會增加到已存在的同名文件內容后面,nonConflict表示當同名文件存在時會報錯。 "encoding": "UTF-8", //編碼類型。 "fieldDelimiter": ",", //每一列的分隔符。 "fileFormat": "csv", //文件類型,可選值包括csv和text格式。 "object": "" //備份到OSS的文件名前綴,建議使用"Tablestore實例名/表名/date",例如"instance/table/{date}"。 }, "name": "Writer", "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "category": "processor" } ], "setting": { "errorLimit": { "record": "0" //允許出錯的個數。當錯誤超過這個數目的時候同步任務會失敗。 }, "locale": "zh", "speed": { "throttle": false, //當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,需要配置mbps參數,表示限流。 "concurrent": 3 //作業并發數。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
單擊圖標,保存配置。
說明執行后續操作時,如果未保存腳本,則系統會出現保存確認的提示,單擊確認即可。
同步時序表數據的任務配置
在數據集成節點下,雙擊打開新建的離線同步任務節點。
配置同步網絡鏈接。
選擇離線同步任務的數據來源、數據去向以及用于執行同步任務的資源組,并測試連通性。
重要數據同步任務的執行必須經過資源組來實現,請選擇資源組并保證資源組與讀寫兩端的數據源能聯通訪問。
在網絡與資源配置步驟,選擇數據來源為Tablestore Stream,并選擇數據源名稱為表格存儲數據源。
選擇資源組。
選擇資源組后,系統會顯示資源組的地域、規格等信息以及自動測試資源組與所選數據源之間連通性。
重要請與新增數據源時選擇的資源組保持一致。
選擇數據去向為OSS,并選擇數據源名稱為OSS數據源。
系統會自動測試資源組與所選數據源之間連通性。
測試可連通后,單擊下一步。
配置任務。
同步時序表數據時只支持使用腳本模式進行任務配置。增量數據的同步需要使用到OTSStream Reader和OSS Writer插件。腳本配置規則請參見Tablestore Stream數據源和OSS數據源。
重要任務轉為腳本模式后,將無法轉為向導模式,請謹慎操作。
在配置任務步驟,單擊圖標,然后在彈出的對話框中單擊確認。
在腳本配置頁面,請根據如下示例完成配置。
時序表只支持按照行模式導出增量變化后的數據。
重要為了便于理解,在配置示例中增加了注釋內容,實際使用腳本時請刪除所有注釋內容。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "otsstream", //插件名,不能修改。 "parameter": { "statusTable": "TableStoreStreamReaderStatusTable", //存儲Tablestore Stream狀態的表,一般無需修改。 "maxRetries": 30, //最大重試次數。 "isExportSequenceInfo": false, //是否導出時序信息,時序信息包含了數據的寫入時間等。 "mode": "single_version_and_update_only", //Tablestore Stream導出數據的格式,目前需要設置為single_version_and_update_only。如果配置模板中無此項,則需要增加。 "isTimeseriesTable":"true", //是否為時序表。當要導出時序表數據到OSS時,您需要設置此參數為true。 "datasource": "otssource", //數據源名稱,請根據實際填寫。 "envType": 1, "column": [ //設置時序表中需要導出到OSS中的列,如果配置模板中無此項則需要增加。 { "name": "_m_name" //度量名稱,無需修改。如果不需要導出,請刪除該配置。 }, { "name": "_data_source", //數據源,無需修改。如果不需要導出,請刪除該配置。 }, { "name": "_tags", //時間線標簽,無需修改。如果不需要導出,請刪除該配置。 }, { "name": "colname", //時間線數據中的列,請根據實際填寫。如果需要導出多列,請添加相應列。 } ], "startTimeString": "${startTime}", //開始導出的時間點,由于是增量導出,需要循環啟動此任務,則此處每次啟動時的時間都不同,因此需要設置一個變量,例如${startTime}。 "table": "timeseriestable", //Tablestore中的時序表名稱。 "endTimeString": "${endTime}" //結束導出的時間點。此處也需要設置一個變量,例如${endTime}。 }, "name": "Reader", "category": "reader" }, { "stepType": "oss", //Writer插件的名稱,不能修改。 "parameter": { //此處參數配置只適用于導出csv和text格式的文件。如果要導出為parquet和orc格式,請參照OSS數據源文檔修改OSSWriter配置。 "fieldDelimiterOrigin": ",", "nullFormat": "null", //定義null值的字符串標識符方式,可以是空字符串。 "dateFormat": "yyyy-MM-dd HH:mm:ss", //時間格式。 "datasource": "osssource", //OSS數據源名稱,請根據實際填寫。 "envType": 1, "writeSingleObject": false, //是否寫單個文件。設置此參數為true,表示寫單個文件,當讀不到任何數據時不會產生空文件;設置此參數為false,表示寫多個文件,當讀不到任何數據時,如果配置文件頭會輸出空文件只包含文件頭,否則只輸出空文件。 "writeMode": "truncate", //當同名文件存在時系統進行的操作,可選值包括truncate、append和nonConflict。truncate表示會清理已存在的同名文件,append表示會增加到已存在的同名文件內容后面,nonConflict表示當同名文件存在時會報錯。 "encoding": "UTF-8", //編碼類型。 "fieldDelimiter": ",", //每一列的分隔符。 "fileFormat": "csv", //文件類型,可選值包括csv和text格式。 "object": "" //備份到OSS的文件名前綴,建議使用"Tablestore實例名/表名/date",例如"instance/table/{date}"。 }, "name": "Writer", "category": "writer" }, { "name": "Processor", "stepType": null, "category": "processor", "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" } } ], "setting": { "executeMode": null, "errorLimit": { "record": "0" //允許出錯的個數。當錯誤超過這個數目的時候同步任務會失敗。 }, "speed": { "concurrent": 2, //作業并發數。 "throttle": false //當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,需要配置mbps參數,表示限流。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
單擊圖標,保存配置。
說明執行后續操作時,如果未保存腳本,則系統會出現保存確認的提示,單擊確認即可。
步驟三:配置調度屬性
通過調度配置,您可以配置同步任務的執行時間、重跑屬性、調度依賴等。
單擊任務右側的調度配置。
在調度配置面板的參數部分,單擊新增參數,根據下表說明新增參數。更多信息,請參見調度參數支持的格式。
參數
參數值
startTime
$[yyyymmddhh24-2/24]$[miss-10/24/60]
endTime
$[yyyymmddhh24-1/24]$[miss-10/24/60]
配置示例如下圖所示。
假如任務運行時的時間為2023年04月23日19:00:00點,則startTime為20230423175000,endTime為20230423185000。任務將會同步17:50到18:50時段內新增的數據。
在時間屬性部分,配置時間屬性。更多信息,請參見時間屬性配置說明。
此處以任務整點每小時自動運行為例介紹配置,如下圖所示。
在調度依賴部分,選中使用工作空間根節點復選框,系統會自動生成依賴的上游節點信息。
使用工作空間根節點表示該任務無上游的依賴任務。
配置完成后,關閉配置調度面板。
步驟四:調試代碼并提交任務
(可選)根據需要調試腳本代碼。
通過調試腳本代碼,確保同步任務能成功同步表格存儲的增量數據到OSS中。
重要調試腳本代碼時配置的時間范圍內的數據可能會多次導入到OSS中,相同數據行會覆蓋寫入到OSS中。
單擊圖標。
在參數對話框,選擇運行資源組的名稱,并配置自定義參數。
自定義參數的格式為
yyyyMMddHHmmss
,例如20230423175000。單擊運行。
提交同步任務。
提交同步任務后,同步任務會按照配置的調度屬性進行運行。
單擊圖標。
在提交對話框,根據需要填寫變更描述。
單擊確認。
步驟五:查看任務執行結果
在DataWorks控制臺查看任務運行狀態。
單擊同步任務工具欄右側的運維。
在周期實例頁面的實例視角頁簽,查看實例的運行狀態。
在OSS管理控制臺查看數據同步結果。
登錄OSS管理控制臺。
在Bucket列表頁面,找到目標Bucket后,單擊Bucket名稱。
在文件列表頁簽,選擇相應文件,下載后可查看內容是否符合預期。