日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Elasticsearch數據源

Elasticsearch數據源為您提供讀取和寫入Elasticsearch雙向通道的功能,本文為您介紹DataWorks的Elasticsearch數據同步的能力支持情況。

背景信息

Elasticsearch在公共資源組上支持Elasticsearch 5.x版本,在Serverless資源組(推薦)和獨享數據集成資源組上支持Elasticsearch 5.x、6.x、7.x和8.x版本。

說明

Elasticsearch是遵從Apache開源條款的一款開源產品,是當前主流的企業級搜索引擎。Elasticsearch是一個基于Lucene的搜索和數據分析工具,它提供分布式服務。Elasticsearch核心概念同數據庫核心概念的對應關系如下所示。

Relational DB(實例)-> Databases(數據庫)-> Tables(表)-> Rows(一行數據)-> Columns(一行數據的一列)
Elasticsearch        -> Index              -> Types       -> Documents       -> Fields

Elasticsearch中可以有多個索引或數據庫,每個索引可以包括多個類型或表,每個類型可以包括多個文檔或行,每個文檔可以包括多個字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把從Reader讀入的數據寫入Elasticsearch中。

支持的版本

DataWorks平臺目前僅支持配置阿里云Elasticsearch 5.x、6.x、7.x和8.x版本數據源,不支持配置自建Elasticsearch數據源。

使用限制

離線讀寫

  • Elasticsearch Reader會獲取Server端shard信息用于數據同步,需要確保在任務同步中Server端的shards處于存活狀態,否則會存在數據不一致風險。

  • 如果您使用的是6.x及以上版本,支持使用Serverless資源組(推薦)獨享數據集成資源組

  • 不支持同步scaled_float類型的字段。

  • 不支持同步字段中帶有關鍵字 $ref的索引。

支持的字段類型

類型

離線讀(Elasticsearch Reader)

離線寫(Elasticsearch Writer)

實時寫

binary

支持

支持

支持

boolean

支持

支持

支持

keyword

支持

支持

支持

constant_keyword

不支持

不支持

不支持

wildcard

不支持

不支持

不支持

long

支持

支持

支持

integer

支持

支持

支持

short

支持

支持

支持

byte

支持

支持

支持

double

支持

支持

支持

float

支持

支持

支持

half_float

不支持

不支持

不支持

scaled_float

不支持

不支持

不支持

unsigned_long

不支持

不支持

不支持

date

支持

支持

支持

date_nanos

不支持

不支持

不支持

alias

不支持

不支持

不支持

object

支持

支持

支持

flattened

不支持

不支持

不支持

nested

支持

支持

支持

join

不支持

不支持

不支持

integer_range

支持

支持

支持

float_range

支持

支持

支持

long_range

支持

支持

支持

double_range

支持

支持

支持

date_range

支持

支持

支持

ip_range

不支持

支持

支持

ip

支持

支持

支持

version

支持

支持

支持

murmur3

不支持

不支持

不支持

aggregate_metric_double

不支持

不支持

不支持

histogram

不支持

不支持

不支持

text

支持

支持

支持

annotated-text

不支持

不支持

不支持

completion

支持

不支持

不支持

search_as_you_type

不支持

不支持

不支持

token_count

支持

不支持

不支持

dense_vector

不支持

不支持

不支持

rank_feature

不支持

不支持

不支持

rank_features

不支持

不支持

不支持

geo_point

支持

支持

支持

geo_shape

支持

支持

支持

point

不支持

不支持

不支持

shape

不支持

不支持

不支持

percolator

不支持

不支持

不支持

string

支持

支持

支持

工作原理

Elasticsearch Reader的工作原理如下:

  • 通過Elasticsearch的_searchscrollslice(即游標分片)方式實現,slice結合數據集成任務的task多線程分片機制使用。

  • 根據Elasticsearch中的Mapping配置,轉換數據類型。

更多詳情請參見Elasticsearch官方文檔

說明

Elasticsearch Reader會獲取Server端shard信息用于數據同步,需要確保在任務同步中Server端的shards處于存活狀態,否則會存在數據不一致風險。

基本配置

重要

實際運行時,請刪除下述代碼中的注釋。

