Apache Flume是一個分布式、可靠和高可用的系統,用于從大量不同的數據源有效地收集、聚合和移動大量日志數據,進行集中式的數據存儲。Flume的核心是Agent,Agent中包含Source、Channel和Sink。本文為您介紹如何使用HDFS Sink寫入數據至JindoFS。

前提條件

已創建集群,并選擇了Flume服務。創建集群詳情,請參見創建集群

背景信息

Apache Flume的HDFS Sink通過Flush接口保證事務性寫入。JindoFS Block模式從SmartData 3.2.x及后續版本開始默認支持Flush接口,您可以直接配置Sink。

OSS本身不支持Flush接口,SmartData 3.4.x及后續版本支持Flume可恢復性寫入JindoFS Cache模式或OSS。通過支持Flush接口,雖然不能讓數據立刻可見,但是可以確保數據暫存在云端,當寫入程序發生Crash時,您可以通過調用SDK或命令行,恢復程序在Crash之前已經寫入JindoFS Cache模式或OSS的數據。

使用Flume寫入JindoFS Block模式

您需要配置Sink,各參數含義請參見Apache Flume。配置Sink示例如下:
# 配置JFS Sink

xxx.sinks.jfs_sink.hdfs.path = jfs://${your_ns_name}/flume_dir/%H%M/%S
xxx.sinks.jfs_sink.hdfs.round =true
xxx.sinks.jfs_sink.hdfs.roundValue = 15
xxx.sinks.jfs_sink.hdfs.Unit = minute
xxx.sinks.jfs_sink.hdfs.filePrefix = your_topic

# Sink參數,batchSize需要設置大一些,建議每次Flush的量在32MB以上,否則會影響性能。
xxx.sinks.jfs_sink.hdfs.batchSize = 100000
...
xxx.sinks.jfs_sink.rollSize = 3600
xxx.sinks.jfs_sink.threadsPoolSize = 30
xxx.sinks.jfs_sink.fileType = DataStream
xxx.sinks.jfs_sink.writeFormat = Text
說明 本文示例中的your_ns_name為您NameSpace的名稱。

使用Flume寫入JindoFS Cache模式或OSS

  1. 開啟Flush和Recover功能。
    具體步驟,請參見開啟Flush和Recover功能
  2. 配置Sink。
    各參數含義請參見Apache Flume。配置Sink示例如下:
    # 配置OSS Sink
    
    xxx.sinks.oss_sink.hdfs.path = oss://${your_bucket}/flume_dir/%H%M/%S
    xxx.sinks.oss_sink.hdfs.round = true
    xxx.sinks.oss_sink.hdfs.roundValue = 15
    xxx.sinks.oss_sink.hdfs.Unit = minute
    xxx.sinks.oss_sink.hdfs.filePrefix = <your_topic>
    
    # Sink參數,batchSize需要設置大一些,建議每次Flush的量在32MB以上,否則會影響性能。
    xxx.sinks.oss_sink.hdfs.batchSize = 100000
    ...
    xxx.sinks.oss_sink.rollSize = 3600
    xxx.sinks.oss_sink.threadsPoolSize = 30
    說明 本文示例中的your_bucket為您OSS Bucket的名稱。
  3. 恢復文件Flush的數據。
    具體步驟,請參見恢復文件Flush的數據

開啟Flush和Recover功能

使用Flume寫入JindoFS Cache模式時,需要開啟Flush和Recover功能。

  1. 進入SmartData服務。
    1. 登錄阿里云E-MapReduce控制臺
    2. 在頂部菜單欄處,根據實際情況選擇地域和資源組
    3. 單擊上方的集群管理頁簽。
    4. 集群管理頁面,單擊相應集群所在行的詳情
    5. 在左側導航欄單擊集群服務 > SmartData
  2. 進入smartdata-site頁面。
    1. 單擊配置頁簽。
    2. 服務配置頁面,單擊storage
  3. 單擊右上角的自定義配置,添加如下參數。
    參數 描述
    fs.jfs.cache.oss.flush.enable 是否開啟Flush和Recover功能,開啟時需要設置為true。
    fs.jfs.cache.flush.staging.path Flush的數據和Manifest的暫存區,默認值為/tmp。

    例如:在使用默認值時,如果文件的寫入路徑是:oss://test-bucket/dir1/file1,則Staging的路徑為oss://test-bucket/tmp/dir1/file1

  4. 保存配置。
    1. 單擊右上角的保存
    2. 確認修改對話框中,輸入執行原因,開啟自動更新配置
    3. 單擊確定

恢復文件Flush的數據

您可以使用Recover命令恢復數據。
jindo jfs -recover [-R]
                   [-flushStagingPath {flushStagingPath}]
                   [-accessKeyId ${accessKeyId}]
                   [-accessKeySecret ${accessKeySecret}]
                   <path>
參數 描述
-R 是否遞歸Recover,恢復文件夾時需要添加該參數。
-flushStagingPath Flush的數據和Manifest的暫存區,默認值為/tmp。

例如:在使用默認值時,如果文件的寫入路徑是:oss://test-bucket/dir1/file1,則Staging的路徑為oss://test-bucket/tmp/dir1/file1

-accessKeyId 阿里云賬號的AccessKey ID。
-accessKeySecret 阿里云賬號的AccessKey Secret。
path Flush的數據的寫入路徑。

例如:oss://test-bucket/dir1/file1oss://test-bucket/dir1

您也可以通過JindoOssFileSystem的Recover接口恢復文件或目錄。本示例為恢復文件夾。
JindoOssFileSystem jindoFileSystem = (JindoOssFileSystem) fs;
boolean isFolder = true;
jindoFileSystem.recover(path, isFolder);
說明 代碼中的path表示待恢復文件的路徑,isFolder表示是否需要遞歸恢復。如果是恢復文件夾,需要設置為true。如果是恢復文件,需要設置為false。