日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

表流讀寫

更新時(shí)間:
重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對(duì)您的業(yè)務(wù)造成影響,請(qǐng)務(wù)必仔細(xì)閱讀。

說(shuō)明

詳細(xì)內(nèi)容請(qǐng)參考Databricks官網(wǎng)文章:表流讀寫

有關(guān)演示這些功能的Databricks筆記本,請(qǐng)參閱入門筆記本二

Delta Lake通過(guò)readStream和writeStream與Spark結(jié)構(gòu)化流式處理深度集成。Delta Lake克服了許多流式處理系統(tǒng)和文件相關(guān)的常見(jiàn)限制,例如:

  • 合并低延遲引入產(chǎn)生的小文件

  • 保持多個(gè)流(或并發(fā)批處理作業(yè))執(zhí)行“僅一次”處理

  • 使用文件作為流源時(shí),可以有效地發(fā)現(xiàn)哪些文件是新文件

Delta表作為流源

當(dāng)您將Delta表加載為流源并在流式查詢中使用它時(shí),該查詢將處理表中存在的所有數(shù)據(jù)以及流啟動(dòng)后到達(dá)的所有新數(shù)據(jù)。

您可以將路徑和表都作為流加載。

Scala

%spark
spark.readStream.format("delta").load("/mnt/delta/events")

Scala

%spark
spark.readStream.format("delta").table("events")

你也可以執(zhí)行以下操作:

  • 通過(guò)設(shè)置maxFilesPerTrigger選項(xiàng),控制Delta Lake提供給流的任何微批處理的最大大小。這指定了每個(gè)觸發(fā)器中要考慮的新文件的最大數(shù)量。默認(rèn)值為1000。

  • 通過(guò)設(shè)置maxBytesPerTrigger選項(xiàng)來(lái)限制每個(gè)微批處理的數(shù)據(jù)量的速率。這將設(shè)置一個(gè)“軟最大值”,這意味著批處理大約此數(shù)量的數(shù)據(jù),并可能處理超過(guò)該限制的數(shù)據(jù)量。如果你使用Trigger。如果Trigger.Once用于流式傳輸,則忽略此選項(xiàng)。如果將此選項(xiàng)與maxFilesPerTrigger結(jié)合使用,則微批處理將處理數(shù)據(jù),直到達(dá)到maxFilesPerTrigger或maxBytesPerTrigger限制。

忽略更新和刪除

結(jié)構(gòu)化流式處理不處理非追加的輸入,如果在用作源的表上進(jìn)行了任何修改,則引發(fā)異常。有兩種主要策略可以處理無(wú)法自動(dòng)向下游傳播的更改:

  • 您可以刪除輸出和檢查點(diǎn),并從頭開(kāi)始重新啟動(dòng)流。

  • 您可以設(shè)置以下兩個(gè)選項(xiàng)之一:

    • ignoreDeletes:忽略在分區(qū)邊界刪除數(shù)據(jù)的事務(wù)。

    • ignoreChanges:如果由于更新、合并、刪除(在分區(qū)內(nèi))或覆蓋等數(shù)據(jù)更改操作而必須重寫源表中的文件,則重新處理更新。未更改的行可能仍會(huì)發(fā)出,因此您的下游使用者應(yīng)該能夠處理重復(fù)項(xiàng)。刪除不會(huì)傳播到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,則源表的刪除或更新不會(huì)中斷流。

示例

例如,假設(shè)您有一個(gè)表user_events,其中包含date、user_email和action列,這些列是按日期分區(qū)的。由于GDPR的原因,您需要從user_events表中刪除數(shù)據(jù)。

當(dāng)您在分區(qū)邊界處執(zhí)行刪除(即,WHERE在分區(qū)列上)操作時(shí),文件已經(jīng)按值分段,因此刪除操作只是從元數(shù)據(jù)中刪除這些文件。因此,如果只想從某些分區(qū)刪除數(shù)據(jù),可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

但是,如果您必須基于user_email刪除數(shù)據(jù),則需要使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

