本文為您介紹如何使用Flink會話窗口函數。

定義

會話窗口(SESSION)通過SESSION活動來對元素進行分組。會話窗口與滾動窗口和滑動窗口相比,沒有窗口重疊,沒有固定窗口大小。相反,當它在一個固定的時間周期內不再收到元素,即會話斷開時,該窗口就會關閉。

會話窗口通過一個間隔時間(Gap)來配置,這個間隔定義了非活躍周期的長度。例如,一個表示鼠標單擊活動的數據流可能具有長時間的空閑時間,并在兩段空閑之間散布著高濃度的單擊。如果數據在指定的間隔(Gap)之后到達,則會開始一個新的窗口。

函數語法

SESSION函數用于在GROUP BY子句中定義會話窗口。
SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit
說明 <time-attr> 參數必須是數據流中的一個合法的時間屬性字段,指定為Processing Time或Event Time,詳情請參見概述,了解如何定義時間屬性

標識函數

使用標識函數選出窗口的起始時間或者結束時間,窗口的時間屬性用于下級Window的聚合。

窗口標識函數 返回類型 描述
SESSION_START(<time-attr>, <gap-interval>) Timestamp 返回窗口的起始時間(包含邊界)。例如[00:10,00:15]的窗口,返回00:10,即為此會話窗口內第一條記錄的時間。
SESSION_END(<time-attr>, <gap-interval>) Timestamp 返回窗口的結束時間(包含邊界)。例如[00:00,00:15]的窗口,返回 00:15,即為此會話窗口內最后一條記錄的時間+<gap-interval>
SESSION_ROWTIME(<time-attr>, <gap-interval>) Timestamp(rowtime-attr) 返回窗口的結束時間(不包含邊界)。例如(00:00,00:15)的窗口,返回00:14:59.999 。返回值是一個rowtime attribute,也就是可以基于該字段進行時間類型的操作,例如級聯窗口。該參數只能用于基于Event Time的Window。
SESSION_PROCTIME(<time-attr>, <gap-interval>) Timestamp(rowtime-attr) 返回窗口的結束時間(不包含邊界)。例如(00:00,00:15)的窗口,返回 00:14:59.999 。返回值是一個Proctime Attribute,也就是可以基于該字段進行時間類型的操作,例如級聯窗口。該參數只能用于基于Processing Time的Window。

示例

統計每個用戶在每個活躍會話期間的單擊次數,會話超時時長為30秒。
  • 測試數據
    username (VARCHAR) click_url (VARCHAR) ts (TIMESTAMP)
    Jark http://taobao.com/xxx 2017-10-10 10:00:00.0
    Jark http://taobao.com/xxx 2017-10-10 10:00:10.0
    Jark http://taobao.com/xxx 2017-10-10 10:00:49.0
    Jark http://taobao.com/xxx 2017-10-10 10:01:05.0
    Jark http://taobao.com/xxx 2017-10-10 10:01:58.0
    Timo http://taobao.com/xxx 2017-10-10 10:02:10.0
  • 測試語句
    CREATE TEMPORARY TABLE user_clicks(
      username varchar,
      click_url varchar,
      eventtime varchar,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --為Rowtime定義Watermark。
    ) WITH (
      'connector'='sls',
      ...
    );
    
    CREATE TEMPORARY TABLE session_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='datahub'           --目前SLS只支持輸出VARCHAR類型的DDL,所以使用DataHub存儲。
      ...
    );
    
    INSERT INTO session_output
    SELECT
    SESSION_START(ts, INTERVAL '30' SECOND),
    SESSION_END(ts, INTERVAL '30' SECOND),
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
  • 測試結果
    window_start (TIMESTAMP) window_end (TIMESTAMP) username (VARCHAR) clicks (BIGINT)
    2017-10-10 10:00:00.0 2017-10-10 10:00:40.0 Jark 2
    2017-10-10 10:00:49.0 2017-10-10 10:01:35.0 Jark 2
    2017-10-10 10:01:58.0 2017-10-10 10:02:28.0 Jark 1
    2017-10-10 10:02:10.0 2017-10-10 10:02:40.0 Timo 1