當您需要將Azure Event Hubs事件中心中的數據同步到阿里云Elasticsearch中時,可使用阿里云Logstash的管道配置功能實現。本文介紹具體的實現方法。

操作流程

  1. 步驟一:準備環境與實例
  2. 步驟二:創建并配置Logstash管道
  3. 步驟三:驗證結果

步驟一:準備環境與實例

  1. 創建阿里云Elasticsearch實例,并開啟自動創建索引功能。本文使用7.10版本的實例。
  2. 創建阿里云Logstash實例并配置NAT公網數據傳輸。本文使用7.4版本的實例。
    具體操作請參見創建阿里云Logstash實例
    由于阿里云Logstash實例部署在專有網絡VPC下,但在數據同步過程中,Logstash需要連接公網才能與Azure Event Hubs事件中心互通,因此需要通過配置NAT網關實現與公網連通,詳情請參見配置NAT公網數據傳輸
    說明 對于自建的Logstash,需要購買與阿里云Elasticsearch在同一VPC下的ECS實例(已符合條件的ECS不需要重復購買,需要綁定彈性公網IP)。
  3. 準備Azure Event Hubs事件中心的自建環境。
    具體操作請參見Azure Event Hubs官方文檔

步驟二:創建并配置Logstash管道

  1. 登錄阿里云Elasticsearch控制臺
  2. 進入目標實例。
    1. 在頂部菜單欄處,選擇地域。
    2. 在左側導航欄,單擊Logstash實例,然后在Logstash實例中單擊目標實例ID。
  3. 在左側導航欄,單擊管道管理
  4. 單擊創建管道
  5. 創建管道任務頁面,輸入管道ID并配置管道。
    本文使用的管道配置如下。
    input {
      azure_event_hubs {
         event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"]
         initial_position => "beginning"
         threads => 2
         decorate_events => true
         consumer_group => "group-kl"
         storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn"
         storage_container => "lettie_container"
       }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"]
        index => "test-log"
        password => "xxxxxx"
        user => "elastic"
      }
    }
    表 1. input參數說明
    參數 說明
    event_hub_connections 標識要讀取的事件中心的連接字符串列表。連接字符串包括事件中心的EntityPath。更多詳細說明,請參見event_hub_connections
    說明 每一個事件中心都會定義一個event_hub_connections參數,其他參數在各事件中心之間共享。
    initial_position 從事件中心讀取數據的位置,可選值:beginning(默認)、end和look_back。更多詳細說明,請參見initial position
    threads 處理事件的線程總數。更多詳細說明,請參見threads
    decorate_events 是否同步事件中心的元數據,包括事件中心名稱、consumer_group、processor_host、分區、偏移量、序列、時間戳和event_size。更多詳細說明,請參見decorate events
    consumer_group 用于讀取事件中心數據的消費者組。您需要專門為Logstash創建一個消費者組,并確保Logstash的所有節點都使用該消費者組,以便它們可以正常協同工作。更多詳細說明,請參見consumer group
    storage_connection Blob賬戶存儲的連接字符串。Blob賬戶存儲會保留重啟之間的偏移量,并確保Logstash的多個節點處理不同的分區。設置此值后,重啟將在處理中斷的地方開始。如果未設置此值,重啟將從initial_position設置的值的地方開始。更多詳細說明,請參見storage connection
    storage_container 用于持久保存偏移量并允許多個Logstash節點一起工作的存儲容器的名稱。更多詳細說明,請參見storage container
    說明 為避免覆蓋偏移量,建議使用不同的storage_container名稱。如果同一份數據分別寫入到不同的服務中,此參數需設置為不同的名稱。
    表 2. output參數說明
    參數 說明
    hosts Elasticsearch服務的訪問地址,需要設置為http://<阿里云Elasticsearch實例ID>.elasticsearch.aliyuncs.com:9200
    index 遷移后的索引名。
    user 訪問Elasticsearch服務的用戶名,默認為elastic。
    password 對應用戶的密碼。對于阿里云Elasticsearch,elastic用戶的密碼在創建實例時設定,如果忘記可進行重置,重置密碼的注意事項和操作步驟請參見重置實例訪問密碼

    更多Config配置詳情請參見Logstash配置文件說明

  6. 單擊下一步,配置管道參數。
    管道參數配置
    參數 說明
    管道工作線程 并行執行管道的Filter和Output的工作線程數量。當事件出現積壓或CPU未飽和時,請考慮增大線程數,更好地使用CPU處理能力。默認值:實例的CPU核數。
    管道批大小 單個工作線程在嘗試執行Filter和Output前,可以從Input收集的最大事件數目。較大的管道批大小可能會帶來較大的內存開銷。您可以設置LS_HEAP_SIZE變量,來增大JVM堆大小,從而有效使用該值。默認值:125。
    管道批延遲 創建管道事件批時,將過小的批分派給管道工作線程之前,要等候每個事件的時長,單位為毫秒。默認值:50ms。
    隊列類型 用于事件緩沖的內部排隊模型。可選值:
    • MEMORY:默認值。基于內存的傳統隊列。
    • PERSISTED:基于磁盤的ACKed隊列(持久隊列)。
    隊列最大字節數 請確保該值小于您的磁盤總容量。默認值:1024 MB。
    隊列檢查點寫入數 啟用持久性隊列時,在強制執行檢查點之前已寫入事件的最大數目。設置為0,表示無限制。默認值:1024。
    警告 配置完成后,需要保存并部署才能生效。保存并部署操作會觸發實例重啟,請在不影響業務的前提下,繼續執行以下步驟。
  7. 單擊保存或者保存并部署
    • 保存:將管道信息保存在Logstash里并觸發實例變更,配置不會生效。保存后,系統會返回管道管理頁面。可在管道列表區域,單擊操作列下的立即部署,觸發實例重啟,使配置生效。
    • 保存并部署:保存并且部署后,會觸發實例重啟,使配置生效。

步驟三:驗證結果

  1. 登錄目標阿里云Elasticsearch實例的Kibana控制臺,根據頁面提示進入Kibana主頁。
    登錄Kibana控制臺的具體操作,請參見登錄Kibana控制臺
    說明 本文以阿里云Elasticsearch 7.10.0版本為例,其他版本操作可能略有差別,請以實際界面為準。
  2. 單擊右上角的Dev tools
  3. Console中,執行如下命令,查看同步后數據。
    GET test-log3/_search
    {
      "query":{
        "match":{
          "message":"L23"
         }
       }
    }
    預期結果如下。預期結果