日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

特征平臺與特征生產

目前在特征平臺(FeatureStore)中支持的特征生產功能在推薦、廣告、風控以及機器學習等領域都有廣泛的應用。該功能旨在降低特征生產的復雜度,通過將特征生產中通用常見的功能固定下來,通過配置的方式即可實現特征生產。本文為您介紹特征生產的詳細過程。

前提條件

在開始執行操作前,請確認您已完成以下準備工作:

依賴產品

具體操作

人工智能平臺PAI

云原生大數據計算服務MaxCompute

大數據開發治理平臺DataWorks

一、準備工作

準備原始數據

本文需要用到的四張原始表分別為:

  • 用戶表(rec_sln_demo_user_table_preprocess_v1):包含一些基礎的用戶特征,例如性別、年齡、城市和關注數等。

  • 行為表(rec_sln_demo_behavior_table_preprocess_v1):包含一些行為特征,例如某時用戶點擊某個物品等。

  • 物品表(rec_sln_demo_item_table_preprocess_v1):包含一些基礎的物品特征,例如類別、作者、累計點擊數和累計點贊數等。

  • 行為寬表(rec_sln_demo_behavior_table_preprocess_wide_v1):該表由前三張表連接而成。

數據表存放在有公開讀取權限的pai_online_project中,其數據均為模擬數據生成。您需要在DataWorks中執行SQL命令,將上表數據從pai_online_project項目同步到您的MaxCompute項目中。具體操作步驟如下:

  1. 登錄DataWorks控制臺

  2. 在左側導航欄單擊數據建模與開發 > 數據開發

  3. 選擇已創建的DataWorks工作空間后,單擊進入數據開發

  4. 鼠標懸停至新建,選擇新建節點 > MaxCompute > ODPS SQL,在彈出的頁面中配置節點參數。

    參數

    取值建議

    引擎實例

    選擇已創建的MaxCompute引擎。

    節點類型

    ODPS SQL

    路徑

    業務流程/Workflow/MaxCompute

    名稱

    可自定義名稱。

  5. 單擊確認

  6. 在新建節點區域運行以下SQL命令,將用戶表、物品表、行為表以及行為寬表數據從pai_online_project項目同步到您的MaxCompute項目中。資源組選擇已創建的獨享資源組。

    • 同步用戶表:rec_sln_demo_user_table_preprocess_v1

      CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_v1
      like pai_online_project.rec_sln_demo_user_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_user_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • 同步行為表:rec_sln_demo_behavior_table_preprocess_v1

      CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v1
      like pai_online_project.rec_sln_demo_behavior_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • 同步物品表:rec_sln_demo_item_table_preprocess_v1

      CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_v1
      like pai_online_project.rec_sln_demo_item_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_item_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • 同步行為寬表:rec_sln_demo_behavior_table_preprocess_wide_v1

      CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_wide_v1
      like pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_wide_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1
      WHERE ds >= '20240530' and ds <='20240605';

安裝FeatureStore Python SDK

以下代碼均建議在Jupyter Notebook環境下運行。

  • 安裝特征平臺Python SDK,要求在Python3環境下運行。

    %pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.8.0-py3-none-any.whl
  • 導入需要的功能模塊:

    import os
    from feature_store_py import FeatureStoreClient
    from feature_store_py.fs_datasource import MaxComputeDataSource
    from feature_store_py.feature_engineering import TableTransform, Condition, DayOf, ComboTransform, Feature, AggregationTransform, auto_count_feature_transform, WindowTransform, auto_window_feature_transform

二、表變換和特征變換操作流程

以下代碼均建議在Jupyter Notebook環境下運行。

  1. 定義表變換。

    1. 初始化Client。

      access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID") # 填入您的Access Key ID
      access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET") # 填入您的Access Key Secret
      project = 'project_name' # 填入您的項目名
      region = 'cn-hangzhou' # 填入您的項目所在區域,例如華東1(杭州)為cn-hangzhou
      fs_client = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region=region)
    2. 指定數據源。

      input_table_name = "rec_sln_demo_behavior_table_preprocess_v1"
      ds = MaxComputeDataSource(table=input_table_name, project=project)
    3. 指定變換后的輸出表名稱。

      output_table_name = "rec_sln_demo_v1_fs_test_v1"
    4. 定義表變換。

      trans_name = "drop_duplicates"  # 表變換名稱
      keys = ["user_id", "item_id"]  # 去重字段
      sort_keys = ["event_unix_time"]  # 排序字段
      sort_order = ["desc"]  # 順序定義
      tran_i = TableTransform(trans_name, keys, sort_keys, sort_order)
      
  2. 定義特征變換。

    feature1 = Feature(
        name="page_net_type",
        input=['page', 'net_type'],
        transform=ComboTransform(
            separator='_'
        )
    )
    feature2 = Feature(
        name="trim_playtime",
        type="double",
        transform="playtime/10"
    )
    
  3. 生成pipeline流程。

    pipeline = fs_client.create_pipeline(ds, output_table_name).add_table_transform(tran_i).add_feature_transform([feature1, feature2], keep_input_columns=True)
  4. 生成與執行變換。

    execute_date = '20240605'
    output_table = pipeline.execute(execute_date, drop_table=True)

    執行上述代碼共分以下兩個步驟:

    1. 生成變換配置。該配置除了指定SQL外,還指定了變換所需的各種信息,包括輸入輸出、參數和依賴等。

    2. 執行。根據第一個步驟生成的配置執行,將變換后的結果存放在輸出表中。

  5. 查看結果。

    1. 查看生成表的結果,該結果直接以pandas.DataFrame的形式呈現。

      pd_ret = output_table.to_pandas(execute_date, limit=20)
    2. 展示pd_ret的內容。

      pd_ret
    3. 查看生成的配置,該配置包含輸入表定義、變換SQL、依賴、參數以及輸出表定義等。該配置既適用于Debug調試,也適用于保存后進行后續的上線例行任務等。

      transform_info = output_table.transform_info
    4. 查看transform_info的內容。

      transform_info
    5. 查看第一階段的輸入配置。

      pipeline_config = pipeline.pipeline_config
    6. 查看pipeline_config的內容。

      pipeline_config

三、統計類型特征變換

統計特征普遍存在于特征生產場景中,是機器學習和數據分析中常見的一種數據預處理方法,通常用于生成更具代表性和更容易解釋的特征。這些變換通過對原始數據進行匯總、計算和抽取,使得模型能夠更好地理解數據的時間趨勢、周期性和異常情況。具體優勢如下:

  • 捕捉時間趨勢:在用戶行為數據中,最近一段時間的行為往往對當前狀態有更大的影響。

  • 降低噪音:原始數據中可能包含大量噪音。通過統計變換,您可以通過聚合操作來減少這些噪音的影響。

  • 豐富特征:統計變換可以生成新特征,增加模型的表達能力。

  • 提高模型性能:通過引入統計特征,您可以顯著提高模型的預測性能。

  • 增強解釋性:統計特征更加易于解釋和理解,使得問題的診斷和分析更為方便。

  • 數據壓縮:在某些情況下,統計特征可以有效減少數據維度。

雖然統計特征的具體實現過程較為復雜,但通過如下介紹的簡單的統計特征定義,就可以生產出大量的統計特征。

單個統計類型特征變化定義與運行

  1. 定義輸入和輸出表名。

    input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_agg_table_name = "rec_sln_demo_behavior_test_agg_v1"
  2. 定義一個統計類型特征。

    feature_agg1 = Feature(
                name="user_avg_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg", # 聚合函數名稱,可選的有 'avg', 'sum', 'min', 'max'
                    condition=Condition(field="event", value="expr", operator="<>"), # 設置條件判斷,表示當字段 "event" 的值不等于 "expr" 時成立。具體邏輯可結合生成的SQL語句來進一步理解。
                    group_by_keys="user_id", # group by 對應的 key
                    window_size=DayOf(1), # 窗口大小,這里是 1 天
                ),
            )
  3. 創建pipeline,運行統計類型特征變換。

    agg_pipeline = fs_client.create_pipeline(ds_agg, output_agg_table_name).add_feature_transform([feature_agg1])
  4. 生產與執行變換。

    execute_date = '20240605'
    print("transform_info = ", agg_pipeline.transform_info)
    output_agg_table = agg_pipeline.execute(execute_date, drop_table=True)
  5. 查看transform_info內容。

    transform_info_agg = output_agg_table.transform_info
    transform_info_agg
  6. 查看結果。

    pd_ret = output_agg_table.to_pandas(execute_date, limit=20)
    pd_ret

