將分區(qū)數(shù)據(jù)批量同步到以分區(qū)名為特征的MySQL表中
本文利用DataWorks賦值節(jié)點(diǎn)與for-each節(jié)點(diǎn)的特性,實(shí)現(xiàn)將MaxCompute中以年月日和地域劃分的二級(jí)分區(qū)數(shù)據(jù),批量同步到以二級(jí)分區(qū)內(nèi)容為特征的MySQL表中,本文以「年月日_地域」為后綴特征介紹。
場(chǎng)景介紹
在生產(chǎn)過(guò)程中,MaxCompute數(shù)據(jù)倉(cāng)庫(kù)每天會(huì)產(chǎn)生大量數(shù)據(jù),這些數(shù)據(jù)通常按照年月日和地域進(jìn)行分區(qū)拆分,并同步至帶有年月日及地域后綴的MySQL表中,以便進(jìn)行業(yè)務(wù)處理。由于DataWorks的標(biāo)準(zhǔn)離線同步任務(wù)無(wú)法直接實(shí)現(xiàn)這一需求,我們可以通過(guò)創(chuàng)建賦值節(jié)點(diǎn)來(lái)獲取每日的地域二級(jí)分區(qū)信息,并利用for-each節(jié)點(diǎn)的循環(huán)特性,將這些二級(jí)分區(qū)數(shù)據(jù)作為參數(shù)傳遞給離線同步腳本。離線同步腳本會(huì)根據(jù)獲取的分區(qū)參數(shù)信息,將數(shù)據(jù)同步到相應(yīng)命名的MySQL表中。
前提條件
已在DataWorks工作空間的數(shù)據(jù)源管理頁(yè)面,新增MaxCompute來(lái)源數(shù)據(jù)源。
說(shuō)明本文使用的測(cè)試數(shù)據(jù),請(qǐng)參見MaxCompute示例數(shù)據(jù)。
數(shù)據(jù)源支持的數(shù)據(jù)同步能力,請(qǐng)參見MaxCompute數(shù)據(jù)源。
已在DataWorks工作空間的數(shù)據(jù)源管理頁(yè)面,新增MySQL去向數(shù)據(jù)源。
說(shuō)明數(shù)據(jù)源支持的數(shù)據(jù)同步能力,請(qǐng)參見MySQL數(shù)據(jù)源。
已在MySQL數(shù)據(jù)源中執(zhí)行以
_年月日_地域
為后綴的建表語(yǔ)句,請(qǐng)參見MySQL建表語(yǔ)句。已購(gòu)買Serverless資源組,并為資源組綁定工作空間、完成網(wǎng)絡(luò)連通配置。
說(shuō)明本文僅支持Serverless資源組。
購(gòu)買與配置操作,請(qǐng)參見新增和使用Serverless資源組。
實(shí)現(xiàn)原理
業(yè)務(wù)實(shí)現(xiàn)原理,如下圖所示:
使用賦值節(jié)點(diǎn)來(lái)獲取MaxCompute表中當(dāng)前日期的所有二級(jí)分區(qū)信息。
利用for-each節(jié)點(diǎn)的循環(huán)遍歷特性,將從賦值節(jié)點(diǎn)獲取的分區(qū)信息作為參數(shù),循環(huán)傳遞給循環(huán)體內(nèi)的離線同步節(jié)點(diǎn)。
通過(guò)離線同步節(jié)點(diǎn)腳本內(nèi)的動(dòng)態(tài)參數(shù)配置,實(shí)現(xiàn)從MaxCompute二級(jí)分區(qū)到MySQL特定表的數(shù)據(jù)遷移。
操作步驟
步驟一:創(chuàng)建賦值節(jié)點(diǎn)
進(jìn)入數(shù)據(jù)開發(fā)頁(yè)面,在下拉框中選擇對(duì)應(yīng)工作空間后單擊進(jìn)入數(shù)據(jù)開發(fā)。
新建賦值節(jié)點(diǎn)。
右鍵單擊目標(biāo)業(yè)務(wù)流程,選擇
。在新建節(jié)點(diǎn)對(duì)話框中,輸入名稱,并選擇節(jié)點(diǎn)類型及路徑。單擊確認(rèn),進(jìn)入賦值節(jié)點(diǎn)編輯頁(yè)面。
配置調(diào)度信息。
單擊賦值節(jié)點(diǎn)編輯頁(yè)面右側(cè)的調(diào)度配置,進(jìn)入調(diào)度配置頁(yè)面。
在調(diào)度參數(shù)的參數(shù)名中輸入
dt_time
,在參數(shù)值內(nèi)點(diǎn)擊下拉選擇今天$[yyyymmdd]
,將當(dāng)前時(shí)間年月日賦值給參數(shù)dt_time
。在調(diào)度依賴的依賴的上游節(jié)點(diǎn)對(duì)話框內(nèi)點(diǎn)擊勾選使用工作空間根節(jié)點(diǎn)為節(jié)點(diǎn)配置上游依賴。
開發(fā)賦值節(jié)點(diǎn)任務(wù)。
賦值節(jié)點(diǎn)支持ODPS SQL、SHELL和Python三種語(yǔ)言編寫開發(fā)任務(wù),本文采用ODPS SQL語(yǔ)言進(jìn)行示例說(shuō)明。
SELECT region FROM sales_data WHERE dt = ${dt_time} GROUP BY region;
說(shuō)明使用動(dòng)態(tài)參數(shù)
dt_time
獲取每日年月日信息,可完成對(duì)每日全部地域分區(qū)的獲取。該節(jié)點(diǎn)輸出參數(shù)默認(rèn)為右側(cè)調(diào)度配置中的本節(jié)點(diǎn)輸出參數(shù)
outputs
,節(jié)點(diǎn)輸出值會(huì)自動(dòng)被DataWorks捕獲,傳遞給下游for-each節(jié)點(diǎn)。如果您配置了多個(gè)MaxCompute數(shù)據(jù)源,則需在節(jié)點(diǎn)上方的MaxCompute引擎實(shí)例對(duì)話框中選擇所需執(zhí)行的數(shù)據(jù)源實(shí)例。
保存提交節(jié)點(diǎn)任務(wù)。
單擊工具欄中的圖標(biāo),保存編寫的SQL語(yǔ)句。
單擊工具欄中的圖標(biāo),提交節(jié)點(diǎn)任務(wù)。
說(shuō)明如您創(chuàng)建的工作空間為標(biāo)準(zhǔn)模式,則需單擊節(jié)點(diǎn)上方發(fā)布按鈕進(jìn)行任務(wù)發(fā)布。
步驟二:創(chuàng)建for-each節(jié)點(diǎn)
新建for-each節(jié)點(diǎn)。
右鍵單擊目標(biāo)業(yè)務(wù)流程,選擇
。在新建節(jié)點(diǎn)對(duì)話框中,輸入名稱,并選擇節(jié)點(diǎn)類型及路徑。單擊確認(rèn),進(jìn)入for-each節(jié)點(diǎn)編輯頁(yè)面。
配置調(diào)度信息。
單擊賦值節(jié)點(diǎn)編輯頁(yè)面右側(cè)的調(diào)度配置,進(jìn)入調(diào)度配置頁(yè)面。
在調(diào)度依賴的依賴的上游節(jié)點(diǎn)下拉選擇節(jié)點(diǎn)名稱,在輸入框中輸入步驟一中創(chuàng)建的賦值節(jié)點(diǎn)名稱,選擇后綴名為
_out
的賦值節(jié)點(diǎn),單擊添加按鈕。單擊打開節(jié)點(diǎn)上下文參數(shù)內(nèi)容,在本節(jié)點(diǎn)輸入?yún)?shù)中找到參數(shù)名為
loopDataArray
的內(nèi)容,單擊內(nèi)容右側(cè)編輯按鈕。單擊選擇取值來(lái)源信息并進(jìn)行保存。
系統(tǒng)支持的最大循環(huán)次數(shù)為128次。如果您實(shí)際業(yè)務(wù)情況循環(huán)次數(shù)大于128次,您可在頁(yè)面提示框進(jìn)行配置修改。
步驟三:創(chuàng)建離線同步節(jié)點(diǎn)
新建離線同步節(jié)點(diǎn)。
單擊for-each節(jié)點(diǎn)編輯頁(yè)面左側(cè)的
節(jié)點(diǎn)。在新建節(jié)點(diǎn)對(duì)話框中,輸入節(jié)點(diǎn)名稱,單擊新建。
網(wǎng)絡(luò)與資源配置。
選擇離線同步節(jié)點(diǎn),右鍵單擊打開節(jié)點(diǎn),進(jìn)入網(wǎng)絡(luò)與資源配置頁(yè)面。
配置來(lái)源數(shù)據(jù)源,數(shù)據(jù)來(lái)源選擇
MaxCompute(ODPS)
,在數(shù)據(jù)源名稱下拉選擇您所需同步的MaxCompute數(shù)據(jù)源。在新建獨(dú)享數(shù)據(jù)集成資源組模塊選擇您所創(chuàng)建的資源組。
配置去向數(shù)據(jù)源,數(shù)據(jù)去向選擇
MySQL
,在數(shù)據(jù)源名稱下拉選擇您所需接收的MySQL數(shù)據(jù)源。
配置數(shù)據(jù)來(lái)源與去向。
點(diǎn)擊下一步進(jìn)入配置數(shù)據(jù)來(lái)源與去向頁(yè)面。
在數(shù)據(jù)來(lái)源中配置schema和表名信息,在schema下拉框選擇您所創(chuàng)建的schema,在表名下拉框選擇示例表名
sales_data
。在數(shù)據(jù)去向中配置表名信息,在表名下拉框選擇示例表名
prefix_20240913_beijing
。
配置動(dòng)態(tài)分區(qū)參數(shù)。
找到配置數(shù)據(jù)來(lái)源與去向頁(yè)面上方的轉(zhuǎn)換腳本按鈕,單擊提示框的確定按鈕。
找到參數(shù)
partition
,將里面的參數(shù)值dt=${bizdate},region=
信息更換為動(dòng)態(tài)分區(qū)dt=${offline_time},region=${dag.foreach.current}
。說(shuō)明在for-each節(jié)點(diǎn)內(nèi),可以通過(guò)參數(shù)
${dag.foreach.current}
獲取當(dāng)前遍歷值。找到MySQL表名,將里面的表名
prefix_20240912_shanghai
信息更換為動(dòng)態(tài)表名prefix_${offline_time}_${dag.foreach.current}
。在右側(cè)的調(diào)度配置對(duì)話框中,設(shè)置調(diào)度參數(shù)
offline_time
,在其參數(shù)值內(nèi)通過(guò)下拉菜單選擇今天$[yyyymmdd]
,從而將當(dāng)前日期的年月日賦值給參數(shù)offline_time
。在調(diào)度依賴的依賴的上游節(jié)點(diǎn)的節(jié)點(diǎn)輸出對(duì)話框中輸入
out
,選擇節(jié)點(diǎn)名稱:start
的節(jié)點(diǎn)進(jìn)行添加。
單擊工具欄中的圖標(biāo),保存離線同步任務(wù)。
步驟四:保存提交for-each循環(huán)流程
進(jìn)入步驟二中創(chuàng)建的for-each節(jié)點(diǎn)頁(yè)面。
如下圖所示,配置連接for-each節(jié)點(diǎn)與離線同步節(jié)點(diǎn)。
單擊工具欄中的圖標(biāo),保存循環(huán)流程。
單擊工具欄中的圖標(biāo),在提交頁(yè)面勾選全部節(jié)點(diǎn)信息并點(diǎn)擊確定按鈕進(jìn)行提交。
如您創(chuàng)建的工作空間為標(biāo)準(zhǔn)模式,則需單擊節(jié)點(diǎn)上方發(fā)布按鈕進(jìn)行任務(wù)發(fā)布。
步驟五:執(zhí)行節(jié)點(diǎn)任務(wù)
單擊for-each節(jié)點(diǎn)頁(yè)面左上角的運(yùn)維,進(jìn)入運(yùn)維中心,并在左側(cè)任務(wù)運(yùn)維導(dǎo)航欄中選擇
。找到您所創(chuàng)建的賦值節(jié)點(diǎn),單擊
進(jìn)入補(bǔ)數(shù)據(jù)頁(yè)面。勾選選擇下游任務(wù)的任務(wù)信息,點(diǎn)擊提交并跳轉(zhuǎn)按鈕執(zhí)行該任務(wù)。說(shuō)明您可以在跳轉(zhuǎn)頁(yè)面查看對(duì)應(yīng)任務(wù)的運(yùn)行狀況。
結(jié)果驗(yàn)證
任務(wù)執(zhí)行成功后,您可以在MySQL數(shù)據(jù)庫(kù)中檢查prefix_20240913_beijing
、prefix_20240913_shanghai
、prefix_20240913_hangzhou
這三個(gè)表的數(shù)據(jù),確認(rèn)它們與MaxCompute表當(dāng)前日對(duì)應(yīng)的二級(jí)分區(qū)數(shù)據(jù)是否一致。
MaxCompute表數(shù)據(jù):
您可根據(jù)以下示例SQL語(yǔ)句查詢出相應(yīng)結(jié)果。
SELECT * FROM sales_data where dt = '20240913';
MySQL表數(shù)據(jù):
您可根據(jù)以下示例SQL語(yǔ)句查詢出相應(yīng)結(jié)果。
SELECT * FROM `prefix_20240913_beijing`;
SELECT * FROM `prefix_20240913_shanghai`;
SELECT * FROM `prefix_20240913_hangzhou`;
示例數(shù)據(jù)
MaxCompute示例數(shù)據(jù)
-- 創(chuàng)建分區(qū)表
-- dt為一級(jí)年月日分區(qū),region為二級(jí)地域分區(qū)。
CREATE TABLE IF NOT EXISTS sales_data (
id BIGINT,
product_name STRING,
quantity INT,
price DECIMAL(10,2)
)
PARTITIONED BY (dt STRING, region STRING);
-- 插入測(cè)試數(shù)據(jù)
INSERT INTO TABLE sales_data PARTITION (dt='20240913', region='beijing')
VALUES (1, 'phone', 10, 99.99);
INSERT INTO TABLE sales_data PARTITION (dt='20240913', region='shanghai')
VALUES (2, 'book', 5, 70.00);
INSERT INTO TABLE sales_data PARTITION (dt='20240913', region='hangzhou')
VALUES (3, 'book', 5, 70.00);
MySQL建表語(yǔ)句
-- MySql 建表
-- 表名格式prefix_年月日_地域
CREATE TABLE IF NOT EXISTS prefix_20240913_beijing (
id BIGINT,
product_name varchar(100),
quantity INT,
price DECIMAL(10,2)
);
CREATE TABLE IF NOT EXISTS prefix_20240913_shanghai (
id BIGINT,
product_name varchar(100),
quantity INT,
price DECIMAL(10,2)
);
CREATE TABLE IF NOT EXISTS prefix_20240913_hangzhou (
id BIGINT,
product_name varchar(100),
quantity INT,
price DECIMAL(10,2)
);
在創(chuàng)建MySQL數(shù)據(jù)表時(shí),需確保其按照prefix_年月日_地域
的命名規(guī)則,并且數(shù)量與MaxCompute表當(dāng)前日的二級(jí)分區(qū)數(shù)量一致,否則同步過(guò)程會(huì)報(bào)錯(cuò)。