日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Databricks Delta優化Spark作業性能

本文介紹如何使用Databricks Delta進行Spark作業的優化。

前提條件

已創建集群,詳情請參見創建集群

集群應滿足以下配置:

區域

詳情

地域(Region)

華北2(北京)

集群規模

1個Master節點,5個Worker節點

ECS實例配置

配置如下:

  • CPU:32核

  • 內存:128GiB

  • ECS規格:ecs.g6.8xlarge

  • 數據盤配置:ESSD云盤300GB X 4塊

  • 系統盤配置:ESSD云盤120GB X 1塊

說明

ECS實例會因庫存等原因和實際售賣頁有出入。此處參數僅供參考,具體請您根據實際情況選擇相應的實例規格進行測試。

OSS寬帶

10Gbps

背景信息

Databricks數據洞察內置了Databricks商業版引擎,您可以利用Databricks數據洞察創建集群,實現在秒級響應時間內處理PB級別的數據。本文示例制造100億條數據,利用Databricks Delta的Data Skipping和ZOEDER Clustering特性,對Spark作業進行改造,達到優化性能的目的。Databricks Delta詳情請參見Processing Petabytes of Data in Seconds with Databricks Delta

配置Spark

  1. 使用阿里云賬號登錄Databricks數據洞察控制臺

  2. 在Databricks數據洞察控制臺頁面,選擇所在的地域(Region)。

    創建的集群將會在對應的地域內,一旦創建后不能修改。

  3. 在左側導航欄中,單擊集群

  4. 單擊待配置集群所在行的詳情。

  5. 集群詳情頁面,單擊上方的Spark配置。

  6. 配置以下參數。

    1. 修改以下配置。

      參數

      描述

      spark.driver.cores

      4

      spark.driver.memory

      8G

      spark.executor.memory

      23G

    2. 新增以下配置。

      1. 在配置區域,單擊spark-defaults頁簽。

      2. 單擊右側的自定義配置。

      參數

      描述

      spark.executor.cores

      3

      spark.executor.instances

      22

      spark.yarn.executor.memoryOverhead

      default

示例

  1. 準備數據。

    • 準備測試數據和query腳本。

      在集群中生成數據預計需要5小時,生成測試數據詳情請參見Processing Petabytes of Data in Seconds with Databricks Delta

    • 準備五張表:

      • conn_random:delta格式表

      • conn_random_parquet:parquet格式表

      • conn_optimize:經過OPTIMIZE的表,主要是Compaction

      • conn_zorder_only_ip:ZORDER BY (src_ip, dst_ip)

      • conn_zorder:ZORDER BY (src_ip, src_port, dst_ip, dst_port)

  2. 使用OPTIMIZE命令進行優化。

    詳細代碼如下:

    import spark.implicits._
    
    val seed = 0
    val numRecords = 10*1000*1000*1000L
    val numFiles = 1000*1000
    
    val baseLocation = "oss://mytest/records-10m(1000)3-(1000)2/data/random/"
    
    val dbName = s"mdc_random_$numFiles"
    val connRandom = "conn_random"
    val connRandomParquet = "conn_random_parquet"
    // val connSorted = "conn_sorted"
    val connOptimize = "conn_optimize"
    val connZorderOnlyIp = "conn_zorder_only_ip"
    val connZorder = "conn_zorder"
    
    spark.conf.set("spark.sql.shuffle.partitions", numFiles)
    spark.conf.get("spark.sql.shuffle.partitions")
    
    sql(s"drop database if exists $dbName cascade")
    sql(s"create database if not exists $dbName")
    sql(s"use $dbName")
    sql(s"show tables").show(false)
    
    import scala.util.Random
    
    case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
    
    // 生成數據
    def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
    def randomPort(r: Random) = r.nextInt(65536)
    
    def randomConnRecord(r: Random) = ConnRecord(
      src_ip = randomIPv4(r), src_port = randomPort(r),
      dst_ip = randomIPv4(r), dst_port = randomPort(r))
    
    val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
      val partitionID = it.toStream.head
      val r = new Random(seed = partitionID)
      Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
    }
    
    // 生成數據表
    df.write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connRandom)
    .saveAsTable(connRandom)
    
    df.write
    .mode("overwrite")
    .format("parquet")
    .option("path", baseLocation + connRandomParquet)
    .saveAsTable(connRandomParquet)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connOptimize)
    .saveAsTable(connOptimize)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connZorderOnlyIp)
    .saveAsTable(connZorderOnlyIp)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connZorder)
    .saveAsTable(connZorder)
    
    spark.conf.set("spark.databricks.io.skipping.mdc.addNoise", "false")
    
    // OPTIMIZE優化命令
    sql(s"OPTIMIZE '${baseLocation + connOptimize}'")
    sql(s"OPTIMIZE '${baseLocation + connZorderOnlyIp}' ZORDER BY (src_ip, dst_ip)")
    sql(s"OPTIMIZE '${baseLocation + connZorder}' ZORDER BY (src_ip, src_port, dst_ip, dst_port)")

  3. 驗證Spark SQL。

    select count(*) from conn_random where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_optimize where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';

測試結論

本示例各表情況如下。

表名稱

時間(s)

conn_random_parquet

2504

conn_random

2324

conn_optimize

112

conn_zorder

65

conn_zorder_only_ip

46

說明

通過以上示例,可以發現:

  • 經過OPTIMIZE的表,文件大小會在1G左右,而且進行了delta元數據的優化,提高了data-skipping的效率,在性能上提升約20倍(2504/112=22X)。

  • Zorder使得data-skipping的優化效果進一步深化,性能提升約40倍(2504/65=38X)。

  • 當Zorder列是查詢列時,優化效果會更加明顯,實驗顯示性能提升約50倍(2504/46=54X)。

問題反饋

您在使用阿里云Databricks數據洞察過程中有任何疑問,歡迎用釘釘掃描下面的二維碼加入釘釘群進行反饋。

產品釘釘群