本文為您介紹CEP中規則的JSON格式相關信息。

目標人群

  • 客戶風控平臺開發人員:對Flink CEP較熟悉的平臺研發人員應能快速學習本格式,并根據自身平臺需求判斷是否需要進一步封裝。
  • 客戶風控策略人員:只熟悉具體策略但缺乏Java經驗的同學,在熟悉CEP概念的基礎上,也可快速上手本格式的使用來編寫新規則,使其在上線的風控作業中應用。

JSON格式定義

對于一個事件序列(Event Sequence)中的模式(Pattern),我們可以將其看作一個圖(Graph),圖中節點(Node)為針對某些事件(Event)的模式,節點之間的邊(Edge)為事件選擇策略(Event Selection Strategy),即如何從一類模式的匹配轉移到另一類模式的匹配。每個圖也可以看作一個更大的圖的子節點,從而允許模式的嵌套。基于以上考慮,阿里云實時計算Flink定義了一套基于JSON的規范來描述CEP中的規則,進而方便規則的存儲與修改,該規范中各個字段的含義如下。
  • 節點(Node)定義
    一個節點(Node)即一個完整的模式(Pattern),它包含如下屬性。
    字段名描述類型是否必填備注
    namePattern名稱。string一個唯一的字符串。
    說明 不同節點的名稱不能重復。
    type該Node類型。enum(string)
    • 對于包含子Pattern的節點,該字段值為COMPOSITE。
    • 對于無子Pattern的節點,該字段值為ATOMIC。
    quantifier量詞,用于描述如何匹配該Pattern,例如只匹配一次。dict請參見本文量詞(Quantifier)定義。
    condition條件。dict請參見本文條件(Condition)定義。
  • 量詞(Quantifier)定義
    量詞的作用是描述對于滿足該Pattern的事件要如何匹配。例如模式 "A*" 對應的量詞properties為LOOPING,該Pattern內部的事件選擇策略為SKIP_TILL_ANY。
    字段名描述類型是否必填備注
    consumingStrategy事件選擇策略。enum(string)僅支持以下取值:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY

    取值及含義請參見本文連續性定義。

    times用于描述該Pattern需要匹配多少次。dict
    取值示例如下。
    "times": {
              "from": 3,
              "to": 3,
              "windowTime": {
              "unit": "MINUTES",
              "size": 12
              }
            },
    其中from和to的數據類型均為integer,windowTime的單位可以為DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。
    說明 windowTime可以設為null,即"windowTime": null
    properties描述該量詞所具有的屬性。array of enumString取值及含義請參見本文量詞屬性含義。
    untilCondition 停止條件。
    說明 僅可在LOOPING量詞修飾的Pattern后使用。
    dict 取值及含義請參見本文條件(Condition)定義。
  • 條件(Condition)定義
    條件用于篩選符合某些要求的事件。例如要篩選瀏覽時長超過5分鐘的客戶,瀏覽時長超過5分鐘即為一個條件。
    字段名描述類型是否必填備注
    type條件類型。enum(string)條件類型取值如下:
    • CLASS:對應用戶自定義的條件。
    • AVIATOR:對應基于AVIATOR表達式的條件。
    • GROOVY:對應基于GROOVY表達式的條件。
    ...其他可序列化的自定義字段。...
    目前我們支持以下幾種Condition:
    • Class類型Condition
      字段名描述類型是否必填備注
      type條件類型。enum(string)固定值為Class。
      className類名。string該class完整類名,例如com.alibaba.ververica.cep.demo.StartCondition
    • 包含自定義參數的Condition

      用戶在使用普通的Class類型Condition時,只能傳入類名(className),而無法動態地傳入參數。在動態CEP支持中,為了提供更豐富的Condition表達能力,我們設計并實現了包含自定義參數的Condition(即CustomArgsCondition),從而允許用戶在JSON中通過字符串數組來設置CustomArgsCondition所需參數, 進而動態構造CustomArgsCondition實例。這一特性允許用戶動態更新Condition的參數,而無需修改Java代碼。

      字段名描述類型是否必填備注
      type條件類型。enum(string)固定值為Class。
      className類名。string該class完整類名,例如com.alibaba.ververica.cep.demo.CustomMiddleCondition
      args自定義參數。array of string一個字符串數組。
    • 基于Aviator表達式的Condition
      Aviator是一個表達式求值引擎,可以動態地將表達式編譯成字節碼(詳情請參見aviatorscript)。因此我們可以在作業中使用基于Aviator表達式的Condition,使得條件的閾值也可以動態修改,而無需修改Java代碼重新編譯運行。
      字段名描述類型是否必填備注
      type類名。string固定值為AVIATOR。
      expression表達式字符串。string形如price > 10這樣的表達式字符串(price變量名來自于Java代碼中定義的字段)。

      您可以將該字符串在數據庫中的值進行修改。例如修改為price > 20,Flink CEP作業會動態加載price > 20構造新的AviatorCondition來處理之后的事件。

    • 基于Groovy表達式的Condition
      Groovy是一個基于JVM平臺的動態語言(Groovy語法可以參見syntax)。動態CEP支持使用Groovy表達式來定義條件(Condition),從而允許動態修改條件的閾值。
      字段名描述類型是否必填備注
      type類名。string固定值為GROOVY。
      expression表達式字符串。string形如price > 5.0 && name.contains("mid")這樣的表達式字符串(pricename等變量名來自于Java代碼中定義的字段)。您可以將該字符串在數據庫中的值進行修改。例如修改為price > 20 && name.contains("end"),Flink CEP作業會動態加載新的Groovy字符串并構造新的GroovyCondition來處理之后的事件。
  • 邊(Edge)定義
    字段名描述類型是否必填備注
    source源模式名稱。string無。
    target目標模式名稱。string無。
    type事件選擇策略。dict支持以下取值:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY
    • NOT_FOLLOW
    • NOT_NEXT

    取值及含義請參見本文連續性定義。

  • 圖(GraphNode extends Node)定義

    一個圖(GraphNode)代表一個完整的Pattern序列,它的節點(nodes)是各個獨立的Pattern,邊(edges)代表如何從一類Pattern的匹配轉移到另一類Pattern的匹配。

    為了支持Pattern的嵌套(即GroupPattern),我們將一個GraphNode看作是Node的子類,即一個GraphNode可以作為一個更大的GraphNode中的Node。GraphNode相比于基礎Node,額外多了以下2類字段:
    • 描述圖的結構的nodes字段與edges字段。
    • 描述圖內時間窗口策略的window字段與事件匹配后的跳出策略afterMatchSkipStrategy字段。
    GraphNode的字段詳情請參見下表。
    字段名描述類型是否必填備注
    name該復合Pattern名稱。String一個唯一的字符串。
    說明 不同Graph名稱不能重復。
    type該Node類型。enum(string)固定值為COMPOSITE。
    version該Graph使用的JSON格式的版本號。Int默認值為1。
    nodes該Pattern內嵌套的子Pattern。array of Node不可以為空的array。
    edges嵌套的子Pattern的連接關系。array of Edge可以為空的array。
    window
    • 當類型為FIRST_AND_LAST:代表該復合Pattern一次完整匹配之間的最大時間間隔。
    • 當類型為PREVIOUS_AND_CURRENT:代表該相鄰2個子Pattern匹配之間的最大時間間隔。
    dict取值示例如下。
    "window": {
       "type": "FIRST_AND_LAST",
       "time": {
       "unit": "DAYS",
       "size": 1
       }
    }

    單位可以為DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。數據類型為Long或Integer。

    afterMatchSkipStrategy該圖內所有事件匹配后的跳過策略。dict請參見本文事件匹配后的跳過策略(AfterMatchSkipStrategy)定義。
    quantifier量詞,用于描述如何匹配該Pattern,例如只匹配一次。dict請參見本文量詞(Quantifier)定義。
  • 事件匹配后的跳過策略(AfterMatchSkipStrategy)定義
    字段名描述類型是否必填備注
    type策略類型。enum(string)
    參數取值如下:
    • NO_SKIP(默認值):每個成功的匹配都會被輸出。
    • SKIP_TO_NEXT:丟棄以相同事件開始的所有部分匹配。
    • SKIP_PAST_LAST_EVENT:丟棄起始在這個匹配的開始和結束之間的所有部分匹配。
    • SKIP_TO_FIRST:丟棄起始在這個匹配的開始和第一個出現的名稱為PatternName事件之間的所有部分匹配。
    • SKIP_TO_LAST:丟棄起始在這個匹配的開始和最后一個出現的名稱為PatternName事件之間的所有部分匹配。

    詳情請參見匹配后跳過策略

    patternName策略針對的模式的名稱。string一個唯一的字符串。
  • 連續性定義
    物理值含義
    STRICT嚴格連續。 所有匹配的事件中間沒有任何不匹配的事件。
    SKIP_TILL_NEXT松散連續。允許匹配的事件之間出現不匹配的事件,不匹配的事件會被忽略。
    SKIP_TILL_ANY不確定松散連續。更進一步的松散連續,允許忽略掉一些匹配事件的附加匹配。
    NOT_NEXT緊接著的后續事件不能是某指定事件。
    NOT_FOLLOW某指定事件后續不出現。

    相關示例請參見事件處理(CEP)文檔。

  • 量詞屬性含義
    取值含義
    SINGLE代表該模式只出現一次。
    LOOPING代表該模式為循環模式,可能出現多次,類比正則表達式中的*與+。
    TIMES代表該模式會出現指定次數。
    GREEDY代表在匹配該模式時,會采用貪婪匹配策略,盡可能多地匹配。
    OPTIONAL代表該模式為可選模式。

