本文介紹如何使用E-MapReduce(簡稱EMR)的Flume實時同步日志服務(LogHub)的數據至E-MapReduce集群的HDFS,并根據數據記錄的時間戳將數據存入HDFS相應的分區中。
背景信息
您可以借助日志服務的Logtail工具,將需要同步的數據實時采集并上傳到LogHub,再使用E-MapReduce的Flume將LogHub的數據同步至EMR集群的HDFS。
采集數據到日志服務的LogHub的詳細步驟參見數據采集概述。
前提條件
已創建DataLake集群,并且選擇了Flume服務,詳情請參見創建集群。
操作步驟
- 配置Flume。
- 進入Flume的配置頁面。
- 登錄EMR on ECS控制臺。
- 在頂部菜單欄處,根據實際情況選擇地域和資源組。
- 在集群管理頁面,單擊目標集群操作列的集群服務。
- 在集群服務頁面,單擊FLUME服務區域的配置。
- 單擊flume-conf.properties頁簽。本文示例采用的是全局配置方式,如果您想按照節點配置,可以在FLUME服務配置頁面的下拉列表中選擇獨立節點配置。
- 在flume-conf.properties的參數值中,添加以下內容。
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.loghub.LogHubSource default-agent.sources.source1.endpoint = <yourLogHubEndpoint> default-agent.sources.source1.project = canaltest default-agent.sources.source1.logstore = canal default-agent.sources.source1.accessKeyId = yHiu*******BG2s default-agent.sources.source1.accessKey = ABctuw0M***************iKKljZy default-agent.sources.source1.useRecordTime = true default-agent.sources.source1.consumerGroup = consumer_1 default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H default-agent.sinks.k1.hdfs.fileType = DataStream default-agent.sinks.k1.hdfs.rollInterval = 3600 default-agent.sinks.k1.hdfs.round = true default-agent.sinks.k1.hdfs.roundValue = 60 default-agent.sinks.k1.hdfs.roundUnit = minute default-agent.sinks.k1.hdfs.rollSize = 0 default-agent.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 2000 default-agent.channels.c1.transactionCapacity = 2000 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
參數 說明 default-agent.sources.source1.type 固定值為org.apache.flume.source.loghub.LogHubSource。 default-agent.sources.source1.endpoint LogHub的Endpoint。 說明 如果使用VPC或經典網絡的Endpoint,需要保證與EMR集群在同一個地區;如果使用公網Endpoint,需要保證運行Flume agent的節點有公網IP。default-agent.sources.source1.project LogHub的項目名。 default-agent.sources.source1.logstore LogStore名稱。 default-agent.sources.source1.accessKeyId 阿里云的AccessKey ID。 default-agent.sources.source1.accessKey 阿里云的AccessKey Secret。 default-agent.sources.source1.useRecordTime 設置為true。 默認值為false。如果Header中沒有Timestamp屬性,接收Event的時間戳會被加入到Header中。但是在Flume Agent啟停或者同步滯后等情況下,會將數據放入錯誤的時間分區中。為避免這種情況,可以將該值設置為true,使用數據收集到LogHub的時間作為Timestamp。
default-agent.sources.source1.consumerGroup 消費組名稱,默認值為consumer_1。 default-agent.sources.source1.consumerPosition 消費組在第一次消費LogHub數據時的位置,默認值為end,即從最近的數據開始消費。 - begin:表示從最早的數據開始消費。
- special:表示從指定的時間點開始消費。
在配置為special時,需要配置startTime為開始消費的時間點,單位為秒。
default-agent.sources.source1.heartbeatInterval 消費組與服務端維持心跳的間隔,單位是毫秒,默認為30000毫秒。 default-agent.sources.source1.fetchInOrder 相同Key的數據是否按序消費,默認值為false。 default-agent.sources.source1.batchSize 通用的source batch配置,在一個批處理中寫入通道的最大消息數。 default-agent.sources.source1.batchDurationMillis 通用的source batch配置,在將批處理寫入通道之前的最大時間。 default-agent.sources.source1.backoffSleepIncrement 通用的source sleep配置,表示LogHub沒有數據時觸發Sleep的初始和增量等待時間。 default-agent.sources.source1.maxBackoffSleep 通用的source sleep配置,表示LogHub沒有數據時觸發Sleep的最大等待時間。 default-agent.sinks.k1.hdfs.path HDFS存儲路徑。例如,/tmp/flume-data/loghub/datetime=%y%m%d/hour=%H。 default-agent.sinks.k1.hdfs.fileType 保存到HDFS上的文件類型。固定為DataStream。 default-agent.sinks.k1.hdfs.rollInterval 設置多久生成一個新的文件,單位為秒。例如,3600。 default-agent.sinks.k1.hdfs.round 用于HDFS文件按照時間分區,時間戳向下取整。默認值為true。 default-agent.sinks.k1.hdfs.roundValue 當default-agent.sinks.k1.hdfs.round設置為true,配合default-agent.sinks.k1.hdfs.roundUnit時間單位一起使用。 例如,default-agent.sinks.k1.hdfs.roundUnit值為minute,該值設置為60,則表示60分鐘之內的數據寫到一個文件中,相當于每60分鐘生成一個文件。
default-agent.sinks.k1.hdfs.roundUnit 按時間分區使用的時間單位。默認值為minute。 default-agent.sinks.k1.hdfs.rollSize 當臨時文件達到該參數值時,滾動成目標文件,單位:byte。 該值設置為0,則表示文件不根據文件大小滾動生成。
default-agent.sinks.k1.hdfs.rollCount 當Event數據達到該數量時,將臨時文件滾動生成目標文件。 該值設置為0,則表示文件不根據Event數滾動生成。
default-agent.channels.c1.capacity 通道中存儲的最大事件數。例如,2000。 default-agent.channels.c1.transactionCapacity 每次Channel從Source獲取事件或推送給Sink的最大事件數。例如,2000。 配置項請遵循開源Flume內容,詳情請參見Avro Source、Taildir Source、Sink和Channel。
- 保存配置。
- 單擊下方的保存。
- 在彈出的對話框中,輸入執行原因,單擊確定。
- 進入Flume的配置頁面。
- 啟動服務。
- 在FLUME服務頁面,選擇更多操作 > 重啟。
- 在彈出的對話框中,輸入執行原因,單擊確定。
- 在確認對話框中,單擊確定。
- 啟動服務。
- 在FLUME服務頁面,選擇更多操作 > 重啟。
- 在彈出的對話框中,輸入執行原因,單擊確定。
- 在確認對話框中,單擊確定。
啟動成功后,您可以看到配置的HDFS路徑下按照Record Timestamp存儲的日志數據。