{
 "order":{
  "hops":[
   {
    "from":"Reader",
    "to":"Writer"
   }
  ]
 },
 "setting":{
  "errorLimit":{
   "record":"0" //錯誤記錄數。
  },
  "jvmOption":"",
  "speed":{
   "concurrent":3,//并發數
   "throttle":true,//
                     "mbps":"12",//限流,此處1mbps = 1MB/s。
  }
 },
 "steps":[
  {
   "category":"reader",
   "name":"Reader",
   "parameter":{
    "column":[ //讀取列。
     "id",
     "name"
    ],
    "endpoint":"", //服務地址。
    "index":"",  //索引。
    "password":"",  //密碼。
    "scroll":"",  //scroll標志。
    "search":"",  //查詢query參數,與Elasticsearch的query內容相同,使用_search api,重命名為search。
    "type":"default",
    "username":""  //用戶名。
   },
   "stepType":"elasticsearch"
  },
  {
   "stepType": "elasticsearch",
            "parameter": {
                "column": [ //寫入列
                    {
                        "name": "id",
                        "type": "integer"
                    },
                    {
                        "name": "name",
                        "type": "text"
                    }
                ],
                "index": "test",   //寫入索引
                 "indexType": "",   //寫入索引類型,es7不填
                "actionType": "index",  //寫入方式
                "cleanup": false,         //是否重建索引
                "datasource": "test",   //數據源名稱
                "primaryKeyInfo": {     //主鍵取值方式
                    "fieldDelimiterOrigin": ",",
                    "column": [
                        "id"
                    ],
                    "type": "specific",
                    "fieldDelimiter": ","
                },
                "dynamic": false,  //動態映射
                "batchSize": 1024   //批量寫文檔數
            },
            "name": "Writer",
            "category": "writer"
  }
 ],
 "type":"job",
 "version":"2.0" //版本號。
}

高級功能

  • 支持全量拉取

    支持將Elasticsearch中一個文檔的所有內容拉取為一個字段。配置詳情請參見場景一:全量拉取

  • 支持提取半結構化到結構化數據

    分類

    描述

    相關文檔

    產生背景

    Elasticsearch中的數據特征為字段不固定,且有中文名、數據使用深層嵌套的形式。為更好地方便下游業務對數據的計算和存儲需求,特推出從半結構化到結構化的轉換解決方案。

    實現原理

    將Elasticsearch獲取到的JSON數據,利用JSON工具的路徑獲取特性,將嵌套數據扁平化為一維結構的數據。然后將數據映射至結構化數據表中,拆分Elasticsearch復合結構數據至多個結構化數據表。

    解決方案

    JSON有嵌套的情況,通過path路徑來解決。

    • 屬性

    • 屬性.子屬性

    • 屬性[0].子屬性

    場景二:嵌套或對象字段屬性同步

    附屬信息有一對多的情況,需要進行拆表拆行處理,進行遍歷。

    屬性[*].子屬性

    場景三:數組屬性拆分為多行

    數組歸并,一個字符串數組內容,歸并為一個屬性,并進行去重。

    屬性[]

    場景四:數組屬性去重歸并

    多屬性合一,將多個屬性合并為一個屬性。

    屬性1,屬性2

    場景五:多屬性合一同步

    多屬性選擇處理。

    屬性1|屬性2

    場景六:多屬性選擇同步

創建數據源

在進行數據同步任務開發時,您需要在DataWorks上創建一個對應的數據源,操作流程請參見創建并管理數據源詳細的配置參數解釋可在配置界面查看對應參數的文案提示

數據同步任務開發

數據同步任務的配置入口和通用配置流程可參見下文的配置指導。

單表離線同步任務配置指導

單表實時寫同步任務配置指導

操作流程請參見DataStudio側實時同步任務配置

整庫離線寫、單表/整庫全增量實時寫同步任務配置指導

操作流程請參見數據集成側同步任務配置

附錄一:腳本Demo與參數說明

離線任務腳本配置方式

如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。

Reader腳本Demo

{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //錯誤記錄數。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //讀取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服務地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密碼。
                "multiThread":true,
                "scroll":"5m",  //scroll標志。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查詢query參數,與Elasticsearch的query內容相同,使用_search api,重命名為search。
                "type":"doc",
                "username":"aliyun_di"  //用戶名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本號。
}

Reader腳本參數

參數

描述

是否必選

默認值

datasource

數據源名稱,腳本模式支持添加數據源,該配置項填寫的內容必須與添加的數據源名稱保持一致。

index

Elasticsearch中的index名。

type

Elasticsearch中indextype名。

