本文通過示例為您介紹E-MapReduce中的Flume組件,如何配置攔截器(Interceptor)、Channel選擇器(Channel Selector)和Sink組邏輯處理器(Sink Processor)。
攔截器
攔截器的位置在Source和Channel之間,用于修改或丟棄Event。攔截圖示意圖如下。
攔截器的主要類型如下表。
類型 | 描述 |
---|---|
時間戳攔截器 | 在Event Header中添加Unix時間戳屬性。 |
Host攔截器 | 在Event Header中添加Host屬性。 |
靜態攔截器 | 在Event Header中添加一個固定鍵值對屬性。 |
Header攔截器 | 在Event Header中刪除一個或多個屬性。 |
UUID攔截器 | 在Event中設置一個UUID。如果應用層沒有UUID,則可以使用該攔截器來默認添加。 |
Morphline攔截器 | 通過Morphline配置文件過濾Event或修改插入Event Header。 |
查找攔截器 | 使用Java正則表達式查找Event Body。 |
替換攔截器 | 使用Java正則表達式替換Event Body。 |
正則過濾攔截器 | 過濾配置匹配或者沒有匹配上正則表達式的Event Body。 |
相關示例如下:
- 示例1:Event Body包含
1:2:3.4foobar5
,如果想配置正則過濾器,則配置如下。a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
修改后的Event Body不變,后續的Header中增加了
one=>1, two=>2, three=>3
。 - 示例2:Event Body包含
2012-10-18 18:47:57,614 some log line
,如果想配置時間過濾器,則配置如下。a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
修改后的Event Body不變,后續的Header中增加了
timestamp=>1350611220000
。
Channel選擇器
Channel選擇器用于在Source與Channel一對多場景下選擇Channel。Channel選擇器示意圖如下。
Flume內置復制選擇器(Replicating)和多路復用選擇器(Multiplexing)兩種選擇器,默認為復制選擇器。復制選擇器會把所有Event發送到每個Channel,而多路復用選擇器,則會按照一定的規則發送。多路復用選擇器的示例如下。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
上述示例中,r1會選擇性地將Event發送給c1、c2、c3和c4四個Channel。如果Header中的state屬性值為CZ,則發送給c1,如果屬性值為US,則發送給c2和c3,其他情況默認發送給c4。
Sink組邏輯處理器
Sink組邏輯處理器示意圖請參見Channel選擇器的示意圖。
Sink組邏輯處理器用于多個Sink一同消費Channel隊列中的數據,并把這些Sink配置為負載均衡或故障轉移的工作方式。默認Sink與Channel是一對一的。配置為負載均衡方式,則根據配置的負載均衡機制,將Event分發到Sink中。配置為故障轉移方式,則表示多個Sink是一主多備的工作方式,當工作的Sink中止后,Event會被轉移到備用的Sink上。
相關示例如下:
- 示例1:故障轉移方式
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
上述示例中有k1和k2兩個Sink,權重分別是5和10,最大故障轉移時間是10000毫秒。
- 示例2:負載均衡方式
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
上述示例中有k1和k2兩個Sink,通過隨機選擇(random)方法進行負載分配,您也可以使用輪詢(round_robin)方法。示例中的
a1.sinkgroups.g1.processor.backoff
參數表示是否以指數的形式退避失敗的Sinks,設置為true,則Sink Processor會屏蔽故障的Sink。