日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過文件管理優化性能

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

為了提升查詢性能,Delta Engine對數據的存儲和布局進行了優化,目前支持兩種布局算法:bin-packing和Z-Ordering。在本文中,我們會介紹如何使用這兩種布局算法并給出使用案例。此外我們還介紹了Delta Engine的Data skipping功能,以及該功能如何自動提升您的查詢性能。最后,我們介紹如何使用Delta Engine的表文件自動調整功能,優化表文件的存儲和查詢效率。

說明

詳細內容可參考Databricks官網文章:通過文件管理優化性能

壓縮 (bin-packing)

在流處理場景下不斷向表中數據插入數據,或者merge,update等操作,會產生大量的小文件,過多的小文件會導致查詢變慢,并且會引起系統擴展性問題。bin-packing的設計就是為了解決這些問題的。

如何使用?

為了改善查詢性能,Delta Engine提供了OPTIMIZE命令來對表中的數據布局進行優化,將小文件進行合并:

%sql
OPTIMIZE [table_name | delta.`/table/path`]

該命令不但支持全表小文件的合并,還支持特定partition的合并,例如我們可以僅對date大于2017-01-01的分區中的小文件進行合并:

%sql
OPTIMIZE [table_name | delta.`/table/path`] WHERE date >= '2017-01-01'

除了手動執行OPTIMIZE外,你還可以使用Auto-optimize來對表中的數據布局進行優化。

說明

Bin-packing是冪等的,這意味著在同一數據集上運行1次Optimize和運行N次的效果是相同的。

Bin-packing的目標是表中的數據量生成大小均衡的數據文件。

使用案例

測試數據生成:創建10,000個小文件,每個文件中包含10,000行連接數據:(src_ip, src_port, dst_ip, dst_port),基于這些文件創建外部表:conn_rand

%spark
import spark.implicits._
import scala.util.Random

val numRecords = 100*1000*1000L
val numFiles = 10000

// 連接數據
case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)

// 生成隨機的ip和port
def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)

// 生成一條隨機的連接數據
def randomConnRecord(r: Random) = ConnRecord(
  src_ip = randomIPv4(r), src_port = randomPort(r),
  dst_ip = randomIPv4(r), dst_port = randomPort(r))

// 生成10000個partition,每個partition中包含10000條連接數據
val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
  val partitionID = it.toStream.head
  val r = new Random(seed = partitionID)
  Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
}

// 將數據保存到oss中,并基于數據建立table
(df.write
 .mode("overwrite")
 .format("delta")
 .option("path", "oss://databricks-delta-demo/ip_demo")
 .saveAsTable("conn_rand"))

查詢:157開頭的源IP和216開頭的目的IP的連接數量

SELECT COUNT(*) 
FROM conn_rand 
WHERE src_ip LIKE '157.%' AND dst_ip LIKE '216.%'

時間消耗為:56s,下面我們使用OPTIMIZE命令對表中的小文件進行合并:

OPTIMIZE conn_rand;

在進行合并之后,在OSS中生成兩個877MB的大文件(OPTIMIZE生成的文件最大為1GB)。

在執行OPTIMIZE之后,重新執行上述查詢,查詢時間為7s。可以看出,在優化之后查詢性能得到很大的提升。

重要

在Databricks Runtime 6.0及更高版本中可用。

Data skipping

當你向Delta表中寫入數據時,Delta Engine會自動收集表的前32列的統計信息(最小最大值,為空的行的數量)以提升查詢效率。該特性是自動開啟的,不需要進行任何配置。

收集長字符串列的統計信息開銷會很大,為了避免Delta Engine自動收集長字符串列的統計信息,可以配置表特性 dataSkippingNumIndexedCols避免,使得該特性小于長字符串所在列的索引,或者配置該值后,將長字符串列移動到該特性值之后的位置。該表特性的默認值為32,即默認收集前32列的統計信息。

Data skipping的原理:我們以一張Delta表的x列為例,假設給定的表文件x列的最小值為5,最大值為10,如果查詢條件為 where x < 3,則根據表文件的統計信息,我們可以得出結論:該表文件中一定不包含我們需要的數據,因此我們可以直接跳過該表文件,減少掃描的數據量,進而提升查詢性能。