index名

search

Elasticsearch的query參數。

pageSize

每次讀取數據的條數。

100

scroll

Elasticsearch的分頁參數,設置游標存放時間。

  • 設置的過小時,如果獲取兩頁數據間隔時間超出scroll,會導致游標過期,進而丟失數據。

  • 設置的過大時,如果同一時刻發起的查詢過多,超出服務端max_open_scroll_context配置時,會導致數據查詢報錯。

strictMode

以嚴格模式讀取Elasticsearch中的數據,當出現Elasticsearch的shard.failed時會停止讀取,避免讀取少數據。

true

sort

返回結果的排序字段。

retryCount

失敗后重試的次數。

300

connTimeOut

客戶端連接超時時間。

600,000

readTimeOut

客戶端讀取超時時間。

600,000

multiThread

http請求,是否有多線程。

true

preemptiveAuth

http是否使用搶先模式請求

false

retrySleepTime

失敗后重試的時間間隔。

1000

discovery

是否開啟節點發現。

  • true:與集群中隨機一個節點進行連接。啟用節點發現將輪詢并定期更新客戶機中的服務器列表,并對發現的節點發起查詢請求。

  • false:對配置的endpoint發起查詢請求。

false

compression

是否使用GZIP壓縮請求正文,使用時需要在es節點上啟用http.compression設置。

false

dateFormat

待同步字段存在date類型,且該字段mapping沒有format配置時,需要配置dateFormat參數。配置形式如下: "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss",該配置需要包含同步date類型字段的所有格式。

full

是否將全文檔內容作為一個字段同步至目標端,將Elasticsearch的查詢數據作為一個字段,配置詳情請參見場景一:全量拉取

multi

該配置是一個高級功能具有五種用法,兩個子屬性分別為multi.keymulti.mult,配置詳情請參見高級功能中表格內容。

Writer腳本Demo

{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1, //作業并發數。
            "mbps":"12"http://限流,此處1mbps = 1MB/s。
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {

            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource":"xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo":{
                    "type":"pk",    
                     "fieldDelimiter":",",
                     "column":[]
                    },
                "batchSize": 1000,
                "dynamic":false,
                "esPartitionColumn":[
                    {
                        "name":"col1",  
                        "comment":"xx", 
                        "type":"STRING" 
                        }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true,
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            },
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}
說明

VPC環境的Elasticsearch運行在默認資源組會存在網絡不通的情況。您需要使用Serverless資源組(推薦)和獨享數據集成資源組,才能連通VPC進行數據同步。添加資源的詳情請參見Serverless資源組

Writer腳本參數

參數

描述

是否必選

默認值

datasource

選擇需要同步的Elasticsearch數據源,若還未在DataWorks創建該數據源,請先創建,詳情請參見配置Elasticsearch數據源

index

Elasticsearch中的index名。

indexType

Elasticsearch中index的type名。

Elasticsearch

cleanup

定義當前任務在索引index已存在的情況是否要刪除數據。

  • 是(true):導入數據前刪除原來的索引并重建同名索引,此操作會刪除該索引下的數據。

  • 否(false):導入數據前保留索引中已存在的數據。

false

batchSize

定義同步任務一次性插入ElasticSearch的Document條數。

1,000

trySize

定義往ElasticSearch寫入數據失敗后的重試次數。

30

timeout

客戶端超時時間。

600,000

discovery

任務是否啟動節點發現功能。

  • true:與集群中隨機一個節點進行連接。啟用節點發現將輪詢并定期更新客戶機中的服務器列表。

  • false:與Elasticsearch集群進行連接。

false

compression

HTTP請求,開啟壓縮。

true

multiThread

HTTP請求,是否有多線程。

true

ignoreWriteError

忽略寫入錯誤,不重試,繼續寫入。

false

ignoreParseError

忽略解析數據格式錯誤,繼續寫入。

true

alias

Elasticsearch的別名類似于數據庫的視圖機制,為索引my_index創建一個別名my_index_alias,對my_index_alias的操作與my_index的操作一致。

配置alias表示在數據導入完成后,為指定的索引創建別名。

aliasMode

數據導入完成后增加別名的模式,包括append(增加模式)和exclusive(只留這一個):

  • aliasModeappend時,表示追加當前索引至別名alias映射中(一個別名對應多個索引)。

  • aliasModeexclusive時,表示首先刪除別名alias,再添加當前索引至別名alias映射中(一個別名對應一個索引)。

