通過實時計算處理數(shù)據(jù)并同步到Elasticsearch
當(dāng)您需要構(gòu)建一個日志檢索系統(tǒng)時,可通過實時計算Flink對日志數(shù)據(jù)進(jìn)行計算后,輸出到Elasticsearch進(jìn)行搜索。本文以阿里云日志服務(wù)SLS(Log Service)為例,為您介紹具體的實現(xiàn)方法。
前提條件
您已完成以下操作:
開通阿里云實時計算服務(wù)并創(chuàng)建項目。
創(chuàng)建阿里云Elasticsearch實例。
具體操作,請參見創(chuàng)建阿里云Elasticsearch實例。
開通SLS服務(wù)、創(chuàng)建Project和Logstore。
背景信息
阿里云實時計算Flink是阿里云官方支持的Flink產(chǎn)品,支持包括Kafka、Elasticsearch等多種輸入輸出系統(tǒng)。實時計算Flink與Elasticsearch結(jié)合,能夠滿足典型的日志檢索場景。
Kafka或LOG等系統(tǒng)中的日志,經(jīng)過Flink進(jìn)行簡單或者復(fù)雜計算之后,輸出到Elasticsearch進(jìn)行搜索。結(jié)合Flink的強(qiáng)大計算能力與Elasticsearch的強(qiáng)大搜索能力,可為業(yè)務(wù)提供實時數(shù)據(jù)加工及查詢,助力業(yè)務(wù)實時化轉(zhuǎn)型。
實時計算Flink為您提供了非常簡單的方式來對接Elasticsearch。例如當(dāng)前業(yè)務(wù)中的日志或者數(shù)據(jù)被寫入了LOG中,并且需要對LOG中的數(shù)據(jù)進(jìn)行計算之后再寫到Elasticsearch中進(jìn)行搜索,可通過以下鏈路實現(xiàn)。
操作步驟
登錄實時計算控制臺。
創(chuàng)建實時計算作業(yè)。
具體操作,請參見阿里云實時計算Blink獨享模式文檔《Blink SQL開發(fā)指南》中的《作業(yè)開發(fā)》 > 《開發(fā)》章節(jié)。
編寫Flink SQL。
創(chuàng)建日志服務(wù)LOG源表。
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );
WITH參數(shù)說明如下表。
變量
說明
endPoint
阿里云日志服務(wù)的公網(wǎng)服務(wù)入口,即訪問對應(yīng)LOG項目及其內(nèi)部日志數(shù)據(jù)的URL。詳細(xì)信息,請參見服務(wù)入口。
例如杭州區(qū)域的日志服務(wù)入口為:http://cn-hangzhou.log.aliyuncs.com。需要在對應(yīng)的服務(wù)入口前加http://。
accessId
您賬號的AccessKey ID。
accessKey
您賬號的AccessKey Secret。
startTime
消費日志開始的時間點。運行Flink作業(yè)時所選時間要大于此處設(shè)置的時間。
project
LogService的項目名稱。
logStore
LogService項目下具體的LogStore名稱。
consumerGroup
日志服務(wù)的消費組名稱。
創(chuàng)建Elasticsearch結(jié)果表。
重要實時計算3.2.2及以上版本增加了Elasticsearch結(jié)果表功能。創(chuàng)建Flink作業(yè)時,請注意所選的版本。
Elasticsearch結(jié)果表的實現(xiàn)使用了REST API,可以兼容Elasticsearch的各個版本。
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch-7', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );
WITH參數(shù)說明如下。
參數(shù)
說明
endPoint
阿里云Elasticsearch實例的公網(wǎng)地址,格式為http://<instanceid>.public.elasticsearch.aliyuncs.com:9200。可在實例的基本信息頁面獲取,詳細(xì)信息請參見查看實例的基本信息。
accessId
訪問阿里云Elasticsearch實例的用戶名,默認(rèn)為elastic。
accessKey
對應(yīng)用戶的密碼。elastic用戶的密碼在創(chuàng)建實例時設(shè)定,如果忘記可進(jìn)行重置,重置密碼的注意事項和操作步驟,請參見重置實例訪問密碼。
index
索引名稱。如果您還未創(chuàng)建過索引,需要先創(chuàng)建一個索引。具體操作,請參見步驟三:創(chuàng)建索引。您也可以開啟自動創(chuàng)建索引功能,自動創(chuàng)建對應(yīng)索引。具體操作,請參見配置YML參數(shù)。
typeName
索引類型。7.0及以上版本的Elasticsearch實例必須為_doc。
說明Elasticsearch支持根據(jù)PRIMARY KEY更新文檔,且
PRIMARY KEY
只能為1個字段。指定PRIMARY KEY
后,文檔的ID為PRIMARY KEY
字段的值。未指定PRIMARY KEY
,文檔的ID由系統(tǒng)隨機(jī)生成。詳細(xì)信息,請參見Index API。Elasticsearch支持多種更新模式,對應(yīng)WITH中的參數(shù)為updateMode:
當(dāng)
updateMode=full
時,新增的文檔會完全覆蓋已存在的文檔。當(dāng)
updateMode=inc
時,Elasticsearch會根據(jù)輸入的字段值更新對應(yīng)的字段。
Elasticsearch所有的更新默認(rèn)為UPSERT語義,即INSERT或UPDATE。
處理業(yè)務(wù)邏輯并同步數(shù)據(jù)。
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
上線并啟動作業(yè)。
上線并啟動作業(yè)后,即可將日志服務(wù)中的數(shù)據(jù)進(jìn)行簡單聚合后寫入阿里云Elasticsearch中。
更多信息
使用實時計算Flink+Elasticsearch,可幫助您快速創(chuàng)建實時搜索鏈路。如果您有更復(fù)雜的Elasticsearch寫入需求,可以使用實時計算Flink的自定義Sink功能來實現(xiàn)。