Spark Load通過外部的Spark資源實現對導入數據的預處理,提高StarRocks大數據量的導入性能并且節省StarRocks集群的計算資源。Spark Load主要用于初次遷移、大數據量導入StarRocks的場景(數據量可到TB級別)。本文為您介紹Spark Load導入的基本概念、基本原理、使用示例、最佳實踐以及常見問題。

背景信息

Spark Load是一種異步導入方式,您需要通過MySQL協議創建Spark類型導入任務,并可以通過SHOW LOAD命令查看導入結果。
說明 本文圖片和部分內容來源于開源StarRocks的使用Apache Spark批量導入

基本概念

  • Spark ETL:在導入流程中主要負責數據的ETL工作,包括全局字典構建(BITMAP類型)、分區、排序和聚合等。
  • Broker:是一個獨立的無狀態進程。封裝了文件系統接口,提供StarRocks讀取遠端存儲系統中文件的能力。
  • 全局字典:保存了數據從原始值到編碼值映射的數據結構,原始值可以是任意數據類型,而編碼后的值為整型。全局字典主要應用于精確去重預計算的場景。

基本原理

用戶通過MySQL客戶端提交Spark類型導入任務,FE記錄元數據并返回用戶提交成功。

Spark Load的主要流程如下圖所示。Spark Load
Spark Load任務的執行主要分為以下幾個階段:
  1. 向FE提交Spark Load任務。
  2. FE調度提交ETL任務到Spark集群執行。
  3. Spark集群執行ETL完成對導入數據的預處理,包括全局字典構建(BITMAP類型)、分區、排序和聚合等。
  4. ETL任務完成后,FE獲取預處理過的每個分片的數據路徑,并調度相關的BE執行Push任務。
  5. BE通過Broker讀取數據,轉化為StarRocks存儲格式。
  6. FE調度生效版本,完成導入任務。

全局字典

適用場景

目前StarRocks中BITMAP列是使用類庫Roaringbitmap實現的,而Roaringbitmap的輸入數據類型只能是整型,因此如果要在導入流程中實現對于BITMAP列的預計算,則需要將輸入數據的類型轉換成整型。在StarRocks現有的導入流程中,全局字典的數據結構是基于Hive表實現的,保存了原始值到編碼值的映射。

構建流程

  1. 讀取上游數據源的數據,生成一張Hive臨時表,記為hive-table。
  2. 從hive-table中抽取待去重字段的去重值,生成一張新的Hive表,記為distinct-value-table。
  3. 新建一張全局字典表,記為dict-table。一列為原始值,一列為編碼后的值。
  4. 將distinct-value-table與dict-table進行LEFT JOIN,計算出新增的去重值集合,然后對這個集合使用窗口函數進行編碼,此時去重列原始值就多了一列編碼后的值,最后將這兩列的數據寫回dict-table。
  5. 將dict-table與hive-table進行JOIN,完成hive-table中原始值替換成整型編碼值的工作。
  6. hive-table會被下一步數據預處理的流程所讀取,經過計算后導入到StarRocks中。

數據預處理

數據預處理的基本流程如下:
  1. 從數據源讀取數據,上游數據源可以是HDFS文件,也可以是Hive表。
  2. 對讀取到的數據完成字段映射、表達式計算,并根據分區信息生成分桶字段bucket-id。
  3. 根據StarRocks表的Rollup元數據生成RollupTree。
  4. 遍歷RollupTree,進行分層的聚合操作,下一個層級的Rollup可以由上一個層的Rollup計算得來。
  5. 每次完成聚合計算后,會根據bucket-id對數據進行分桶然后寫入HDFS中。
  6. 后續Broker會拉取HDFS中的文件然后導入StarRocks BE節點中。

基本操作

配置ETL集群

Spark作為一種外部計算資源在StarRocks中用來完成ETL工作,未來可能還有其他的外部資源會加入到StarRocks中使用。例如,Spark或GPU用于查詢,HDFS或S3用于外部存儲,MapReduce用于ETL等,因此引入Resource Management來管理StarRocks使用的這些外部資源。

提交Spark導入任務之前,需要配置執行ETL任務的Spark集群。操作語法如下所示。
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
 type = spark,
 spark_conf_key = spark_conf_value,
 working_dir = path,
 broker = broker_name,
 broker.property_key = property_value
);

-- drop spark resource
DROP RESOURCE resource_name;