如果使用update語(yǔ)句更新user_email,則會(huì)重寫包含user_email相關(guān)的文件。使用ignoreChanges時(shí),新記錄將與位于同一文件中的所有其他未更改記錄一起傳播到下游。邏輯應(yīng)該能夠處理這些傳入的重復(fù)記錄。

指定初始位置

說(shuō)明

該功能在Databricks Runtime 7.3 LTS及更高版本上可用。

您可以使用以下選項(xiàng)來(lái)指定Delta Lake流式處理源的起點(diǎn),而無(wú)需處理整個(gè)表。

  • startingVersion:Delta Lake版本開(kāi)始。從該版本(包括該版本)開(kāi)始的所有表更改都將由流式處理源讀取。您可以從命令“ DESCRIBE HISTORY events”輸出的version列中獲取提交版本。

    • 要僅返回最新更改,請(qǐng)?jiān)贒atabricks Runtime 7.4及更高版本中指定latest。

  • startingTimestamp:開(kāi)始的時(shí)間戳。流式處理源將讀取在時(shí)間戳(包括時(shí)間戳)或之后提交的所有表更改。可以是以下任何一項(xiàng):

    • '2018-10-18T22:15:12.013Z',即可以轉(zhuǎn)換為時(shí)間戳的字符串

    • cast('2018-10-18 13:36:32 CEST' as timestamp)

    • '2018-10-18',即日期字符串

    • 本身就是時(shí)間戳或可強(qiáng)制轉(zhuǎn)換為時(shí)間的任何其他表達(dá)式,如:current_timestamp() - interval 12 hour sdate_sub(current_date(), 1)

您不能同時(shí)設(shè)置兩個(gè)選項(xiàng)。您只需要使用其中之一個(gè)選項(xiàng)即可。它們僅在啟動(dòng)新的流查詢時(shí)才生效。如果流式處理查詢已啟動(dòng)并且進(jìn)度已記錄在其檢查點(diǎn)中,則將忽略這些選項(xiàng)。

警告

盡管可以從指定的版本或時(shí)間戳啟動(dòng)流式處理源,但是流式處理源的架構(gòu)始終是Delta表的最新架構(gòu)。您必須確保在指定的版本或時(shí)間戳記之后,不對(duì)Delta表進(jìn)行不兼容的架構(gòu)更改。否則,當(dāng)使用不正確的架構(gòu)讀取數(shù)據(jù)時(shí),流式傳輸源可能會(huì)返回不正確的結(jié)果。

示例

例如,假設(shè)您有一個(gè)表格User_events。如果要閱讀版本5之后的更改,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

如果您想閱讀自2018年10月18日以來(lái)的更改,可以使用:

Scala

%spark
events.readStream
  .format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

用作接收器的Delta表

您也可以使用結(jié)構(gòu)化流將數(shù)據(jù)寫入Delta表。事務(wù)日志使Delta Lake能夠保證"僅一次"處理,即使針對(duì)該表同時(shí)運(yùn)行其他流或批查詢。

追加模式

默認(rèn)情況下,流以追加模式運(yùn)行,這會(huì)將新記錄添加到表中。

您可以使用路徑方法:

Python

%pyspark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

Scala

%spark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

或表格方法:

Python

%pyspark
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

Scala

%spark
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

完整模式

您還可以使用結(jié)構(gòu)化流式處理技術(shù)將整個(gè)批替換為每個(gè)批。一個(gè)示例用例是使用聚合來(lái)計(jì)算摘要:

Scala

%spark
spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")
從結(jié)構(gòu)化的輸入流中讀取數(shù)據(jù),經(jīng)過(guò)處理后結(jié)構(gòu)化流輸出到delta文件

前面的示例不斷更新包含客戶的事件總數(shù)的表。

對(duì)于延遲要求更寬松的應(yīng)用程序,您可以使用一次性觸發(fā)器來(lái)節(jié)省計(jì)算資源。使用這些更新按給定的時(shí)間表更新匯總聚合表,僅處理自上次更新以來(lái)已到達(dá)的新數(shù)據(jù)。