自動優化是Delta Engine一組可選特性,當開啟該組特性后,Delta Engine會自動合并對Delta表的多次寫入產生的小文件,以犧牲部分寫性能為代價,大幅提升查詢性能。自動優化在這些場景下尤其有用:1)能接受分鐘級時延的流式數據入湖;2)常使用Merge Into,Insert Into和Create table as select的場景。
詳細內容可參考Databricks官網文章:自動優化
自動優化的工作原理
自動優化包含兩個重要特性:
優化Delta表的寫入
在開源版Spark中,每個executor向partition中寫入文件時,都會創建一個文件進行寫入,最終會導致一個partition中包含大量的小文件,導致delta表的查詢性能惡化。在Delta Engine中,會有一個專門的executor負責partition的寫入,對partition的寫入進行合并,避免小文件產生;
小文件自動合并
在每次寫入之后,delta engine會檢查文件是否可以進一步壓縮,如果可以會自動執行一些OPTIMIZE 作業,對包含大量小文件的partition進行壓縮。這兩個特性:一個是對寫入進行合并,防止小文件的產生并提高寫數據的吞吐量,一個是對已經產生的小文件進行自動合并,優化查詢性能,減少需要維護的元數據量。
如何使用?
Auto Optimize需要在創建表時,顯式指定:
優化表的寫入過程:該特性由表屬性
delta.autoOptimize.optimizeWrite
控制自動執行小文件合并:該特性由表屬性
delta.autoOptimize.autoCompact
控制
創建表時指定:
%sql
CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES
(delta.autoOptimize.optimizeWrite = true,
delta.autoOptimize.autoCompact = true)
針對現存的表:
%sql
ALTER TABLE [table_name | delta.`<table-path>`]
SET TBLPROPERTIES
(delta.autoOptimize.optimizeWrite = true,
delta.autoOptimize.autoCompact = true)
使用案例
在該案例中,我們創建一個新表,并使用一個for循環,不斷的向該表中插入數據:
%sql
CREATE TABLE student (id INT, name STRING)
USING delta
LOCATION "oss://databricks-delta-demo/auto_optimize"
TBLPROPERTIES
(delta.autoOptimize.optimizeWrite = true,
delta.autoOptimize.autoCompact = true)
%pyspark
import random
import string
for i in range(101):
name = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(20))
sql_query = f'INSERT INTO student VALUES ({i}, "{name}")'
spark.sql(sql_query)
print(f"Inserted {i + 1} entries.")
在OSS browser里刷新,可以看到在向表中插入數據時產生了大量的小文件,在我們的代碼里執行單行插入,對每一行都會生成一個單獨的小文件,同時產生了兩個delta log文件。
當插入了50條數據時,delta engine會自動的將前50個小文件進行合并,成為一個新的文件。自動小文件合并的閾值為50,即當發現表中有50個小文件才會進行合并。該閾值可以使用spark conf:spark.databricks.delta.autoCompact.minNumFiles
進行控制。
何時啟用寫優化?
寫優化的目的是為了提升寫數據的吞吐量,它是通過減少被寫入的文件數量來實現的,而代價就是降低了并行度。
并且由于寫優化需要根據表的分區結構來對寫入的數據進行額外的shuffle,額外的shuffle勢必會引入額外的開銷,但寫入性能的提升一般可以抵消掉shuffle帶來的開銷,即使不能抵消,為了文件合并帶來的查詢性能提升,使用該特性也是值得的。
什么場景開啟寫優化?
能接受分鐘級別時延的流處理場景;
頻繁使用MERGE, UPDATE, DELETE, INSERT INTO, CREATE TABLE AS SELECT等SQL語句的場景。
什么場景關閉寫優化?
當寫入TB級及以上的數據。
何時啟用自動壓縮?
當對一張表成功寫入之后,Delta engine會檢查是否達到自動壓縮閾值,如果達到,會同步執行一次自動壓縮。自動壓縮的一些特性:
自動壓縮僅僅是做一些小文件合并,不會進行Z-Ordering優化;
手動執行Optimize命令時默認合并的文件大小為1GB,而自動壓縮默認產生的文件大小為128MB(最大),可以使用spark conf:
spark.databricks.delta.autoCompact.maxFileSize
進行控制;自動壓縮會使用貪心算法,選擇收益最大的一些partition來進行合并,具體的partition數量取決于集群配置,如果集群有更多的CPU,則更多的partition會被優化。
什么場景開啟自動壓縮?
能接受分鐘級別時延的流處理場景;
沒有周期性的對表進行優化。
什么場景關閉自動壓縮?
當并發的對表執行DELETE,MERGE,UPDATE和自動壓縮時,會導致這些作業的事務沖突,當自動壓縮遇到事務沖突時,Delta engine不會進行重試。