本文介紹如何使用Databricks Delta進行Spark作業的優化。
前提條件
已創建集群,詳情請參見創建集群。
集群應滿足以下配置:
區域 | 詳情 |
---|---|
地域(Region) | 華北2(北京) |
集群規模 | 1個Master節點,5個Worker節點 |
ECS實例配置 | 配置如下:
說明 ECS實例會因庫存等原因和實際售賣頁有出入。此處參數僅供參考,具體請您根據實際情況選擇相應的實例規格進行測試。 |
OSS寬帶 | 10Gbps |
背景信息
Databricks數據洞察內置了Databricks商業版引擎,您可以利用Databricks數據洞察創建集群,實現在秒級響應時間內處理PB級別的數據。本文示例制造100億條數據,利用Databricks Delta的Data Skipping和ZOEDER Clustering特性,對Spark作業進行改造,達到優化性能的目的。Databricks Delta詳情請參見Processing Petabytes of Data in Seconds with Databricks Delta。
配置Spark
使用阿里云賬號登錄Databricks數據洞察控制臺。
在Databricks數據洞察控制臺頁面,選擇所在的地域(Region)。
創建的集群將會在對應的地域內,一旦創建后不能修改。
在左側導航欄中,單擊集群。
單擊待配置集群所在行的詳情。
在集群詳情頁面,單擊上方的Spark配置。
配置以下參數。
修改以下配置。
參數
描述
spark.driver.cores
4
spark.driver.memory
8G
spark.executor.memory
23G
新增以下配置。
在配置區域,單擊spark-defaults頁簽。
單擊右側的自定義配置。
參數
描述
spark.executor.cores
3
spark.executor.instances
22
spark.yarn.executor.memoryOverhead
default
示例
準備數據。
準備測試數據和query腳本。
在集群中生成數據預計需要5小時,生成測試數據詳情請參見Processing Petabytes of Data in Seconds with Databricks Delta。
準備五張表:
conn_random:delta格式表
conn_random_parquet:parquet格式表
conn_optimize:經過OPTIMIZE的表,主要是Compaction
conn_zorder_only_ip:ZORDER BY (src_ip, dst_ip)
conn_zorder:ZORDER BY (src_ip, src_port, dst_ip, dst_port)
使用OPTIMIZE命令進行優化。
詳細代碼如下:
import spark.implicits._ val seed = 0 val numRecords = 10*1000*1000*1000L val numFiles = 1000*1000 val baseLocation = "oss://mytest/records-10m(1000)3-(1000)2/data/random/" val dbName = s"mdc_random_$numFiles" val connRandom = "conn_random" val connRandomParquet = "conn_random_parquet" // val connSorted = "conn_sorted" val connOptimize = "conn_optimize" val connZorderOnlyIp = "conn_zorder_only_ip" val connZorder = "conn_zorder" spark.conf.set("spark.sql.shuffle.partitions", numFiles) spark.conf.get("spark.sql.shuffle.partitions") sql(s"drop database if exists $dbName cascade") sql(s"create database if not exists $dbName") sql(s"use $dbName") sql(s"show tables").show(false) import scala.util.Random case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int) // 生成數據 def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") def randomPort(r: Random) = r.nextInt(65536) def randomConnRecord(r: Random) = ConnRecord( src_ip = randomIPv4(r), src_port = randomPort(r), dst_ip = randomIPv4(r), dst_port = randomPort(r)) val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it => val partitionID = it.toStream.head val r = new Random(seed = partitionID) Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r)) } // 生成數據表 df.write .mode("overwrite") .format("delta") .option("path", baseLocation + connRandom) .saveAsTable(connRandom) df.write .mode("overwrite") .format("parquet") .option("path", baseLocation + connRandomParquet) .saveAsTable(connRandomParquet) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connOptimize) .saveAsTable(connOptimize) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorderOnlyIp) .saveAsTable(connZorderOnlyIp) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorder) .saveAsTable(connZorder) spark.conf.set("spark.databricks.io.skipping.mdc.addNoise", "false") // OPTIMIZE優化命令 sql(s"OPTIMIZE '${baseLocation + connOptimize}'") sql(s"OPTIMIZE '${baseLocation + connZorderOnlyIp}' ZORDER BY (src_ip, dst_ip)") sql(s"OPTIMIZE '${baseLocation + connZorder}' ZORDER BY (src_ip, src_port, dst_ip, dst_port)")
驗證Spark SQL。
select count(*) from conn_random where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_optimize where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';
測試結論
本示例各表情況如下。
表名稱 | 時間(s) |
---|---|
conn_random_parquet | 2504 |
conn_random | 2324 |
conn_optimize | 112 |
conn_zorder | 65 |
conn_zorder_only_ip | 46 |
通過以上示例,可以發現:
經過OPTIMIZE的表,文件大小會在1G左右,而且進行了delta元數據的優化,提高了data-skipping的效率,在性能上提升約20倍(2504/112=22X)。
Zorder使得data-skipping的優化效果進一步深化,性能提升約40倍(2504/65=38X)。
當Zorder列是查詢列時,優化效果會更加明顯,實驗顯示性能提升約50倍(2504/46=54X)。
問題反饋
您在使用阿里云Databricks數據洞察過程中有任何疑問,歡迎用釘釘掃描下面的二維碼加入釘釘群進行反饋。