后續會轉換別名為實際的索引名稱,別名可以用來進行索引遷移和多個索引的查詢統一,并可以用來實現視圖的功能。

append

settings

創建index時的settings,與Elasticsearch官方一致。

column

column用來配置文檔的多個字段Filed信息,具體每個字段項可以配置name(名稱)、type(類型)等基礎配置,以及AnalyzerFormatArray等擴展配置。

Elasticsearch所支持的字段類型如下所示。

- id  //type id對應Elasticsearch中的_id,可以理解為唯一主鍵。寫入時,相同id的數據會被覆蓋,且不會被索引。
- string
- text
- keyword
- long
- integer
- short
- byte
- double
- float
- date
- boolean
- binary
- integer_range
- float_range
- long_range
- double_range
- date_range
- geo_point
- geo_shape
- ip
- token_count
- array
- object
- nested

列類型的說明如下:

  • 列類型為text類型時,可以配置analyzer(分詞器)、normsindex_options等參數,示例如下。

    {
        "name": "col_text",
        "type": "text",
        "analyzer": "ik_max_word"
        }
  • 列類型為Date類型時,您可配置如下兩種方式解析源端數據,配置方式請保持一致。

    • 方式一:根據reader端讀取字段的內容直接寫入es data字段:

      • 配置origin:true必填,讓讀取字段的內容直接寫入es data

      • 配置"format",表示在通過es writer創建mapping時,該字段需要設置format屬性。示例如下:

          {
             "parameter":{
               "column":[{
                   "name": "col_date",
                   "type": "date",
                   "format": "yyyy-MM-dd HH:mm:ss",
                   "origin": true
                }]
           }
        }
    • 方式二:(時區轉換)如果需要數據集成幫助您進行時區轉換,可添加Timezone參數。示例如下:

      配置的"format"表示數據集成在做時區轉換時,解析的時間格式如下:

        {
           "parameter" :{
             "column": [{
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
               "Timezone": "UTC"
             }]
         }
      }
  • 列類型為地理形狀geo_shape時,可以配置tree(geohash或quadtree)、precision(精度)屬性,示例如下。

    {
        "name": "col_geo_shape",
        "type": "geo_shape",
        "tree": "quadtree",
        "precision": "10m"
        }

如果需要在column中配置除了type以外的屬性值,您可以使用other_params參數,該參數配置在column中,在update mappings時,用于描述column中除了type以外的Elasticsearch屬性信息。

 {
   "name": "guid",
   "other_params":
    {
       "doc_values": false
      },
    "type": "text"
  }

如果您希望源端數據寫入為Elasticsearch時按照數組類型寫入,您可按照JSON格式或指定分隔符的方式來解析源端數據。配置詳情請參見附錄二:ElasticSearch寫入的格式期望是數組類型

dynamic

定義當在文檔中發現未存在的字段時,同步任務是否通過Elasticsearch動態映射機制為字段添加映射。

  • true:保留Elasticsearch的自動mappings映射。

  • false:默認值,不填寫默認為false,根據同步任務配置的column生成并更新Elasticsearch的mappings映射。

Elasticsearch 7.x版本的默認type_doc。使用Elasticsearch的自動mappings時,請配置_docesVersion為7。

您需要轉換為腳本模式,添加一個版本參數:"esVersion": "7"

false

actionType

表示Elasticsearch在數據寫出時的action類型,目前數據集成支持indexupdate兩種actionType,默認值為index

  • index:底層使用了Elasticsearch SDK的Index.Builder構造批量請求。Elasticsearch index插入時,需要首先判斷插入的文檔數據中是否指定ID:

    • 如果沒有指定ID,Elasticsearch會默認生成一個唯一ID。該情況下會直接添加文檔至Elasticsearch中。

    • 如果已指定ID,會進行更新(替換整個文檔),且不支持針對特定Field進行修改。

      說明

      此處的更新并非Elasticsearch中的更新(替換部分指定列替換)。

  • update:根據用戶指定的ID進行文檔更新,如果ID值在索引中不存在則插入文檔,存在則更新指定的column字段內容(其他文檔字段內容不變)。每次update完成都會獲取整個文檔信息,從而實現針對特定字段進行修改。這里update不支持條件篩選,僅根據指定ID值進行更新操作。由于每次更新都需要獲取一遍原始文檔,因此對性能同步能會有較大影響。

    說明

    設置action類型為update時,您需要設置主鍵primaryKeyInfo

