將表格存儲的增量數(shù)據(jù)轉換為全量數(shù)據(jù)格式
通過DataWorks控制臺,您可以在MaxCompute中使用merge_udf.jar包將表格存儲的增量數(shù)據(jù)轉換為全量數(shù)據(jù)格式。
前提條件
已導出表格存儲全量數(shù)據(jù)到MaxCompute,且已配置同步表格存儲增量數(shù)據(jù)到MaxCompute。具體操作,請分別參見全量導出和增量同步。
已下載merge_udf.jar包。具體下載路徑請參見merge_udf.jar。
注意事項
字段名稱大小寫敏感,請確保MaxComputer中的字段名稱與表格存儲中的字段名稱一致。
步驟一:新建JAR資源
通過新建JAR資源,將下載的merge_udf.jar包上傳到MaxCompute中。
進入數(shù)據(jù)開發(fā)頁面。
以項目管理員身份登錄DataWorks控制臺。
說明僅項目管理員角色可以新增數(shù)據(jù)源,其他角色的成員僅可查看數(shù)據(jù)源。
在左側導航欄,單擊工作空間列表后,選擇地域。
在工作空間列表頁面,在目標工作空間操作列選擇快速進入>數(shù)據(jù)開發(fā)。
新建JAR資源。
在數(shù)據(jù)開發(fā)頁面,將鼠標懸停在圖標,選擇新建資源>JAR。
重要只有在工作空間配置頁面添加MaxCompute引擎后,當前頁面才會顯示MaxCompute節(jié)點。具體操作,請參見創(chuàng)建并管理工作空間。
您也可以打開相應的業(yè)務流程,右鍵單擊MaxCompute,選擇新建資源>JAR。
在新建資源對話框,選擇路徑并填寫資源名稱。
說明資源名稱無需與上傳的文件名保持一致,JAR資源的后綴必須為
.jar
。如果該JAR包已經(jīng)在MaxCompute(ODPS)客戶端上傳過,則需要取消選擇上傳為ODPS資源,否則上傳會報錯。
單擊點擊上傳,選擇相應文件進行上傳。
單擊新建。
在資源編輯頁面,提交資源到調(diào)度開發(fā)服務器端。
單擊工具欄中的圖標。
在提交對話框,填寫變更描述。
單擊確認。
步驟二:新建并注冊函數(shù)
新建函數(shù)。
在數(shù)據(jù)開發(fā)頁面,將鼠標懸停在圖標,選擇新建函數(shù)>函數(shù)。
您也可以打開相應的業(yè)務流程,右鍵單擊MaxCompute,選擇新建函數(shù)。
在新建函數(shù)對話框,選擇路徑并填寫函數(shù)名稱。
單擊新建。
注冊函數(shù)。
在注冊函數(shù)頁面,選擇函數(shù)類型為其他函數(shù)。
填寫資源列表為步驟一:新建JAR資源中的資源名稱,并根據(jù)表類型和模式填寫對應的類名。
根據(jù)表類型不同可以選擇的模式不同,單版本表只能選擇單版本模式,多版本表可以選擇多版本模式V1或者多版本模式V2。關于模式選擇的更多信息,請參見附錄:模式選擇。
表類型
模式
類名
單版本表
單版本模式
com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
多版本表
多版本模式V1
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1
多版本模式V2
com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
單擊圖標,保存配置。
在函數(shù)編輯頁面,提交函數(shù)到調(diào)度開發(fā)服務器端。
單擊工具欄中的圖標。
在提交對話框,填寫變更描述。
單擊確認。
步驟三:編寫ODPS SQL并運行
新建ODPS SQL節(jié)點。
在數(shù)據(jù)開發(fā)頁面,將鼠標懸停在圖標,選擇新建節(jié)點>ODPS SQL。
您也可以打開相應的業(yè)務流程,右鍵單擊MaxCompute,選擇新建節(jié)點>ODPS SQL。
在新建節(jié)點對話框,選擇路徑并填寫名稱。
單擊確認。
在節(jié)點編輯頁面,編寫ODPS SQL語句并執(zhí)行。
ODPS SQL語句的語法如下:
select function_name(para_list) as (custom_para_list) from( select * from stream_table_name distribute by primary_keys sort by primary_keys,SequenceID )t;
其中
function_name
為步驟二:新建并注冊函數(shù)中的函數(shù)名稱,para_list
為附錄:模式選擇中的參數(shù)列表,custom_para_list
為自定義參數(shù)列表,stream_table_name為增量表的名稱,primary_keys
為表格存儲數(shù)據(jù)表的主鍵列表,SequenceID
為增量表的sequenceid。
附錄:模式選擇
模式包括單版本模式、多版本模式V1和多版本模式V2,請根據(jù)表類型選擇合適的模式。
單版本模式
類名:com.aliyun.ots.stream.utils.mergecell.oneversion.MergeCell
參數(shù)列表
參數(shù)列表的格式為
pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo
。具體參數(shù)說明請參見下表,請根據(jù)實際填寫pknum、colnum、colnames和pknames,其他參數(shù)保持參數(shù)名稱即可。參數(shù)
類型
描述
INT pknum
常量
表格存儲數(shù)據(jù)表的主鍵個數(shù)。
INT colnum
常量
表格存儲數(shù)據(jù)表中的屬性列個數(shù)。
List<String> colnames
常量
要合并增量變更的屬性列名稱。
List<String> pknames
變量
表格存儲數(shù)據(jù)表的主鍵列表。
STRING colname
變量
屬性列。
BIGINT version
變量
版本號。
STRING colvalue
變量
增量值。
STRING optype
變量
增量操作類型。
STRING sequenceinfo
變量
自增保序sequenceid。
示例
當表格存儲數(shù)據(jù)表的主鍵為
pk1,pk2
且屬性列為col1,col2,col3
時,則在MaxCompute中創(chuàng)建的增量表Schema為pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,如果要合并增量變更的屬性列為col1,col2,col3
,設置參數(shù)列表為2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,ODPS SQL語句示例如下:select mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo) as (pk1,pk2,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted) from( select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceInfo )t;
執(zhí)行ODPS SQL語句后的輸出結果請參見下表。
pk1
pk2
col1
co1_is_deleted
col2
col2_is_deleted
col3
col3_is_deleted
test
0
\N
\N
20
\N
\N
true
輸出結果的說明如下:
當col列和col_is_deleted列均為
\N
時,表示col列無任何增量操作。當col列為具體的值且col_is_deleted列為
\N
時,表示col列的值被修改為對應值。當col列為
\N
且col_is_deleted列為true時,表示col列被刪除。
多版本模式V1
類名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV1
參數(shù)列表
參數(shù)列表的格式為
pknum,colnum,colnames,pknames,colname,version,colvalue,optype,sequenceinfo
。具體參數(shù)說明請參見下表,請根據(jù)實際填寫pknum、colnum、colnames和pknames,其他參數(shù)保持參數(shù)名稱即可。參數(shù)
類型
描述
INT pknum
常量
表格存儲數(shù)據(jù)表的主鍵個數(shù)。
INT colnum
常量
表格存儲數(shù)據(jù)表中的屬性列個數(shù)。
List<String> colnames
常量
要合并增量變更的屬性列名稱。
List<String> pknames
變量
表格存儲數(shù)據(jù)表的主鍵列表。
STRING colname
變量
屬性列。
BIGINT version
變量
版本號。
STRING colvalue
變量
增量值。
STRING optype
變量
增量操作類型。
STRING sequenceinfo
變量
自增保序sequenceid。
示例
當表格存儲數(shù)據(jù)表的主鍵為
pk1,pk2
且屬性列為col1,col2,col3
時,則在MaxCompute中創(chuàng)建的增量表Schema為pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,如果要合并增量變更的屬性列為col1,col2,col3
,設置參數(shù)列表為2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,ODPS SQL語句示例如下:select mergeCell(2,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,optype,sequenceinfo) as (pk1,pk2,version,col1,col1_is_deleted,col2,col2_is_deleted,col3,col3_is_deleted) from( select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceinfo )t;
執(zhí)行ODPS SQL語句后的輸出結果請參見下表。
pk1
pk2
version
col1
co1_is_deleted
col2
col2_is_deleted
col3
col3_is_deleted
test
0
123
\N
\N
20
\N
\N
true
輸出結果的說明如下:
當version列為具體的值,且col列和col_is_deleted列均為
\N
時,表示col列對應版本沒有任何增量操作。當version列和col列均為具體的值,且col_is_deleted列為
\N
時,表示col列對應版本的值被修改為具體的值。當version列為具體的值,col列為
\N
,且col_is_deleted列為true時,表示col列對應版本被刪除。當version列和col列均為
\N
,且col_is_deleted列為true時,表示存在刪除一列所有版本的操作。
多版本模式V2
類名:com.aliyun.ots.stream.utils.mergecell.multiversion.MergeCellV2
參數(shù)列表
參數(shù)列表的格式為
pknum,colnum,maxversion,colnames,pknames,colname,version,colvalue,optype,sequenceinfo
。具體參數(shù)說明請參見下表,請根據(jù)實際填寫pknum、colnum、maxversion、colnames和pknames,其他參數(shù)保持參數(shù)名稱即可。參數(shù)
類型
描述
BIGINT pknum
常量
表格存儲數(shù)據(jù)表的主鍵個數(shù)。
BIGINT colnum
常量
表格存儲數(shù)據(jù)表中的屬性列個數(shù)。
BIGINT maxversion
常量
最大版本數(shù)。
List<String> colnames
常量
要合并增量變更的屬性列名稱。
List<String> pknames
變量
表格存儲數(shù)據(jù)表的主鍵列表。
STRING colname
變量
屬性列。
BIGINT version
變量
版本號。
STRING colvalue
變量
增量值。
STRING optype
變量
增量操作類型。
STRING sequenceinfo
變量
自增保序sequenceid。
示例
當表格存儲數(shù)據(jù)表的主鍵為
pk1,pk2
,屬性列為col1,col2,col3
且最大版本數(shù)為3時,則在MaxCompute中創(chuàng)建的增量表Schema為pk1,pk2,colname,version,colvalue,optype,sequenceinfo
,如果要合并增量變更的屬性列為col1,col2,col3
,設置參數(shù)列表為2,3,3,"col1","col2","col3",pk1,pk2,colName,version,colValue,opType,sequenceInfo
,ODPS SQL語句示例如下:select mergeCell(2,3,3,"col1","col2","col3",pk1,pk2,colname,version,colvalue,opyype,sequenceinfo) as (pk1,pk2,col1,col2,col3) from( select * from stream_table_name distribute by pk1,pk2 sort by pk1,pk2,sequenceinfo )t;
執(zhí)行ODPS SQL語句后的輸出結果請參見下表。
pk1
pk2
col1
col2
col3
test
02
{"data":[{"version":1621330803390,"value":"value001"},{"version":1621330795198,"value":"value002"},{"version":1621330785936,"value":"value003"}],"needDeleteAllVersionFirst":true,"deleteVersions":[]}
\N
\N
輸出結果的說明如下:
data表示新寫入的數(shù)據(jù)列表,按照版本號降序排序。最多保留maxversion個版本的數(shù)據(jù)。
needDeleteAllVersionFirst表示該列是否需要刪除原有全部版本。當出現(xiàn)刪除一行DeleteRow或刪除一列的所有版本DeleteColumns時,該值為true,否則為false。
deleteVersions表示該列需要刪除的版本列表,按照版本號降序排序。最多保留maxversion個版本。
deleteVersions中的版本號不會與data中的版本號相同,當needDeleteAllVersionFirst為true時,deleteVersions為空列表。
常見問題
執(zhí)行SQL語句進行表格存儲數(shù)據(jù)格式轉換時出現(xiàn)類型轉換錯誤問題
問題現(xiàn)象
在DataWorks中通過數(shù)據(jù)開發(fā)執(zhí)行SQL語句進行表格存儲數(shù)據(jù)格式轉換時出現(xiàn)如下錯誤:
FAILED ODPS -0010000:System internal error -fuxi job failed, causer by: Failed in UDF/UDTF/UDAF com.aliyun.otsstream.utils.mergecell.oneversion.MergeCell class, at query location of line 1, column 8
可能原因
odps.sql.type.system.odps2
參數(shù)的配置不正確(即odps.sql.type.system.odps2=true
)。解決方案
在該函數(shù)的SQL語句前加
set odps.sql.type.system.odps2=false;
,并與SQL語句一起提交運行。
字段映射不到值且未查到值的字段均顯示被刪除
問題現(xiàn)象
執(zhí)行數(shù)據(jù)轉換SQL語句后,返回結果中存在字段映射不到值,且未查到值的字段均顯示被刪除。例如下圖中name列值均為
\N
且name_is_deleted列值均為True,表示name列被刪除,但是實際表格存儲中增量數(shù)據(jù)表中實際存在name列。可能原因
增量同步任務中的isExportSequenceInfo參數(shù)配置不正確(即
"isExportSequenceInfo"=false
),導致系統(tǒng)認為屬性值已被刪除。sequenceInfo中記錄了某一列的歷史版本信息,如果未導出sequenceInfo,則增量數(shù)據(jù)轉為全量數(shù)據(jù)格式時,對應版本的值無法設置到行中。
解決方案
在同步任務中配置導出時序信息,然后重新執(zhí)行同步數(shù)據(jù)并進行數(shù)據(jù)格式轉換。
請選擇合適方式進行導出時序信息配置。
通過DataWorks控制臺在增量同步任務腳本編輯頁面修改isExportSequenceInfo參數(shù)為true(即
"isExportSequenceInfo"=true
),并保存和提交同步任務。通過DataWorks控制臺在增量同步任務的配置任務步驟選中導出時序信息復選框,并保存和提交同步任務。