本文介紹如何使用E-MapReduce(簡稱EMR)的Flume實時同步日志服務(LogHub)的數據至E-MapReduce集群的HDFS,并根據數據記錄的時間戳將數據存入HDFS相應的分區中。

背景信息

您可以借助日志服務的Logtail工具,將需要同步的數據實時采集并上傳到LogHub,再使用E-MapReduce的Flume將LogHub的數據同步至EMR集群的HDFS。

采集數據到日志服務的LogHub的詳細步驟參見數據采集概述

前提條件

已創建DataLake集群,并且選擇了Flume服務,詳情請參見創建集群

操作步驟

  1. 配置Flume。
    1. 進入Flume的配置頁面。
      1. 登錄EMR on ECS控制臺
      2. 在頂部菜單欄處,根據實際情況選擇地域和資源組
      3. 集群管理頁面,單擊目標集群操作列的集群服務
      4. 集群服務頁面,單擊FLUME服務區域的配置
    2. 單擊flume-conf.properties頁簽。
      本文示例采用的是全局配置方式,如果您想按照節點配置,可以在FLUME服務配置頁面的下拉列表中選擇獨立節點配置
    3. flume-conf.properties的參數值中,添加以下內容。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.loghub.LogHubSource
      default-agent.sources.source1.endpoint = <yourLogHubEndpoint>
      default-agent.sources.source1.project = canaltest
      default-agent.sources.source1.logstore = canal
      default-agent.sources.source1.accessKeyId = yHiu*******BG2s
      default-agent.sources.source1.accessKey = ABctuw0M***************iKKljZy
      default-agent.sources.source1.useRecordTime = true
      default-agent.sources.source1.consumerGroup = consumer_1
      
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H
      default-agent.sinks.k1.hdfs.fileType = DataStream
      default-agent.sinks.k1.hdfs.rollInterval = 3600
      default-agent.sinks.k1.hdfs.round = true
      default-agent.sinks.k1.hdfs.roundValue = 60
      default-agent.sinks.k1.hdfs.roundUnit = minute
      default-agent.sinks.k1.hdfs.rollSize = 0
      default-agent.sinks.k1.hdfs.rollCount = 0
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 2000
      default-agent.channels.c1.transactionCapacity = 2000
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      參數 說明
      default-agent.sources.source1.type 固定值為org.apache.flume.source.loghub.LogHubSource。
      default-agent.sources.source1.endpoint LogHub的Endpoint。
      說明 如果使用VPC或經典網絡的Endpoint,需要保證與EMR集群在同一個地區;如果使用公網Endpoint,需要保證運行Flume agent的節點有公網IP。
      default-agent.sources.source1.project LogHub的項目名。
      default-agent.sources.source1.logstore LogStore名稱。
      default-agent.sources.source1.accessKeyId 阿里云的AccessKey ID。
      default-agent.sources.source1.accessKey 阿里云的AccessKey Secret。
      default-agent.sources.source1.useRecordTime 設置為true。

      默認值為false。如果Header中沒有Timestamp屬性,接收Event的時間戳會被加入到Header中。但是在Flume Agent啟停或者同步滯后等情況下,會將數據放入錯誤的時間分區中。為避免這種情況,可以將該值設置為true,使用數據收集到LogHub的時間作為Timestamp。

      default-agent.sources.source1.consumerGroup 消費組名稱,默認值為consumer_1。
      default-agent.sources.source1.consumerPosition 消費組在第一次消費LogHub數據時的位置,默認值為end,即從最近的數據開始消費。
      • begin:表示從最早的數據開始消費。
      • special:表示從指定的時間點開始消費。

        在配置為special時,需要配置startTime為開始消費的時間點,單位為秒。

      首次運行后LogHub服務端會記錄消費組的消費點,此時如果想更改 consumerPosition,可以清除LogHub的消費組狀態,或者更改配置consumerGroup為新的消費組。
      default-agent.sources.source1.heartbeatInterval 消費組與服務端維持心跳的間隔,單位是毫秒,默認為30000毫秒。
      default-agent.sources.source1.fetchInOrder 相同Key的數據是否按序消費,默認值為false。
      default-agent.sources.source1.batchSize 通用的source batch配置,在一個批處理中寫入通道的最大消息數。
      default-agent.sources.source1.batchDurationMillis 通用的source batch配置,在將批處理寫入通道之前的最大時間。
      default-agent.sources.source1.backoffSleepIncrement 通用的source sleep配置,表示LogHub沒有數據時觸發Sleep的初始和增量等待時間。
      default-agent.sources.source1.maxBackoffSleep 通用的source sleep配置,表示LogHub沒有數據時觸發Sleep的最大等待時間。
      default-agent.sinks.k1.hdfs.path HDFS存儲路徑。例如,/tmp/flume-data/loghub/datetime=%y%m%d/hour=%H。
      default-agent.sinks.k1.hdfs.fileType 保存到HDFS上的文件類型。固定為DataStream。
      default-agent.sinks.k1.hdfs.rollInterval 設置多久生成一個新的文件,單位為秒。例如,3600。
      default-agent.sinks.k1.hdfs.round 用于HDFS文件按照時間分區,時間戳向下取整。默認值為true。
      default-agent.sinks.k1.hdfs.roundValue default-agent.sinks.k1.hdfs.round設置為true,配合default-agent.sinks.k1.hdfs.roundUnit時間單位一起使用。

      例如,default-agent.sinks.k1.hdfs.roundUnit值為minute,該值設置為60,則表示60分鐘之內的數據寫到一個文件中,相當于每60分鐘生成一個文件。

      default-agent.sinks.k1.hdfs.roundUnit 按時間分區使用的時間單位。默認值為minute。
      default-agent.sinks.k1.hdfs.rollSize 當臨時文件達到該參數值時,滾動成目標文件,單位:byte。

      該值設置為0,則表示文件不根據文件大小滾動生成。

      default-agent.sinks.k1.hdfs.rollCount 當Event數據達到該數量時,將臨時文件滾動生成目標文件。

      該值設置為0,則表示文件不根據Event數滾動生成。

      default-agent.channels.c1.capacity 通道中存儲的最大事件數。例如,2000。
      default-agent.channels.c1.transactionCapacity 每次Channel從Source獲取事件或推送給Sink的最大事件數。例如,2000。

      配置項請遵循開源Flume內容,詳情請參見Avro SourceTaildir SourceSinkChannel

    4. 保存配置。
      1. 單擊下方的保存
      2. 在彈出的對話框中,輸入執行原因,單擊確定
  2. 啟動服務。
    1. 在FLUME服務頁面,選擇更多操作 > 重啟
    2. 在彈出的對話框中,輸入執行原因,單擊確定
    3. 確認對話框中,單擊確定
  3. 啟動服務。
    1. 在FLUME服務頁面,選擇更多操作 > 重啟
    2. 在彈出的對話框中,輸入執行原因,單擊確定
    3. 確認對話框中,單擊確定
    啟動成功后,您可以看到配置的HDFS路徑下按照Record Timestamp存儲的日志數據。