本文通過示例為您介紹E-MapReduce中的Flume組件,如何配置攔截器(Interceptor)、Channel選擇器(Channel Selector)和Sink組邏輯處理器(Sink Processor)。

攔截器

攔截器的位置在Source和Channel之間,用于修改或丟棄Event。攔截圖示意圖如下。interceptor
攔截器的主要類型如下表。
類型 描述
時間戳攔截器 在Event Header中添加Unix時間戳屬性。
Host攔截器 在Event Header中添加Host屬性。
靜態攔截器 在Event Header中添加一個固定鍵值對屬性。
Header攔截器 在Event Header中刪除一個或多個屬性。
UUID攔截器 在Event中設置一個UUID。如果應用層沒有UUID,則可以使用該攔截器來默認添加。
Morphline攔截器 通過Morphline配置文件過濾Event或修改插入Event Header。
查找攔截器 使用Java正則表達式查找Event Body。
替換攔截器 使用Java正則表達式替換Event Body。
正則過濾攔截器 過濾配置匹配或者沒有匹配上正則表達式的Event Body。
相關示例如下:
  • 示例1:Event Body包含1:2:3.4foobar5,如果想配置正則過濾器,則配置如下。
    a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
    a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
    a1.sources.r1.interceptors.i1.serializers.s1.name = one
    a1.sources.r1.interceptors.i1.serializers.s2.name = two
    a1.sources.r1.interceptors.i1.serializers.s3.name = three

    修改后的Event Body不變,后續的Header中增加了one=>1, two=>2, three=>3

  • 示例2:Event Body包含2012-10-18 18:47:57,614 some log line,如果想配置時間過濾器,則配置如下。
    a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
    a1.sources.r1.interceptors.i1.serializers = s1
    a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
    a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
    a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

    修改后的Event Body不變,后續的Header中增加了timestamp=>1350611220000

Channel選擇器

Channel選擇器用于在Source與Channel一對多場景下選擇Channel。Channel選擇器示意圖如下。Selector
Flume內置復制選擇器(Replicating)和多路復用選擇器(Multiplexing)兩種選擇器,默認為復制選擇器。復制選擇器會把所有Event發送到每個Channel,而多路復用選擇器,則會按照一定的規則發送。多路復用選擇器的示例如下。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

上述示例中,r1會選擇性地將Event發送給c1、c2、c3和c4四個Channel。如果Header中的state屬性值為CZ,則發送給c1,如果屬性值為US,則發送給c2和c3,其他情況默認發送給c4。

Sink組邏輯處理器

Sink組邏輯處理器示意圖請參見Channel選擇器的示意圖。

Sink組邏輯處理器用于多個Sink一同消費Channel隊列中的數據,并把這些Sink配置為負載均衡或故障轉移的工作方式。默認Sink與Channel是一對一的。配置為負載均衡方式,則根據配置的負載均衡機制,將Event分發到Sink中。配置為故障轉移方式,則表示多個Sink是一主多備的工作方式,當工作的Sink中止后,Event會被轉移到備用的Sink上。Selector
相關示例如下:
  • 示例1:故障轉移方式
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000

    上述示例中有k1和k2兩個Sink,權重分別是5和10,最大故障轉移時間是10000毫秒。

  • 示例2:負載均衡方式
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random

    上述示例中有k1和k2兩個Sink,通過隨機選擇(random)方法進行負載分配,您也可以使用輪詢(round_robin)方法。示例中的a1.sinkgroups.g1.processor.backoff參數表示是否以指數的形式退避失敗的Sinks,設置為true,則Sink Processor會屏蔽故障的Sink。