Logstash是開源的服務器端數據處理管道,能夠同時從多個數據源采集數據,然后對數據進行轉換,并將數據寫入指定的存儲中。云原生數據倉庫 AnalyticDB MySQL 版完全兼容MySQL,您可以將Logstash Input插件支持的任一數據源中的數據寫入AnalyticDB for MySQL。本文介紹如何使用Logstash將Kafka數據寫入AnalyticDB for MySQL數倉版。
Logstash組件介紹
輸入-采集各種樣式、大小和來源的數據
在實際業務中,數據往往以各種各樣的形式分散或集中地存儲在多個系統中,Logstash支持多種數據輸入方式,可以在同一時間從多種數據源采集數據。Logstash能夠以連續的流式傳輸方式輕松地從用戶的日志、指標、Web應用、數據存儲以及AWS服務采集數據。
過濾-實時解析和轉換數據
數據從源傳輸到目標存儲的過程中,Logstash過濾器能夠解析各個事件,識別已命名的字段來構建結構,并將它們轉換成通用格式,從而更輕松、快速地分析和實現商業價值。
使用Grok從非結構化數據中派生出結構化數據。
從IP地址破譯出地理坐標。
將PII數據匿名化,完全排除敏感字段。
簡化整體處理,不受數據源、格式或架構的影響
輸出-導出數據
除了AnalyticDB for MySQL以外,Logstash提供多種數據輸出方向,靈活解鎖眾多下游用例。
操作步驟
Kafka是一個高吞吐量的分布式發布、訂閱日志服務,具有高可用、高性能、分布式、高擴展、持久性等特點。目前Kafka已經被各大公司廣泛使用,同時logstash也可以快速接入業務中,免去重復建設的麻煩。
在Apache Kafka服務器根目錄,執行以下命令安裝和更新插件。
$ bin/plugin install $ bin/plugin update
Logstash從1.5版本開始集成Kafka,Logstash 1.5及以上版本中所有插件的目錄和命名都發生了改變,插件發布地址為Logstash-plugins。
配置插件。
Input配置示例
以下配置可以實現對Kafka讀取端(consumer)的基本使用。
input { kafka { zk_connect => "localhost:2181" group_id => "Logstash" topic_id => "test" codec => plain reset_beginning => false # boolean (optional), default: false consumer_threads => 5 # number (optional), default: 1 decorate_events => true # boolean (optional), default: false } }
參數說明:
group_id
:消費者分組,可以通過組ID來指定,不同組之間的消費互不影響,相互隔離。topic_id
:指定消費話題(Topic),也可以理解為先訂閱某個話題,然后消費。reset_beginning
:指定Logstash啟動后從哪個位置開始讀取數據,默認是結束位置,即Logstash進程會從上次讀取結束時的偏移量開始繼續讀取數據;如果之前沒有消費過,則從頭讀取數據。如果您要導入原數據,需將
reset_beginning
值改為true
, Logstash進程將從頭開始讀取數據,作用類似于cat ,但是Logstash讀到最后一行時不會終止,而是變成tail -F
,繼續監聽相應數據。decorate_events
:指定輸出消息時會輸出自身信息,包括消費消息的大小、Topic來源以及consumer的group信息。rebalance_max_retries
:當有新的consumer(Logstash)加入到同一個group時,將會Rebalance ,此后將會有Partitions的消費端遷移到新的consumer上。如果一個consumer獲得了某個Partition的消費權限,那么它將會向Zookeeper注冊Partition Owner registry節點信息,但是有可能此時舊的consumer尚沒有釋放此節點,此值用于控制注冊節點的重試次數。consumer_timeout_ms
:在指定時間內沒有消息到達將拋出異常,該參數一般無需修改。
更多Input參數配置請參見Input。
說明如果需要多個Logstash端協同消費同一個Topic,需要先把相應的Topic分多個Partitions(區),此時多個消費者消費將無法保證消息的消費順序性,然后把兩個或多個Logstash消費端配置成相同的
group_id
和topic_id
。Output配置示例
output { jdbc { driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD" statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ] } }
參數說明:
connection_string
:AnalyticDB for MySQL的連接地址。statement
:INSERT SQL的聲明數組。
更多Output參數配置請參見Output。
在Logstash安裝目錄中執行
bin/Logstash -f config/xxxx.conf
命令啟動任務,將Kafka數據寫入AnalyticDB for MySQL。