多個不同窗口統計類型特征變換自動JOIN

  1. 定義輸出表名。

    output_agg_table_name_2 = "rec_sln_demo_behavior_test_agg_v2"
  2. 定義多個不同窗口的統計類型特征,定義的窗口大小分為別1、3、7、15和30。

    feature_agg1 = Feature(
                name="user_avg_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg2 = Feature(
                name="user_avg_praise_count_3d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(3),
                ),
            )
    feature_agg3 = Feature(
                name="user_avg_praise_count_7d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(7),
                ),
            )
    feature_agg4 = Feature(
                name="user_avg_praise_count_15d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(15),
                ),
            )
    feature_agg5 = Feature(
                name="user_avg_praise_count_30d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(30),
                ),
            )
  3. 創建pipeline。

    agg_pipeline_2 = fs_client.create_pipeline(ds_agg, output_agg_table_name_2).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5])
  4. 執行pipeline生產過程。

    execute_date = '20240605'
    output_agg_table_2 = agg_pipeline_2.execute(execute_date, drop_table=True)
  5. 查看變換結果。

    transform_info_agg_2 = output_agg_table_2.transform_info
    transform_info_agg_2
  6. 查看表運行結果。

    pd_ret_2 = output_agg_table_2.to_pandas(execute_date, limit=20)
    pd_ret_2

多個統計特征變換過程支持自動歸并和類型自動推導

為了優化計算過程,多個特征同窗口大小時會自動歸并,在同一個group窗口塊中計算完成。 整個計算過程會涉及類型的改變,例如avg會將bigint類型變換為double類型,而且輸入特征的類型眾多比較難記住。因此在整個統計特征的變換過程中,會支持類型自動推導,即不需要預先指定類型,特征定義過程中會自動推導出結果特征的類型。

  1. 定義輸出表名。

    output_agg_table_name_3 = "rec_sln_demo_behavior_test_agg_v3"
  2. 定義更多不同類型的特征。

    feature_agg6 = Feature(
                name="user_expr_cnt_1d",
                transform=AggregationTransform(
                    agg_func="count",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                )
            )
    feature_agg7 = Feature(
                name="user_expr_item_id_dcnt_1d",
                input=['item_id'],
                transform=AggregationTransform(
                    agg_func="count",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg8 = Feature(
                name="user_sum_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg9 = Feature(
                name="user_sum_praise_count_3d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(3),
                ),
            )
  3. 創建pipeline。

    agg_pipeline_3 = fs_client.create_pipeline(ds_agg, output_agg_table_name_3).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5, feature_agg6, feature_agg7, feature_agg8, feature_agg9])
  4. 執行pipeline生產過程。

    execute_date = '20240605'
    output_agg_table_3 = agg_pipeline_3.execute(execute_date, drop_table=True)
  5. 查看變換結果。

    transform_info_agg_3 = output_agg_table_3.transform_info
    transform_info_agg_3
  6. 查看表運行結果。

    pd_ret_3 = output_agg_table_3.to_pandas(execute_date, limit=20)
    pd_ret_3

內置自動擴展函數,支持對統計特征變換自動擴展

因統計特征較多,包括不同的窗口大小和眾多聚合函數計算組合,手動實現每個特征比較復雜。系統內置了自動擴展函數,僅需指定要統計的輸入特征,即可自動生成并完成數百種統計特征的定義。

  1. 指定要統計的輸入特征。

    name_prefix = "user_"
    input_list = ["playtime", "duration", "click_count", "praise_count"]
    event_name = 'event'
    event_type = 'expr'
    group_by_key = "user_id"
    count_feature_list = auto_count_feature_transform(name_prefix, input_list, event_name, event_type, group_by_key)
    print("len_count_feature_list = ", len(count_feature_list))
    print("count_feature_list = ", count_feature_list)
  2. 定義輸出表名,并創建pipeline。

    output_agg_table_name_4 = "rec_sln_demo_behavior_test_agg_v4"
    agg_pipeline_4 =fs_client.create_pipeline(ds_agg, output_agg_table_name_4).add_feature_transform(count_feature_list)
  3. 執行pipeline生產過程。

    execute_date = '20240605'
    output_agg_table_4 = agg_pipeline_4.execute(execute_date, drop_table=True)
  4. 查看變換結果。

    transform_info_agg_4 = output_agg_table_4.transform_info
    transform_info_agg_4
  5. 查看表運行結果。

    pd_ret_4 = output_agg_table_4.to_pandas(execute_date, limit=20)
    pd_ret_4

支持不同的group key同時變換

