在使用Logstash傳輸數(shù)據(jù)時(shí),在某些業(yè)務(wù)使用場景中,您可能需要切分源端數(shù)據(jù)并提取到字段中再寫入目標(biāo)端Elasticsearch集群。例如,源端Logs日志中存在以豎線(|)分隔的數(shù)據(jù),此時(shí)您可以通過Logstash按照|切分?jǐn)?shù)據(jù)并提取到字段中,再輸出到目標(biāo)端Elasticsearch集群。本文介紹如何通過Logstash切分?jǐn)?shù)據(jù)并提取到字段中。

背景信息

logstash-filter-mutate插件(過濾器插件)支持對事件中的字段進(jìn)行切分、重命名、刪除、替換和修改等操作,詳細(xì)信息請參見Mutate filter plugin。所有的過濾器插件都支持以下常見的可選配置項(xiàng),詳細(xì)信息請參見Common Option
配置項(xiàng)輸入類型
add_fieldhash
add_tagarray
enable_metricboolean
idstring
periodic_flushboolean
remove_fieldarray
remove_tagarray

前提條件

您已完成以下操作:
  • 創(chuàng)建阿里云Elasticsearch實(shí)例。

    具體操作,請參見創(chuàng)建阿里云Elasticsearch實(shí)例,本文以7.10版本實(shí)例為例。

  • 開啟目標(biāo)Elasticsearch實(shí)例的自動(dòng)創(chuàng)建索引功能。
    具體操作請參見配置YML參數(shù)
    說明 自動(dòng)創(chuàng)建的索引可能不符合您的預(yù)期,不建議開啟,本文僅供測試。在實(shí)際業(yè)務(wù)中,建議您先在目標(biāo)Elasticsearch實(shí)例中創(chuàng)建索引,再通過Logstash傳輸數(shù)據(jù)。創(chuàng)建索引的具體操作請參見快速入門
  • 創(chuàng)建阿里云Logstash實(shí)例,需要與Elasticsearch實(shí)例在同一專有網(wǎng)絡(luò)下。

    具體操作,請參見創(chuàng)建阿里云Logstash實(shí)例

  • 準(zhǔn)備測試數(shù)據(jù)。
    本文以Beats采集的Logs中的某一條數(shù)據(jù)為例,關(guān)于Beats采集數(shù)據(jù)的詳細(xì)信息請參見采集ECS服務(wù)日志。如下測試數(shù)據(jù)中的LogMessage|分隔,并存在多個(gè)||||特殊符號。本文使用Logstash按照|切分?jǐn)?shù)據(jù),被切分的字段分別寫入到對應(yīng)的字段中:mobile、appName、type、timestamp、status、code、component、cid、serviceId、serviceName、serviceType、param,最后將字段輸出到目標(biāo)端Elasticsearch集群中。
    LogMessage: |1390000****|jop|byORP|2022-04-18T14:18:16.633|/log/cms/send|200|pluginNums=0,pluginStatus=0||||||