-- show resources
SHOW RESOURCES
SHOW PROC "/resources";

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
  • 創建資源

    resource-name為StarRocks中配置的Spark資源的名字。

    PROPERTIES是Spark資源的相關參數,參數描述如下表所示,更多參數描述請參見Spark Configuration
    參數描述
    type資源類型。必填參數,目前僅支持Spark。
    Spark相關參數spark.master 必填參數,目前支持yarn。
    spark.submit.deployModeSpark程序的部署模式。必填參數,支持cluster和client兩種。
    spark.hadoop.fs.defaultFSMaster為YARN時必填。
    spark.hadoop.yarn.resourcemanager.address單點Resource Manager地址。
    spark.hadoop.yarn.resourcemanager.ha.enabledResource Manager啟用HA。默認值為true。
    spark.hadoop.yarn.resourcemanager.ha.rm-idsResource Manager邏輯ID列表。
    spark.hadoop.yarn.resourcemanager.hostname.rm-id對于每個rm-id,指定Resource Manager對應的主機名。
    說明 HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-idspark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。
    spark.hadoop.yarn.resourcemanager.address.rm-id對于每個rm-id,指定host:port以供客戶端提交作業。
    說明 HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-idspark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。
    working_dir ETL使用的目錄。
    說明 Spark作為ETL資源使用時必填。例如,hdfs://host:port/tmp/starrocks
    brokerBroker名字。
    說明 Spark作為ETL資源使用時必填。需要使用ALTER SYSTEM ADD BROKER命令提前完成配置。
    broker.property_keyBroker讀取ETL生成中間文件時需要指定的認證信息等。
    創建資源示例如下所示。
    -- yarn cluster模式
    CREATE EXTERNAL RESOURCE "spark0"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.jars" = "xxx.jar,yyy.jar",
        "spark.files" = "/tmp/aaa,/tmp/bbb",
        "spark.executor.memory" = "1g",
        "spark.yarn.queue" = "queue0",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker0",
        "broker.username" = "user0",
        "broker.password" = "password0"
    );
    
    -- yarn HA cluster模式
    CREATE EXTERNAL RESOURCE "spark1"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
        "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
        "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
        "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker1"
    );
    
    -- HDFS HA cluster模式
    CREATE EXTERNAL RESOURCE "spark2"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://myha",
        "spark.hadoop.dfs.nameservices" = "myha",
        "spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
        "working_dir" = "hdfs://myha/tmp/starrocks",
        "broker" = "broker2",
        "broker.dfs.nameservices" = "myha",
        "broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
  • 查看資源

    普通賬戶只能看到自己有USAGE-PRIV使用權限的資源。root和admin賬戶可以看到所有的資源。

  • 資源權限
    資源權限通過GRANT REVOKE來管理,目前僅支持USAGE-PRIV使用權限。您可以將USAGE-PRIV權限賦予某個用戶或者某個角色,角色的使用與之前一致。示例如下。
    -- 授予spark0資源的使用權限給用戶user0
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
    
    -- 授予spark0資源的使用權限給角色role0
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
    
    -- 授予所有資源的使用權限給用戶user0
    GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
    
    -- 授予所有資源的使用權限給角色role0
    GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
    
    -- 撤銷用戶user0的spark0資源使用權限
    REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

配置Spark客戶端

FE底層通過執行spark-submit命令提交Spark任務,因此需要為FE配置Spark客戶端,建議使用官方2.4.5或以上版本的Spark 2.x,Spark下載地址下載完成后,請按照以下步驟完成配置:
  1. 配置SPARK-HOME環境變量

    將Spark客戶端放在FE同一臺機器上的目錄下,并在FE的配置文件中配置spark_home_default_dir指向此目錄,此配置項的值默認為FE根目錄下的lib/spark2x路徑,此配置項不可為空。

  2. 配置Spark依賴包

    將Spark客戶端下的jars文件夾內所有JAR包歸檔打包成一個ZIP文件,并在FE的配置文件中配置spark_resource_path指向此ZIP文件。如果此配置項為空,則FE會嘗試尋找FE根目錄下的lib/spark2x/jars/spark-2x.zip文件,如果沒有找到則會報文件不存在的錯誤。

    當提交Spark Load任務時,會將歸檔好的依賴文件上傳至遠端倉庫,默認倉庫路徑掛在working_dir/{cluster_id}目錄下,并以--spark-repository--{resource-name}命名,表示集群內的一個Resource對應一個遠端倉庫,遠端倉庫目錄結構參考如下。
    ---spark-repository--spark0/
       |---archive-1.0.0/
       |   |---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0-jar-with-dependencies.jar
       |   |---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip
       |---archive-1.1.0/
       |   |---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0-jar-with-dependencies.jar
       |   |---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip
       |---archive-1.2.0/
       |   |-...

