日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Logstash導入數倉版

Logstash是開源的服務器端數據處理管道,能夠同時從多個數據源采集數據,然后對數據進行轉換,并將數據寫入指定的存儲中。云原生數據倉庫 AnalyticDB MySQL 版完全兼容MySQL,您可以將Logstash Input插件支持的任一數據源中的數據寫入AnalyticDB for MySQL。本文介紹如何使用Logstash將Kafka數據寫入AnalyticDB for MySQL數倉版

Logstash組件介紹

  • 輸入-采集各種樣式、大小和來源的數據

    在實際業務中,數據往往以各種各樣的形式分散或集中地存儲在多個系統中,Logstash支持多種數據輸入方式,可以在同一時間從多種數據源采集數據。Logstash能夠以連續的流式傳輸方式輕松地從用戶的日志、指標、Web應用、數據存儲以及AWS服務采集數據。

  • 過濾-實時解析和轉換數據

    數據從源傳輸到目標存儲的過程中,Logstash過濾器能夠解析各個事件,識別已命名的字段來構建結構,并將它們轉換成通用格式,從而更輕松、快速地分析和實現商業價值。

    • 使用Grok從非結構化數據中派生出結構化數據。

    • 從IP地址破譯出地理坐標。

    • 將PII數據匿名化,完全排除敏感字段。

    • 簡化整體處理,不受數據源、格式或架構的影響

  • 輸出-導出數據

    除了AnalyticDB for MySQL以外,Logstash提供多種數據輸出方向,靈活解鎖眾多下游用例。

操作步驟

Kafka是一個高吞吐量的分布式發布、訂閱日志服務,具有高可用、高性能、分布式、高擴展、持久性等特點。目前Kafka已經被各大公司廣泛使用,同時logstash也可以快速接入業務中,免去重復建設的麻煩。

  1. 在Apache Kafka服務器根目錄,執行以下命令安裝和更新插件。

    $ bin/plugin install 
    $ bin/plugin update

    Logstash從1.5版本開始集成Kafka,Logstash 1.5及以上版本中所有插件的目錄和命名都發生了改變,插件發布地址為Logstash-plugins

  2. 配置插件。

    • Input配置示例

      以下配置可以實現對Kafka讀取端(consumer)的基本使用。

      input {
          kafka {
              zk_connect => "localhost:2181"
              group_id => "Logstash"
              topic_id => "test"
              codec => plain
              reset_beginning => false # boolean (optional), default: false
              consumer_threads => 5  # number (optional), default: 1
              decorate_events => true # boolean (optional), default: false
          }
      }          

      參數說明:

      • group_id:消費者分組,可以通過組ID來指定,不同組之間的消費互不影響,相互隔離。

      • topic_id:指定消費話題(Topic),也可以理解為先訂閱某個話題,然后消費。

      • reset_beginning:指定Logstash啟動后從哪個位置開始讀取數據,默認是結束位置,即Logstash進程會從上次讀取結束時的偏移量開始繼續讀取數據;如果之前沒有消費過,則從頭讀取數據。

        如果您要導入原數據,需將reset_beginning值改為true, Logstash進程將從頭開始讀取數據,作用類似于cat ,但是Logstash讀到最后一行時不會終止,而是變成tail -F,繼續監聽相應數據。

      • decorate_events:指定輸出消息時會輸出自身信息,包括消費消息的大小、Topic來源以及consumer的group信息。

      • rebalance_max_retries:當有新的consumer(Logstash)加入到同一個group時,將會Rebalance ,此后將會有Partitions的消費端遷移到新的consumer上。如果一個consumer獲得了某個Partition的消費權限,那么它將會向Zookeeper注冊Partition Owner registry節點信息,但是有可能此時舊的consumer尚沒有釋放此節點,此值用于控制注冊節點的重試次數。

      • consumer_timeout_ms:在指定時間內沒有消息到達將拋出異常,該參數一般無需修改。

      更多Input參數配置請參見Input

      說明

      如果需要多個Logstash端協同消費同一個Topic,需要先把相應的Topic分多個Partitions(區),此時多個消費者消費將無法保證消息的消費順序性,然后把兩個或多個Logstash消費端配置成相同的group_idtopic_id

    • Output配置示例

      output {
          jdbc {
              driver_class => "com.mysql.jdbc.Driver"
              connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
              statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
          }
      }         

      參數說明:

      • connection_stringAnalyticDB for MySQL的連接地址。

      • statement:INSERT SQL的聲明數組。

      更多Output參數配置請參見Output

  3. 在Logstash安裝目錄中執行bin/Logstash -f config/xxxx.conf命令啟動任務,將Kafka數據寫入AnalyticDB for MySQL