Java UDF
本文為您介紹如何編寫和使用UDF。
背景信息
自2.2.0版本起,StarRocks支持使用Java語言編寫用戶定義函數(shù)(User Defined Function,簡稱UDF)。
自3.0版本起,StarRocks支持Global UDF,您只需要在相關(guān)的SQL語句(CREATE/SHOW/DROP)中加上GLOBAL
關(guān)鍵字,該語句即可全局生效,無需逐個為每個數(shù)據(jù)庫執(zhí)行此語句。您可以根據(jù)業(yè)務(wù)場景開發(fā)自定義函數(shù),擴展StarRocks的函數(shù)能力。
目前StarRocks支持的UDF包括:
用戶自定義標(biāo)量函數(shù)(Scalar UDF)
用戶自定義聚合函數(shù)(User Defined Aggregation Function,UDAF)
用戶自定義窗口函數(shù)(User Defined Window Function,UDWF)
用戶自定義表格函數(shù)(User Defined Table Function,UDTF)
前提條件?
使用StarRocks的Java UDF功能前,您需要:
安裝Apache Maven以創(chuàng)建并編寫相關(guān)Java項目。
在服務(wù)器上安裝JDK 1.8。
開啟UDF功能。在實例配置頁面,設(shè)置FE配置項
enable_udf
為TRUE
,并重啟實例使配置項生效。
類型映射關(guān)系
SQL TYPE | Java TYPE |
BOOLEAN | java.lang.Boolean |
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
STRING/VARCHAR | java.lang.String |
開發(fā)并使用UDF
您需要創(chuàng)建Maven項目并使用Java語言編寫相應(yīng)功能。
步驟一:創(chuàng)建Maven項目
創(chuàng)建Maven項目,項目的基本目錄結(jié)構(gòu)如下。
project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target
步驟二:添加依賴?
在pom.xml中添加如下依賴。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
步驟三:開發(fā)UDF?
您需要使用Java語言開發(fā)相應(yīng)UDF。
開發(fā)Scalar UDF
Scalar UDF,即用戶自定義標(biāo)量函數(shù),可以對單行數(shù)據(jù)進(jìn)行操作,輸出單行結(jié)果。當(dāng)您在查詢時使用Scalar UDF,每行數(shù)據(jù)最終都會按行出現(xiàn)在結(jié)果集中。典型的標(biāo)量函數(shù)包括UPPER
、LOWER
、ROUND
、ABS
。
以下示例以提取JSON數(shù)據(jù)功能為例進(jìn)行說明。例如,業(yè)務(wù)場景中,JSON數(shù)據(jù)中某個字段的值可能是JSON字符串而不是JSON對象,因此在提取JSON字符串時,SQL語句需要嵌套調(diào)用GET_JSON_STRING
,即GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")
。
為簡化SQL語句,您可以開發(fā)一個UDF,直接提取JSON字符串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")
。
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
public final String evaluate(String jsonObj, String key) {
if (obj == null || key == null) return null;
try {
// JSONPath庫可以全部展開,即使某個字段的值是JSON格式的字符串
return JSONPath.read(jsonObj, key).toString();
} catch (Exception e) {
return null;
}
}
}
用戶自定義類必須實現(xiàn)如下方法。
方法中請求參數(shù)和返回參數(shù)的數(shù)據(jù)類型,需要和步驟六中的CREATE FUNCTION
語句中聲明的相同,且兩者的類型映射關(guān)系需要符合類型映射關(guān)系。
方法 | 含義 |
TYPE1 evaluate(TYPE2, ...) |
|
開發(fā)UDAF
UDAF,即用戶自定義的聚合函數(shù),對多行數(shù)據(jù)進(jìn)行操作,輸出單行結(jié)果。典型的聚合函數(shù)包括SUM
、COUNT
、MAX
、MIN
,這些函數(shù)對于每個GROUP BY分組中多行數(shù)據(jù)進(jìn)行聚合后,只輸出一行結(jié)果。
以下示例以MY_SUM_INT
函數(shù)為例進(jìn)行說明。與內(nèi)置函數(shù)SUM
(返回值為BIGINT類型)區(qū)別在于,MY_SUM_INT
函數(shù)支持傳入?yún)?shù)和返回參數(shù)的類型為INT。
package com.starrocks.udf.sample;
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}
用戶自定義類必須實現(xiàn)如下方法。
方法中傳入?yún)?shù)和返回參數(shù)的數(shù)據(jù)類型,需要和步驟六中的CREATE FUNCTION
語句中聲明的相同,且兩者的類型映射關(guān)系需要符合類型映射關(guān)系。
方法 | 含義 |
State create() | 創(chuàng)建State。 |
void destroy(State) | 銷毀State。 |
void update(State, ...) | 更新State。其中第一個參數(shù)是State,其余的參數(shù)是函數(shù)聲明的輸入?yún)?shù),可以為1個或多個。 |
void serialize(State, ByteBuffer) | 序列化State。 |
void merge(State, ByteBuffer) | 合并State和反序列化State。 |
TYPE finalize(State) | 通過State獲取函數(shù)的最終結(jié)果。 |
并且,開發(fā)UDAF函數(shù)時,您需要使用緩沖區(qū)類java.nio.ByteBuffer
和局部變量serializeLength
,用于保存和表示中間結(jié)果,指定中間結(jié)果的序列化長度。
類和局部變量 | 說明 |
java.nio.ByteBuffer() | 緩沖區(qū)類,用于保存中間結(jié)果。由于中間結(jié)果在不同執(zhí)行節(jié)點間傳輸時,會進(jìn)行序列化和反序列化,因此還需要使用serializeLength指定中間結(jié)果序列化后的長度。 |
serializeLength() | 中間結(jié)果序列化后的長度,單位為Byte。serializeLength的數(shù)據(jù)類型固定為INT。例如,示例中 |
java.nio.ByteBuffer
序列化相關(guān)事項:
不支持依賴ByteBuffer的remaining()方法來反序列化State。
不支持對ByteBuffer調(diào)用clear()方法。
serializeLength
需要與實際寫入數(shù)據(jù)的長度保持一致,否則序列化和反序列化過程中會造成結(jié)果錯誤。
開發(fā)UDWF
UDWF,即用戶自定義窗口函數(shù)。跟普通聚合函數(shù)不同的是,窗口函數(shù)針對一組行(一個窗口)計算值,并為每行返回一個結(jié)果。一般情況下,窗口函數(shù)包含OVER
子句,將數(shù)據(jù)行拆分成多個分組,窗口函數(shù)基于每一行數(shù)據(jù)所在的組(一個窗口)進(jìn)行計算,并為每行返回一個結(jié)果。
以下示例以MY_WINDOW_SUM_INT
函數(shù)為例進(jìn)行說明。與內(nèi)置函數(shù)SUM
(返回類型為BIGINT)區(qū)別在于,MY_WINDOW_SUM_INT
函數(shù)支持傳入?yún)?shù)和返回參數(shù)的類型為INT。
package com.starrocks.udf.sample;
public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
public void reset(State state) {
state.counter = 0;
}
public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}
用戶自定義類必須實現(xiàn)UDAF所需要的方法(窗口函數(shù)是特殊聚合函數(shù))以及windowUpdate()方法。
方法中請求參數(shù)和返回參數(shù)的數(shù)據(jù)類型,需要和步驟六中的CREATE FUNCTION
語句中聲明的相同,且兩者的類型映射關(guān)系需要符合類型映射關(guān)系。
需要額外實現(xiàn)的方法
方法 | 含義 |
| 更新窗口數(shù)據(jù)。窗口函數(shù)的詳細(xì)說明,請參見窗口函數(shù)。輸入每一行數(shù)據(jù),都會獲取到對應(yīng)窗口信息來更新中間結(jié)果。
|
開發(fā)UDTF
UDTF,即用戶自定義表值函數(shù),讀入一行數(shù)據(jù),輸出多個值可視為一張表。表值函數(shù)常用于實現(xiàn)行轉(zhuǎn)列。
目前UDTF只支持返回多行單列。
以下示例以MY_UDF_SPLIT
函數(shù)為例進(jìn)行說明。MY_UDF_SPLIT
函數(shù)支持分隔符為空格,傳入?yún)?shù)和返回參數(shù)的類型為STRING。
package com.starrocks.udf.sample;
public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}
用戶自定義類必須實現(xiàn)如下方法。
方法中請求參數(shù)和返回參數(shù)的數(shù)據(jù)類型,需要和步驟六中的CREATE FUNCTION
語句中聲明的相同,且兩者的類型映射關(guān)系需要符合類型映射關(guān)系。
方法 | 含義 |
TYPE[] process() |
|
步驟四:打包Java項目
通過以下命令打包Java項目。
mvn package
target目錄下會生成兩個文件:udf-1.0-SNAPSHOT.jar
和udf-1.0-SNAPSHOT-jar-with-dependencies.jar
。
步驟五:上傳項目
將文件udf-1.0-SNAPSHOT-jar-with-dependencies.jar
上傳到OSS上,并開放JAR包的公共讀權(quán)限。詳情請參見簡單上傳、設(shè)置Bucket ACL。
步驟六中,F(xiàn)E會對UDF所在JAR包進(jìn)行校驗并計算校驗值,BE會下載UDF所在JAR包并執(zhí)行。
步驟六:在StarRocks中創(chuàng)建UDF
StarRocks內(nèi)提供了兩種Namespace的UDF:一種是Database級Namespace,一種是Global級Namespace。
如果您沒有特殊的UDF可見性隔離需求,您可以直接選擇創(chuàng)建Global UDF。在引用Global UDF時,直接調(diào)用Function Name即可,無需任何Catalog和Database作為前綴,訪問更加便捷。
如果您有特殊的UDF可見性隔離需求,或者需要在不同Database下創(chuàng)建同名UDF,那么你可以選擇在Database內(nèi)創(chuàng)建UDF。此時,如果您的會話在某個Database內(nèi),您可以直接調(diào)用Function Name即可;如果您的會話在其他Catalog和Database下,那么您需要帶上Catalog和Database前綴,例如:
catalog.database.function
。
創(chuàng)建Global UDF需要有System級的CREATE GLOBAL FUNCTION權(quán)限;創(chuàng)建數(shù)據(jù)庫級別的UDF需要有數(shù)據(jù)庫級的CREATE FUNCTION權(quán)限;使用UDF需要有對應(yīng)UDF的USAGE權(quán)限。關(guān)于如何賦權(quán),參見GRANT。
JAR包上傳完成后,您需要在StarRocks中,按需創(chuàng)建相應(yīng)的UDF。如果創(chuàng)建Global UDF,只需要在SQL語句中帶上GLOBAL
關(guān)鍵字即可。
語法
CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]
參數(shù)說明
參數(shù) | 必選 | 說明 |
GLOBAL | 否 | 如需創(chuàng)建全局UDF,需指定該關(guān)鍵字。從3.0版本開始支持。 |
AGGREGATE | 否 | 如要創(chuàng)建UDAF和UDWF,需指定該關(guān)鍵字。 |
TABLE | 否 | 如要創(chuàng)建UDTF,需指定該關(guān)鍵字。 |
function_name | 是 | 函數(shù)名,可以包含數(shù)據(jù)庫名稱,比如, |
arg_type | 是 | 函數(shù)的參數(shù)類型。具體支持的數(shù)據(jù)類型,請參見類型映射關(guān)系。 |
return_type | 是 | 函數(shù)的返回值類型。具體支持的數(shù)據(jù)類型,請參見類型映射關(guān)系。 |
properties | 是 | 函數(shù)相關(guān)屬性。創(chuàng)建不同類型的UDF需配置不同的屬性,詳情和示例請參考以下示例。 |
創(chuàng)建Scalar UDF
執(zhí)行如下命令,在StarRocks中創(chuàng)建之前示例中的Scalar UDF。
CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
參數(shù) | 描述 |
symbol | UDF所在項目的類名。格式為 |
type | 用于標(biāo)記所創(chuàng)建的UDF類型。取值為 |
file | UDF所在JAR包的HTTP路徑,配置成OSS包含對應(yīng)內(nèi)網(wǎng)Endpoint的HTTP URL。格式為 |
創(chuàng)建UDAF
執(zhí)行如下命令,在StarRocks中創(chuàng)建之前示例中的UDAF。
CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
PROPERTIES
里的參數(shù)說明與創(chuàng)建Scalar UDF相同。
創(chuàng)建UDWF
執(zhí)行如下命令,在StarRocks中創(chuàng)建先前示例中的UDWF。
CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
analytic
:所創(chuàng)建的函數(shù)是否為窗口函數(shù),固定取值為true
。其他參數(shù)說明與創(chuàng)建Scalar UDF相同。
創(chuàng)建UDTF
執(zhí)行如下命令,在StarRocks中創(chuàng)建先前示例中的UDTF。
CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
PROPERTIES
里的參數(shù)說明與創(chuàng)建Scalar UDF相同。
步驟七:使用UDF
創(chuàng)建完成后,您可以測試使用您開發(fā)的UDF。
使用Scalar UDF
執(zhí)行如下命令,使用步驟六創(chuàng)建的Scalar UDF函數(shù)。
SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');
使用UDAF
執(zhí)行如下命令,使用步驟六創(chuàng)建的UDAF函數(shù)。
SELECT MY_SUM_INT(col1);
使用UDWF
執(zhí)行如下命令,使用步驟六創(chuàng)建的UDWF函數(shù)。
SELECT MY_WINDOW_SUM_INT(intcol)
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;
使用UDTF
執(zhí)行如下命令,使用先前示例中的UDTF。
-- 假設(shè)存在表 t1,其列 a、b、c1 信息如下。
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."
-- 使用 MY_UDF_SPLIT() 函數(shù)。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."
第一個
MY_UDF_SPLIT
為調(diào)用MY_UDF_SPLIT
后生成的列別名。暫不支持使用
AS t2(f1)
的方式指定表格函數(shù)返回表的表別名和列別名。
查看UDF信息?
運行以下命令查看UDF信息。
SHOW [GLOBAL] FUNCTIONS;
刪除UDF
運行以下命令刪除指定的UDF。
DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);
FAQ
Q:開發(fā)UDF時是否可以使用靜態(tài)變量?不同UDF間的靜態(tài)變量間否會互相影響?
A:支持在開發(fā)UDF時使用靜態(tài)變量,且不同UDF間(即使類同名),靜態(tài)變量是互相隔離的,不會互相影響。