除了Spark依賴(默認以spark-2x.zip命名),FE還會上傳DPP的依賴包至遠端倉庫。如果此次Spark Load提交的所有依賴文件都已存在遠端倉庫,則不需要再上傳依賴,節省下了每次重復上傳大量文件的時間。

配置YARN客戶端

FE底層通過YARN命令獲取正在運行的Application的狀態,以及終止Application,因此需要為FE配置YARN客戶端,建議使用官方2.5.2或以上版本的Hadoop 2.x。Hadoop下載地址,下載完成后,請按照以下步驟完成配置:
  1. 配置YARN可執行文件路徑

    將下載好的YARN客戶端放在FE同一臺機器的目錄下,并在FE配置文件中配置yarn_client_path參數,指向YARN的二進制可執行文件,默認為FE根目錄下的lib/yarn-client/hadoop/bin/yarn路徑。

  2. 配置生成YARN所需的配置文件的路徑(可選)

    當FE通過YARN客戶端獲取Application的狀態,或者終止Application時,默認會在FE根目錄下的lib/yarn-config路徑下生成執行yarn命令所需的配置文件,此路徑可以通過在FE配置文件配置yarn_config_dir參數修改,目前生成的配置文件包括core-site.xmlyarn-site.xml

創建導入任務

  • 創建語法
    LOAD LABEL load_label
        (data_desc, ...)
    WITH RESOURCE resource_name
    [resource_properties]
    [PROPERTIES (key1=value1, ... )]
    
    * load_label:
        db_name.label_name
    
    * data_desc:
        DATA INFILE ('file_path', ...)
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [COLUMNS TERMINATED BY separator ]
        [(col1, ...)]
        [COLUMNS FROM PATH AS (col2, ...)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
        DATA FROM TABLE hive_external_tbl
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
    * resource_properties:
     (key2=value2, ...)
    創建導入的詳細語法可以執行HELP SPARK LOAD命令查看幫助。Spark Load的創建導入語法中參數意義如下:
    • Label

      導入任務的標識。每個導入任務,都有一個在單DataBase內部唯一的Label。具體規則與Broker Load一致。

    • 數據描述類參數

      目前支持的數據源有CSV和Hive table。其他規則與Broker Load一致。

    • 導入作業參數

      導入作業參數主要指的是Spark Load創建導入語句中的屬于opt_properties部分的參數。導入作業參數是作用于整個導入作業的。規則與Broker Load一致。

    • Spark資源參數
      Spark資源需要提前配置到StarRocks系統中并且賦予用戶USAGE-PRIV權限后才能使用Spark Load。當您有臨時性的需求,例如增加任務使用的資源而修改Spark configs時,可以設置以下參數,設置僅對本次任務生效,并不影響StarRocks集群中已有的配置。
      WITH RESOURCE 'spark0'
      (
         "spark.driver.memory" = "1g",
         "spark.executor.memory" = "3g"
      )
    • 數據源為Hive表時的導入

      如果期望在導入流程中將Hive表作為數據源,則需要先新建一張類型為Hive的外部表,然后提交導入命令時指定外部表的表名即可。

    • 導入流程構建全局字典
      適用于StarRocks表聚合列的數據類型為BITMAP類型。在Load命令中指定需要構建全局字典的字段即可,格式為StarRocks字段名稱=bitmap_dict(hive表字段名稱)
      重要 目前只有在上游數據源為Hive表時才支持全局字典的構建。
  • 示例:
    • 上游數據源為HDFS文件時創建導入任務的情況
      LOAD LABEL load_label
          (data_desc, ...)
      WITH RESOURCE resource_name
      [resource_properties]
      [PROPERTIES (key1=value1, ... )]
      
      * load_label:
          db_name.label_name
      
      * data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY separator ]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (col2, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]
      
          DATA FROM TABLE hive_external_tbl
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]
      
      * resource_properties:
       (key2=value2, ...)
    • 上游數據源是Hive表時創建導入任務的情況
      1. 新建Hive資源。
        CREATE EXTERNAL RESOURCE hive0
        properties
        (
            "type" = "hive",
            "hive.metastore.uris" = "thrift://emr-header-1.cluster-xxx:9083"
        );
      2. 新建Hive外部表。
        CREATE EXTERNAL TABLE hive_t1
        (
            k1 INT,
            K2 SMALLINT,
            k3 varchar(50),
            uuid varchar(100)
        )
        ENGINE=hive
        properties
        (
            "resource" = "hive0",
            "database" = "tmp",
            "table" = "t1"
        );
      3. 提交load命令,要求導入的StarRocks表中的列必須在Hive外部表中存在。
        LOAD LABEL db1.label1
        (
            DATA FROM TABLE hive_t1
            INTO TABLE tbl1
            SET
            (
                uuid=bitmap_dict(uuid)
            )
        )
        WITH RESOURCE 'spark0'
        (
            "spark.executor.memory" = "2g",
            "spark.shuffle.compress" = "true"
        )
        PROPERTIES
        (
            "timeout" = "3600"
        );

