Stream Load是一種同步的導入方式,您可以通過HTTP協議發送請求將本地文件或數據流導入到Doris中。Stream Load同步執行導入并返回導入結果。您可以直接通過請求的返回體判斷本次導入是否成功。本文為您介紹Stream Load導入的基本原理、基本操作、系統配置以及最佳實踐。
適用場景
Stream Load主要適用于導入本地文件或通過程序導入數據流中的數據。
基本原理
下面為您展示了Stream Load的主要流程,省略了部分導入細節。
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
Stream Load中,Doris會選定一個節點作為Coordinator節點,該節點負責接收數據并分發數據到其他數據節點。您可以通過HTTP協議提交導入命令。如果提交到FE,則FE會通過HTTP redirect指令將請求轉發給某一個BE。您也可以直接提交導入命令給某一指定BE。導入的最終結果由Coordinator BE返回給您。
支持的數據格式
Stream Load支持CSV(文本)和JSON兩個數據格式。
基本操作
創建導入任務
Stream Load通過HTTP協議提交和傳輸數據。本示例通過curl
命令展示如何提交導入任務。您也可以通過其他HTTP Client進行操作。
curl
命令curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load # Header中支持屬性見下表。 # 格式為: -H "key1:value1"
創建導入任務的詳細語法可以通過
HELP STREAM LOAD
命令查看。Stream Load中所有與導入任務相關的參數均設置在Header中。相關參數描述如下表所示。參數
說明
簽名參數
user:passwd
Stream Load創建導入任務使用的是HTTP協議,已通過Basic access authentication進行簽名。Doris會根據簽名來驗證用戶身份和導入權限。
導入任務參數
label
導入任務的標識。
每個導入任務,都有一個在單database內部唯一的Label。Label是您在導入命令中自定義的名稱。通過該Label,您可以查看對應導入任務的執行情況。Label的另一個作用是防止您重復導入相同的數據。強烈推薦您同一批次數據使用相同Label,這樣同一批次數據的重復請求只會被接受一次,保證了At-Most-Once。當Label對應的導入作業狀態為CANCELLED時,該Label可以再次被使用。
column_separator
用于指定導入文件中的列分隔符,默認為\t。
如果是不可見字符,則需要加\x作為前綴,使用十六進制來表示分隔符。例如,Hive文件的分隔符\x01,需要指定為
-H "column_separator:\x01"
。可以使用多個字符的組合作為列分隔符。line_delimiter
用于指定導入文件中的換行符,默認為\n。可以使用多個字符的組合作為換行符。
max_filter_ratio
導入任務的最大容忍率,默認為0容忍,取值范圍是0~1。
當導入的錯誤率超過該值,則導入失敗。如果您希望忽略錯誤的行,可以通過設置該參數大于0來保證導入成功。計算公式為
(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio
,其中dpp.abnorm.ALL
表示數據質量不合格的行數,例如類型不匹配、列數不匹配、長度不匹配等;dpp.norm.ALL
表示導入過程中正確數據的條數,可以通過SHOW LOAD
命令查詢導入任務的正確數據量。原始文件的行數 = dpp.abnorm.ALL + dpp.norm.ALL
where
導入任務指定的過濾條件。
Stream Load支持對原始數據指定where語句進行過濾。被過濾的數據將不會被導入,也不會參與filter ratio的計算,但會被計入num_rows_unselected。
Partitions
待導入表的Partition信息,如果待導入數據不屬于指定的Partition,則不會被導入。未被導入的數據將計入 dpp.abnorm.ALL。
columns
待導入數據的函數變換配置,目前Stream Load支持的函數變換方法包含列的順序變化以及表達式變換,其中表達式變換的方法與查詢語句的一致。
exec_mem_limit
導入內存限制。默認為2 GB,單位為字節。
strict_mode
指定此次導入是否開啟strict mode模式,默認關閉。
Stream Load導入可以開啟strict mode模式,開啟方式為在HEADER中聲明
strict_mode=true
。strict mode模式的意思是對于導入過程中的列類型轉換進行嚴格過濾。嚴格過濾的策略如下:對于列類型轉換來說,如果strict mode為true,則錯誤數據將被filter。這里的錯誤數據是指原始數據并不為空值,在參與列類型轉換后結果為空值的這一類數據。
對于導入的某列由函數變換生成時,strict mode對其不產生影響。
對于導入的某列類型包含范圍限制的,如果原始數據能正常通過類型轉換,但無法通過范圍限制的,strict mode對其也不產生影響。例如,如果類型是decimal(1,0),原始數據為10,則屬于可以通過類型轉換但不在列聲明的范圍內,strict mode對其不產生影響。
merge_type
數據的合并類型,共支持APPEND、DELETE、MERGE三種類型。
APPEND(默認值):表示這批數據全部需要追加到現有數據中。
DELETE:表示刪除與這批數據key相同的所有行。
MERGE:需要與DELETE條件聯合使用,表示滿足DELETE條件的數據按照DELETE語義處理,其余的按照APPEND語義處理。
two_phase_commit
Stream Load導入可以開啟兩階段事務提交模式:在Stream load過程中,數據寫入完成即會返回信息,此時數據不可見,事務狀態為PRECOMMITTED,您手動觸發commit操作之后,數據才可見。默認的兩階段批量事務提交為關閉。
開啟方式是在be.conf中配置
disable_stream_load_2pc=false
,并且在HEADER中聲明two_phase_commit=true
。示例:
發起Stream Load預提交操作。
說明列順序變換例子:原始數據有三列src_c1、src_c2、rc_c3,目前Doris表也有三列dst_c1、dst_c2、dst_c3。
如果原始表的src_c1列對應目標表dst_c1列,原始表的src_c2列對應目標表dst_c2列,原始表的src_c3列對應目標表dst_c3列,則寫法為
columns: dst_c1, dst_c2, dst_c3
。如果原始表的src_c1列對應目標表dst_c2列,原始表的src_c2列對應目標表dst_c3列,原始表的src_c3列對應目標表dst_c1列,則寫法為
columns: dst_c2, dst_c3, dst_c1
。表達式變換例子:原始文件有兩列,目標表也有兩列(c1,c2),但是原始文件的兩列均需要經過函數變換才能對應目標表的兩列,則寫法如下為
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
,其中tmp_*
是一個占位符,代表的是原始文件中的兩個原始列。
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load { "TxnId": 18036, "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****", "TwoPhaseCommit": "true", "Status": "Success", "Message": "OK", "NumberTotalRows": 100, "NumberLoadedRows": 100, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 1031, "LoadTimeMs": 77, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 1, "ReadDataTimeMs": 0, "WriteDataTimeMs": 58, "CommitAndPublishTimeMs": 0 }
對事務觸發commit操作。
對事務觸發abort操作。
示例
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
返回結果
由于Stream Load是一種同步的導入方式,所以導入的結果會通過創建導入的返回值直接返回給用戶。示例如下。
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****" }
Stream load導入結果參數如下表。
參數
說明
TxnId
導入的事務ID。用戶可不感知。
Label
導入的Label。由用戶指定或系統自動生成。
Status
導入完成狀態。
Success:表示導入成功。
Publish Timeout:表示導入已經完成,只是數據可能會延遲可見,無需重試。
Label Already Exists:Label重復,需更換Label。
Fail:導入失敗。
ExistingJobStatus
已存在Label對應的導入作業的狀態。該字段只有當Status為Label Already Exists時才會顯示。您可以通過該狀態,知曉已存在Label對應的導入作業的狀態。
RUNNING:表示作業在執行中。
FINISHED:表示作業成功。
Message
導入錯誤信息。
NumberTotalRows
導入總處理的行數。
NumberLoadedRows
成功導入的行數。
NumberFilteredRows
數據質量不合格的行數。
NumberUnselectedRows
被where條件過濾的行數。
LoadBytes
導入的字節數。
LoadTimeMs
導入完成時間。單位毫秒。
BeginTxnTimeMs
向FE請求開始一個事務所花費的時間,單位毫秒。
StreamLoadPutTimeMs
向FE請求獲取導入數據執行計劃所花費的時間,單位毫秒。
ReadDataTimeMs
讀取數據所花費的時間,單位毫秒。
WriteDataTimeMs
執行寫入數據操作所花費的時間,單位毫秒。
CommitAndPublishTimeMs
向FE請求提交并且發布事務所花費的時間,單位毫秒。
ErrorURL
如果有數據質量問題,通過訪問該URL查看具體錯誤行。
重要由于Stream Load是同步的導入方式,所以并不會在Doris中記錄導入信息,您無法異步的通過查看導入命令看到Stream Load。使用時需監聽創建導入請求的返回值獲取導入結果。
取消導入
您無法手動取消Stream Load,Stream Load在超時或者導入錯誤后會被系統自動取消。
查看Stream Load
您可以通過show stream load
來查看已經完成的Stream Load任務。
默認BE是不記錄Stream Load的記錄,如果您要查看需要在BE上啟用記錄,配置參數enable_stream_load_record=true
,具體配置詳情請參見BE參數配置。
相關系統配置
FE配置
stream_load_default_timeout_second:導入任務的超時時間(以秒為單位),導入任務在設定的timeout時間內未完成則會被系統取消,變成CANCELLED。默認的timeout時間為600秒。如果導入的源文件無法在規定時間內完成導入,您可以在Stream Load 請求中設置單獨的超時時間,或者調整FE的stream_load_default_timeout_second參數來設置全局的默認超時時間。
BE配置
streaming_load_max_mbStream:Stream Load的最大導入大小,默認為10 GB,單位是MB。如果您的原始文件超過該值,則需要調整BE的streaming_load_max_mb參數。
最佳實踐
應用場景
使用Stream Load最合適的場景就是原始文件在內存中或者在磁盤中。其次,由于Stream Load是一種同步的導入方式,所以如果您希望用同步方式獲取導入結果,也可以使用這種導入。
數據量
由于Stream Load的原理是由BE發起的導入并分發數據,建議的導入數據量在1 GB到10 GB之間。由于默認的最大Stream Load導入數據量為 10 GB,所以導入超過10 GB的文件就需要修改BE的配置streaming_load_max_mb。
例如,如果待導入文件大小為15 GB,則需修改BE配置streaming_load_max_mb為16000即可。
Stream Load的默認超時為300秒,按照Doris目前最大的導入限速來看,約超過3 GB的文件就需要修改導入任務默認超時時間。
導入任務超時時間 = 導入數據量 / 10M/s (具體的平均導入速度需要您根據自己的集群情況計算)
例如,如果導入一個10 GB的文件,則timeout = 1000s ,為10G / 10M/s
。
完整示例
數據情況:數據在客戶端本地磁盤路徑/home/store-sales中,導入的數據量約為15 GB,希望導入到數據庫bj-sales的表store-sales中。
集群情況:Stream Load的并發數不受集群大小影響。
示例如下:
因為導入文件大小超過默認的最大導入大小10 GB,所以需要修改BE的配置文件BE.conf。
streaming_load_max_mb = 16000
計算大概的導入時間是否超過默認timeout值,導入時間為
15000 / 10 = 1500s
,如果超過了默認的timeout時間,則需要修改FE的配置FE.conf,修改參數stream_load_default_timeout_second,將導入時間調整為1500。創建導入任務。
curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load