Data skipping的實現原理和布隆過濾器類似,通過查詢條件判斷表文件中是否可能存在需要查詢的數據,從而減少需要掃描的數據量。如果不可能存在查詢的數據,則可以直接跳過,如果可能存在被查詢的數據,則需要掃描表文件,但被掃描的表文件中不一定包含查詢的數據,我們將這種判斷表文件中包含查詢數據,但實際并不存在的情況稱為假陽性。

為了能盡可能多的跳過和查詢無關的表文件,我們需要盡可能縮小該表中min-max的差距,使得相近的數據盡可能在文件中聚集。舉一個簡單的例子,假設一張表包含10個表文件,對于表中的x列,它的取值為[1, 10],如果每個表文件的x列的分布均為[1, 10],則對于查詢條件:where x < 3,無法跳過任何一個表文件,因此,也無法實現性能提升,而如果每個表文件的min-max均為0,即在表文件1的x列分布為[1, 1],表文件2的x列分布為[2, 2]...,則對于查詢條件:where x < 3,可以跳過80%的表文件。受該思想的啟發,Delta Engine支持使用Z-Ordering來對數據進行聚集,縮小表文件的min-max差距,提升查詢性能。下面我們介紹Z-Ordering的使用。

Z-Ordering (多維聚類)

Z-Ordering將相關聯的信息存儲到同一組文件中,這種聚集會自動被Delta Engine的Data-Skipping算法使用,顯著減少需要掃描的數據數量。

如何使用?

想要使用ZOrder來優化你的數據布局,僅需要在OPTIMIZE時,增加ZORDER BY子句即可。

OPTIMIZE events
ZORDER BY (eventType)

Z-Order支持在多個維度(多列)優化數據布局,在Z-Ordering多列時,使用逗號分隔:

OPTIMIZE events
ZORDER BY (eventType, generateTime)

如果您經常在where語句中使用到某個列,且該列的基數很大(有很多取值,值域很寬),那么使用Z-Ordering可以顯著提升您的查詢性能。

重要

  • Z-Ordering只對已經收集了統計信息的列生效,在上一節我們介紹過,Delta Engine默認僅為前32列自動生成統計信息,意味著Z-Ordering也只能被用于前32列,如果您查詢的列索引大于32,可以將該列索引調到32以內。

  • Z-Ordering不是冪等的,而是一種增量操作。多次運行間不能保證Z-Ordering所需的時間減少。但是,如果沒有數據添加到剛被Z-Order過的數據,則再次執行Z-Ordering不會改變上次執行完Z-Ordering的數據布局,執行時間理論上會減少。

使用案例

在本案例中,我們使用和Bin-packing壓縮相同的數據集,創建10000個小文件,每個文件中包含10w條網絡連接數據,遵循 (src_ip, src_port, dst_ip, dst_port) 的格式,在生成的數據之上創建一張表:

%spark
import spark.implicits._
import scala.util.Random

val seed = 0
val numRecords = 1000*1000*1000L
val numFiles = 10000

case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)

def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
def randomPort(r: Random) = r.nextInt(65536)

def randomConnRecord(r: Random) = ConnRecord(
  src_ip = randomIPv4(r), src_port = randomPort(r),
  dst_ip = randomIPv4(r), dst_port = randomPort(r))

val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
  val partitionID = it.toStream.head
  val r = new Random(seed = partitionID)
  Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
}

(df.write
.mode("overwrite")
.format("delta")
.option("path", "oss://databricks-delta-demo/conn_record")
.saveAsTable("conn_record"))

執行如下SQL,我們可以得到每個表文件的統計信息:

SELECT row_number() OVER (ORDER BY file) AS file_id,
   count(*) as numRecords, min(src_ip), max(src_ip), min(src_port), 
   max(src_port), min(dst_ip), max(dst_ip), min(dst_port), max(dst_port)
FROM (
SELECT input_file_name() AS file, * FROM conn_record)
GROUP BY file
image

從表中統計信息來看,由于每一列的min-max的范圍都比較寬,特別是端口號,幾乎覆蓋了端口列的值域,對于這種情況,Delta Engine無法使用文件的統計信息來跳過查詢無關文件,因此無法實現有效優化。

我們執行如下查詢:

SELECT COUNT(*) FROM conn_record 
WHERE src_ip like '157.%' AND dst_ip like '216.%' 
AND src_port = 10000 AND dst_port = 10000;

當我們執行上述查詢時,Delta Engine掃描了全表,耗時為3.5min。

image

