日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

本文介紹Delta Lake作為數(shù)據(jù)源和數(shù)據(jù)接收端如何流式讀寫數(shù)據(jù)。

Delta Table作為數(shù)據(jù)源(Source)

spark.readStream
  .format("delta")
  .option("maxFilesPerTrigger", 1000)
  .load("/tmp/delta_table")

maxFilesPerTrigger指定了一個(gè)批次最多處理的文件數(shù)量,默認(rèn)值為1000。

通常作為數(shù)據(jù)源的組件,數(shù)據(jù)一旦產(chǎn)生就會(huì)被下游消費(fèi),數(shù)據(jù)不會(huì)發(fā)生變化。但是Delta還兼顧了數(shù)據(jù)湖的角色,數(shù)據(jù)可能會(huì)被刪除、更新,或者合并。目前Delta提供了兩個(gè)選項(xiàng)來應(yīng)對(duì)這種情況:

  • ignoreDeletes:設(shè)置該選項(xiàng)為true后,對(duì)分區(qū)的刪除動(dòng)作不會(huì)有任何影響。

  • ignoreChanges:設(shè)置該選項(xiàng)為true后,刪除、更新或合并動(dòng)作不會(huì)被特殊處理,但是這些動(dòng)作產(chǎn)生的新文件會(huì)被當(dāng)成新數(shù)據(jù)發(fā)送到下游。例如,某一個(gè)文件包含10000條數(shù)據(jù),更新其中一條數(shù)據(jù)后,新文件有9999條舊數(shù)據(jù)和1條新數(shù)據(jù)。這9999條舊數(shù)據(jù)和1條新數(shù)據(jù)會(huì)被發(fā)送到下游。

Delta Table作為數(shù)據(jù)接收端(Sink)

  • Append模式:該模式是Spark Streaming的默認(rèn)工作模式。

    df.writeStream
      .format("delta")
      .outputMode("append")
      .option("checkpointLocation", "/tmp/delta_table/_checkpoints")
      .start("/tmp/delta_table")
  • Complete模式:在該模式下每一次batch的執(zhí)行都會(huì)以全表覆蓋的形式寫入目標(biāo)表。例如,對(duì)于(id LONG, date DATE, name STRING, sales DOUBLE)這張表,您可以統(tǒng)計(jì)每個(gè)人的總銷售額,將統(tǒng)計(jì)結(jié)果寫入目標(biāo)表,每個(gè)批次更新一次。

    • Spark Structured Streaming讀寫

      spark.readStream
        .format("delta")
        .load("/tmp/delta_table")
        .select("name","sales")
        .groupBy("name")
        .agg(sum("sales"))
        .writeStream
        .format("delta")
        .outputMode("complete")
        .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints")
        .start("/tmp/delta_table_summary")
    • Streaming SQL讀寫

      create table targetTableName
              (key bigint, value bigint)
              using delta;
      
      create table sourceTableName
              (key bigint, value bigint)
              using delta;
      
      insert into sourceTableName values(1,238),(238,2388);
      
      CREATE SCAN stream_source_1 ON sourceTableName USING STREAM;
      
      CREATE STREAM test_delta_stream
              OPTIONS(
                checkpointLocation='/tmp/test_delta_stream/targetTableName'
              )
              MERGE INTO targetTableName AS target
              USING (
              select key, value
              from (
               SELECT key, value, row_number() over(partition by key order by value) as rn
               FROM stream_source_1 ) where rn = 1
              ) AS source
              ON target.key=source.value
              WHEN MATCHED THEN UPDATE SET *
              WHEN NOT MATCHED THEN INSERT *
       ;