流式讀寫
本文介紹Delta Lake作為數(shù)據(jù)源和數(shù)據(jù)接收端如何流式讀寫數(shù)據(jù)。
Delta Table作為數(shù)據(jù)源(Source)
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", 1000)
.load("/tmp/delta_table")
maxFilesPerTrigger
指定了一個(gè)批次最多處理的文件數(shù)量,默認(rèn)值為1000。
通常作為數(shù)據(jù)源的組件,數(shù)據(jù)一旦產(chǎn)生就會(huì)被下游消費(fèi),數(shù)據(jù)不會(huì)發(fā)生變化。但是Delta還兼顧了數(shù)據(jù)湖的角色,數(shù)據(jù)可能會(huì)被刪除、更新,或者合并。目前Delta提供了兩個(gè)選項(xiàng)來應(yīng)對(duì)這種情況:
ignoreDeletes:設(shè)置該選項(xiàng)為true后,對(duì)分區(qū)的刪除動(dòng)作不會(huì)有任何影響。
ignoreChanges:設(shè)置該選項(xiàng)為true后,刪除、更新或合并動(dòng)作不會(huì)被特殊處理,但是這些動(dòng)作產(chǎn)生的新文件會(huì)被當(dāng)成新數(shù)據(jù)發(fā)送到下游。例如,某一個(gè)文件包含10000條數(shù)據(jù),更新其中一條數(shù)據(jù)后,新文件有9999條舊數(shù)據(jù)和1條新數(shù)據(jù)。這9999條舊數(shù)據(jù)和1條新數(shù)據(jù)會(huì)被發(fā)送到下游。
Delta Table作為數(shù)據(jù)接收端(Sink)
Append模式:該模式是Spark Streaming的默認(rèn)工作模式。
df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta_table/_checkpoints") .start("/tmp/delta_table")
Complete模式:在該模式下每一次batch的執(zhí)行都會(huì)以全表覆蓋的形式寫入目標(biāo)表。例如,對(duì)于(id LONG, date DATE, name STRING, sales DOUBLE)這張表,您可以統(tǒng)計(jì)每個(gè)人的總銷售額,將統(tǒng)計(jì)結(jié)果寫入目標(biāo)表,每個(gè)批次更新一次。
Spark Structured Streaming讀寫
spark.readStream .format("delta") .load("/tmp/delta_table") .select("name","sales") .groupBy("name") .agg(sum("sales")) .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints") .start("/tmp/delta_table_summary")
Streaming SQL讀寫
create table targetTableName (key bigint, value bigint) using delta; create table sourceTableName (key bigint, value bigint) using delta; insert into sourceTableName values(1,238),(238,2388); CREATE SCAN stream_source_1 ON sourceTableName USING STREAM; CREATE STREAM test_delta_stream OPTIONS( checkpointLocation='/tmp/test_delta_stream/targetTableName' ) MERGE INTO targetTableName AS target USING ( select key, value from ( SELECT key, value, row_number() over(partition by key order by value) as rn FROM stream_source_1 ) where rn = 1 ) AS source ON target.key=source.value WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ;