Broker Load是一個異步的導入方式,支持的數據源取決于Broker進程支持的數據源。本文為您介紹Broker Load導入的基本原理、基本操作、系統配置以及最佳實踐。
背景信息
因為Doris表里的數據是有序的,所以Broker Load在導入數據時需要利用Doris集群資源對數據進行排序,相對于Spark Load來完成海量歷史數據遷移,Broker Load對Doris集群資源占用較大。Broker Load方式是在沒有Spark計算資源的情況下使用,如果有Spark計算資源建議使用Spark Load。
適用場景
源數據在Broker可以訪問的存儲系統中,例如HDFS。
數據量在幾十到百GB級別。
基本原理
提交導入任務后,FE會生成對應的Plan并根據目前BE的個數和文件的大小,將Plan分給多個BE執行,每個BE執行一部分導入數據。BE在執行的過程中會從Broker拉取數據,在對數據transform之后將數據導入系統。所有BE均完成導入,由FE最終決定導入是否成功。
+
| 1. user create broker load
v
+----+----+
| |
| FE |
| |
+----+----+
|
| 2. BE etl and load the data
+--------------------------+
| | |
+---v---+ +--v----+ +---v---+
| | | | | |
| BE | | BE | | BE |
| | | | | |
+---+-^-+ +---+-^-+ +--+-^--+
| | | | | |
| | | | | | 3. pull data from broker
+---v-+-+ +---v-+-+ +--v-+--+
| | | | | |
|Broker | |Broker | |Broker |
| | | | | |
+---+-^-+ +---+-^-+ +---+-^-+
| | | | | |
+---v-+-----------v-+----------v-+-+
| HDFS/BOS/AFS cluster |
| |
+----------------------------------+
開始導入
Hive分區表的數據導入
創建Hive表。
##數據格式是:默認,分區字段是:day CREATE TABLE `ods_demo_detail`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` double, `actual_price` double ) PARTITIONED BY (day string) row format delimited fields terminated by ',' lines terminated by '\n'
使用Hive的Load命令將您的數據導入到Hive表中。
load data local inpath '/opt/custorm' into table ods_demo_detail;
創建Doris表。
CREATE TABLE `doris_ods_test_detail` ( `rq` date NULL, `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL ) ENGINE=OLAP UNIQUE KEY(`rq`, `id`, `store_id`) PARTITION BY RANGE(`rq`) ( PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01'))) DISTRIBUTED BY HASH(`store_id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "MONTH", "dynamic_partition.start" = "-2147483648", "dynamic_partition.end" = "2", "dynamic_partition.prefix" = "P_", "dynamic_partition.buckets" = "1", "in_memory" = "false", "storage_format" = "V2" );
開始導入數據。
LOAD LABEL broker_load_2022_03_23 ( DATA INFILE("hdfs://192.168.**.**:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*") INTO TABLE doris_ods_test_detail COLUMNS TERMINATED BY "," (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) COLUMNS FROM PATH AS (`day`) SET (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price) ) WITH BROKER "broker_name_1" ( "username" = "hdfs", "password" = "" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
Hive分區表導入(ORC格式)
創建ORC格式的Hive分區表。
#數據格式:ORC 分區:day CREATE TABLE `ods_demo_orc_detail`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` double, `actual_price` double ) PARTITIONED BY (day string) row format delimited fields terminated by ',' lines terminated by '\n' STORED AS ORC
創建Doris表。
CREATE TABLE `doris_ods_test_detail` ( `rq` date NULL, `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL ) ENGINE=OLAP UNIQUE KEY(`rq`, `id`, `store_id`) PARTITION BY RANGE(`rq`) ( PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01'))) DISTRIBUTED BY HASH(`store_id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "MONTH", "dynamic_partition.start" = "-2147483648", "dynamic_partition.end" = "2", "dynamic_partition.prefix" = "P_", "dynamic_partition.buckets" = "1", "in_memory" = "false", "storage_format" = "V2" );
使用Broker Load導入數據。
LOAD LABEL dish_2022_03_23 ( DATA INFILE("hdfs://10.220.**.**:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*") INTO TABLE doris_ods_test_detail COLUMNS TERMINATED BY "," FORMAT AS "orc" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) COLUMNS FROM PATH AS (`day`) SET (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price) ) WITH BROKER "broker_name_1" ( "username" = "hdfs", "password" = "" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
其中,涉及參數:
FORMAT AS "orc"
: 指定了要導入的數據格式。SET
: 定義了Hive表和Doris表之間的字段映射關系及字段轉換的一些操作。
HDFS文件系統數據導入
以上面創建好的Doris表為例,通過Broker Load從HDFS上導入數據的語句如下所示。
LOAD LABEL demo.label_20220402
(
DATA INFILE("hdfs://10.220.**.**:8020/tmp/test_hdfs.txt")
INTO TABLE `ods_dish_detail_test`
COLUMNS TERMINATED BY "\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
)
with HDFS (
"fs.defaultFS"="hdfs://10.220.**.**:8020",
"hadoop.username"="root"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
查看導入狀態
您可以通過下面的命令查看上面導入任務的狀態信息。
show load order by createtime desc limit 1\G;
返回信息如下所示。
*************************** 1. row ***************************
JobId: 4132****
Label: broker_load_2022_03_23
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2022-04-01 18:59:06
EtlStartTime: 2022-04-01 18:59:11
EtlFinishTime: 2022-04-01 18:59:11
LoadStartTime: 2022-04-01 18:59:11
LoadFinishTime: 2022-04-01 18:59:11
URL: NULL
JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029****":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029****":[36728051]},"FileNumber":1,"FileSize":5540}
1 row in set (0.01 sec)
取消導入
當Broker Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消導入任務的Label 。
例如:撤銷數據庫demo上Label為broker_load_2022_03_23的導入作業。
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
相關系統配置
Broker參數
Broker Load需要借助Broker 程訪問遠端存儲,不同的Broker需要提供不同的參數 。
FE配置
以下配置屬于Broker Load的系統級別配置,即會作用于所有Broker Load導入任務的配置。主要通過修改fe.conf來調整配置值。
min_bytes_per_broker_scanner:限制了單個BE處理的數據量的最小值。
max_bytes_per_broker_scanner:限制了單個BE處理的數據量的最大值。
max_broker_concurrency:限制了一個作業的最大的導入并發數。
最小處理的數據量、最大并發數、源文件大小和當前集群BE的個數共同決定了本次導入的并發數。
本次導入并發數 = Math.min(源文件大小/最小處理量,最大并發數,當前BE節點個數)
本次導入單個BE的處理量 = 源文件大小/本次導入的并發數
通常一個導入作業支持的最大數據量為max_bytes_per_broker_scanner * BE節點數
。如果需要導入更大數據量,則需要適當調整max_bytes_per_broker_scanner參數的大小。
默認參數值如下:
min_bytes_per_broker_scanner:默認64 MB,單位bytes。
max_bytes_per_broker_scanner:默認3 GB,單位bytes。
max_broker_concurrency:默認10。
最佳實踐
應用場景
使用Broker Load最適合的場景就是原始數據在文件系統(HDFS、BOS、AFS)中的場景。其次,由于Broker Load是單次導入中唯一的一種異步導入的方式,所以您想使用異步方式導入大文件時,可以考慮使用Broker Load。
數據量
以下內容是針對單個BE的情況,如果您集群有多個BE,則下面標題中的數據量應該乘以BE個數來計算。例如,您的集群有3個BE,則3 GB以下(包含)則應該乘以3,也就是9 GB以下(包含)。
3 GB以下(包含):您可以直接提交Broker Load創建導入請求。
3 GB以上:由于單個導入BE最大的處理量為3 GB,超過3 GB的待導入文件就需要通過調整Broker Load的導入參數來實現大文件的導入。
根據當前BE的個數和原始文件的大小修改單個BE的最大掃描量和最大并發數。
修改fe.conf中配置 max_broker_concurrency = BE個數 當前導入任務單個BE處理的數據量 = 原始文件大小 / max_broker_concurrency max_bytes_per_broker_scanner >= 當前導入任務單個BE處理的數據量 例如,一個100 GB的文件,集群的BE個數為10個 max_broker_concurrency = 10 max_bytes_per_broker_scanner >= 10G = 100G / 10
修改后,所有的BE會并發的處理導入任務,每個BE處理原始文件的一部分。
說明上述兩個FE中的配置均為系統配置,其修改是作用于所有的Broker Load任務。
創建導入時自定義當前導入任務的timeout時間。
當前導入任務單個BE處理的數據量 / 用戶Doris集群最慢導入速度(MB/s) >= 當前導入任務的timeout時間 >= 當前導入任務單個BE處理的數據量 / 10M/s 例如,一個100 GB的文件,集群的BE個數為10個,則timeout時間如下所示。 timeout >= 1000s = 10G / 10M/s
當您發現第二步計算出的timeout時間超過系統默認的導入最大超時時間(4小時),此時不推薦您將導入最大超時時間直接改大來解決問題。單個導入時間如果超過默認的導入最大超時時間(4小時),最好通過切分待導入文件并且分多次導入來解決問題。主要原因是單次導入超過4小時的話,導入失敗后重試的時間成本很高。您可以通過如下公式計算出Doris集群期望最大導入文件數據量。
期望最大導入文件數據量 = 14400s * 10M/s * BE個數 例如,集群的BE個數為10個 期望最大導入文件數據量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
重要通常您的環境可能達不到10M/s的速度,所以建議超過500 GB的文件都進行文件切分,然后再導入。
作業調度
系統會限制一個集群內,正在運行的Broker Load作業數量,以防止同時運行過多的Load作業。
desired_max_waiting_jobs:FE的配置參數,會限制一個集群內未開始或正在運行(作業狀態為PENDING或LOADING)的Broker Load作業數量。默認為100。如果超過該閾值,新提交的作業將會被直接拒絕。
一個Broker Load作業會被分為pending task和loading task階段。其中pending task負責獲取導入文件的信息,而loading task會發送給BE執行具體的導入任務。
async_pending_load_task_pool_size:FE的配置參數,用于限制同時運行的pending task的任務數量。也相當于控制了實際正在運行的導入任務數量。該參數默認為10。例如,您提交了100個Load作業,同時只會有10個作業會進入LOADING狀態(開始執行),而其他作業處于PENDING狀態(等待)。
async_loading_load_task_pool_size:FE的配置參數,用于限制同時運行的loading task的任務數量。一個Broker Load作業會有1個pending task和多個loading task (等于LOAD語句中DATA INFILE子句的個數),所以async_loading_load_task_pool_size應該大于等于async_pending_load_task_pool_size。