查看導入任務

Spark Load和Broker Load都是異步導入方式。您必須將創建導入的Label記錄下來,并且在SHOW LOAD命令中使用此Label來查看導入結果。查看導入的命令在所有導入方式中是通用的,具體語法可執行HELP SHOW LOAD命令查看。

執行以下命令,查看導入任務。
show load order by createtime desc limit 1\G
返回信息如下。
 *************************** 1. row ***************************
  JobId: 76391
  Label: label1
  State: FINISHED
 Progress: ETL:100%; LOAD:100%
  Type: SPARK
 EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
 TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
 ErrorMsg: N/A
 CreateTime: 2019-07-27 11:46:42
 EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
  URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
 JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
返回結果中涉及到的參數如下表所示。
參數描述
State導入任務當前所處的階段。

任務提交之后狀態為PENDING,提交Spark ETL之后狀態變為ETL,ETL完成之后FE調度BE執行push操作,狀態變為LOADING,push完成并且版本生效后狀態變為FINISHED。

導入任務的最終階段有CANCELLED和FINISHED兩個,當Load Job處于這兩個階段時導入完成。其中CANCELLED為導入失敗,FINISHED為導入成功。

Progress導入任務的進度描述。包括ETL和LOAD兩種進度,對應了導入流程的ETL和LOADING兩個階段。
LOAD的進度范圍為0~100%。
LOAD進度 = 當前已完成所有replica導入的tablet個數 / 本次導入任務的總tablet個數* 100%
說明
  • 如果所有導入表均完成導入,此時LOAD的進度為99%,導入進入到最后生效階段,整個導入完成后,LOAD的進度才會變為100%。
  • 因為導入進度并不是線性的,所以如果一段時間內進度沒有變化,并不代表導入沒有在執行。
Type導入任務的類型。Spark Load為SPARK。
CreateTime導入任務的創建時間。
EtlStartTimeETL階段開始的時間。
EtlFinishTimeETL階段完成的時間。
LoadStartTimeLOADING階段開始的時間。
LoadFinishTime整個導入任務完成的時間。
JobDetails顯示作業的詳細運行狀態,包括導入文件的個數、總大小(字節)、子任務個數、已處理的原始行數等。示例如下。
 {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}
URL可以復制輸入到瀏覽器,跳轉至相應Application的Web頁面。

其余返回結果集中參數含義可以參見Broker Load,詳情請參見Broker Load

查看Spark Launcher提交日志

Spark任務提交過程中產生的詳細日志,日志默認保存在FE根目錄下log/spark_launcher_log路徑下,并以spark-launcher-{load-job-id}-{label}.log格式命名,日志會在此目錄下保存一段時間,當FE元數據中的導入信息被清理時,相應的日志也會被清理,默認保存時間為3天。

取消導入任務

當Spark Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消導入任務的Label。取消導入命令語法可以執行HELP CANCEL LOAD命令查看。

相關系統配置

以下配置屬于Spark Load的系統級別配置,也就是作用于所有Spark Load導入任務的配置,主要通過修改fe.conf來調整配置值。
參數描述
enable-spark-load開啟Spark Load和創建Resource功能。

默認值為false,表示關閉此功能。

spark-load-default-timeout-second任務默認超時時間。

默認值為259200秒(3天)。

spark-home-default-dirSpark客戶端路徑。

默認值為fe/lib/spark2x

spark-launcher-log-dir打包好的Spark依賴文件路徑。

默認值為空。

spark-launcher-log-dirSpark客戶端的提交日志存放的目錄。

默認值為fe/log/spark-launcher-log

yarn-client-pathYARN二進制可執行文件路徑。

默認值為fe/lib/yarn-client/hadoop/bin/yarn

yarn-config-dirYARN配置文件生成路徑。

默認值為fe/lib/yarn-config

最佳實踐

使用Spark Load最適合的場景是原始數據在文件系統(HDFS)中,數據量在幾十GB到TB級別。小數據量還是建議使用Stream Load或者Broker Load。

完整Spark Load導入示例,請參見03_sparkLoad2StarRocks.md