操作步驟

  1. 進(jìn)入阿里云Elasticsearch控制臺的Logstash頁面
  2. 進(jìn)入目標(biāo)實(shí)例。
    1. 在頂部菜單欄處,選擇地域。
    2. Logstash實(shí)例中單擊目標(biāo)實(shí)例ID。
  3. 在左側(cè)導(dǎo)航欄,單擊管道管理
  4. 單擊創(chuàng)建管道
  5. 創(chuàng)建管道任務(wù)頁面,輸入管道ID并配置管道。
    本文使用的管道配置如下。
    input {
        beats {
            port => 8001
        }
    }
    filter {
        mutate {
            gsub => ["message","\|","| "]
            split => ["message","|"]
            add_field => {
                "mobile" => "%{[message][1]}"
                "appName" => "%{[message][2]}"
                "type" => "%{[message][3]}"
                "timestamp" => "%{[message][4]}"
                "status" => "%{[message][5]}"
                "code" => "%{[message][6]}"
                "component" => "%{[message][7]}"
                "cid" => "%{[message][8]}"
                "serviceId" => "%{[message][9]}"
                "serviceName" => "%{[message][10]}"
                "serviceType" => "%{[message][11]}"
                "param" => "%{[message][12]}"
            }
        }
        mutate {
            strip => ["mobile","appName","type","timestamp","status","code","component","cid","serviceId","serviceName","serviceType","param"]
        }
    }
    output {
        elasticsearch {
            index => "<yourIndexName>"
            hosts => ["es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "<yourPassword>"
        }
    }                
    重要
    • input.beat.port為Beats采集日志輸入到當(dāng)前Logstash管道的端口,需要使用8000~9000范圍內(nèi)的端口。
    • 以上管道配置中的gsub => ["message","\|","| "],第二個(gè)|后有一個(gè)空格。
    • 以上管道配置中的indexhostspassword參數(shù)值需要替換為您實(shí)際業(yè)務(wù)的索引名稱、Elasticsearch集群的訪問地址和集群的elastic賬號對應(yīng)的密碼。
    以上管道配置的原理說明如下:
    1. 通過Logstash的filter.mutate.gsub參數(shù),使用正則表達(dá)式\|去匹配LogMessage中的|,將|替換為| (即|+空格)。替換后的效果如下。
      LogMessage: | 1390000****| jop| byORP| 2022-04-18T14:18:16.633| /log/cms/send| 200| pluginNums=0,pluginStatus=0| | | | | |
    2. 通過filter.mutate.split參數(shù)將LogMessage按照|進(jìn)行切分。
    3. 通過filter.mutate.add_field參數(shù)添加字段,即將切分后的LogMessage一一添加到對應(yīng)的字段中。添加后的效果如下。
      "mobile":" 1390000****",
      "appName":" jop",
      "type":" byORP",
      "timestamp":" 2022-04-18T14:18:16.633",
      "status":" /log/cms/sen",
      "code":" 200",
      "component":" pluginNums=0,pluginStatus=0",
      "cid":" ",
      "serviceId":" ",
      "serviceName":" ",
      "serviceType":" ",
      "param":" "
    4. 通過filter.mutate.strip參數(shù)去除字段空格。由于添加后的每個(gè)字段前面都有一個(gè)空格,因此需要去除這些空格。

    更多管道配置說明,請參見通過配置文件管理管道Logstash配置文件說明

    警告 配置完成后,需要保存并部署才能生效。保存并部署操作會(huì)觸發(fā)實(shí)例重啟,請?jiān)诓挥绊憳I(yè)務(wù)的前提下,繼續(xù)執(zhí)行以下步驟。
  6. 單擊保存或者保存并部署
    • 保存:將管道信息保存在Logstash里并觸發(fā)實(shí)例變更,配置不會(huì)生效。保存后,系統(tǒng)會(huì)返回管道管理頁面。可在管道列表區(qū)域,單擊操作列下的立即部署,觸發(fā)實(shí)例重啟,使配置生效。
    • 保存并部署:保存并且部署后,會(huì)觸發(fā)實(shí)例重啟,使配置生效。

驗(yàn)證結(jié)果

  1. 登錄目標(biāo)阿里云Elasticsearch實(shí)例的Kibana控制臺,根據(jù)頁面提示進(jìn)入Kibana主頁。
    登錄Kibana控制臺的具體操作,請參見登錄Kibana控制臺
    說明 本文以阿里云Elasticsearch 7.10.0版本為例,其他版本操作可能略有差別,請以實(shí)際界面為準(zhǔn)。
  2. 單擊右上角的Dev tools
  3. Console中,執(zhí)行以下腳本,查詢目標(biāo)索引中的信息。
    GET <yourIndexName>/_search
    {
      "query": {
        "match_all": {}
      }
    }
    說明 <yourIndexName>需要與管道配置中的index參數(shù)值保持一致。

    預(yù)期結(jié)果如下。

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "<yourIndexName>",
            "_type" : "_doc",
            "_id" : "Lb1UWoAB-6Zo6en4luDi",
            "_score" : 1.0,
            "_source" : {
              "mobile" : "1390000****",
              "appName" : "jop",
              "type" : "byORP",
              "timestamp" : "2022-04-18T14:18:16.633",
              "status" : "/log/cms/sen",
              "code" : "200",
              "component" : "pluginNums=0,pluginStatus=0",
              "cid" : "",
              "serviceId" : "",
              "serviceName" : "",
              "serviceType" : "",
              "param" : ""
            }
          }
        ]
      }
    }