日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

概述

更新時間:

實時計算Flink版支持在Flink SQL作業中使用Python自定義函數,本文為您介紹Flink Python自定義函數的分類、Python依賴使用方法和調優方式。

自定義函數分類

分類

描述

UDSF(User Defined Scalar Function)

用戶自定義標量值函數,將0個、1個或多個標量值映射到一個新的標量值。其輸入與輸出是一對一的關系,即讀入一行數據,寫出一條輸出值。詳情請參見自定義標量函數(UDSF)

UDAF(User Defined Aggregation Function)

自定義聚合函數,將多條記錄聚合成1條記錄。其輸入與輸出是多對一的關系,即將多條輸入記錄聚合成一條輸出值。詳情請參見自定義聚合函數(UDAF)

UDTF(User Defined Table-valued Function)

自定義表值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自定義的標量函數類似,但與標量函數不同。表值函數可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列數據。詳情請參見自定義表值函數(UDTF)

使用Python依賴

實時計算Flink版集群已預裝了Pandas、NumPy和PyArrow等常用的Python包,您可以在Python作業開發頁面,了解實時計算Flink版中已安裝的第三方Python包列表。預裝的Python包使用時需要在Python函數內部導入。示例如下。

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

此外,您也可以在Python自定義函數中使用其他類型的第三方Python包。需要注意的是,如果使用了非預裝的第三方Python包,在注冊Python UDF時,需要將其作為依賴文件上傳,詳情請參見管理自定義函數(UDF)使用Python依賴

代碼調試

您可以在Python自定義函數的代碼實現中,通過Logging的方式,輸出日志信息,方便問題定位,示例如下。

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

日志輸出后,您可以在TaskManager的日志文件中查看日志,詳情請參見查看運行日志

性能調優

預先加載資源

預先加載資源可以在UDF初始化時提前加載資源,無需在每一次執行計算(即eval)時重新加載資源。例如,您可能只想加載一次大型深度學習模型,然后對模型多次運行批量預測。代碼示例如下。

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
說明

關于如何上傳Python數據文件,可以參考文檔使用Python依賴

使用Pandas庫

除了普通Python自定義函數之外,實時計算Flink版也支持您使用Pandas自定義函數。對于Pandas自定義函數,輸入數據的類型是Pandas中定義的數據結構,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自定義函數中使用Pandas和NumPy等高性能的Python庫,開發出高性能的Python自定義函數,詳情請參見Vectorized User-defined Functions

配置參數

Python自定義函數的性能在很大程度取決于Python自定義函數自身的實現,如果遇到性能問題,您需要盡可能優化Python自定義函數的實現。除此之外,Python自定義函數的性能也受以下參數取值的影響。

參數

說明

python.fn-execution.bundle.size

Python UDF的計算是異步的,在執行過程中,Java算子將數據異步發送給Python進程進行處理。Java算子在將數據發送給Python進程之前,會先將數據緩存起來,到達一定閾值之后,再發送給Python進程。python.fn-execution.bundle.size參數可用來控制可緩存的數據最大條數。

默認值為100000,單位是條數。

python.fn-execution.bundle.time

用來控制數據的最大緩存時間。當緩存的數據條數到達python.fn-execution.bundle.size定義的閾值或緩存時間到達python.fn-execution.bundle.time定義的閾值時,會觸發緩存數據的計算。

默認值為1000,單位是毫秒。

python.fn-execution.arrow.batch.size

使用Pandas UDF時,一個arrow batch可容納的數據最大條數,默認值為10000。

說明

python.fn-execution.arrow.batch.size參數值不能大于python.fn-execution.bundle.size參數值。

說明

以上3個參數并不是配置的越大越好,當這些參數取值配置過大時,可能會導致Checkpoint時,需要處理過多的數據,從而導致Checkpoint時間過長,甚至會導致Checkpoint失敗。以上參數的更多詳情,請參見Configuration

相關文檔