示例一:普通Pattern示例

例如在電商大促的實時營銷場景中,要找到在大促前10分鐘時間窗口內滿足指定條件的客戶,來使用Flink 動態CEP規則針對性地調整營銷策略。這些客戶需要滿足的條件如下:
  • 領取了某會場的優惠券。
  • 在購物車中添加了超過3次的商品。
  • 但最后沒有結賬付款。
為此,我們將領取某會場的優惠券定義為StartCondition,添加商品到購物車定義為MiddleCondition,結賬定義為EndCondition。抽象出的模式為在大促前10分鐘的時間窗口內,滿足StartCondition的事件可以發生也可以不發生,滿足MiddleCondition的事件發生了大于等于3次,但最后沒有1個滿足EndCondition的事件。它對應的Pattern用Java代碼描述如下。
Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));
其按本文檔描述的JSON格式表達如下。
{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

示例二:在Pattern中使用包含自定義參數的Condition

例如在實時營銷場景中,假設我們給用戶打上了一個人群標簽,之后會根據用戶所屬的標簽采取不同的營銷策略,例如對于A類用戶我們發送營銷短信,對于B類用戶我們發送優惠券等,而對于其他用戶,我們不采取營銷措施。針對上述需求,我們可以定義一個普通的Class類型Condition來解決,但當我們想調整策略,針對C類用戶也發送優惠券時,如果使用的是普通的Class類型Condition,那么我們必須改寫代碼,重新編譯并運行作業。這種情況下,我們可以使用包含自定義參數的Condition,在代碼中定義好如何根據傳入的參數進行策略的調整之后,我們只需要在數據庫中修改傳入的參數(即包含自定義參數的Condition的args字段的值),例如由["A", "B"] 改為["A", "B", "C"],即可實現營銷策略的動態更新。

即假設初始Pattern中定義的Condition如下:
"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
我們可將其修改為:
"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
關于該類Condition在具體業務場景的使用示例,詳情請參見Demo
說明 本文中aviatorscriptDemo屬于第三方搭建的網站,訪問時可能會存在無法打開或訪問延遲的問題。

相關文檔

Flink動態CEP快速入門