本文為您介紹E-MapReduce中DeltaLake的配置信息及其常用命令的示例。
DeltaLake配置信息
EMR中DeltaLake的默認配置信息如下:
Spark 2.X環境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
Spark 3.X環境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
常用命令示例
創建表
CREATE TABLE delta_table (id INT) USING delta;
插入數據
INSERT INTO delta_table VALUES 0,1,2,3,4;
覆蓋寫數據
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
查詢數據
SELECT * FROM delta_table;
更新數據
給偶數
id
加100。UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
刪除數據
刪除偶數
id
的記錄。DELETE FROM delta_table WHERE mod(id, 2) = 0;
Merge操作
創建Source表用于Merge操作。
CREATE TABLE newData(id INT) USING delta;
向表中插入數據。
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
使用newData作為Source Merge進delta_table表。如果匹配到相同
id
的記錄,則該id
加100;如果沒有匹配到,則直接插入新記錄。MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;
流式讀數據
創建流式目標表。
CREATE TABLE stream_debug_table (id INT) USING delta;
創建流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;
說明本文示例中的delta_table為您已存在的delta表。
流式寫入目標表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
流式代碼示例
通過SSH方式連接集群,詳情請參見登錄集群。
執行以下命令,啟動streaming-sql。
streaming-sql
說明如果您已添加DeltaLake組件,則可以直接執行
streaming-sql
命令。如果集群內沒有默認配置,您可以通過以下配置來使用Delta Lake。streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
執行以下命令,創建流式目標表。
CREATE TABLE stream_debug_table (id INT) USING DELTA;
執行以下命令,創建流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;
執行以下命令,以delta_table作為Source,流式寫入目標表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
您可以新開啟一個streaming-sql客戶端,向Source中插入新數據,并查詢目標表的數據。
執行以下命令,驗證Source存量寫入。
SELECT * FROM stream_debug_table;
執行以下命令,插入新數據。
INSERT INTO delta_table VALUES 801, 802;
執行以下命令,查詢插入的數據。
SELECT * FROM stream_debug_table;
執行以下命令,插入新數據。
INSERT INTO delta_table VALUES 901, 902;
執行以下命令,查詢插入的數據。
SELECT * FROM stream_debug_table;
文檔內容是否對您有幫助?