通過自定義Source,您可以自行擴展更多的數據源,例如,加密的數據流、自建的服務端口和專有的數據存儲中心等。本文通過示例為您介紹如何自定義Source。
前提條件
- 已創建集群,并且選擇了Flume服務,詳情請參見創建集群。
- 本地安裝了文件傳輸工具(SSH Secure File Transfer Client)。
操作步驟
- 創建自定義Source。
- 添加pom依賴。
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies>
說明1.9.0
為Flume的版本信息,需要根據您Flume的版本信息進行替換。 - 編寫自定義的Source類。
org.example.MySource
實現了一個按照特定格式打印日志的Source。package org.example; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; public class MySource extends AbstractSource implements Configurable, PollableSource { private String myDateFormat; private int myIntervalMS; @Override public void configure(Context context) { String myFormat = context.getString("dateFormat", "HH:mm:ss.SSS"); int myInterval = context.getInteger("intervalMS", 1000); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myDateFormat = myFormat; this.myIntervalMS = myInterval; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = new SimpleEvent(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat(myDateFormat); e.setBody((sdf.format(date)).getBytes()); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Exception e) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; e.printStackTrace(); } try { Thread.sleep(myIntervalMS); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
- 添加pom依賴。
- 將自定義的代碼打成JAR包。在pom.xml所在目錄,執行如下命令制作JAR包。
mvn clean package -DskipTests
- 使用文件傳輸工具,上傳生成的JAR包至Flume的/opt/apps/FLUME/flume-current/lib目錄。說明 非EMR集群時,請上傳到您實際Flume的安裝目錄。
- 新增配置。
- 通過SSH方式登錄集群,詳情請參見登錄集群。
- 執行以下命令,進入/conf目錄。
cd /opt/apps/FLUME/flume-current/conf
- 執行以下命令,新增配置文件。
vim mysource.conf
說明 本文示例中配置文件為mysource.conf,您可以自定義文件名稱。 - 添加如下內容至配置文件mysource.conf中。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = org.example.MySource a1.sources.r1.dateFormat = HH:mm:ss.SSS a1.sources.r1.intervalMS = 2000 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
說明 代碼中的dateFormat
表示日期格式,intervalMS
表示間隔時間,單位ms。
- 啟動Flume。
- 執行以下命令,進入/flume-current目錄。
cd /opt/apps/FLUME/flume-current
- 執行以下命令,啟動Flume。
bin/flume-ng agent --name a1 -c conf -f conf/mysource.conf -Dflume.root.logger=INFO,console
返回如下信息。2021-07-16 14:44:27,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2021-07-16 14:44:27,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2021-07-16 14:44:27,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1 2021-07-16 14:44:27,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 37 2E 37 30 35 14:44:27.705 } 2021-07-16 14:44:29,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 32 39 2E 37 30 39 14:44:29.709 } 2021-07-16 14:44:31,709 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 31 2E 37 30 39 14:44:31.709 } 2021-07-16 14:44:33,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 33 2E 37 31 30 14:44:33.710 } 2021-07-16 14:44:35,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 35 2E 37 31 30 14:44:35.710 } 2021-07-16 14:44:37,710 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 37 2E 37 31 30 14:44:37.710 } 2021-07-16 14:44:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 34 3A 34 34 3A 33 39 2E 37 31 30 14:44:39.710 }
- 執行以下命令,進入/flume-current目錄。