通過自定義Source,您可以自行擴展更多的數據源,例如,加密的數據流、自建的服務端口和專有的數據存儲中心等。本文通過示例為您介紹如何自定義Source。

前提條件

  • 已創建集群,并且選擇了Flume服務,詳情請參見創建集群
  • 本地安裝了文件傳輸工具(SSH Secure File Transfer Client)。

操作步驟

  1. 創建自定義Source。
    1. 添加pom依賴。
      <dependencies>
          <dependency>
              <groupId>org.apache.flume</groupId>
              <artifactId>flume-ng-core</artifactId>
              <version>1.9.0</version>
          </dependency>
      </dependencies>
      說明 1.9.0為Flume的版本信息,需要根據您Flume的版本信息進行替換。
    2. 編寫自定義的Source類。
      org.example.MySource實現了一個按照特定格式打印日志的Source。
      package org.example;
      
      import java.text.SimpleDateFormat;
      import java.util.Date;
      
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.PollableSource;
      import org.apache.flume.conf.Configurable;
      import org.apache.flume.event.SimpleEvent;
      import org.apache.flume.source.AbstractSource;
      
      public class MySource extends AbstractSource implements Configurable, PollableSource {
          private String myDateFormat;
          private int myIntervalMS;
      
          @Override
          public void configure(Context context) {
              String myFormat = context.getString("dateFormat", "HH:mm:ss.SSS");
              int myInterval = context.getInteger("intervalMS", 1000);
      
              // Process the myProp value (e.g. validation, convert to another type, ...)
      
              // Store myProp for later retrieval by process() method
              this.myDateFormat = myFormat;
              this.myIntervalMS = myInterval;
          }
      
          @Override
          public void start() {
              // Initialize the connection to the external client
          }
      
          @Override
          public void stop () {
              // Disconnect from external client and do any additional cleanup
              // (e.g. releasing resources or nulling-out field values) ..
          }
      
          @Override
          public Status process() throws EventDeliveryException {
              Status status = null;
      
              try {
                  // This try clause includes whatever Channel/Event operations you want to do
      
                  // Receive new data
                  Event e = new SimpleEvent();
      
                  Date date = new Date();
                  SimpleDateFormat sdf = new SimpleDateFormat(myDateFormat);
                  e.setBody((sdf.format(date)).getBytes());
      
                  // Store the Event into this Source's associated Channel(s)
                  getChannelProcessor().processEvent(e);
      
                  status = Status.READY;
      
              } catch (Exception e) {
                  // Log exception, handle individual exceptions as needed
      
                  status = Status.BACKOFF;
                  e.printStackTrace();
      
              }
      
              try {
                  Thread.sleep(myIntervalMS);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
      
              return status;
          }
      
          @Override
          public long getBackOffSleepIncrement() {
              return 0;
          }
      
          @Override
          public long getMaxBackOffSleepInterval() {
              return 0;
          }
      }
  2. 將自定義的代碼打成JAR包。
    pom.xml所在目錄,執行如下命令制作JAR包。
    mvn clean package -DskipTests
  3. 使用文件傳輸工具,上傳生成的JAR包至Flume的/opt/apps/FLUME/flume-current/lib目錄。
    說明 非EMR集群時,請上傳到您實際Flume的安裝目錄。
  4. 新增配置。
    1. 通過SSH方式登錄集群,詳情請參見登錄集群
    2. 執行以下命令,進入/conf目錄。
      cd /opt/apps/FLUME/flume-current/conf
    3. 執行以下命令,新增配置文件。
      vim mysource.conf
      說明 本文示例中配置文件為mysource.conf,您可以自定義文件名稱。
    4. 添加如下內容至配置文件mysource.conf中。
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      a1.sources.r1.type = org.example.MySource
      a1.sources.r1.dateFormat = HH:mm:ss.SSS
      a1.sources.r1.intervalMS = 2000
      
      a1.sinks.k1.type = logger
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      說明 代碼中的dateFormat表示日期格式,intervalMS表示間隔時間,單位ms。
  5. 啟動Flume。
    1. 執行以下命令,進入/flume-current目錄。
      cd /opt/apps/FLUME/flume-current
    2. 執行以下命令,啟動Flume。
      bin/flume-ng agent --name a1 -c conf -f conf/mysource.conf -Dflume.root.logger=INFO,console
      返回如下信息。
      2021-07-16 14:44:27,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
      2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
      2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
      2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
      2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
      2021-07-16 14:44:27,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 37 2E 37 30 35             14:44:27.705 }
      2021-07-16 14:44:29,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 39 2E 37 30 39             14:44:29.709 }
      2021-07-16 14:44:31,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 31 2E 37 30 39             14:44:31.709 }
      2021-07-16 14:44:33,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 33 2E 37 31 30             14:44:33.710 }
      2021-07-16 14:44:35,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 35 2E 37 31 30             14:44:35.710 }
      2021-07-16 14:44:37,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 37 2E 37 31 30             14:44:37.710 }
      2021-07-16 14:44:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 39 2E 37 31 30             14:44:39.710 }