下面我們對 (src_ip, src_port, dst_ip, dst_port) 四列進行Z-Order優化:

OPTIMIZE conn_record 
ZORDER BY (src_ip, src_port, dst_ip, dst_port);

重新執行表文件信息統計的SQL:

SELECT row_number() OVER (ORDER BY file) AS file_id,
   count(*) as numRecords, min(src_ip), max(src_ip), min(src_port), 
   max(src_port), min(dst_ip), max(dst_ip), min(dst_port), max(dst_port)
FROM (
SELECT input_file_name() AS file, * FROM conn_record)
GROUP BY file
image

可以發現執行Optimize之后,文件數量減少為26個,合并了大量的小文件,另一方面,數據的min-max range變窄很多,可以更好的實現Data-Skipping,我們重跑上面的查詢SQL:

SELECT COUNT(*) FROM conn_record 
WHERE src_ip like '157.%' AND dst_ip like '216.%' 
AND src_port = 10000 AND dst_port = 10000;

在優化后,執行該查詢掃描的數據量僅有889.5MB,向比未優化少了30倍,并且查詢時間減少為5s,提升了42倍。

image

本次使用的示例數據量較少(使用delta格式壓縮存儲,26.6GB左右),性能提升效果還不是那么明顯,當數據量較大時,性能提升會更加顯著,甚至可以達到百倍的性能提升。想要了解更多Optimize的使用案例,可以參考使用Delta在秒級內處理PB級數據

表文件大小調優

設置目標文件大小

如果想要調整Delta表的文件大小,可以通過設置表屬性:delta.targetFileSize 來實現。一旦設置了該屬性,所有的數據布局優化操作(如:小文件合并,Z-Ordering和寫優化)都會盡可能產生給定大小的文件。

針對新創建的表:

CREATE TABLE student USING delta
LOCATION "oss://delta-demo/student"
TBLPROPERTIES ("delta.targetFileSize" = "100MB")

針對現存表:

ALTER TABLE student 
SET TBLPROPERTIES ("delta.targetFileSize" = "100MB")

基于負載自動調整文件大小

Delta Engine會根據對表執行的操作對delta表的文件大小進行自動調整。Delta engine會自動檢測出Delta表最近是否有頻繁的MERGE操作重寫文件,如果出現頻繁的文件重寫,則Delta Engine會減小重寫的文件大小以提升未來再次被重寫的性能。

例如,在執行MERGE操作時,如果之前的10個操作,有9次操作都是MERGE,則本次MERGE操作會生成相對較小的文件,從而提升未來的MERGE操作的性能。

自動調整會在幾次表文件重寫操作之后才會被激活,但如果你的使用場景本身就會頻繁的執行MERGE,UPDATE和DELETE操作,你想要立刻激活自動調整文件大小這一特性,則可以通過設置表屬性:delta.tuneFileSizesForRewrites實現,如果將該表屬性設置為true,則在該表上執行任何數據布局優化操作都會使用相對較小的文件大小。如果將該表屬性設置為false,則會關閉Delta Engine的自動檢測。

基于表大小調整表文件大小

Delta Engine會根據表的大小自動調整表文件的大小,對于比較小的表,Delta Engine會使用較小的文件,對于較大的表,Delta Engine會使用較大的文件,從而防止表中的文件數量變得非常多。

具體來看,當整張表的大小小于2.56TB時,會以256MB作為目標表文件大小,當表的大小介于2.56TB-10TB之間時,目標文件大小線性增長,當表的大小大于10TB后,以1GB作為目標表文件大小。

需要注意的是,如果設置了表屬性:delta.targetFileSize或者delta.tuneFileSizesForRewrites,則該Delta Engine的該特性會自動失效。

提高交互式查詢性能

Delta Engine提供了一些其他機制來提高查詢性能。

管理數據實效性

在每個查詢開始時,Delta表會自動更新到表的最新版本。當命令狀態報告: Updating the Delta table's state時,可以在筆記本中觀察到這個過程。但是,在表上運行歷史分析時,您可能不需要最新的數據,特別是在頻繁引入流式處理數據的表中。 在這些情況下,可以在 Delta 表的過時快照上運行查詢。 這會降低從查詢獲取結果的延遲時間。

