本文為您介紹如何使用Flume同步EMR Kafka集群的數據至阿里云OSS-HDFS(JindoFS服務)。

背景信息

OSS-HDFS服務是一款云原生數據湖存儲產品,基于統一的元數據管理能力,在完全兼容HDFS文件系統接口的同時,提供充分的POSIX能力支持,能更好的滿足大數據和AI領域豐富多樣的數據湖計算場景,詳細信息請參見OSS-HDFS服務概述

前提條件

操作步驟

  1. 開啟OSS-HDFS。

    開通并授權訪問OSS-HDFS服務,具體操作請參見開通并授權訪問OSS-HDFS服務

  2. 配置Flume。
    1. 進入Flume的配置頁面。
      1. 登錄EMR on ECS控制臺
      2. 在頂部菜單欄處,根據實際情況選擇地域和資源組
      3. 集群管理頁面,單擊目標集群操作列的集群服務
      4. 集群服務頁面,單擊FLUME服務區域的配置
    2. 設置JVM最大可用內存(Xmx)。
      Flume向OSS寫入數據時,因為需要占用較大的JVM內存,所以可以增大Flume Agent的Xmx。
      1. 單擊flume-env.sh頁簽。

        本文示例采用的是全局配置方式,如果您想按照節點配置,可以在FLUME服務配置頁面的下拉列表中選擇獨立節點配置

      2. 修改JAVA_OPTS的參數值。

        例如,設置為1g,則參數值修改為-Xmx1g。

    3. 單擊flume-conf.properties頁簽。
      本文示例采用的是全局配置方式,如果您想按照節點配置,可以在FLUME服務配置頁面的下拉列表中選擇獨立節點配置
    4. flume-conf.properties的參數值中,添加以下內容。
      說明 代碼示例中的default-agent,請與FLUME服務配置頁面的agent_name參數的參數值保持一致。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>
      default-agent.sinks.k1.hdfs.fileType=DataStream
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      參數 描述
      default-agent.sources.source1.kafka.bootstrap.servers Kafka集群Broker的Host和端口號。
      default-agent.sinks.k1.hdfs.path OSS-HDFS的路徑。填寫格式為oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>

      例如,oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result。

      說明 <examplebucket>為Bucket名稱,<exampleregion>為地域ID,<exampledir>為OSS上的目錄名。
      default-agent.channels.c1.capacity 通道中存儲的最大事件數。請根據實際環境修改該參數值。
      default-agent.channels.c1.transactionCapacity 每個事務通道將從源接收或提供給接收器的最大事件數。請根據實際環境修改該參數值。
  3. 測試數據同步情況。
    1. 通過SSH方式連接DataFlow集群,詳情請參見登錄集群
    2. 創建名稱為flume-test的Topic。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. 生成測試數據。
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      例如輸入abc并回車。

    4. oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result路徑下會以當前時間的時間戳(毫秒)為后綴生成文件FlumeData.xxxx