日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用UDAF實現數據排序和聚合

本文提供了一個自定義聚合函數(UDAF),實現將多行數據合并為一行并按照指定列進行排序,并以居民用電戶電網終端數據為例,介紹如何在實時計算控制臺使用該函數進行數據聚合和排序。

示例數據

居民用電戶電網終端數據表electric_info,包括事件標識event_id,用戶標識user_id,事件時間event_time,用戶終端狀態status。需要將用戶的終端狀態按照事件時間升序排列。

  • electric_info

    event_id

    user_id

    event_time

    status

    1

    1222

    2023-06-30 11:14:00

    LD

    2

    1333

    2023-06-30 11:12:00

    LD

    3

    1222

    2023-06-30 11:11:00

    TD

    4

    1333

    2023-06-30 11:12:00

    LD

    5

    1222

    2023-06-30 11:15:00

    TD

    6

    1333

    2023-06-30 11:18:00

    LD

    7

    1222

    2023-06-30 11:19:00

    TD

    8

    1333

    2023-06-30 11:10:00

    TD

    9

    1555

    2023-06-30 11:16:00

    TD

    10

    1555

    2023-06-30 11:17:00

    LD

  • 預期結果

    user_id

    status

    1222

    TD,LD,TD,TD

    1333

    TD,LD,LD,LD

    1555

    TD,LD

步驟一:準備數據源

本文以云數據庫RDS為例。

  1. 快速創建RDS MySQL實例。

    說明

    RDS MySQL版實例需要與Flink工作空間處于同一VPC。不在同一VPC下時請參見網絡連通性。

  2. 創建數據庫和賬號。

    創建名稱為electric的數據庫,并創建高權限賬號或具有數據庫electric讀寫權限的普通賬號。

  3. 通過DMS登錄RDS MySQL,在electric數據庫中創建表electric_info和electric_info_SortListAgg,并插入數據。

    CREATE TABLE `electric_info` (
      event_id bigint NOT NULL PRIMARY KEY COMMENT '事件id',
      user_id bigint NOT NULL COMMENT '用戶標識', 
      event_time timestamp NOT NULL COMMENT '事件時間',
      status varchar(10) NOT NULL COMMENT '用戶終端狀態'
    );
    
    CREATE TABLE `electric_info_SortListAgg` (
      user_id bigint NOT NULL PRIMARY KEY COMMENT '用戶標識', 
      status_sort varchar(50) NULL COMMENT '用戶終端狀態按事件時間升序'
    );
    
    -- 準備數據
    INSERT INTO electric_info VALUES 
    (1,1222,'2023-06-30 11:14','LD'),
    (2,1333,'2023-06-30 11:12','LD'),
    (3,1222,'2023-06-30 11:11','TD'),
    (4,1333,'2023-06-30 11:12','LD'),
    (5,1222,'2023-06-30 11:15','TD'),
    (6,1333,'2023-06-30 11:18','LD'),
    (7,1222,'2023-06-30 11:19','TD'),
    (8,1333,'2023-06-30 11:10','TD'),
    (9,1555,'2023-06-30 11:16','TD'),
    (10,1555,'2023-06-30 11:17','LD');

