日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Stream Load

Stream Load是一種同步的導入方式,您可以通過HTTP協議發送請求將本地文件或數據流導入到Doris中。Stream Load同步執行導入并返回導入結果。您可以直接通過請求的返回體判斷本次導入是否成功。本文為您介紹Stream Load導入的基本原理、基本操作、系統配置以及最佳實踐。

適用場景

Stream Load主要適用于導入本地文件或通過程序導入數據流中的數據。

基本原理

下面為您展示了Stream Load的主要流程,省略了部分導入細節。

^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

Stream Load中,Doris會選定一個節點作為Coordinator節點,該節點負責接收數據并分發數據到其他數據節點。您可以通過HTTP協議提交導入命令。如果提交到FE,則FE會通過HTTP redirect指令將請求轉發給某一個BE。您也可以直接提交導入命令給某一指定BE。導入的最終結果由Coordinator BE返回給您。

支持的數據格式

Stream Load支持CSV(文本)和JSON兩個數據格式。

基本操作

創建導入任務

Stream Load通過HTTP協議提交和傳輸數據。本示例通過curl命令展示如何提交導入任務。您也可以通過其他HTTP Client進行操作。

  • curl命令

    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
    
    # Header中支持屬性見下表。
    # 格式為: -H "key1:value1"

    創建導入任務的詳細語法可以通過HELP STREAM LOAD命令查看。Stream Load中所有與導入任務相關的參數均設置在Header中。相關參數描述如下表所示。

    參數

    說明

    簽名參數

    user:passwd

    Stream Load創建導入任務使用的是HTTP協議,已通過Basic access authentication進行簽名。Doris會根據簽名來驗證用戶身份和導入權限。

    導入任務參數

    label

    導入任務的標識。

    每個導入任務,都有一個在單database內部唯一的Label。Label是您在導入命令中自定義的名稱。通過該Label,您可以查看對應導入任務的執行情況。Label的另一個作用是防止您重復導入相同的數據。強烈推薦您同一批次數據使用相同Label,這樣同一批次數據的重復請求只會被接受一次,保證了At-Most-Once。當Label對應的導入作業狀態為CANCELLED時,該Label可以再次被使用。

    column_separator

    用于指定導入文件中的列分隔符,默認為\t。

    如果是不可見字符,則需要加\x作為前綴,使用十六進制來表示分隔符。例如,Hive文件的分隔符\x01,需要指定為-H "column_separator:\x01"。可以使用多個字符的組合作為列分隔符。

    line_delimiter

    用于指定導入文件中的換行符,默認為\n。可以使用多個字符的組合作為換行符。

    max_filter_ratio

    導入任務的最大容忍率,默認為0容忍,取值范圍是0~1。

    當導入的錯誤率超過該值,則導入失敗。如果您希望忽略錯誤的行,可以通過設置該參數大于0來保證導入成功。計算公式為(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio,其中dpp.abnorm.ALL表示數據質量不合格的行數,例如類型不匹配、列數不匹配、長度不匹配等;dpp.norm.ALL表示導入過程中正確數據的條數,可以通過SHOW LOAD命令查詢導入任務的正確數據量。

    原始文件的行數 = dpp.abnorm.ALL + dpp.norm.ALL

    where

    導入任務指定的過濾條件。

    Stream Load支持對原始數據指定where語句進行過濾。被過濾的數據將不會被導入,也不會參與filter ratio的計算,但會被計入num_rows_unselected。

    Partitions

    待導入表的Partition信息,如果待導入數據不屬于指定的Partition,則不會被導入。未被導入的數據將計入 dpp.abnorm.ALL。

    columns

    待導入數據的函數變換配置,目前Stream Load支持的函數變換方法包含列的順序變化以及表達式變換,其中表達式變換的方法與查詢語句的一致。

    exec_mem_limit

    導入內存限制。默認為2 GB,單位為字節。

    strict_mode

    指定此次導入是否開啟strict mode模式,默認關閉。

    Stream Load導入可以開啟strict mode模式,開啟方式為在HEADER中聲明strict_mode=true。strict mode模式的意思是對于導入過程中的列類型轉換進行嚴格過濾。嚴格過濾的策略如下:

    • 對于列類型轉換來說,如果strict mode為true,則錯誤數據將被filter。這里的錯誤數據是指原始數據并不為空值,在參與列類型轉換后結果為空值的這一類數據。

    • 對于導入的某列由函數變換生成時,strict mode對其不產生影響。

    • 對于導入的某列類型包含范圍限制的,如果原始數據能正常通過類型轉換,但無法通過范圍限制的,strict mode對其也不產生影響。例如,如果類型是decimal(1,0),原始數據為10,則屬于可以通過類型轉換但不在列聲明的范圍內,strict mode對其不產生影響。

    merge_type

    數據的合并類型,共支持APPEND、DELETE、MERGE三種類型。

    • APPEND(默認值):表示這批數據全部需要追加到現有數據中。

    • DELETE:表示刪除與這批數據key相同的所有行。

    • MERGE:需要與DELETE條件聯合使用,表示滿足DELETE條件的數據按照DELETE語義處理,其余的按照APPEND語義處理。

    two_phase_commit

    Stream Load導入可以開啟兩階段事務提交模式:在Stream load過程中,數據寫入完成即會返回信息,此時數據不可見,事務狀態為PRECOMMITTED,您手動觸發commit操作之后,數據才可見。默認的兩階段批量事務提交為關閉。

    開啟方式是在be.conf中配置disable_stream_load_2pc=false,并且在HEADER中聲明two_phase_commit=true

    示例:

    1. 發起Stream Load預提交操作。

      說明

      列順序變換例子:原始數據有三列src_c1、src_c2、rc_c3,目前Doris表也有三列dst_c1、dst_c2、dst_c3。

      • 如果原始表的src_c1列對應目標表dst_c1列,原始表的src_c2列對應目標表dst_c2列,原始表的src_c3列對應目標表dst_c3列,則寫法為columns: dst_c1, dst_c2, dst_c3

      • 如果原始表的src_c1列對應目標表dst_c2列,原始表的src_c2列對應目標表dst_c3列,原始表的src_c3列對應目標表dst_c1列,則寫法為columns: dst_c2, dst_c3, dst_c1

      • 表達式變換例子:原始文件有兩列,目標表也有兩列(c1,c2),但是原始文件的兩列均需要經過函數變換才能對應目標表的兩列,則寫法如下為columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2),其中tmp_*是一個占位符,代表的是原始文件中的兩個原始列。

      curl  --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
      {
          "TxnId": 18036,
          "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****",
          "TwoPhaseCommit": "true",
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 100,
          "NumberLoadedRows": 100,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 1031,
          "LoadTimeMs": 77,
          "BeginTxnTimeMs": 1,
          "StreamLoadPutTimeMs": 1,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 58,
          "CommitAndPublishTimeMs": 0
      }
    2. 對事務觸發commit操作。

    3. 對事務觸發abort操作。

  • 示例

    curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
  • 返回結果

    由于Stream Load是一種同步的導入方式,所以導入的結果會通過創建導入的返回值直接返回給用戶。示例如下。

    {
        "TxnId": 1003,
        "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****",
        "Status": "Success",
        "ExistingJobStatus": "FINISHED", // optional
        "Message": "OK",
        "NumberTotalRows": 1000000,
        "NumberLoadedRows": 1000000,
        "NumberFilteredRows": 1,
        "NumberUnselectedRows": 0,
        "LoadBytes": 40888898,
        "LoadTimeMs": 2144,
        "BeginTxnTimeMs": 1,
        "StreamLoadPutTimeMs": 2,
        "ReadDataTimeMs": 325,
        "WriteDataTimeMs": 1933,
        "CommitAndPublishTimeMs": 106,
        "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****"
    }

    Stream load導入結果參數如下表。

    參數

    說明

    TxnId

    導入的事務ID。用戶可不感知。

    Label

    導入的Label。由用戶指定或系統自動生成。

    Status

    導入完成狀態。

    • Success:表示導入成功。

    • Publish Timeout:表示導入已經完成,只是數據可能會延遲可見,無需重試。

    • Label Already Exists:Label重復,需更換Label。

    • Fail:導入失敗。

    ExistingJobStatus

    已存在Label對應的導入作業的狀態。該字段只有當Status為Label Already Exists時才會顯示。您可以通過該狀態,知曉已存在Label對應的導入作業的狀態。

    • RUNNING:表示作業在執行中。

    • FINISHED:表示作業成功。

    Message

    導入錯誤信息。

    NumberTotalRows

    導入總處理的行數。

    NumberLoadedRows

    成功導入的行數。

    NumberFilteredRows

    數據質量不合格的行數。

    NumberUnselectedRows

    被where條件過濾的行數。

    LoadBytes

    導入的字節數。

    LoadTimeMs

    導入完成時間。單位毫秒。

    BeginTxnTimeMs

    向FE請求開始一個事務所花費的時間,單位毫秒。

    StreamLoadPutTimeMs

    向FE請求獲取導入數據執行計劃所花費的時間,單位毫秒。

    ReadDataTimeMs

    讀取數據所花費的時間,單位毫秒。

    WriteDataTimeMs

    執行寫入數據操作所花費的時間,單位毫秒。

    CommitAndPublishTimeMs

    向FE請求提交并且發布事務所花費的時間,單位毫秒。

    ErrorURL

    如果有數據質量問題,通過訪問該URL查看具體錯誤行。

    重要

    由于Stream Load是同步的導入方式,所以并不會在Doris中記錄導入信息,您無法異步的通過查看導入命令看到Stream Load。使用時需監聽創建導入請求的返回值獲取導入結果。

