在Broker Load模式下,通過部署的Broker程序,StarRocks可讀取對應數據源(例如,Apache HDFS,阿里云OSS)上的數據,利用自身的計算資源對數據進行預處理和導入。本文為您介紹Broker Load導入的使用示例以及常見問題。
背景信息
Broker Load是一種異步的導入方式。您需要通過MySQL協議創建導入,并通過查看導入命令檢查導入結果。StarRocks支持從外部存儲系統導入數據,支持CSV、ORCFile和Parquet等文件格式,建議單次導入數據量在幾十GB到上百GB級別。
Broker Load導入
查看Broker實例
阿里云EMR StarRocks集群在創建時已經自動搭建并啟動Broker服務,Broker服務位于每個Core節點上。使用以下SQL命令可以查看Broker實例。
SHOW PROC "/brokers"\G
返回信息如下所示。
*************************** 1. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 2. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 3. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 4. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
4 rows in set (0.00 sec)
創建導入任務
語法
StarRocks版本小于2.5.8
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_name broker_properties [PROPERTIES (key1=value1, ... )]
StarRocks版本大于等于2.5.8
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_properties [PROPERTIES (key1=value1, ... )]
參數描述
執行
HELP BROKER LOAD
命令,可以查看創建導入作業的詳細語法。Label
導入任務的標識。每個導入任務都有一個唯一的Label。Label是您在導入命令中自定義的或系統自動生成的名稱。通過該Label,您可以查看對應導入任務的執行情況,并且Label可以用來防止導入相同的數據。當導入任務狀態為FINISHED時,對應的Label就不能再次使用了。當Label對應的導入任務狀態為CANCELLED時,可以再次使用該Label提交導入作業。
數據描述類data_desc
數據描述類參數,主要指的是語句中data_desc部分的參數。每組data_desc表述了本次導入涉及到的數據源地址、ETL函數,目標表及分區等信息。
Broker Load支持一次導入任務涉及多張表,每個Broker Load導入任務可通過多個data_desc聲明多張表來實現多表導入。每個單獨的data_desc可以指定屬于該表的數據源地址,可以用多個file_path來指定導入同一個表的多個文件。Broker Load保證了單次導入的多張表之間原子性成功或失敗。data_desc常見參數如下所示。
data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [COLUMNS FROM PATH AS (colx, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate]
相關參數描述如下表所示。
參數
描述
file_path
文件路徑可以指定到文件,也可以用星號(*)通配符指定某個目錄下的所有文件。中間的目錄也可以使用通配符匹配。
可以使用的通配符有? * [] {} ^,使用規則請參見FileSystem。
例如, 通過hdfs://hdfs_host:hdfs_port/user/data/tablename// , 可以匹配tablename下所有分區內的所有文件。通過 hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/ , 可以匹配tablename下4月分區的所有文件。
negative
設置數據取反導入。
該功能適用的場景是當數據表中聚合列的類型均為SUM類型時,如果希望撤銷某一批導入的數據,可以通過negative參數導入同一批數據,StarRocks會自動為這批數據在聚合列上數據取反,以達到消除同一批數據的功能。
partition
指定待導入表的Partition信息。
如果待導入數據不屬于指定的Partition,則不會被導入。同時,不指定Partition中的數據會被認為是“錯誤數據”。對于不想導入,也不想記錄為“錯誤數據”的數據,可以使用where predicate來過濾。
column_separator
COLUMNS TERMINATED BY column_separator ,用于指定導入文件中的列分隔符,默認為\t。
如果是不可見字符,則需要加\x作為前綴,使用十六進制來表示分隔符。例如,Hive文件的分隔符為\x01,則列分隔符為\\x01。
file_type
FORMAT AS file_type,用于指定導入文件的類型。例如,parquet、orc、csv,默認值為csv。
parquet類型也可以通過文件后綴名.parquet或者.parq判斷。
COLUMNS FROM PATH AS
提取文件路徑中的分區字段。
例如,導入文件為/path/col_name=col_value/dt=20210101/file1,其中col_name/dt為表中的列,則將col_value、20210101分別導入到col_name和dt對應的列的代碼示例如下。
(col1, col2) COLUMNS FROM PATH AS (col_name, dt)
set column mapping
SET (k1=f1(xx), k2=f2(xx)),data_desc中的SET語句負責設置列函數變換。
如果原始數據的列和表中的列不一一對應,則需要使用該屬性。
where predicate
WHERE predicate,data_desc中的WHERE語句負責過濾已經完成transform的數據。
被過濾的數據不會進入容忍率的統計中。如果多個data_desc中聲明了關于同一張表的多個條件,則會以AND語義合并這些條件。
導入作業參數
導入作業參數是指Broker Load創建導入語句中屬于broker_properties部分的參數。導入作業參數是作用于整個導入作業的。
broker_properties: (key2=value2, ...)
部分參數描述如下表所示。
參數
描述
timeout
導入作業的超時時間(以秒為單位)。
您可以在opt_properties中自行設置每個導入的超時時間。導入任務在設定的時限內未完成則會被系統取消,變為CANCELLED。Broker Load的默認導入超時時間為4小時。
重要通常情況下,不需要您手動設置導入任務的超時時間。當在默認超時時間內無法完成導入時,可以手動設置任務的超時時間。
推薦超時時間的計算方式為:
超時時間 >((總文件大小 (MB)* 待導入的表及相關Roll up表的個數) / (30 * 導入并發數))
公式中的30為目前BE導入的平均速度,表示30 MB/s。例如,如果待導入數據文件為1 GB,待導入表包含2個Rollup表,當前的導入并發數為3,則timeout的最小值為 (1 * 1024 * 3 ) / (10 * 3) = 102 秒。
由于每個StarRocks集群的機器環境不同且集群并發的查詢任務也不同,所以StarRocks集群的最慢導入速度需要您根據歷史的導入任務速度進行推測。
max_filter_ratio
導入任務的最大容忍率,默認為0容忍,取值范圍是0~1。當導入的錯誤率超過該值,則導入失敗。如果您希望忽略錯誤的行,可以設置該參數值大于0,來保證導入可以成功。
計算公式為:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )
其中,
dpp.abnorm.ALL
表示數據質量不合格的行數,例如類型不匹配、列數不匹配和長度不匹配等。dpp.abnorm.ALL
指的是導入過程中正確數據的條數,可以通過SHOW LOAD
命令查詢導入任務的正確數據量。原始文件的行數 = dpp.abnorm.ALL + dpp.norm.ALL
load_mem_limit
導入內存限制。默認值為0,表示不限制。
strict_mode
Broker Load導入可以開啟Strict Mode模式。開啟方式為
properties ("strict_mode" = "true")
。默認關閉。
Strict Mode模式是對于導入過程中的列類型轉換進行嚴格過濾。嚴格過濾的策略為,對于列類型轉換,如果Strict Mode為true,則錯誤的數據將被過濾掉。錯誤數據是指原始數據并不為空值,在參與列類型轉換后結果為空值的數據。但以下場景除外:
對于導入的某列由函數變換生成時,Strict Mode對其不產生影響。
對于導入的某列類型包含范圍限制的,如果原始數據能正常通過類型轉換,但無法通過范圍限制的,Strict Mode對其也不產生影響。例如,如果類型是decimal(1,0),原始數據為10,則屬于可以通過類型轉換但不在列聲明的范圍內,Strict Mode對其不產生影響。
創建阿里云OSS導入任務示例
重要在阿里云EMR StarRocks上使用broker作為Broker名稱即可。
如果您的StarRocks版本小于2.5.8,則可以按照以下代碼創建導入示例;如果您的StarRocks版本大于等于2.5.8,則不添加
WITH BROKER broker
部分內容。
StarRocks版本小于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com" );
StarRocks版本大于等于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )
查看導入任務狀態
Broker Load導入是異步的,您可以在SHOW LOAD
命令中指定Label來查詢對應導入作業的執行狀態。具體語法可執行HELP SHOW LOAD
命令查看。
SHOW LOAD
命令只能查看異步導入方式的LOAD任務。同步方式的LOAD任務,例如Stream Load任務,目前無法使用SHOW LOAD
命令查看。
查看導入任務狀態示例如下。
show load where label = 'label1'\G
*************************** 1. row ***************************
JobId: 7****
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
返回參數的描述如下表所示。
參數 | 描述 |
JobId | 導入任務的唯一ID,每個導入任務的JobId都不同,由系統自動生成。與Label不同的是,JobId永遠不會相同,而Label則可以在導入任務失敗后被復用。 |
Label | 導入任務的標識。 |
State | 導入任務當前所處的階段。
|
Progress | 導入任務的進度描述。分為ETL和LOAD兩種進度,分別對應導入流程的ETL和LOADING兩個階段。目前Broker Load只有LOADING階段,所以ETL固定顯示為N/A,而LOAD的進度范圍為0~100%。 LOAD的進度的計算公式為 如果所有導入表均完成導入,此時LOAD的進度為99%, 導入進入到最后生效階段,待整個導入任務完成后,LOAD的進度才會改為100%。 重要 導入進度并不是線性的,所以如果一段時間內進度沒有變化,并不代表導入任務沒有執行。 |
Type | 導入任務的類型。Broker Load的Type取值是BROKER。 |
EtlInfo | 主要顯示導入的數據量指標unselected.rows,dpp.norm.ALL和dpp.abnorm.ALL。 您可以根據unselected.rows的參數值判斷where條件過濾了多少行,根據dpp.norm.ALL和dpp.abnorm.ALL兩個指標可以驗證當前導入任務的錯誤率是否超過max-filter-ratio。三個指標之和就是原始數據量的總行數。 |
TaskInfo | 主要顯示當前導入任務參數,即創建Broker Load導入任務時您指定的參數,包括cluster,timeout和max-filter-ratio。 |
ErrorMsg | 如果導入任務狀態為CANCELLED,則顯示失敗的原因,包括type和msg兩部分。如果導入任務成功則顯示N/A。type的取值意義如下:
|
CreateTime | 分別表示導入創建的時間、ETL階段開始的時間、ETL階段完成的時間、LOADING階段開始的時間和整個導入任務完成的時間。
|
EtlStartTime | |
EtlFinishTime | |
LoadStartTime | |
LoadFinishTime | |
URL | 導入任務的錯誤數據樣例,訪問URL地址即可獲取本次導入的錯誤數據樣例。當本次導入不存在錯誤數據時,URL字段為N/A。 |
JobDetails | 顯示作業的詳細運行狀態。包括導入文件的個數、總大小(字節)、子任務個數、已處理的原始行數,運行子任務的BE節點ID,以及未完成的BE節點ID。
其中已處理的原始行數,每5秒更新一次。該行數僅用于展示當前的進度,不代表最終實際的處理行數。實際處理行數以EtlInfo中顯示的數據為準。 |
取消導入任務
當Broker Load作業狀態不為CANCELLED或FINISHED時,可以手動取消。取消時需要指定待取消導入任務的Label 。可執行HELP CANCEL LOAD
命令查看取消導入命令的語法。
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];
HDFS導入
HDFS導入語法示例
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1, tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH BROKER 'broker1' ( "username" = "hdfs_username", "password" = "hdfs_password" ) PROPERTIES ( "timeout" = "3600" );
HDFS認證
社區版本的HDFS支持簡單認證和Kerberos認證兩種認證方式。
簡單認證(Simple):用戶的身份由與HDFS建立鏈接的客戶端操作系統決定。
涉及參數如下表。
參數
描述
hadoop.security.authentication
認證方式。默認值為simple。
username
HDFS的用戶名。
password
HDFS的密碼。
Kerberos認證:客戶端的身份由用戶自己的Kerberos證書決定。
涉及參數如下表。
參數
描述
hadoop.security.authentication
認證方式。默認值為kerberos。
kerberos_principal
指定Kerberos的Principal。
kerberos_keytab
指定Kerberos的keytab文件路徑。該文件必須為Broker進程所在服務器上的文件。
kerberos_keytab_content
指定Kerberos中keytab文件內容經過Base64編碼之后的內容。
重要該參數和kerberos_keytab參數只需配置一個。
HDFS HA配置
通過配置NameNode HA,可以在NameNode切換時,自動識別到新的NameNode。配置以下參數用于訪問以HA模式部署的HDFS集群。
參數
描述
dfs.nameservices
指定HDFS服務的名稱,您可以自定義。
例如,設置dfs.nameservices為my_ha。
dfs.ha.namenodes.xxx
自定義NameNode的名稱,多個名稱時以逗號(,)分隔。其中xxx為dfs.nameservices中自定義的名稱。
例如,設置dfs.ha.namenodes.my_ha為my_nn。
dfs.namenode.rpc-address.xxx.nn
指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名稱。
例如,設置dfs.namenode.rpc-address.my_ha.my_nn參數值的格式為host:port。
dfs.client.failover.proxy.provider
指定Client連接NameNode的Provider,默認值為org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。
示例如下。
( "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
HA模式可以與簡單認證、Kerberos認證兩種認證方式組合,進行集群訪問。例如,通過簡單認證方式訪問HA HDFS。
( "username"="user", "password"="passwd", "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
HDFS集群的配置可以寫入hdfs-site.xml文件中,您使用Broker進程讀取HDFS集群的信息時,只需要填寫集群的文件路徑名和認證信息即可。
導入示例
創建測試表,下面是tpch的lineitem。
CREATE TABLE lineitem ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber int, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate date, l_commitdate date, l_receiptdate date, l_shipinstruct string, l_shipmode string, l_comment string ) ENGINE=OLAP DUPLICATE KEY(l_orderkey) DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES( "replication_num" = "1" );
創建導入任務。
重要如果您的StarRocks版本小于2.5.8,則可以按照以下代碼創建導入示例;如果您的StarRocks版本大于等于2.5.8,則不添加
WITH BROKER broker
部分內容。StarRocks版本小于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx" );
StarRocks版本大于等于2.5.8
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )
查看導入任務狀態。
show load where label = 'lineitem'\G; *************************** 1. row *************************** JobId: 1**** Label: lineitem State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215 TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0 ErrorMsg: NULL CreateTime: 2022-04-13 15:07:53 EtlStartTime: 2022-04-13 15:07:56 EtlFinishTime: 2022-04-13 15:07:56 LoadStartTime: 2022-04-13 15:07:56 LoadFinishTime: 2022-04-13 15:08:06 URL: NULL JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072} 2 rows in set (0.00 sec)
導入成功后進行查詢操作。
查詢表lineitem中的行數。
select count(*) from lineitem;
返回信息如下所示。
+----------+ | count(*) | +----------+ | 6001215 | +----------+ 1 row in set (0.03 sec)
查詢表lineitem中的前2行信息。
select * from lineitem limit 2;
返回信息如下所示。
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | 69 | 115209 | 7721 | 1 | 48 | 58761.6 | 0.01 | 0.07 | A | F | 1994-08-17 | 1994-08-11 | 1994-09-08 | NONE | TRUCK | regular epitaphs. carefully even ideas hag | | 69 | 104180 | 9201 | 2 | 32 | 37893.76 | 0.08 | 0.06 | A | F | 1994-08-24 | 1994-08-17 | 1994-08-31 | NONE | REG AIR | s sleep carefully bold, | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ 2 rows in set (0.01 sec)
導入任務并發度
一個作業可以拆成一個或者多個任務,任務之間并行執行。拆分由LOAD語句中的DataDescription來決定。例如:
多個DataDescription對應導入多個不同的表,每個會拆成一個任務。
多個DataDescription對應導入同一個表的不同分區,每個也會拆成一個任務。
每個任務還會拆分成一個或者多個實例,然后將這些實例平均分配到BE上并行執行。實例的拆分由以下FE配置決定:
min_bytes_per_broker_scanner:單個實例處理的最小數據量,默認值為64 MB。
max_broker_concurrency:單個任務最大并發實例數,默認值為100。
load_parallel_instance_num:單個BE上并發實例數,默認值為1個。
實例總數的計算公式為實例的總數 = min(導入文件總大小/單個實例處理的最小數據量,單個任務最大并發實例數,單個BE上并發實例數 * BE數)
。
通常情況下,一個作業只有一個DataDescription,只會拆分成一個任務。任務會拆成與BE數相等的實例,然后分配到所有BE上并行執行。