步驟二:注冊UDF

  1. 下載ASI_UDX-1.0-SNAPSHOT.jar

    pom.xml文件已配置了Flink 1.17.1版該自定義函數需要的最小化依賴信息。關于使用自定義函數的更多信息,詳情請參見自定義函數。

  2. 本示例中ASI_UDAF實現了多行數據合并一行并按照指定列進行排序,詳情如下。后續您可以根據實際業務情況進行修改。

    package ASI_UDAF;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class ASI_UDAF{
    	/**Accumulator class*/
    	public static class AcList {
    		public  List<String> list;
    	}
    
    	/**Aggregate function class*/
    	public static class SortListAgg extends AggregateFunction<String,AcList> {
    		public String getValue(AcList asc) {
    			/**Sort the data in the list according to a specific rule*/
    			asc.list.sort(new Comparator<String>() {
    				@Override
    				public int compare(String o1, String o2) {
    					return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]);
    				}
    			});
    			/**Traverse the sorted list, extract the required fields, and join them into a string*/
    			List<String> ret = new ArrayList<String>();
    			Iterator<String> strlist = asc.list.iterator();
    			while (strlist.hasNext()) {
    				ret.add(strlist.next().split("#")[0]);
    			}
    			String str = StringUtils.join(ret, ',');
    			return str;
    		}
    
    		/**Method to create an accumulator*/
    		public AcList createAccumulator() {
    			AcList ac = new AcList();
    			List<String> list = new ArrayList<String>();
    			ac.list = list;
    			return ac;
    		}
    
    		/**Accumulation method: add the input data to the accumulator*/
    		public void accumulate(AcList acc, String tuple1) {
    			acc.list.add(tuple1);
    		}
    
    		/**Retraction method*/
    		public void retract(AcList acc, String num) {
    		}
    	}
    }
  3. 進入注冊UDF頁面。

    注冊UDF方式的優點是便于后續開發進行代碼復用。對于Java類型的UDF,您也可以通過依賴文件項進行上傳,詳情請參見自定義聚合函數(UDAF)。

    1. 登錄實時計算控制臺

    2. 單擊目標工作空間操作列下的控制臺。

    3. 單擊數據開發 > ETL。

    4. 單擊左側的函數頁簽,單擊注冊UDF

      image.png

  4. 選擇文件位置上傳步驟1中的JAR文件,單擊確定。

    注冊UDF

    說明
    • 當您開通Flink工作空間時綁定了OSS Bucket,則您的UDF JAR文件會被上傳到該OSS Bucket的sql-artifacts目錄下;當您開通Flink工作空間時選擇了全托管存儲時,則您的UDF JAR文件會被存儲在資源管理頁面的資源文件中。

    • 此外,Flink開發控制臺會解析您UDF JAR文件中是否使用了Flink UDF、UDAF和UDTF接口的類,并自動提取類名,填充到Function Name字段中。

  5. 管理函數對話框,單擊創建函數。

    在SQL編輯器頁面左側函數列表,您可以看到已注冊成功的UDF。

步驟三:創建Flink作業

  1. 數據開發 > ETL頁面,單擊新建。

    image.png

  2. 單擊空白的流作業草稿。

  3. 單擊下一步

  4. 新建作業草稿對話框,填寫作業配置信息。

    作業參數

    說明

    文件名稱

    作業的名稱。

    說明

    作業名稱在當前項目中必須保持唯一。

    存儲位置

    指定該作業的存儲位置。

    您還可以在現有文件夾右側,單擊新建文件夾圖標,新建子文件夾。

    引擎版本

    當前作業使用的Flink的引擎版本。需要與pom中的version一致。

    引擎版本號含義、版本對應關系和生命周期重要時間點詳情請參見引擎版本介紹

  5. 編寫DDL和DML代碼。

    --創建臨時表electric_info
    CREATE TEMPORARY TABLE electric_info (
      event_id bigint not null,
      `user_id` bigint not null, 
      event_time timestamp(6) not null,
      status string not null,
      primary key(event_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info'
    );
    
    CREATE TEMPORARY TABLE electric_info_sortlistagg (
      `user_id` bigint not null, 
      status_sort varchar(50) not null,
      primary key(user_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info_sortlistagg'
    );
    
    --將electric_info表中的數據聚合并插入到electric_info_sortlistagg表中
    --將status和event_time拼接成的字符串作為參數傳遞給已注冊的自定義函數ASI_UDAF$SortListAgg
    INSERT INTO electric_info_sortlistagg 
    SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING)))
    FROM electric_info GROUP BY user_id;

    參數說明如下,您可以根據實際情況進行修改。MySQL連接器更多參數詳情請參見MySQL。

    參數

    說明

    備注

    connector

    連接器類型。

    本示例固定值為mysql

    hostname

    MySQL數據庫的IP地址或者Hostname。

    本文填寫為RDS MySQL實例的內網地址。

    username

    MySQL數據庫服務的用戶名。

    無。

    password

    MySQL數據庫服務的密碼。

    本示例通過使用名為mysql_pw密鑰的方式填寫密碼值,避免信息泄露,詳情請參見變量管理。

    database-name

    MySQL數據庫名稱。

    本示例填寫為步驟一:準備數據源中創建的數據庫electric。

    table-name

    MySQL表名。

    本示例填寫為electric或electric_info_sortlistagg。

    port

    MySQL數據庫服務的端口號。

    無。

  6. (可選)單擊右上方的深度檢查調試,功能詳情請參見SQL作業開發

  7. 單擊部署,單擊確定。

  8. 運維中心 > 作業運維頁面,單擊目標作業名稱操作列下的啟動,選擇無狀態啟動。

步驟四:查詢結果

在RDS中使用如下語句查看用戶的終端狀態按照事件時間升序排列結果。

SELECT * FROM `electric_info_sortlistagg`;

結果如下:

image.png

相關文檔