本文為您介紹Hudi與Spark SQl集成后,支持的建表語句。
背景信息
Spark SQL創建Hudi表時,可以通過options設置表配置信息,options參數如下表所示。
0.10版本之后options被替換為tblproperties。
參數 | 描述 | 是否必選 |
primaryKey | 指定主鍵列,多個主鍵時使用逗號(,)隔開。 | 必選 |
type | 表類型,支持以下兩種類型:
| 可選 |
preCombineField | 版本字段。 對應Hudi的DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY字段。 | 建議設置,否則upsert場景無法支持 |
payloadClass | 默認值為DefaultHoodieRecordPayload。 對應Hudi的DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY字段。 | 可選 |
前提條件
已創建包含Spark和Hudi服務的集群,詳情請參見創建集群。
使用限制
EMR-3.36.0及后續版本和EMR-5.2.0及后續版本,支持Spark SQL對Hudi進行讀寫操作。
啟動方式
- Spark2和Spark3 hudi0.11以下版本
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
- Spark3 hudi0.11及以上版本
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
創建非分區表
options通過primaryKey指定主鍵列,多個字段時使用逗號(,)隔開。創建非分區表的示例如下所示:
創建表類型為cow,主鍵為id的非分區表。
create table if not exists h0( id bigint, name string, price double ) using hudi options ( type = 'cow', primaryKey = 'id' );
創建表類型為mor,主鍵為id和name的非分區表。
create table if not exists h0( id bigint, name string, price double ) using hudi options ( type = 'mor', primaryKey = 'id,name' );
創建表類型為cow的非分區表。
create table if not exists h0( id bigint, name string, price double ) using hudi options ( type = 'cow' );
創建分區表
創建分區表的示例如下所示。
create table if not exists h_p0 (
id bigint,
name string,
dt string,
hh string
) using hudi
location 'oss://xxx/h_p0'
options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'id'
)
partitioned by (dt, hh);
本文代碼示例中的location為表所在的路徑,可以是OSS路徑,也可以是HDFS路徑。主鍵為id,分區字段為dt和hh,版本字段為id。
創建外表
支持在已經存在的Hudi表之上創建外表。創建外表示例如下所示。
create table h0
using hudi
location '/xx/xx/h0';
CTAS語法
通過以下示例為您介紹如何使用CTAS語法。
示例1:
create table if not exists h1 using hudi as select 1 as id, 'a1' as name, 10 as price;
示例2:
create table if not exists h2 using hudi partitioned by (dt) location '/xx/xx/h2' options ( type = 'mor', primaryKey = 'id,name' ) as select 1 as id, 'a1' as name, 20 as price, '2021-01-03' as dt;