通過自定義Sink,您可以自行擴展更多的數據存儲組件,或者根據需求裁剪和優化現有Sink的功能。本文通過示例為您介紹如何自定義Sink。

前提條件

  • 已創建集群,并且選擇了Flume服務,詳情請參見創建集群
  • 本地安裝了文件傳輸工具(SSH Secure File Transfer Client)。

操作步驟

  1. 創建自定義Sink。
    1. 添加pom依賴。
      <dependencies>
          <dependency>
              <groupId>org.apache.flume</groupId>
              <artifactId>flume-ng-core</artifactId>
              <version>1.9.0</version>
          </dependency>
      </dependencies>
      說明 1.9.0為Flume的版本信息,需要根據您創建集群的信息替換。
    2. 編寫自定義的Sink類。
      org.example.MySink仿照LoggerSink實現了一個默認Buffer更大的Sink。
      package org.example;
      
      import org.apache.flume.Channel;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.Transaction;
      import org.apache.flume.conf.Configurable;
      import org.apache.flume.event.EventHelper;
      import org.apache.flume.sink.AbstractSink;
      import org.apache.flume.sink.LoggerSink;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class MySink extends AbstractSink implements Configurable {
          private static final Logger logger = LoggerFactory.getLogger(MySink.class);
      
          // Default Max bytes to dump
          public static final int DEFAULT_MAX_BYTE_DUMP = 32;
      
          // Max number of bytes to be dumped
          private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
      
          public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog";
      
          private String myProp;
      
          @Override
          public void configure(Context context) {
              this.maxBytesToLog = context.getInteger(MAX_BYTES_DUMP_KEY, DEFAULT_MAX_BYTE_DUMP);
          }
      
          @Override
          public void start() {
              // Initialize the connection to the external repository (e.g. HDFS) that
              // this Sink will forward Events to ..
          }
      
          @Override
          public void stop () {
              // Disconnect from the external respository and do any
              // additional cleanup (e.g. releasing resources or nulling-out
              // field values) ..
          }
      
          @Override
          public Status process() throws EventDeliveryException {
              Status status = Status.READY;
      
              // Start transaction
              Channel ch = getChannel();
              Transaction txn = ch.getTransaction();
              Event event = null;
              try {
                  txn.begin();
                  // This try clause includes whatever Channel operations you want to do
                  event = ch.take();
                  // Send the Event to the external repository.
                  // storeSomeData(e);
      
                  if (event != null) {
                      if (logger.isInfoEnabled()) {
                          logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
                      }
                  } else {
                      // No event found, request back-off semantics from the sink runner
                      status = Status.BACKOFF;
                  }
                  txn.commit();
              } catch (Exception e) {
                  txn.rollback();
                  throw new EventDeliveryException("Failed to log event: " + event, e);
              } finally {
                  txn.close();
              }
              return status;
          }
      }
  2. 將自定義的代碼打成JAR包。
    pom.xml所在目錄,執行如下命令制作JAR包。
    mvn clean package -DskipTests
  3. 使用文件傳輸工具,上傳生成的JAR包至Flume的/opt/apps/FLUME/flume-current/lib目錄。
    說明 非EMR集群時,請上傳到您實際Flume的安裝目錄。
  4. 新增配置。
    1. 通過SSH方式登錄集群,詳情請參見登錄集群
    2. 執行以下命令,進入/conf目錄。
      cd /opt/apps/FLUME/flume-current/conf
    3. 執行以下命令,新增配置文件。
      vim custom_sink.conf
      說明 本文示例中配置文件為custom_sink.conf,您可以自定義文件名稱。
    4. 添加如下內容至配置文件custom_sink.conf中。
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      a1.sources.r1.type = org.apache.flume.source.StressSource
      a1.sources.r1.maxEventsPerSecond = 1
      a1.sources.r1.batchSize = 1
      a1.sources.r1.maxTotalEvents = 100
      
      a1.sinks.k1.type = org.example.MySink
      a1.sinks.k1.maxBytesToLog = 64
      
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      說明 代碼中的maxBytesToLog表示Buffer最大字節數。
  5. 啟動Flume。
    1. 執行以下命令,進入/flume-current目錄。
      cd /opt/apps/FLUME/flume-current
    2. 執行以下命令,啟動Flume。
      bin/flume-ng agent --name a1 -c conf -f conf/custom_sink.conf  -Dflume.root.logger=INFO,console
      返回如下信息。
      2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
      2021-07-16 14:49:29,024 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms
      2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
      2021-07-16 14:49:29,118 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
      2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
      2021-07-16 14:49:29,525 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
      2021-07-16 14:49:29,526 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.StressSource.doStart(StressSource.java:169)] Stress source doStart finished
      2021-07-16 14:49:29,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:30,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:31,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:32,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }
      2021-07-16 14:49:33,006 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.example.MySink.process(MySink.java:66)] Event: { headers:{} body: 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000010 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000020 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................
      00000030 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F 7F ................ }