index

primaryKeyInfo

定義當前寫入ElasticSearch的主鍵取值方式。

  • 業務主鍵(pk):_id 的值指定為某一個字段。

    "parameter":{
    "primaryKeyInfo":{
    "type":"pk",
    "column":["id"]}
    }
  • 聯合主鍵(specific):_id 的值指定為某幾個字段的值拼接,分隔符為您設置的主鍵分隔符fieldDelimiter

    說明

    其中字段名為eswriter的待寫入字段,向導模式拉取主鍵列配置時只包含Elasticsearch索引中已存在的字段。

    "parameter":{
    "primaryKeyInfo":{
    "type":"specific",
    "fieldDelimiter":",",
    "column":["col1","col2"]}
    }
  • 無主鍵(nopk):_id在寫入ElasticSearch時系統自動生成。

    "primaryKeyInfo":{
    "type":"nopk"
    }

specific

esPartitionColumn

定義寫入ElasticSearch時是否開啟分區,用于修改ElasticSearch中的routing的參數。

  • 開啟分區:把指定列的value通過分隔符空串連接指定為routing的值,在寫入時,插入或更新指定shard中的doc,開啟分區的情況下您需要指定分區列。

    {    "esPartitionColumn": [
            {
                "name":"col1",
                "comment":"xx",
                "type":"STRING"
                }
            ],
        }
  • 不開啟分區:不填寫該參數,默認使用_id作為routing起到將文檔均勻分布到多個分片上防止數據傾斜的作用。

false

enableWriteNull

該參數用于是否支持將來源端的空值字段同步至Elasticsearch。取值如下:

  • true:支持。同步后,Elasticsearch中對應字段的value為空。

  • false:不支持。來源端的空值字段無法同步至Elasticsearch,即在Elasticsearch中不顯示該字段。

true

附錄二:ElasticSearch寫入的格式期望是數組類型

支持以下兩種方式將源端數據按照數組類型寫入ElasticSearch。

  • 按JSON格式解析源端數據

    例如:源端數據為"[1,2,3,4,5]",配置json_array=true對其進行解析,同步將以數組格式寫入ElasticSearch。

    "parameter" : {
      {
        "name":"docs_1",
        "type":"keyword",
        "json_array":true
      }
    }
  • 按分隔符解析源端數據

    例如:源端數據為"1,2,3,4,5", 配置分隔符splitter=","對其進行解析,同步將以數組格式寫入ElasticSearch。

    說明

    一個任務僅支持配置一種分隔符,splitter全局唯一,不支持多array字段配置為不同的分隔符。例如源端字段列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter無法針對每列單獨配置使用。

    "parameter" : {
          "column": [
            {
              "name": "docs_2",
              "array": true,
              "type": "long"
            }
          ],
          "splitter":","http://注意:splitter配置與column配置同級。
    }

附錄三:場景示例

場景一:全量拉取

  • 背景說明:將Elasticsearch中文檔查詢的結果拉取為一個字段。

  • 配置示例:

    
    ## 讀端:Elasticsearch中的原始數據
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "IXgdO4MB4GR_1DmrjTXP",
            "_score": 1.0,
            "_source": {
                "feature1": "value1",
                "feature2": "value2",
                "feature3": "value3"
            }
        }]
    
    ##數據集成Elasticsearch Reader插件配置
    "parameter": {
      "column": [
          "content"
      ],
      "full":true
    }
    
    ##寫端結果:同步至目標端1行1列
    {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}

