日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過實時計算處理數(shù)據(jù)并同步到Elasticsearch

當(dāng)您需要構(gòu)建一個日志檢索系統(tǒng)時,可通過實時計算Flink對日志數(shù)據(jù)進(jìn)行計算后,輸出到Elasticsearch進(jìn)行搜索。本文以阿里云日志服務(wù)SLS(Log Service)為例,為您介紹具體的實現(xiàn)方法。

前提條件

您已完成以下操作:

背景信息

阿里云實時計算Flink是阿里云官方支持的Flink產(chǎn)品,支持包括Kafka、Elasticsearch等多種輸入輸出系統(tǒng)。實時計算Flink與Elasticsearch結(jié)合,能夠滿足典型的日志檢索場景。

Kafka或LOG等系統(tǒng)中的日志,經(jīng)過Flink進(jìn)行簡單或者復(fù)雜計算之后,輸出到Elasticsearch進(jìn)行搜索。結(jié)合Flink的強(qiáng)大計算能力與Elasticsearch的強(qiáng)大搜索能力,可為業(yè)務(wù)提供實時數(shù)據(jù)加工及查詢,助力業(yè)務(wù)實時化轉(zhuǎn)型。

實時計算Flink為您提供了非常簡單的方式來對接Elasticsearch。例如當(dāng)前業(yè)務(wù)中的日志或者數(shù)據(jù)被寫入了LOG中,并且需要對LOG中的數(shù)據(jù)進(jìn)行計算之后再寫到Elasticsearch中進(jìn)行搜索,可通過以下鏈路實現(xiàn)。Flink+ES數(shù)據(jù)鏈路

操作步驟

  1. 登錄實時計算控制臺

  2. 創(chuàng)建實時計算作業(yè)。

    具體操作,請參見阿里云實時計算Blink獨享模式文檔《Blink SQL開發(fā)指南》中的《作業(yè)開發(fā)》 > 《開發(fā)》章節(jié)。

  3. 編寫Flink SQL。

    1. 創(chuàng)建日志服務(wù)LOG源表。

      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );

      WITH參數(shù)說明如下表。

      變量

      說明

      endPoint

      阿里云日志服務(wù)的公網(wǎng)服務(wù)入口,即訪問對應(yīng)LOG項目及其內(nèi)部日志數(shù)據(jù)的URL。詳細(xì)信息,請參見服務(wù)入口

      例如杭州區(qū)域的日志服務(wù)入口為:http://cn-hangzhou.log.aliyuncs.com。需要在對應(yīng)的服務(wù)入口前加http://

      accessId

      您賬號的AccessKey ID。

      accessKey

      您賬號的AccessKey Secret。

      startTime

      消費日志開始的時間點。運行Flink作業(yè)時所選時間要大于此處設(shè)置的時間。

      project

      LogService的項目名稱。

      logStore

      LogService項目下具體的LogStore名稱。

      consumerGroup

      日志服務(wù)的消費組名稱。

    2. 創(chuàng)建Elasticsearch結(jié)果表。

      重要
      • 實時計算3.2.2及以上版本增加了Elasticsearch結(jié)果表功能。創(chuàng)建Flink作業(yè)時,請注意所選的版本。

      • Elasticsearch結(jié)果表的實現(xiàn)使用了REST API,可以兼容Elasticsearch的各個版本。

      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch-7',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );

      WITH參數(shù)說明如下。

      參數(shù)

      說明

      endPoint

      阿里云Elasticsearch實例的公網(wǎng)地址,格式為http://<instanceid>.public.elasticsearch.aliyuncs.com:9200。可在實例的基本信息頁面獲取,詳細(xì)信息請參見查看實例的基本信息

      accessId

      訪問阿里云Elasticsearch實例的用戶名,默認(rèn)為elastic。

      accessKey

      對應(yīng)用戶的密碼。elastic用戶的密碼在創(chuàng)建實例時設(shè)定,如果忘記可進(jìn)行重置,重置密碼的注意事項和操作步驟,請參見重置實例訪問密碼

      index

      索引名稱。如果您還未創(chuàng)建過索引,需要先創(chuàng)建一個索引。具體操作,請參見步驟三:創(chuàng)建索引。您也可以開啟自動創(chuàng)建索引功能,自動創(chuàng)建對應(yīng)索引。具體操作,請參見配置YML參數(shù)

      typeName

      索引類型。7.0及以上版本的Elasticsearch實例必須為_doc

      說明
      • Elasticsearch支持根據(jù)PRIMARY KEY更新文檔,且PRIMARY KEY只能為1個字段。指定PRIMARY KEY后,文檔的ID為PRIMARY KEY字段的值。未指定PRIMARY KEY,文檔的ID由系統(tǒng)隨機(jī)生成。詳細(xì)信息,請參見Index API

      • Elasticsearch支持多種更新模式,對應(yīng)WITH中的參數(shù)為updateMode

        • 當(dāng)updateMode=full時,新增的文檔會完全覆蓋已存在的文檔。

        • 當(dāng)updateMode=inc時,Elasticsearch會根據(jù)輸入的字段值更新對應(yīng)的字段。

      • Elasticsearch所有的更新默認(rèn)為UPSERT語義,即INSERT或UPDATE。

    3. 處理業(yè)務(wù)邏輯并同步數(shù)據(jù)。

      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  4. 上線并啟動作業(yè)。

    上線并啟動作業(yè)后,即可將日志服務(wù)中的數(shù)據(jù)進(jìn)行簡單聚合后寫入阿里云Elasticsearch中。

更多信息

使用實時計算Flink+Elasticsearch,可幫助您快速創(chuàng)建實時搜索鏈路。如果您有更復(fù)雜的Elasticsearch寫入需求,可以使用實時計算Flink的自定義Sink功能來實現(xiàn)。