Spark Load通過外部的Spark資源實現對導入數據的預處理,提高StarRocks大數據量的導入性能并且節省StarRocks集群的計算資源。Spark Load主要用于初次遷移、大數據量導入StarRocks的場景(數據量可到TB級別)。本文為您介紹Spark Load導入的基本概念、基本原理、使用示例、最佳實踐以及常見問題。
背景信息
SHOW LOAD
命令查看導入結果。基本概念
- Spark ETL:在導入流程中主要負責數據的ETL工作,包括全局字典構建(BITMAP類型)、分區、排序和聚合等。
- Broker:是一個獨立的無狀態進程。封裝了文件系統接口,提供StarRocks讀取遠端存儲系統中文件的能力。
- 全局字典:保存了數據從原始值到編碼值映射的數據結構,原始值可以是任意數據類型,而編碼后的值為整型。全局字典主要應用于精確去重預計算的場景。
基本原理
用戶通過MySQL客戶端提交Spark類型導入任務,FE記錄元數據并返回用戶提交成功。
- 向FE提交Spark Load任務。
- FE調度提交ETL任務到Spark集群執行。
- Spark集群執行ETL完成對導入數據的預處理,包括全局字典構建(BITMAP類型)、分區、排序和聚合等。
- ETL任務完成后,FE獲取預處理過的每個分片的數據路徑,并調度相關的BE執行Push任務。
- BE通過Broker讀取數據,轉化為StarRocks存儲格式。
- FE調度生效版本,完成導入任務。
全局字典
適用場景
目前StarRocks中BITMAP列是使用類庫Roaringbitmap實現的,而Roaringbitmap的輸入數據類型只能是整型,因此如果要在導入流程中實現對于BITMAP列的預計算,則需要將輸入數據的類型轉換成整型。在StarRocks現有的導入流程中,全局字典的數據結構是基于Hive表實現的,保存了原始值到編碼值的映射。
構建流程
- 讀取上游數據源的數據,生成一張Hive臨時表,記為hive-table。
- 從hive-table中抽取待去重字段的去重值,生成一張新的Hive表,記為distinct-value-table。
- 新建一張全局字典表,記為dict-table。一列為原始值,一列為編碼后的值。
- 將distinct-value-table與dict-table進行LEFT JOIN,計算出新增的去重值集合,然后對這個集合使用窗口函數進行編碼,此時去重列原始值就多了一列編碼后的值,最后將這兩列的數據寫回dict-table。
- 將dict-table與hive-table進行JOIN,完成hive-table中原始值替換成整型編碼值的工作。
- hive-table會被下一步數據預處理的流程所讀取,經過計算后導入到StarRocks中。
數據預處理
- 從數據源讀取數據,上游數據源可以是HDFS文件,也可以是Hive表。
- 對讀取到的數據完成字段映射、表達式計算,并根據分區信息生成分桶字段bucket-id。
- 根據StarRocks表的Rollup元數據生成RollupTree。
- 遍歷RollupTree,進行分層的聚合操作,下一個層級的Rollup可以由上一個層的Rollup計算得來。
- 每次完成聚合計算后,會根據bucket-id對數據進行分桶然后寫入HDFS中。
- 后續Broker會拉取HDFS中的文件然后導入StarRocks BE節點中。
基本操作
配置ETL集群
Spark作為一種外部計算資源在StarRocks中用來完成ETL工作,未來可能還有其他的外部資源會加入到StarRocks中使用。例如,Spark或GPU用于查詢,HDFS或S3用于外部存儲,MapReduce用于ETL等,因此引入Resource Management來管理StarRocks使用的這些外部資源。
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value
);
-- drop spark resource
DROP RESOURCE resource_name;
-- show resources
SHOW RESOURCES
SHOW PROC "/resources";
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
- 創建資源
resource-name為StarRocks中配置的Spark資源的名字。
PROPERTIES是Spark資源的相關參數,參數描述如下表所示,更多參數描述請參見Spark Configuration。參數 描述 type 資源類型。必填參數,目前僅支持Spark。 Spark相關參數 spark.master 必填參數,目前支持yarn。 spark.submit.deployMode Spark程序的部署模式。必填參數,支持cluster和client兩種。 spark.hadoop.fs.defaultFS Master為YARN時必填。 spark.hadoop.yarn.resourcemanager.address 單點Resource Manager地址。 spark.hadoop.yarn.resourcemanager.ha.enabled Resource Manager啟用HA。默認值為true。 spark.hadoop.yarn.resourcemanager.ha.rm-ids Resource Manager邏輯ID列表。 spark.hadoop.yarn.resourcemanager.hostname.rm-id 對于每個rm-id,指定Resource Manager對應的主機名。 說明 HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-id或spark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。spark.hadoop.yarn.resourcemanager.address.rm-id 對于每個rm-id,指定host:port以供客戶端提交作業。 說明 HA Resource Manager只需配置spark.hadoop.yarn.resourcemanager.hostname.rm-id或spark.hadoop.yarn.resourcemanager.address.rm-id中的任意一個。working_dir ETL使用的目錄。 說明 Spark作為ETL資源使用時必填。例如,hdfs://host:port/tmp/starrocks。broker Broker名字。 說明 Spark作為ETL資源使用時必填。需要使用ALTER SYSTEM ADD BROKER
命令提前完成配置。broker.property_key Broker讀取ETL生成中間文件時需要指定的認證信息等。 創建資源示例如下所示。-- yarn cluster模式 CREATE EXTERNAL RESOURCE "spark0" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.jars" = "xxx.jar,yyy.jar", "spark.files" = "/tmp/aaa,/tmp/bbb", "spark.executor.memory" = "1g", "spark.yarn.queue" = "queue0", "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032", "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000", "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks", "broker" = "broker0", "broker.username" = "user0", "broker.password" = "password0" ); -- yarn HA cluster模式 CREATE EXTERNAL RESOURCE "spark1" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true", "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2", "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1", "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2", "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000", "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks", "broker" = "broker1" ); -- HDFS HA cluster模式 CREATE EXTERNAL RESOURCE "spark2" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032", "spark.hadoop.fs.defaultFS" = "hdfs://myha", "spark.hadoop.dfs.nameservices" = "myha", "spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2", "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port", "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port", "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", "working_dir" = "hdfs://myha/tmp/starrocks", "broker" = "broker2", "broker.dfs.nameservices" = "myha", "broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2", "broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port", "broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port", "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );
- 查看資源
普通賬戶只能看到自己有USAGE-PRIV使用權限的資源。root和admin賬戶可以看到所有的資源。
- 資源權限資源權限通過GRANT REVOKE來管理,目前僅支持USAGE-PRIV使用權限。您可以將USAGE-PRIV權限賦予某個用戶或者某個角色,角色的使用與之前一致。示例如下。
-- 授予spark0資源的使用權限給用戶user0 GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%"; -- 授予spark0資源的使用權限給角色role0 GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0"; -- 授予所有資源的使用權限給用戶user0 GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%"; -- 授予所有資源的使用權限給角色role0 GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0"; -- 撤銷用戶user0的spark0資源使用權限 REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";
配置Spark客戶端
- 配置SPARK-HOME環境變量
將Spark客戶端放在FE同一臺機器上的目錄下,并在FE的配置文件中配置spark_home_default_dir指向此目錄,此配置項的值默認為FE根目錄下的lib/spark2x路徑,此配置項不可為空。
- 配置Spark依賴包
將Spark客戶端下的jars文件夾內所有JAR包歸檔打包成一個ZIP文件,并在FE的配置文件中配置spark_resource_path指向此ZIP文件。如果此配置項為空,則FE會嘗試尋找FE根目錄下的lib/spark2x/jars/spark-2x.zip文件,如果沒有找到則會報文件不存在的錯誤。
當提交Spark Load任務時,會將歸檔好的依賴文件上傳至遠端倉庫,默認倉庫路徑掛在working_dir/{cluster_id}目錄下,并以--spark-repository--{resource-name}
命名,表示集群內的一個Resource對應一個遠端倉庫,遠端倉庫目錄結構參考如下。---spark-repository--spark0/ |---archive-1.0.0/ | |---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0-jar-with-dependencies.jar | |---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip |---archive-1.1.0/ | |---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0-jar-with-dependencies.jar | |---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip |---archive-1.2.0/ | |-...
除了Spark依賴(默認以spark-2x.zip命名),FE還會上傳DPP的依賴包至遠端倉庫。如果此次Spark Load提交的所有依賴文件都已存在遠端倉庫,則不需要再上傳依賴,節省下了每次重復上傳大量文件的時間。
配置YARN客戶端
- 配置YARN可執行文件路徑
將下載好的YARN客戶端放在FE同一臺機器的目錄下,并在FE配置文件中配置yarn_client_path參數,指向YARN的二進制可執行文件,默認為FE根目錄下的lib/yarn-client/hadoop/bin/yarn路徑。
- 配置生成YARN所需的配置文件的路徑(可選)
當FE通過YARN客戶端獲取Application的狀態,或者終止Application時,默認會在FE根目錄下的lib/yarn-config路徑下生成執行yarn命令所需的配置文件,此路徑可以通過在FE配置文件配置yarn_config_dir參數修改,目前生成的配置文件包括core-site.xml和yarn-site.xml。
創建導入任務
- 創建語法
LOAD LABEL load_label (data_desc, ...) WITH RESOURCE resource_name [resource_properties] [PROPERTIES (key1=value1, ... )] * load_label: db_name.label_name * data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY separator ] [(col1, ...)] [COLUMNS FROM PATH AS (col2, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] DATA FROM TABLE hive_external_tbl [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] * resource_properties: (key2=value2, ...)
創建導入的詳細語法可以執行HELP SPARK LOAD
命令查看幫助。Spark Load的創建導入語法中參數意義如下:- Label
導入任務的標識。每個導入任務,都有一個在單DataBase內部唯一的Label。具體規則與Broker Load一致。
- 數據描述類參數
目前支持的數據源有CSV和Hive table。其他規則與Broker Load一致。
- 導入作業參數
導入作業參數主要指的是Spark Load創建導入語句中的屬于opt_properties部分的參數。導入作業參數是作用于整個導入作業的。規則與Broker Load一致。
- Spark資源參數Spark資源需要提前配置到StarRocks系統中并且賦予用戶USAGE-PRIV權限后才能使用Spark Load。當您有臨時性的需求,例如增加任務使用的資源而修改Spark configs時,可以設置以下參數,設置僅對本次任務生效,并不影響StarRocks集群中已有的配置。
WITH RESOURCE 'spark0' ( "spark.driver.memory" = "1g", "spark.executor.memory" = "3g" )
- 數據源為Hive表時的導入
如果期望在導入流程中將Hive表作為數據源,則需要先新建一張類型為Hive的外部表,然后提交導入命令時指定外部表的表名即可。
- 導入流程構建全局字典適用于StarRocks表聚合列的數據類型為BITMAP類型。在Load命令中指定需要構建全局字典的字段即可,格式為
StarRocks字段名稱=bitmap_dict(hive表字段名稱)
。重要 目前只有在上游數據源為Hive表時才支持全局字典的構建。
- Label
- 示例:
- 上游數據源為HDFS文件時創建導入任務的情況
LOAD LABEL load_label (data_desc, ...) WITH RESOURCE resource_name [resource_properties] [PROPERTIES (key1=value1, ... )] * load_label: db_name.label_name * data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY separator ] [(col1, ...)] [COLUMNS FROM PATH AS (col2, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] DATA FROM TABLE hive_external_tbl [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] * resource_properties: (key2=value2, ...)
- 上游數據源是Hive表時創建導入任務的情況
- 新建Hive資源。
CREATE EXTERNAL RESOURCE hive0 properties ( "type" = "hive", "hive.metastore.uris" = "thrift://emr-header-1.cluster-xxx:9083" );
- 新建Hive外部表。
CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) ) ENGINE=hive properties ( "resource" = "hive0", "database" = "tmp", "table" = "t1" );
- 提交load命令,要求導入的StarRocks表中的列必須在Hive外部表中存在。
LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=bitmap_dict(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
- 新建Hive資源。
- 上游數據源為HDFS文件時創建導入任務的情況
查看導入任務
Spark Load和Broker Load都是異步導入方式。您必須將創建導入的Label記錄下來,并且在SHOW LOAD
命令中使用此Label來查看導入結果。查看導入的命令在所有導入方式中是通用的,具體語法可執行HELP SHOW LOAD
命令查看。
show load order by createtime desc limit 1\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:49:44
LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
參數 | 描述 |
---|---|
State | 導入任務當前所處的階段。 任務提交之后狀態為PENDING,提交Spark ETL之后狀態變為ETL,ETL完成之后FE調度BE執行push操作,狀態變為LOADING,push完成并且版本生效后狀態變為FINISHED。 導入任務的最終階段有CANCELLED和FINISHED兩個,當Load Job處于這兩個階段時導入完成。其中CANCELLED為導入失敗,FINISHED為導入成功。 |
Progress | 導入任務的進度描述。包括ETL和LOAD兩種進度,對應了導入流程的ETL和LOADING兩個階段。 LOAD的進度范圍為0~100%。
說明
|
Type | 導入任務的類型。Spark Load為SPARK。 |
CreateTime | 導入任務的創建時間。 |
EtlStartTime | ETL階段開始的時間。 |
EtlFinishTime | ETL階段完成的時間。 |
LoadStartTime | LOADING階段開始的時間。 |
LoadFinishTime | 整個導入任務完成的時間。 |
JobDetails | 顯示作業的詳細運行狀態,包括導入文件的個數、總大小(字節)、子任務個數、已處理的原始行數等。示例如下。
|
URL | 可以復制輸入到瀏覽器,跳轉至相應Application的Web頁面。 |
其余返回結果集中參數含義可以參見Broker Load,詳情請參見Broker Load。
查看Spark Launcher提交日志
Spark任務提交過程中產生的詳細日志,日志默認保存在FE根目錄下log/spark_launcher_log路徑下,并以spark-launcher-{load-job-id}-{label}.log格式命名,日志會在此目錄下保存一段時間,當FE元數據中的導入信息被清理時,相應的日志也會被清理,默認保存時間為3天。
取消導入任務
當Spark Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消導入任務的Label。取消導入命令語法可以執行HELP CANCEL LOAD
命令查看。
相關系統配置
參數 | 描述 |
---|---|
enable-spark-load | 開啟Spark Load和創建Resource功能。 默認值為false,表示關閉此功能。 |
spark-load-default-timeout-second | 任務默認超時時間。 默認值為259200秒(3天)。 |
spark-home-default-dir | Spark客戶端路徑。 默認值為fe/lib/spark2x。 |
spark-launcher-log-dir | 打包好的Spark依賴文件路徑。 默認值為空。 |
spark-launcher-log-dir | Spark客戶端的提交日志存放的目錄。 默認值為fe/log/spark-launcher-log。 |
yarn-client-path | YARN二進制可執行文件路徑。 默認值為fe/lib/yarn-client/hadoop/bin/yarn。 |
yarn-config-dir | YARN配置文件生成路徑。 默認值為fe/lib/yarn-config。 |
最佳實踐
使用Spark Load最適合的場景是原始數據在文件系統(HDFS)中,數據量在幾十GB到TB級別。小數據量還是建議使用Stream Load或者Broker Load。
完整Spark Load導入示例,請參見03_sparkLoad2StarRocks.md。