以上內容闡述了當所有group key均相同時的處理方法,此外,系統還支持對不同group key進行轉換操作。具體示例如下:

  1. 定義輸出表名。

    input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_agg_table_name_5 = "rec_sln_demo_behavior_test_agg_v5"
  2. 定義不同group key的特征。

    feature_agg1 = Feature(
                name="item__sum_follow_cnt_15d",
                input=['follow_cnt'],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="item_id",
                    window_size=DayOf(1),
                )
            )
    feature_agg2 = Feature(
                name="author__max_follow_cnt_15d",
                input=['follow_cnt'],
                transform=AggregationTransform(
                    agg_func="max",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="author",
                    window_size=DayOf(15),
                ),
            )
  3. 創建pipeline。

    agg_pipeline_5 = fs_client.create_pipeline(ds_agg, output_agg_table_name_5).add_feature_transform([feature_agg1, feature_agg2])
  4. 執行pipeline生產過程。

    execute_date = '20240605'
    output_agg_table_5 = agg_pipeline_5.execute(execute_date, drop_table=True)
  5. 查看變換結果。

    transform_info_agg_5 = output_agg_table_5.transform_info
    transform_info_agg_5

四、WindowTransform窗口變換特征

上述統計類型特征變換基本能滿足常規特征生產場景。但在某些大規模推薦等場景中,還會面臨更高層次要求。FeatureStore支持WindowTransform窗口變換特征,您可以方便地得到KV特征,還可以利用天級中間表的形式來優化計算過程,進而達到降低特征計算時間、節省計算成本的目的。具體優勢如下:

  • 捕捉復雜非線性交互:簡單的特征(如用戶的年齡、性別等)難以表達用戶的復雜偏好。特征交叉可以幫助捕捉用戶和物品之間更復雜的非線性交互關系。

  • 提升預測準確性:交叉特征可以顯著提升推薦系統和廣告系統的性能。

  • 減少存儲空間:對于大規模的用戶和物品集合,直接存儲每一對用戶和物品的交互特征是不現實的。特征抽取和特征轉換能有效減少需要存儲的特征數量。

  • 提升推理效率:通過預先計算并存儲交叉特征,在實時推理時可以快速查找并利用這些特征,從而提升系統響應速度。

按照以下幾個維度為您介紹WindowTransform窗口變換特征的實現過程:

簡單聚合函數計算過程

簡單聚合函數包括count、sum、max和min 。這些聚合函數的計算過程比較直接,即通過天級別匯總后,再進一步進行多天的匯總即可得到最終的結果。此外,本文還將介紹天級中間表,以及引入了UDF(MaxCompute UDF概述)以得到最終的計算結果。在實際的計算過程中,用戶執行特征生產的過程還是和上述常規特征變換一樣,Feature Store Python SDK會自動處理天級中間表的創建、UDF的生成、資源上傳以及函數自動注冊等,用戶無需感知這些細節即可實現特征生產過程。

  1. 定義輸入和輸出表名。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_window_table_name_1 = "rec_sln_demo_behavior_test_window_v1"
  2. 定義WindowTransform特征。

    win_feature1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_cnt_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum", # 聚合函數,可選的有 'sum', 'avg', 'max', 'min'
            agg_field="click_count", # 對該特征進行聚合函數的計算
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
  3. 創建pipeline,運行WindowTransform類型特征變換。

    window_pipeline_1 = fs_client.create_pipeline(ds_window_1, output_window_table_name_1).add_feature_transform([win_feature1, win_feature2], keep_input_columns=True)
  4. 生產與執行變換。

    該生產過程有天級臨時中間表生成,此時的drop table只會刪掉最終的結果,不會刪除中間的臨時表。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_1.transform_info)
    output_window_table_1 = window_pipeline_1.execute(execute_date, drop_table=True)

    另外,因涉及多天的統計,例如上述示例是統計7天的數據,中間的臨時表一般只會計算最新分區的數據。為此系統在執行特征生產過程中提供了一個參數 backfill_partitions,當第一次執行時可以將其設置為True,系統會自動補全依賴的天數。例如本次統計是涉及7天的數據,系統會自動將7天的數據補全,這樣在后面例行運行時,還可以再設置為False,只需要補全最新一天分區的數據即可。

    execute_date = '20240506'
    output_window_table_1 = window_pipeline_1.execute(execute_date, backfill_partitions=True)
    • 當設置backfill_partitions參數為True時,系統會自動補全臨時中間表的依賴天數的數據,建議在第一次例行運行時運行。

    • 當統計天數較多時,運行上述代碼的時間較長。

  5. 查看結果表數據。

    window_ret_1 = output_window_table_1.to_pandas(execute_date, limit=50)
    window_ret_1
  6. 查看實際的計算過程。

    window_pipeline_1.transform_info

    通過計算過程可以看出,系統會生成天級別的中間臨時表 rec_sln_demo_behavior_table_preprocess_wide_v1_tmp_daily ,該表會將天級的結果匯總存儲在固定的分區中,這樣會避免重復計算。

    另外在計算最終結果時,還會使用到一個UDF:count_kv,該UDF會自動將統計結果分類匯總到結果map中,以String的形式存在,方便后續進行離線和在線結果推理。

