MongoDB數(shù)據(jù)源為您提供讀取和寫入MongoDB雙向通道的功能,本文為您介紹DataWorks的MongoDB數(shù)據(jù)同步的能力支持情況。
支持的版本
僅支持4.x、5.x版本的MongoDB。
使用限制
數(shù)據(jù)集成支持使用MongoDB數(shù)據(jù)庫對應賬號進行連接,如果您使用的是云數(shù)據(jù)庫MongoDB版,默認會有一個root賬號。出于安全策略的考慮,在添加使用MongoDB數(shù)據(jù)源時,請避免使用root作為訪問賬號。
如果MongoDB為分片集群,則在配置數(shù)據(jù)源時,需要配置mongos地址,避免配置mongod/shard節(jié)點地址。否則同步任務在抽取MongoDB中數(shù)據(jù)時,可能會導致只查詢到指定shard的數(shù)據(jù),而非預期的全集。關于mongos、mongod,詳情請參考mongos、mongod。
在并發(fā)大于1的情況下,同步任務配置的集合中所有
_id
字段類型必須一致(例如,_id
字段都為string類型或者ObjectId類型),否則會出現(xiàn)部分數(shù)據(jù)無法同步的問題。說明并發(fā)大于1時,任務拆分會使用
_id
字段進行劃分,因而在此場景下_id
字段不支持混合類型。如果_id
有多種字段類型,您可以使用單并發(fā)的形式進行數(shù)據(jù)同步,且不配置splitFactor或splitFactor配置為1。
數(shù)據(jù)集成本身不支持數(shù)組類型,但MongoDB支持數(shù)組類型,并且數(shù)組類型具有強大的索引功能。您可以通過參數(shù)的特殊配置,將字符串轉換為MongoDB中的數(shù)組。轉換類型后,即可并行寫入MongoDB。
自建MongoDB數(shù)據(jù)庫不支持公網訪問,僅支持阿里云內網訪問。
數(shù)據(jù)集成目前不支持在數(shù)據(jù)查詢(參數(shù)query)配置中讀取指定列的數(shù)據(jù)。
離線同步任務中,如果MongoDB無法獲取字段結構,將默認按照6個字段生成字段映射,字段名分別為
col1
,col2
,col3
,col4
,col5
,col6
。在同步任務運行時,默認優(yōu)先使用
splitVector
命令進行任務分片,在部分MongoDB版本中,不支持splitVector
命令,進而會導致報錯no such cmd splitVector
,您可以在同步任務配置中,單擊按鈕,進入腳本模式,在MongoDB的parameter配置中,增加以下參數(shù),避免使用splitVector
。"useSplitVector" : false
支持的字段類型
MongoDB Reader支持的MongoDB數(shù)據(jù)類型
數(shù)據(jù)集成支持大部分MongoDB類型,但也存在部分沒有支持的情況,請注意檢查您的數(shù)據(jù)類型。
對于支持讀取的數(shù)據(jù)類型,數(shù)據(jù)集成在讀取時:
基本類型的數(shù)據(jù),會根據(jù)同步任務配置的讀取字段(column,詳見下文的附錄:MongoDB腳本Demo與參數(shù)說明)中的name自動讀取對應path下的數(shù)據(jù),并根據(jù)數(shù)據(jù)類型做自動轉換,您無需指定column的type屬性。
類型
離線讀(MongoDB Reader)
說明
ObjectId
支持
對象ID類型。
Double
支持
64位浮點數(shù)類型。
32-bit integer
支持
32位整數(shù)。
64-bit integer
支持
64位整數(shù)。
Decimal128
支持
Decimal128類型。
說明如果配置為嵌套類型、Combine類型,JSON序列化時會被當做對象處理,需增加參數(shù)
decimal128OutputType
為bigDecimal
,才能輸出為decimal。String
支持
字符串類型。
Boolean
支持
布爾類型。
Timestamp
支持
時間戳類型。
說明BsonTimestamp存儲的是時間戳,無需考慮時區(qū)影響,詳情請參見MongoDB中的時區(qū)問題。
Date
支持
日期類型。
部分復雜類型的數(shù)據(jù),您可通過配置column的type屬性,進行自定義處理。
類型
離線讀(MongoDB Reader)
說明
Document
支持
嵌入文檔類型。
如果沒有配置type屬性,則直接將Document轉JSON序列化處理。
如果配置了type屬性為
document
,則屬于嵌套類型,MongoDB Reader會按path讀取Document屬性。詳細示例請參見下文的數(shù)據(jù)類型示例2:遞歸解析處理多層嵌套的Document。
Array
支持
數(shù)組類型。
如果type配置為
array.json
、arrays
,直接JSON序列化處理。如果type配置為
array
、document.array
,則拼接為字符串,分隔符(column中的splitter)默認為英文逗號。
重要數(shù)據(jù)集成本身不支持數(shù)組類型,但MongoDB支持數(shù)組類型,并且數(shù)組類型具有強大的索引功能。您可以通過參數(shù)的特殊配置,將字符串轉換為MongoDB中的數(shù)組。轉換類型后,即可并行寫入MongoDB。
數(shù)據(jù)集成特殊數(shù)據(jù)類型:combine
類型 | 離線讀(MongoDB Reader) | 說明 |
Combine | 支持 | 數(shù)據(jù)集成自定義類型。 如果type配置為 |
MongoDB Reader數(shù)據(jù)類型轉換
結合上文可見,MongoDB Reader針對MongoDB類型的轉換列表,如下表所示。
轉換后的類型分類 | MongoDB數(shù)據(jù)類型 |
LONG | INT、LONG、document.INT和document.LONG |
DOUBLE | DOUBLE和document.DOUBLE |
STRING | STRING、ARRAY、document.STRING、document.ARRAY和COMBINE |
DATE | DATE和document.DATE |
BOOLEAN | BOOL和document.BOOL |
BYTES | BYTES和document.BYTES |
MongoDB Writer數(shù)據(jù)類型轉換
類型分類 | MongoDB數(shù)據(jù)類型 |
整數(shù)類 | INT和LONG |
浮點類 | DOUBLE |
字符串類 | STRING和ARRAY |
日期時間類 | DATE |
布爾型 | BOOL |
二進制類 | BYTES |
數(shù)據(jù)類型示例1:Combine類型使用示例
MongoDB Reader插件的Combine數(shù)據(jù)類型支持將MongoDB document中的多個字段合并成一個JSON串。例如,導入MongoDB中的字段至MaxCompute,有字段如下(下文均省略了value使用key來代替整個字段)的三個document,其中a、b是所有document均有的公共字段,x_n是不固定字段。
doc1: a b x_1 x_2
doc2: a b x_2 x_3 x_4
doc3: a b x_5
配置文件中要明確指出需要一一對應的字段,需要合并的字段則需另取名稱(不可以與document中已存在字段同名),并指定類型為COMBINE,如下所示。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]
最終導出的MaxCompute結果如下所示。
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
使用COMBINE類型合并MongoDB Document中的多個字段后,輸出結果映射至MaxCompute時會自動刪除公共字段,僅保留Document的特有字段。
例如,a、b為所有Document均有的公共字段,Document文件doc1: a b x_1 x_2
使用COMBINE類型合并字段后,輸出結果本應該為{a,b,x_1,x_2},該結果映射至MaxCompute后,會刪除公共字段a和b,最終輸出的結果為{x_1,x_2}。
數(shù)據(jù)類型示例2:遞歸解析處理多層嵌套的Document
當MongoDB中Document存在多層嵌套時,可通過配置document類型進行遞歸解析處理。示例如下:
MongoDB源端數(shù)據(jù)為:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }
MongoDB列可配置為:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
如上配置,可將源端嵌套字段a.b.c的值寫入目標端c字段中,同步任務運行后,目標端寫入數(shù)據(jù)為this is value
。
數(shù)據(jù)同步任務開發(fā)
MongoDB數(shù)據(jù)同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數(shù)解釋可在配置界面查看對應參數(shù)的文案提示。
創(chuàng)建數(shù)據(jù)源
在進行數(shù)據(jù)同步任務開發(fā)時,您需要在DataWorks上創(chuàng)建一個對應的數(shù)據(jù)源,操作流程請參見創(chuàng)建并管理數(shù)據(jù)源。
單表離線同步任務配置指導
操作流程請參見通過向導模式配置離線同步任務、通過腳本模式配置離線同步任務。
腳本模式配置的全量參數(shù)和腳本Demo請參見下文的附錄:MongoDB腳本Demo與參數(shù)說明。
單表實時同步任務配置指導
操作流程請參見配置單表增量數(shù)據(jù)實時同步、DataStudio側實時同步任務配置。
整庫級別同步任務配置指導
整庫離線、整庫(實時)全增量、整庫(實時)分庫分表等整庫級別同步任務的配置操作,請參見數(shù)據(jù)集成側同步任務配置。
最佳實踐
常見問題
附錄:MongoDB腳本Demo與參數(shù)說明
附錄:離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要在任務腳本中按照腳本的統(tǒng)一格式要求編寫腳本中的reader參數(shù)和writer參數(shù),腳本模式的統(tǒng)一要求請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下的數(shù)據(jù)源的Reader參數(shù)和Writer參數(shù)的指導詳情。
MongoDB Reader腳本Demo
配置一個從MongoDB抽取數(shù)據(jù)到本地的作業(yè),詳情請參見下文的參數(shù)說明。
實際運行時,請刪除下述代碼中的注釋。
暫時不支持取出array中的指定元素。
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", //數(shù)據(jù)源名稱。
"collectionName": "tag_data", //集合名稱。
"query": "", // 數(shù)據(jù)查詢過濾。
"column": [
{
"name": "unique_id", //字段名稱。
"type": "string" //字段類型。
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" //時區(qū)
}
},
"errorLimit":{
"record":"0"http://錯誤記錄數(shù)。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數(shù)不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1 //作業(yè)并發(fā)數(shù)。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
MongoDB Reader腳本參數(shù)
參數(shù) | 描述 |
datasource | 數(shù)據(jù)源名稱,腳本模式支持添加數(shù)據(jù)源,此配置項填寫的內容必須要與添加的數(shù)據(jù)源名稱保持一致。 |
collectionName | MonogoDB的集合名。 |
hint | MongoDB支持hint參數(shù),使查詢優(yōu)化器使用特定索引來完成查詢,在某些情況下,可以提高查詢性能。詳情請參見hint參數(shù)。示例如下:
|
column | MongoDB的文檔列名,配置為數(shù)組形式表示MongoDB的多個列。
|
batchSize | 批量獲取的記錄數(shù),該參數(shù)為選填參數(shù)。默認值為 |
cursorTimeoutInMs | 游標超時時間,該參數(shù)為選填參數(shù)。默認值為 說明
|
query | 您可以通過該配置型來限制返回MongoDB數(shù)據(jù)范圍,僅支持以下時間格式,不支持直接使用時間戳類型的格式。 說明
常用query示例如下:
說明 更多MongoDB的查詢語法請參見MongoDB官方文檔。 |
splitFactor | 如果存在比較嚴重的數(shù)據(jù)傾斜,可以考慮增加splitFactor,實現(xiàn)更小粒度的切分,無需增加并發(fā)數(shù)。 |
MongoDB Writer腳本Demo
配置寫入MongoDB的數(shù)據(jù)同步作業(yè),詳情請參見下文的參數(shù)說明。
{
"type": "job",
"version": "2.0",//版本號。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",//插件名。
"parameter": {
"datasource": "",//數(shù)據(jù)源名。
"column": [
{
"name": "_id",//列名。
"type": "ObjectId"http://數(shù)據(jù)類型。如果replacekey為_id,則此處的type必須配置為ObjectID。如果配置為string,會無法進行替換。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {//寫入模式。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"http://連接名稱。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {//錯誤記錄數(shù)。
"record": "0"
},
"speed": {
"throttle": true,//當throttle值為false時,mbps參數(shù)不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent": 1,//作業(yè)并發(fā)數(shù)。
"mbps": "1"http://限流的速度,此處1mbps = 1MB/s。
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
MongoDB Writer腳本參數(shù)
參數(shù) | 描述 | 是否必選 | 默認值 |
datasource | 數(shù)據(jù)源名稱,腳本模式支持添加數(shù)據(jù)源,該配置項填寫的內容必須與添加的數(shù)據(jù)源名稱保持一致。 | 是 | 無 |
collectionName | MongoDB的集合名。 | 是 | 無 |
column | MongoDB的文檔列名,配置為數(shù)組形式表示MongoDB的多個列。
| 是 | 無 |
writeMode | 指定了傳輸數(shù)據(jù)時是否覆蓋的信息,包括isReplace和replaceKey:
說明 當isReplace設置為true,且將非
原因是寫入數(shù)據(jù)中,存在 | 否 | 無 |
preSql | 表示數(shù)據(jù)同步寫出MongoDB前的前置操作,例如清理歷史數(shù)據(jù)等。如果preSql為空,表示沒有配置前置操作。配置preSql時,需要確保preSql符合JSON語法要求。 | 否 | 無 |
執(zhí)行數(shù)據(jù)集成作業(yè)時,會首先執(zhí)行您已配置的preSql。完成preSql的執(zhí)行后,才可以進入實際的數(shù)據(jù)寫出階段。preSql本身不會影響寫出的數(shù)據(jù)內容。數(shù)據(jù)集成通過preSql參數(shù),可以具備冪等執(zhí)行特性。例如,您的preSql在每次任務執(zhí)行前都會清理歷史數(shù)據(jù)(根據(jù)您的業(yè)務規(guī)則進行清理)。此時,如果任務失敗,您只需要重新執(zhí)行數(shù)據(jù)集成作業(yè)即可。
preSql的格式要求如下:
需要配置type字段,表示前置操作類別,支持drop和remove,例如
"preSql":{"type":"remove"}
:drop:表示刪除集合和集合內的數(shù)據(jù),collectionName參數(shù)配置的集合即是待刪除的集合。
remove:表示根據(jù)條件刪除數(shù)據(jù)。
json:您可以通過JSON控制待刪除的數(shù)據(jù)條件,例如
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}
。此處的${last_day}
為DataWorks調度參數(shù),格式為$[yyyy-mm-dd]
。您可以根據(jù)需要具體使用其它MongoDB支持的條件操作符號($gt、$lt、$gte和$lte等)、邏輯操作符(and和or等)或函數(shù)(max、min、sum、avg和ISODate等)。數(shù)據(jù)集成通過如下MongoDB標準API執(zhí)行您的數(shù)據(jù),刪除query。
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);
說明如果您需要條件刪除數(shù)據(jù),建議優(yōu)先使用JSON配置形式。
item:您可以在item中配置數(shù)據(jù)過濾的列名(name)、條件(condition)和列值(value)。例如
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}
。數(shù)據(jù)集成會基于您配置的item條件項,構造查詢query條件,進而通過MongoDB標準API執(zhí)行刪除。例如
col.deleteMany(query);
。
不識別的preSql,無需進行任何前置刪除操作。