取消導入

您無法手動取消Stream Load,Stream Load在超時或者導入錯誤后會被系統自動取消。

查看Stream Load

您可以通過show stream load來查看已經完成的Stream Load任務。

默認BE是不記錄Stream Load的記錄,如果您要查看需要在BE上啟用記錄,配置參數enable_stream_load_record=true ,具體配置詳情請參見BE參數配置

相關系統配置

FE配置

stream_load_default_timeout_second:導入任務的超時時間(以秒為單位),導入任務在設定的timeout時間內未完成則會被系統取消,變成CANCELLED。默認的timeout時間為600秒。如果導入的源文件無法在規定時間內完成導入,您可以在Stream Load 請求中設置單獨的超時時間,或者調整FE的stream_load_default_timeout_second參數來設置全局的默認超時時間。

BE配置

streaming_load_max_mbStream:Stream Load的最大導入大小,默認為10 GB,單位是MB。如果您的原始文件超過該值,則需要調整BE的streaming_load_max_mb參數。

最佳實踐

應用場景

使用Stream Load最合適的場景就是原始文件在內存中或者在磁盤中。其次,由于Stream Load是一種同步的導入方式,所以如果您希望用同步方式獲取導入結果,也可以使用這種導入。

數據量

由于Stream Load的原理是由BE發起的導入并分發數據,建議的導入數據量在1 GB到10 GB之間。由于默認的最大Stream Load導入數據量為 10 GB,所以導入超過10 GB的文件就需要修改BE的配置streaming_load_max_mb

例如,如果待導入文件大小為15 GB,則需修改BE配置streaming_load_max_mb為16000即可。

Stream Load的默認超時為300秒,按照Doris目前最大的導入限速來看,約超過3 GB的文件就需要修改導入任務默認超時時間。

導入任務超時時間 = 導入數據量 / 10M/s (具體的平均導入速度需要您根據自己的集群情況計算)

例如,如果導入一個10 GB的文件,則timeout = 1000s ,為10G / 10M/s

完整示例

數據情況:數據在客戶端本地磁盤路徑/home/store-sales中,導入的數據量約為15 GB,希望導入到數據庫bj-sales的表store-sales中。

集群情況:Stream Load的并發數不受集群大小影響。

示例如下:

  1. 因為導入文件大小超過默認的最大導入大小10 GB,所以需要修改BE的配置文件BE.conf

    streaming_load_max_mb = 16000
  2. 計算大概的導入時間是否超過默認timeout值,導入時間為15000 / 10 = 1500s,如果超過了默認的timeout時間,則需要修改FE的配置FE.conf,修改參數stream_load_default_timeout_second,將導入時間調整為1500。

  3. 創建導入任務。

    curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load