上述內容介紹了簡單聚合函數計算過程,以count、sum為例執行了整個生產過程,該生產過程雖然涉及天級的中間臨時表、UDF等概念,但核心流程和常規的數據變換操作一樣,未增加操作復雜度。其它簡單聚合函數max和min同理。

avg聚合函數計算過程

因為在一天中計算avg結果后,再通過多天來匯總結果,會導致計算結果不準確,因此單獨介紹avg聚合函數計算過程。正確的計算方法是先計算多天的 sum_v 累加值和 count_v 計數結果,然后再通過sum_v/count_v得出多天的avg值。

盡管該聚合函數被單獨介紹,其復雜的計算細節已被封裝在transform_info中。因此,用戶無須深入了解這些細節,而只需像處理常規特征那樣進行操作,即可順利生成最終結果。

  1. 定義輸入和輸出表名。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_2 = "rec_sln_demo_behavior_test_window_v2"
  2. 定義WindowTransform特征。

    win_feature1 = Feature(
        name="item__kv_gender_click_avg_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="avg",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_avg_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="avg",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(15),
        ),
    )
  3. 創建pipeline,運行WindowTransform類型特征變換。

    window_pipeline_2 = fs_client.create_pipeline(ds_window_1, output_window_table_name_2).add_feature_transform([win_feature1, win_feature2])
  4. 生產與執行變換。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_2.transform_info)
    output_window_table_2 = window_pipeline_2.execute(execute_date, drop_table=True)
  5. 查看結果表數據。

    window_ret_2 = output_window_table_2.to_pandas(execute_date, limit=50)
    window_ret_2

多種group key的函數計算過程

同樣,WindowTransform支持多種group key來同時計算,結果最終left join在輸入表中。具體示例如下所示:

  1. 定義輸入和輸出表名。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_3 = "rec_sln_demo_behavior_test_window_v3"
  2. 定義WindowTransform特征。

    win_feature1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_cnt_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature3 = Feature(
        name="author__kv_gender_click_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="author",
            window_size=DayOf(7),
        ),
    )
    win_feature4 = Feature(
        name="author__kv_gender_click_cnt_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="author",
            window_size=DayOf(7),
        ),
    )
  3. 創建pipeline,運行WindowTransform類型特征變換。

    window_pipeline_3 = fs_client.create_pipeline(ds_window_1, output_window_table_name_3).add_feature_transform([win_feature1, win_feature2, win_feature3, win_feature4])
  4. 生產與執行變換。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_3.transform_info)
    output_window_table_3 = window_pipeline_3.execute(execute_date, drop_table=True)
  5. 查看結果表數據。

    window_ret_3 = output_window_table_3.to_pandas(execute_date, limit=50)
    window_ret_3

內置自動擴展函數,支持對WindowTransform特征自動擴展

和統計類型特征變換類似,WindowTransform統計特征較多,包括不同的窗口大小和眾多聚合函數計算組合,手動實現每個特征比較復雜。系統內置了自動擴展函數,僅需指定要統計的輸入特征,即可自動生成并完成數百種統計特征的定義。

  1. 指定要統計的輸入特征。

    name_prefix = "item"
    input_list = ['gender']
    agg_field = ["click_count"]
    event_name = 'event'
    event_type = 'click'
    group_by_key = "item_id"
    window_size = [7, 15, 30, 45]
    window_transform_feature_list = auto_window_feature_transform(name_prefix, input_list, agg_field, event_name, event_type, group_by_key, window_size)
    print("len_window_transform_feature_list = ", len(window_transform_feature_list))
    print("window_transform_feature_list = ", window_transform_feature_list)
  2. 定義輸出表名,創建pipeline。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_4 = "rec_sln_demo_behavior_test_window_v4"
    window_pipeline_4 =fs_client.create_pipeline(ds_window_1, output_window_table_name_4).add_feature_transform(window_transform_feature_list)
  3. 生產與執行變換。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_4.transform_info)
    output_window_table_4 = window_pipeline_4.execute(execute_date, drop_table=True)

