通過logstash-input-datahub插件,您可以讀取DataHub中的數(shù)據(jù)到其他數(shù)據(jù)源中。本文介紹如何使用logstash-input-datahub插件。

前提條件

您已完成以下操作:

  • 安裝logstash-input-datahub插件。

    具體操作,請(qǐng)參見安裝或卸載插件

  • 開通DataHub產(chǎn)品,并完成創(chuàng)建項(xiàng)目、創(chuàng)建Topic和導(dǎo)入數(shù)據(jù)。

    具體操作,請(qǐng)參見快速入門

使用logstash-input-datahub插件

參見通過配置文件管理管道,在創(chuàng)建管道任務(wù)時(shí),按照以下說明配置Pipeline參數(shù),保存并部署后,即可觸發(fā)阿里云Logstash讀取DataHub的數(shù)據(jù)到目標(biāo)數(shù)據(jù)庫(kù)中。

Logstash的Pipeline配置如下,相關(guān)參數(shù)說明請(qǐng)參見參數(shù)說明

input {
    datahub {
        access_id => "Your accessId"
        access_key => "Your accessKey"
        endpoint => "Endpoint"
        project_name => "test_project"
        topic_name => "test_topic"
        interval => 5
        #cursor => {
        #    "0"=>"20000000000000000000000003110091"
        #    "2"=>"20000000000000000000000003110091"
        #    "1"=>"20000000000000000000000003110091"
        #    "4"=>"20000000000000000000000003110091"
        #    "3"=>"20000000000000000000000003110000"
        #}
        shard_ids => []
        pos_file => "/ssd/1/<Logstash實(shí)例ID>/logstash/data/文件名"
    }
}
output {
    elasticsearch {
      hosts => ["http://es-cn-mp91cbxsm000c****.elasticsearch.aliyuncs.com:9200"]
      user => "elastic"
      password => "your_password"
      index => "datahubtest"
      document_type => "_doc"
  }
}
注意 目前阿里云Logstash只支持同一專有網(wǎng)絡(luò)下的數(shù)據(jù)傳輸,如果源端數(shù)據(jù)在公網(wǎng)環(huán)境下,請(qǐng)參見配置NAT公網(wǎng)數(shù)據(jù)傳輸,通過公網(wǎng)訪問Logstash。

參數(shù)說明

logstash-input-datahub插件支持的參數(shù)如下。
參數(shù) 類型 是否必選 說明
endpoint string DataHub對(duì)外服務(wù)的訪問域名,詳細(xì)信息請(qǐng)參見域名列表
access_id string 阿里云賬號(hào)的AccessKey ID。
access_key string 阿里云賬號(hào)的Access Key Secret。
project_name string DataHub的項(xiàng)目名稱。
topic_name string DataHub的Topic名稱。
retry_times number 重試次數(shù)。-1表示無限重試(默認(rèn))、0表示不重試、大于0表示按照設(shè)置的次數(shù)重試。
retry_interval number 重試的間隔,單位為秒。
shard_ids array 需要消費(fèi)的shard列表。默認(rèn)為空,表示全部消費(fèi)。
cursor string 消費(fèi)起點(diǎn)。默認(rèn)為空,表示從頭開始消費(fèi)。
pos_file string Checkpoint記錄文件,必須配置,優(yōu)先使用checkpoint恢復(fù)消費(fèi)。
enable_pb boolean 是否使用pb傳輸,默認(rèn)為true。如果不支持pb傳輸,請(qǐng)將該參數(shù)設(shè)置為false。
compress_method string 網(wǎng)絡(luò)傳輸?shù)膲嚎s算法,默認(rèn)不壓縮。可選項(xiàng):lz4deflate
print_debug_info boolean 是否打印DataHub的Debug信息,默認(rèn)為false。設(shè)置為true時(shí),會(huì)打印大量信息,這些信息僅用來進(jìn)行腳本調(diào)試。