可以通過將 Spark 會話配置 spark.databricks.delta.stalenessLimit 設置為時間字符串值(例如 1h、15m、1d 分別為 1 小時、15 分鐘和 1 天)來配置表數據的過時程度。此配置是特定session,因此不會影響其他用戶從其他筆記本、作業或BI工具訪問此表。另外,此設置不會更新表。它只會阻止查詢等待表更新。該更新仍在后臺進行,并將在整個集群之間公平地共享資源。如果超過過期限制,則查詢將在表狀態更新上阻止。

用于低延遲查詢的增強檢查點

Delta Lake 寫入檢查點作為 Delta 表的聚合狀態,每 10 次提交寫入一次。這些檢查點用作計算表的最新狀態的起點。如果沒有檢查點 ,Delta Lake將不得不讀取大量的JSON文件(“Delta”文件),表示提交到事務日志以計算表的狀態。此外,此外,列級統計信息 Delta Lake 用于執行存儲在檢查點中的數據跳過操作。

警告

Delta Lake 檢查點與結構化流checkpoints不同。

在Databricks Runtime 7.2及更低版本中,列級別的統計信息作為JSON列存儲在Delta Lake 檢查點中。在Databricks Runtime 7.3LTS及更高版本中,列級別的統計信息存儲作為結構。結構格式使Delta Lake讀取速度更快,因為:

  • Delta Lake不會執行昂貴的JSON解析來獲取列級統計信息。

  • Parquet 列修剪功能可以顯著減少讀取列的統計信息所需的 I/O

結構格式可啟用一系列優化,這些優化可將Delta Lake讀取操作的開銷從幾秒降低到數十毫秒,從而顯著減少短查詢的延遲。

管理檢查點中的列級統計信息您可以使用表屬性delta.checkpoint.writestatsassjson以及delta.checkpoint.writeStatsassTrust.管理如何在檢查點中寫入統計信息。如果兩個表屬性都為false,則Delta-Lake無法執行跳過數據。

在Databricks Runtime 7.3 LTS及更高版本中:

  • 批量寫入JSON和結構格式編寫寫入統計信息。delta.checkpoint.writeStatsAsJson默認值為true。

  • 流式處理只以JSON格式寫入統計信息(以最大程度地減少檢查點對寫入延遲的影響)。若要同時編寫結構格式,請參見為結構化流式查詢啟用增強的檢查點。

  • 在這兩種情況下,都默認未定義 delta.checkpoint.writeStatsAsStruct。

  • 讀取器在可用時使用結構列,否則退回到使用JSON列。

在Databricks運行時7.2及以下版本中,讀者只使用JSON列。因此,如果delta.checkpoint.writestatsassjson為false,此類讀取器無法執行跳過數據。

警告

增強的檢查點不會破壞與開源Delta-Lake 讀取器的兼容性。但是,設置delta.checkpoint.writestatsassjson為false可能會影響到Delta Lake的專有讀取器。請與您的供應商聯系以了解有關性能影響的更多信息。

檢查點中統計信息的權衡

由于在檢查點中編寫統計信息會產生成本(即使對于大型表,通常也要不到一分鐘),因此需要權衡在編寫檢查點所花費的時間與Databricks Runtime 7.2及更低版本的兼容性。如果您能夠將所有工作負載升級到Databricks Runtime 7.3 LTS或更高版本,則可以通過禁用舊版JSON統計信息來降低編寫checkpoints的成本。下表總結了這一折衷方案。

如果跳過數據不適用于你的應用程序,則可以將兩個屬性都設置為false,并且不收集或寫入任何統計信息。我們不建議這種配置。

writeStatsAsStruct

false

true

writeStatsAsJson

false

  • 沒有數據skipping

  • 在Databricks Runtime 7.3及更高版本上的查詢更快

  • checkpoints稍慢

  • 在Databricks Runtime 7.2及更低版本的閱讀器中,沒有數據skipping

true

  • Databricks運行時7.2及以下

  • 較慢的查詢

  • 在Databricks Runtime 7.3及更高版本上的查詢更快

  • 保持與Databricks Runtime 7.2及更低版本上的閱讀器的兼容性

  • checkpoints的延遲最高(秒級)

為結構化流查詢啟用增強的checkpoints

如果您的結構化流工作負載沒有低延遲要求(要求延遲在一分鐘以內),則可以通過運行以下SQL命令來啟用增強檢查點:

SQL

%sql
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

如果您不使用Databricks Runtime 7.2或更低版本來查詢數據,則還可以通過設置以下表屬性來改善檢查點寫入延遲:

SQL

%sql
ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

禁止從沒有統計結構的檢查點的集群中寫入

Databricks Runtime 7.2及更低版本中的編寫器會寫入無統計結構檢查點,從而妨礙了對Databricks Runtime 7.3 LTS閱讀器的優化。要阻止運行Databricks Runtime 7.2及更低版本的集群寫入Delta表,可以使用以下upgradeTableProtocol方法升級Delta表:

Python

%pyspark
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

Scala

%spark
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
警告

應用該upgradeTableProtocol方法可防止運行Databricks Runtime 7.2及更低版本的集群寫入表,并且此更改是不可逆的。我們建議僅在您采用新格式后才升級表。您可以通過使用Databricks Runtime 7.3為表創建淺表克隆來嘗試這些優化。

升級表寫入程序版本后,寫入程序必須遵守'delta.checkpoint.writeStatsassTrust'和'delta.checkpoint.writestatsassjson'.的設置

下表總結了如何在不同版本的Databricks Runtime、表協議版本和編寫器類型中利用增強的檢查點。

Without Protocol Upgrade

With Protocol Upgrade

Databricks Runtime 7.2及以下編寫器

Databricks Runtime 7.3及更高版本的批處理編寫器

Databricks Runtime 7.3及更高版本的流編寫器

Databricks Runtime 7.2及以下編寫器

Databricks Runtime 7.3及更高版本的批處理編寫器

Databricks Runtime 7.3及更高版本的流編寫器

Databricks Runtime 7.2及以下讀取器性能

沒有得到改進

沒有得到改進

沒有得到改進

不能使用編寫器

沒有得到改進

沒有得到改進

Databricks Runtime 7.3及更高版本的讀取器性能

沒有得到改進

默認情況下改進

通過表格屬性選擇Opt-in(1)

不能使用編寫器

默認情況下改進

通過表格屬性選擇Opt-in(1)

(1)設置表屬性'delta.checkpoint.writeStatsAsStruct' = 'true'

禁用使用舊檢查點格式的從集群中寫入Databricks Runtime 7.2及更低版本的編寫器可以編寫舊格式的檢查點,這將妨礙對Databricks Runtime 7.3編寫器的優化。要阻止運行Databricks Runtime 7.2及更低版本的集群寫入Delta表,可以使用upgradeTableProtocol方法升級Delta表:

Python

%pyspark
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

Scala

%spark
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
重要

應用該upgradeTableProtocol方法可防止運行Databricks Runtime 7.2及更低版本的集群寫入表。這種更改是不可逆的。因此,我們建議僅在提交新格式后才升級表。您可以通過使用Databricks Runtime 7.3創建表的淺表克隆來嘗試這些優化:

常見問題(FAQ)

為什么OPTIMIZE不是自動的?

OPTIMIZE操作啟動了多個Spark作業,以便通過壓縮來優化文件大小(并可以選擇執行 Z-Ordering)。由于OPTIMIZE執行的內容大部分是壓縮小文件,因此您必須先累積許多小文件,然后此操作才能生效。因此,該OPTIMIZE操作不會自動運行。

此外,運行 OPTIMIZE(特別是 ZORDER)是時間和資源成本高昂的操作。如果Databricks自動運行OPTIMIZE或等待分批寫出數據,則將無法運行(以Delta表是源)低延遲的Delta-Lake流。許多客戶的Delta表從未進行過優化,因為他們只從這些表流式傳輸數據,從而避免了優化所帶來的查詢好處。

最后,Delta Lake會自動收集有關寫入表的文件(無論是否通過OPTIMIZE操作)的統計信息。這意味著從Delta表的讀取將利用此信息,無論該表或分區是否運行了OPTIMIZE操作

我應該多久跑步一次OPTIMIZE?

當您選擇運行OPTIMIZE的頻率時,性能和成本之間就需要權衡取舍。 如果希望獲得更好的最終用戶查詢性能,則應更頻繁地運行 OPTIMIZE(根據資源使用量,可能需要較高的成本)。如果要優化成本,應該減少運行頻率。

運行 OPTIMIZE(二進制打包和 Z 排序)的最佳實例類型是什么?

這兩個操作都是執行大量 Parquet 解碼和編碼的 CPU 密集型操作。

對于這些工作負載,建議采用 F 或 Fsv2 系列。