在使用Logstash傳輸數(shù)據(jù)時(shí),在某些業(yè)務(wù)使用場景中,您可能需要切分源端數(shù)據(jù)并提取到字段中再寫入目標(biāo)端Elasticsearch集群。例如,源端Logs日志中存在以豎線(|
)分隔的數(shù)據(jù),此時(shí)您可以通過Logstash按照|
切分?jǐn)?shù)據(jù)并提取到字段中,再輸出到目標(biāo)端Elasticsearch集群。本文介紹如何通過Logstash切分?jǐn)?shù)據(jù)并提取到字段中。
背景信息
logstash-filter-mutate插件(過濾器插件)支持對事件中的字段進(jìn)行切分、重命名、刪除、替換和修改等操作,詳細(xì)信息請參見Mutate filter plugin。所有的過濾器插件都支持以下常見的可選配置項(xiàng),詳細(xì)信息請參見Common Option。
配置項(xiàng) | 輸入類型 |
---|---|
add_field | hash |
add_tag | array |
enable_metric | boolean |
id | string |
periodic_flush | boolean |
remove_field | array |
remove_tag | array |
前提條件
您已完成以下操作:
- 創(chuàng)建阿里云Elasticsearch實(shí)例。
具體操作,請參見創(chuàng)建阿里云Elasticsearch實(shí)例,本文以7.10版本實(shí)例為例。
- 開啟目標(biāo)Elasticsearch實(shí)例的自動(dòng)創(chuàng)建索引功能。具體操作請參見配置YML參數(shù)。說明 自動(dòng)創(chuàng)建的索引可能不符合您的預(yù)期,不建議開啟,本文僅供測試。在實(shí)際業(yè)務(wù)中,建議您先在目標(biāo)Elasticsearch實(shí)例中創(chuàng)建索引,再通過Logstash傳輸數(shù)據(jù)。創(chuàng)建索引的具體操作請參見快速入門。
- 創(chuàng)建阿里云Logstash實(shí)例,需要與Elasticsearch實(shí)例在同一專有網(wǎng)絡(luò)下。
具體操作,請參見創(chuàng)建阿里云Logstash實(shí)例。
- 準(zhǔn)備測試數(shù)據(jù)。本文以Beats采集的Logs中的某一條數(shù)據(jù)為例,關(guān)于Beats采集數(shù)據(jù)的詳細(xì)信息請參見采集ECS服務(wù)日志。如下測試數(shù)據(jù)中的LogMessage以
|
分隔,并存在多個(gè)||||
特殊符號。本文使用Logstash按照|
切分?jǐn)?shù)據(jù),被切分的字段分別寫入到對應(yīng)的字段中:mobile、appName、type、timestamp、status、code、component、cid、serviceId、serviceName、serviceType、param,最后將字段輸出到目標(biāo)端Elasticsearch集群中。LogMessage: |1390000****|jop|byORP|2022-04-18T14:18:16.633|/log/cms/send|200|pluginNums=0,pluginStatus=0||||||
操作步驟
- 進(jìn)入阿里云Elasticsearch控制臺的Logstash頁面。
- 進(jìn)入目標(biāo)實(shí)例。
- 在頂部菜單欄處,選擇地域。
- 在Logstash實(shí)例中單擊目標(biāo)實(shí)例ID。
- 在左側(cè)導(dǎo)航欄,單擊管道管理。
- 單擊創(chuàng)建管道。
- 在創(chuàng)建管道任務(wù)頁面,輸入管道ID并配置管道。本文使用的管道配置如下。
input { beats { port => 8001 } } filter { mutate { gsub => ["message","\|","| "] split => ["message","|"] add_field => { "mobile" => "%{[message][1]}" "appName" => "%{[message][2]}" "type" => "%{[message][3]}" "timestamp" => "%{[message][4]}" "status" => "%{[message][5]}" "code" => "%{[message][6]}" "component" => "%{[message][7]}" "cid" => "%{[message][8]}" "serviceId" => "%{[message][9]}" "serviceName" => "%{[message][10]}" "serviceType" => "%{[message][11]}" "param" => "%{[message][12]}" } } mutate { strip => ["mobile","appName","type","timestamp","status","code","component","cid","serviceId","serviceName","serviceType","param"] } } output { elasticsearch { index => "<yourIndexName>" hosts => ["es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200"] user => "elastic" password => "<yourPassword>" } }
重要- input.beat.port為Beats采集日志輸入到當(dāng)前Logstash管道的端口,需要使用8000~9000范圍內(nèi)的端口。
- 以上管道配置中的
gsub => ["message","\|","| "]
,第二個(gè)|
后有一個(gè)空格。 - 以上管道配置中的index、hosts和password參數(shù)值需要替換為您實(shí)際業(yè)務(wù)的索引名稱、Elasticsearch集群的訪問地址和集群的elastic賬號對應(yīng)的密碼。
以上管道配置的原理說明如下:- 通過Logstash的filter.mutate.gsub參數(shù),使用正則表達(dá)式
\|
去匹配LogMessage中的|
,將|
替換為|
(即|
+空格)。替換后的效果如下。LogMessage: | 1390000****| jop| byORP| 2022-04-18T14:18:16.633| /log/cms/send| 200| pluginNums=0,pluginStatus=0| | | | | |
- 通過filter.mutate.split參數(shù)將LogMessage按照
|
進(jìn)行切分。 - 通過filter.mutate.add_field參數(shù)添加字段,即將切分后的LogMessage一一添加到對應(yīng)的字段中。添加后的效果如下。
"mobile":" 1390000****", "appName":" jop", "type":" byORP", "timestamp":" 2022-04-18T14:18:16.633", "status":" /log/cms/sen", "code":" 200", "component":" pluginNums=0,pluginStatus=0", "cid":" ", "serviceId":" ", "serviceName":" ", "serviceType":" ", "param":" "
- 通過filter.mutate.strip參數(shù)去除字段空格。由于添加后的每個(gè)字段前面都有一個(gè)空格,因此需要去除這些空格。
更多管道配置說明,請參見通過配置文件管理管道和Logstash配置文件說明。
警告 配置完成后,需要保存并部署才能生效。保存并部署操作會(huì)觸發(fā)實(shí)例重啟,請?jiān)诓挥绊憳I(yè)務(wù)的前提下,繼續(xù)執(zhí)行以下步驟。 - 單擊保存或者保存并部署。
- 保存:將管道信息保存在Logstash里并觸發(fā)實(shí)例變更,配置不會(huì)生效。保存后,系統(tǒng)會(huì)返回管道管理頁面。可在管道列表區(qū)域,單擊操作列下的立即部署,觸發(fā)實(shí)例重啟,使配置生效。
- 保存并部署:保存并且部署后,會(huì)觸發(fā)實(shí)例重啟,使配置生效。
驗(yàn)證結(jié)果
- 登錄目標(biāo)阿里云Elasticsearch實(shí)例的Kibana控制臺,根據(jù)頁面提示進(jìn)入Kibana主頁。登錄Kibana控制臺的具體操作,請參見登錄Kibana控制臺。說明 本文以阿里云Elasticsearch 7.10.0版本為例,其他版本操作可能略有差別,請以實(shí)際界面為準(zhǔn)。
- 單擊右上角的Dev tools。
- 在Console中,執(zhí)行以下腳本,查詢目標(biāo)索引中的信息。
GET <yourIndexName>/_search { "query": { "match_all": {} } }
說明 <yourIndexName>需要與管道配置中的index參數(shù)值保持一致。預(yù)期結(jié)果如下。
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "<yourIndexName>", "_type" : "_doc", "_id" : "Lb1UWoAB-6Zo6en4luDi", "_score" : 1.0, "_source" : { "mobile" : "1390000****", "appName" : "jop", "type" : "byORP", "timestamp" : "2022-04-18T14:18:16.633", "status" : "/log/cms/sen", "code" : "200", "component" : "pluginNums=0,pluginStatus=0", "cid" : "", "serviceId" : "", "serviceName" : "", "serviceType" : "", "param" : "" } } ] } }