JoinTransform聯接轉換

上述特征生產過程,尤其是AggregationTransform和WindowTransform的輸入均為行為表,輸出結果也是存放于一個行為表中。而在大部分情況下,最終上線需要的不是一個行為表,而是要和別的表進行連接,生成特征表,例如user表或item表。

因此引入joinTransform,支持將AggregationTransform和WindowTransform的特征和已有的user表或item 表連接起來。

JoinTransform和WindowTransform進行關聯
  1. 定義WindowTransform輸入表。

    window_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1'
    ds_window_1 = MaxComputeDataSource(table=window_table_name, project=project)
  2. 定義WindowTransform特征。

    win_fea1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
      )
    )
  3. 創建pipeline。

    說明

    因后續需要join其他表,此處未指定輸出表。

    win_pipeline_1 = fs_client.create_pipeline(ds_window_1).add_feature_transform([win_fea1])
  4. 定義JoinTransform輸入和輸出表。

    item_table_name = 'rec_sln_demo_item_table_preprocess_v1'
    ds_join_1 = MaxComputeDataSource(table=item_table_name, project=project)
    output_table_name = 'rec_sln_demo_item_table_v1_fs_window_debug_v1'
  5. 創建JoinTransform pipeline,和Window Transform pipeline連接在一起。

    join_pipeline_1 = fs_client.create_pipeline(ds_join_1, output_table_name).merge(win_pipeline_1)
  6. 生產和執行變換。

    execute_date = '20240605'
    output_join_table_1 = join_pipeline_1.execute(execute_date, drop_table=True)
  7. 查看結果表數據。

    join_ret_1 = output_join_table_1.to_pandas(execute_date, limit = 50)
    join_ret_1
  8. 查看實際的計算過程。

    output_join_table_1.transform_info
JoinTransform和AggregationTransform進行關聯
  1. 定義AggregationTransform輸入表。

    agg_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1'
    ds_agg_1 = MaxComputeDataSource(table=agg_table_name, project=project)
  2. 定義AggregationTransform特征。

    agg_fea1 = Feature(
        name="user_avg_praise_count_1d",
        input=["praise_count"],
        transform=AggregationTransform(
            agg_func="avg",
            condition=Condition(field="event", value="expr", operator="<>"),
            group_by_keys="user_id",
            window_size=DayOf(1),
        ),
    )
  3. 創建pipeline。

    說明

    因后續需要join其他表,此處未指定輸出表。

    agg_pipeline_1 = fs_client.create_pipeline(ds_agg_1).add_feature_transform([agg_fea1])
  4. 定義JoinTransform輸入和輸出表。

    user_table_name = 'rec_sln_demo_user_table_preprocess_v1'
    ds_join_2 = MaxComputeDataSource(table=user_table_name, project=project)
    output_table_name_2 = 'rec_sln_demo_user_table_v1_fs_window_debug_v1'
  5. 創建JoinTransform pipeline,和AggregationTransform連接在一起。

    join_pipeline_2 = fs_client.create_pipeline(ds_join_2, output_table_name_2).merge(agg_pipeline_1, keep_input_columns=False)
  6. 生產和執行變換。

    execute_date = '20240605'
    output_join_table_2 = join_pipeline_2.execute(execute_date, drop_table=True)
  7. 查看結果表數據。

    join_ret_2 = output_join_table_2.to_pandas(execute_date, limit = 50)
    join_ret_2
  8. 查看實際的計算過程。

    output_join_table_2.transform_info

相關文檔

  • 參考特征生產最佳實踐,了解實際應用場景。

  • 阿里云特征平臺(FeatureStore)基本適用于所有需要特征的場景,比如推薦場景、金融風控場景、用戶增長場景。同時,FeatureStore與阿里云常用數據源引擎、推薦服務引擎完成對接,可為您提供端到端高效便捷的一站式從特征注冊管理到模型開發應用的全流程操作平臺。更多關于FeatureStore的信息,請參見FeatureStore概述

  • 如果您在配置或使用過程中有任何問題,可以搜索釘釘群號:34415007523,進入答疑群聯系技術人員進行咨詢。