EMR Trino提供了獨立的Delta連接器,在E-MapReduce集群上支持了較為完整的數據湖特性并進行了特性擴展。
背景信息
Delta Lake是DataBricks公司推出的一種數據湖方案,以數據為中心,圍繞數據流走向推出了一系列功能特性,詳情請參見Delta Lake概述。
前提條件
已創建DataLake集群、Custom集群,并選擇了Trino服務,或者創建Hadoop集群,并選擇了Presto服務,詳情請參見創建集群。
使用限制
DataLake集群、Custom集群,和EMR-3.39.1及后續版本、EMR-5.5.0及后續版本的Hadoop集群,支持配置Delta連接器。
基礎使用
修改Delta連接器配置
修改Delta連接器配置,詳情請參見修改內置連接器。
連接器默認配置
進入EMR控制臺的Trino服務的配置頁面,在服務配置區域,單擊delta.properties頁簽,您可以修改或添加參數,參數值請根據您實際情況修改。
參數 | 描述 |
hive.metastore.uri | Hive Metastore使用Thrift協議連接的URI。參數值您可以根據實際情況修改,默認格式為thrift://master-1-1.cluster-24****:9083。 |
hive.config.resources | Hive Metastore使用的資源文件位置。 |
示例
Trino無法新建或修改Delta Lake表,可以使用Spark-sql來創建,詳情請參見基礎使用。
生成數據。
執行以下命令,進入Spark-sql命令行。
spark-sql
執行以下命令,創建Delta表。
CREATE TABLE delta_table (id INT) USING delta;
執行以下命令,寫入數據。
INSERT INTO delta_table VALUES 0,1,2,3,4;
查詢數據。
進入Trino命令行,詳情請參見通過命令方式訪問Trino。
執行以下命令,查詢表信息。
SELECT * FROM delta_table;
返回信息如下。
id ---- 0 1 2 3 4 (5 rows)
高階使用
僅EMR-3.39.1、EMR-5.5.0版本支持下列功能。
Time Travel
Time Travel允許查詢表的歷史數據。
EMR Trino支持Delta表的Time Travel特性,語法為for xxx as of
,其中xxx
的值可以為VERSION或TIMESTAMP,分別對應版本號和時間戳兩種Time travel模式。
Trino支持的Time Travel語法和Delta Lake在Spark SQL上的語法相比,多了一個FOR
關鍵字。
示例如下:
執行以下命令,進入Spark-sql命令行。
spark-sql
執行以下命令,覆蓋數據。
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
查詢數據。
進入Trino命令行,詳情請參見通過命令方式訪問Trino。
執行以下命令,查詢表信息。
SELECT * FROM delta_table;
返回信息如下。
id ---- 5 6 7 8 9 (5 rows)
使用Time Travel查詢歷史數據。
執行以下命令,按版本號查詢數據,直接填寫版本號即可。 版本號是一個單調遞增的整數。默認第一次INSERT之后版本號為1,之后每修改一次版本號加1。
SELECT * FROM delta_table FOR VERSION AS OF 1;
返回信息如下。
id ---- 2 1 3 4 0 (5 rows)
按時間戳查詢數據,共支持DATE、TIMESTAMP和TIMESTAMP WITH TIME ZONE三種類型的時間戳。
DATE類型:查詢日期所對應的UTC時間00:00:00的數據。
TIMESTAMP類型:查詢指定時間戳對應的UTC的數據。
例如,使用TIMESTAMP類型查詢北京時間(+08:00)2022年2月15日20點整的數據,則代碼如下。
SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';
說明其中,第一個TIMESTAMP說明使用的是時間戳進行Time Travel查詢(非版本號),第二個TIMESTAMP則說明時間戳是TIMESTAMP類型(非DATE類型)。
返回信息如下。
id ---- 2 0 3 4 1 (5 rows)
TIMESTAMP WITH TIME ZONE類型:無法直接讀取數據,需要進行格式轉換。
例如,查詢北京時間(+08:00)2022年2月15日20點的數據。代碼示例如下。
SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);
Z-Order
Trino基于Z-Order優化了Delta表查詢。目前支持Parquet自身的優化和Data Skipping的優化。執行優化后,Delta會按文件粒度統計各個字段的最大和最小值,該統計信息用于直接過濾數據文件。Trino的Delta連接器可以讀取到這些統計信息。
對于使用OPTIMIZE和ZORDER BY命令優化過的Delta表,在Z-Order列設置合適時,Trino的查詢速度最大能夠提升數十倍。具體優化方法請參見通過文件管理優化性能。
Trino支持Z-order的數據類型有Int、Long、Double、Float、Binary、Boolean、String和Array。
Trino支持Z-Order Data Skipping的謂詞有=
、<
、<=
、>
和>=
。
Trino暫不支持like和in等謂詞,但由于Z-order的局部排序能力,這些謂詞在Z-order優化后同樣可以提升查詢速度。
例如,表conn_zorder,共含有src_ip、src_port、dst_ip和dst_port四列。
先在Spark中執行優化,命令如下所示。
OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);
括號中的順序即為Z-Order的順序。
OPTIMIZE操作會根據數據量大小耗費一定時間。優化完成后,執行符合條件的查詢均會提升性能。
查詢一部分Z-Order優化的列能提升性能,命令如下所示。
SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';
按Z-Order的優化順序執行查詢,速度提升非常大,命令如下所示。
SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;