概述
實(shí)時(shí)計(jì)算Flink版支持在Flink SQL作業(yè)中使用Java自定義函數(shù),本文介紹Flink Java自定義函數(shù)的分類、參數(shù)傳遞及調(diào)用注意事項(xiàng)。
注意事項(xiàng)
為了避免JAR包依賴沖突,在開發(fā)自定義函數(shù)時(shí)您需要注意以下幾點(diǎn):
SQL開發(fā)頁面選擇的Flink版本,請和Pom依賴中的Flink版本保持一致。
Flink相關(guān)依賴,scope請使用provided,即在依賴中添加
<scope>provided</scope>
。其他第三方依賴請采用Shade方式打包,Shade打包詳情請參見Apache Maven Shade Plugin。
Flink依賴沖突問題,詳情請參見如何解決Flink依賴沖突問題?
為避免UDF在SQL作業(yè)文本里被頻繁調(diào)用導(dǎo)致超時(shí)的情況,推薦您將UDF的JAR包作為依賴文件上傳,并且通過
CRETATE TEMPORARY FUNCTION
語法在作業(yè)中聲明函數(shù),例如CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';
自定義函數(shù)分類
Flink支持以下3類自定義函數(shù)。
分類 | 描述 |
UDSF(User Defined Scalar Function) | 用戶自定義標(biāo)量值函數(shù),將0個(gè)、1個(gè)或多個(gè)標(biāo)量值映射到一個(gè)新的標(biāo)量值。其輸入與輸出是一對一的關(guān)系,即讀入一行數(shù)據(jù),寫出一條輸出值。詳情請參見自定義標(biāo)量函數(shù)(UDSF)。 |
UDAF(User Defined Aggregation Function) | 自定義聚合函數(shù),將多條記錄聚合成1條記錄。其輸入與輸出是多對一的關(guān)系,即將多條輸入記錄聚合成一條輸出值。詳情請參見自定義聚合函數(shù)(UDAF)。 |
UDTF(User Defined Table-valued Function) | 自定義表值函數(shù),將0個(gè)、1個(gè)或多個(gè)標(biāo)量值作為輸入?yún)?shù)(可以是變長參數(shù))。與自定義的標(biāo)量函數(shù)類似,但與標(biāo)量函數(shù)不同。表值函數(shù)可以返回任意數(shù)量的行作為輸出,而不僅是1個(gè)值。返回的行可以由1個(gè)或多個(gè)列組成。調(diào)用一次函數(shù)輸出多行或多列數(shù)據(jù)。詳情請參見自定義表值函數(shù)(UDTF)。 |
自定義函數(shù)注冊
全局自定義函數(shù)注冊方法,請參見全局自定義函數(shù)。
作業(yè)級自定義函數(shù)注冊方法,請參見作業(yè)級自定義函數(shù)。
自定義函數(shù)參數(shù)傳遞
您可以在Flink開發(fā)控制臺(tái)配置自定義函數(shù)中的參數(shù)并在UDF代碼中使用。這樣,后續(xù)可以直接在控制臺(tái)上修改參數(shù)值,實(shí)現(xiàn)快速修改UDF參數(shù)值的目的。
自定義函數(shù)中提供了可選的open(FunctionContext context)方法,F(xiàn)unctionContext具備參數(shù)傳遞功能,自定義配置項(xiàng)通過此對象來傳遞。具體步驟如下:
在Flink開發(fā)控制臺(tái)作業(yè)運(yùn)維頁面部署詳情頁簽運(yùn)行參數(shù)配置的其他配置中,添加pipeline.global-job-parameters配置項(xiàng),代碼示例如下。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
FunctionContext#getJobParameter只能獲取pipeline.global-job-parameters這一配置項(xiàng)的值。因此需要將UDF用到的所有配置項(xiàng)全部寫入到pipeline.global-job-parameters中。pipeline.global-job-parameters配置項(xiàng)填寫的具體操作步驟如下。
步驟
動(dòng)作
具體操作
示例
步驟1
定義key-value。
將key和value之間通過冒號(hào)(:)分隔,并將每一對key-value用單引號(hào)(')包圍起來。
說明如果key或value中含有半角冒號(hào)(:),則需要用雙引號(hào)(")將key或value包圍起來。
如果key或value中含有半角冒號(hào)(:)和雙引號(hào)("),則需要通過連寫兩個(gè)雙引號(hào)("")進(jìn)行轉(zhuǎn)義。
當(dāng)
key = k1,value = {hi,hello}
,則定義為'k1:{hi,hello}'
。當(dāng)
key = k2,value = str:ing,str:ing
,則定義為'k2:"str:ing,str:ing"'
當(dāng)
key = k3,value = str"ing,str:ing
,則定義為'k3:"str""ing,str:ing"'
步驟2
按照YAML文件的格式,形成最終的pipeline.global-job-parameters。
將不同的key-value放在不同的行里,并將所有key-value用逗號(hào)(,)連接。
說明YAML文件的多行字符串以豎線(| )開始。
YAML文件的多行字符串,每一行需要有相同的縮進(jìn)。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
在自定義函數(shù)代碼中,通過FunctionContext#getJobParameter獲取map的各項(xiàng)內(nèi)容。
代碼示例如下。
context.getJobParameter("k1", null); // 獲得字符串 {hi,hello}。 context.getJobParameter("k2", null); // 獲得字符串 str:ing,str:ing。 context.getJobParameter("k3", null); // 獲得字符串 str"ing,str:ing。 context.getJobParameter("pipeline.global-job-parameters", null); // null,只能獲得pipeline.global-job-parameters里定義的內(nèi)容,而不能獲得任意的作業(yè)配置項(xiàng)。
命名參數(shù)
僅實(shí)時(shí)計(jì)算引擎VVR 8.0.7及以上版本支持使用命名參數(shù)來實(shí)現(xiàn)自定義函數(shù)。
在SQL中調(diào)用函數(shù)時(shí)必須按順序指定所有參數(shù)字段。當(dāng)參數(shù)較多時(shí),容易出現(xiàn)傳參個(gè)數(shù)、順序錯(cuò)誤,而且不能省略非必填參數(shù)。通過使用命名參數(shù),可以按需指定所需的參數(shù),減少出錯(cuò)概率,使用起來也更加方便。我們通過一個(gè)自定義標(biāo)量函數(shù)(ScalarFunction)的例子來介紹下命名參數(shù)的使用。
// 實(shí)現(xiàn)一個(gè)自定義標(biāo)量函數(shù),后兩個(gè)入?yún)榭蛇x參數(shù)(isOptional = true)
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}
在SQL中使用該自定義函數(shù)時(shí),您可以只指定第一個(gè)必選參數(shù),或選擇性指定可選參數(shù),代碼示例如下。
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- 僅指定第一個(gè)必選參數(shù)
MyNamedUdf(f1 => c) arg1_res,
-- 指定第一個(gè)必選參數(shù)及第二個(gè)可選參數(shù)
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- 指定第一個(gè)必選參數(shù)及第三個(gè)可選參數(shù)
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;
相關(guān)文檔
Java自定義函數(shù)的開發(fā)和使用demo,請參見自定義聚合函數(shù)(UDAF)、自定義標(biāo)量函數(shù)(UDSF)和自定義表值函數(shù)(UDTF)。
Python自定義函數(shù)的調(diào)試和調(diào)優(yōu)方法,請參見概述。
Python自定義函數(shù)的開發(fā)和使用demo,請參見自定義聚合函數(shù)(UDAF)、自定義標(biāo)量函數(shù)(UDSF)和自定義表值函數(shù)(UDTF)。