場景二:嵌套或對象字段屬性同步

  • 背景說明:Object對象或nested嵌套字段的屬性時,通過path路徑來解決。

  • 配置形式:

    • 屬性

    • 屬性.子屬性

    • 屬性[0].子屬性

  • 腳本配置:

    "multi":{
        "multi":true
    }
    說明

    向導模式暫不支持配置。

  • 配置示例:

    ## 讀端:Elasticsearch中的原始數據
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "7XAOOoMB4GR_1Dmrrust",
            "_score": 1.0,
            "_source": {
                "level1": {
                    "level2": [
                        {
                            "level3": "testlevel3_1"
                        },
                        {
                            "level3": "testlevel3_2"
                        }
                    ]
                }
            }
        }
    ]
    ##數據集成Elasticsearch reader插件配置
    "parameter": {
      "column": [
          "level1",
          "level1.level2",
          "level1.level2[0]",
          "level1.level2.level3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行數據4列
    column1(level1):            {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
    column2(level1.level2):     [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
    column3(level1.level2[0]):  {"level3":"testlevel3_1"}
    column4(level1.level2.level3):  null
    說明
    • 獲取的節點上層有數組時結果為null,如上樣例獲取level1.level2.level3不會報錯,同步結果為null,需要配置為level1.level2[0].level3或level1.level2[1].level3,當前不支持level1.level2[*].level3。

    • 不支持key出現"."的數據, 如"level1.level2":{"level3":"testlevel3_1"}, 此時該條數據獲取結果為null。

場景三:數組屬性拆分為多行

  • 背景說明:附屬信息有一對多的情況,需要將數組列拆成多行。

  • 配置形式:屬性[*].子屬性

  • 效果示意:源端數據{ "splitKey" :[1,2,3,4,5]},拆完后寫到目標端為5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}

  • 腳本配置:

    "multi":{   
           "multi":true,    
            "key": "headers"
    }
    說明
    • 向導模式下配置拆多行數組列名,會自動生成腳本配置,具有相同效果。

    • value必須為List,否則會報錯。

  • 配置示例:

    ## 讀端:Elasticsearch中的原始數據
    [
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "nhxmIYMBKDL4VkVLyXRN",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.1"
                    },
                    {
                        "remoteip": "192.0.2.2"
                    }
                ]
            }
        },
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "wRxsIYMBKDL4VkVLcXqf",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.3"
                    },
                    {
                        "remoteip": "192.0.2.4"
                    }
                ]
            }
        }
    ]
    ##數據集成Elasticsearch reader插件配置
    {
       "column":[
          "headers[*].remoteip"
      ]
      "multi":{
          "multi":true,
          "key": "headers"
      }
    }
    
    ##寫端結果:4行
    192.0.2.1
    192.0.2.2
    192.0.2.3
    192.0.2.4

場景四:數組屬性去重歸并

  • 背景說明:數組去重歸并,將一個數組屬性去重歸并后寫入為字符串屬性,數組屬性可以為子屬性如name1.name2,去重采用tostring結果作為標準。

  • 配置形式:屬性[]。

    column里面帶有 [] 關鍵字就會認為對該屬性做去重歸并。

  • 腳本配置:

    "multi":{
        "multi":true
    }
    說明

    向導模式暫不支持配置。

  • 配置示例:

    ## 讀端:Elasticsearch中的原始數據
    "hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "4nbUOoMB4GR_1Dmryj8O",
        "_score": 1.0,
        "_source": {
            "feature1": [
                "value1",
                "value1",
                "value2",
                "value2",
                "value3"
            ]
        }
    }
    ]
    ##數據集成Elasticsearch reader插件配置
    "parameter": {
      "column":[
            "feature1[]"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行1列數據
    "value1,value2,value3"

場景五:多屬性合一同步

  • 背景說明:多屬性選擇處理,返回第一個有值的屬性,都不存在時將寫入null。

  • 配置形式:屬性1|屬性2|...

    column里面帶有 "|"關鍵字就會對該項做多屬性選擇。

  • 腳本配置:

    "multi":{    
        "multi":true
    }
    說明

    向導模式暫不支持該配置。

  • 配置示例:

    ##讀端:Elasticsearch中的原始數據
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    
    ##數據集成Elasticsearch reade插件配置
    "parameter": {
      "column":[
            "feature1|feature2|feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行1列數據
    "feature1"

場景六:多屬性選擇同步

  • 背景說明:多屬性選擇處理 ,返回第一個有值的屬性,都不存在時寫入null。

  • 配置形式:屬性1|屬性2|...

    column里面帶有 "|"關鍵字就會對該項做多屬性選擇

  • 腳本配置:

    "multi":{
        "multi":true
    }
    說明

    向導模式暫不支持該配置。

  • 配置示例:

    ##讀端:Elasticsearch中的原始數據
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    ##數據集成Elasticsearch reader插件配置
    "parameter": {
      "column":[
            "feature1,feature2,feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##寫端結果:1行1列數據
    "feature1,[1,2,3],{"child":"feature3"}"

相關文檔

數據集成支持其他更多數據源接入,更多信息,請參見支持的數據源及同步方案