為了更好地滿足各種不同的業務場景,StarRocks支持多種數據模型,StarRocks中存儲的數據需要按照特定的模型進行組織。本文為您介紹數據導入的基本概念、原理、系統配置、不同導入方式的適用場景,以及一些最佳實踐案例和常見問題。

背景信息

數據導入功能是將原始數據按照相應的模型進行清洗轉換并加載到StarRocks中,方便查詢使用。StarRocks提供了多種導入方式,您可以根據數據量大小或導入頻率等要求選擇最適合自己業務需求的導入方式。

StarRocks導入方式與各數據源關系圖如下。StarRocks schematic diagram
您可以根據不同的數據來源選擇不同的導入方式:
  • 離線數據導入:如果數據源是Hive或HDFS,推薦使用Broker Load。如果數據表很多導入比較麻煩可以使用Hive外表,性能會比Broker load導入效果差,但是可以避免數據搬遷。如果單表的數據量特別大,或者需要做為全局數據字典來精確去重可以考慮使用Spark Load
  • 實時數據導入:日志數據和業務數據庫的Binlog同步到Kafka后,優先推薦通過Routine Load導入StarRocks。如果導入過程中有復雜的多表關聯和ETL預處理可以使用Flink(Flink Connector)處理以后,再通過Stream Load寫入StarRocks。
  • 程序寫入StarRocks:推薦使用Stream Load,可以參見Stream Load中Java或Python的Demo。
  • 文本文件導入:推薦使用Stream Load
  • MySQL數據導入:推薦使用MySQL外表,通過insert into new_table select * from external_table的方式導入。
  • StarRocks內部導入:推薦使用Insert Into方式導入,跟外部調度器配合實現簡單的ETL處理。
說明 本文圖片和部分內容來源于開源StarRocks的導入總覽

注意事項

向StarRocks導入數據時,通常會采用程序對接的方式。以下是導入數據時的一些注意事項:
  • 選擇合適的導入方式:根據數據量大小、導入頻次或數據源所在位置選擇導入方式。

    例如,如果原始數據存放在HDFS上,則使用Broker load導入。

  • 確定導入方式的協議:如果選擇了Broker Load導入方式,則外部系統需要能使用MySQL協議定期提交和查看導入作業。
  • 確定導入方式的類型:導入方式分為同步或異步。如果是異步導入方式,外部系統在提交創建導入后,必須調用查看導入命令,根據查看導入命令的結果來判斷導入是否成功。
  • 制定Label生成策略:Label生成策略需滿足對每一批次數據唯一且固定的原則。
  • 保證Exactly-Once:外部系統需要保證數據導入的At-Least-Once,StarRocks的Label機制可以保證數據導入的At-Most-Once,即可整體上保證數據導入的Exactly-Once。

基本概念

名詞描述
導入作業讀取用戶提交的源數據并進行清洗轉換后,將數據導入到StarRocks系統中。導入完成后,數據即可被用戶查詢到。
Label用于標識一個導入作業,所有導入作業都有一個Label。

Label可由用戶指定或系統自動生成。Label在一個數據庫內是唯一的,一個Label僅可用于一個成功的導入作業。當一個Label對應的導入作業成功后,不可再重復使用該Label提交導入作業。如果某Label對應的導入作業失敗,則該Label可以被再使用。該機制可以保證Label對應的數據最多被導入一次,即At-Most-Once語義。

原子性StarRocks中所有導入方式都提供原子性保證,即同一個導入作業內的所有有效數據要么全部生效,要么全部不生效,不會出現僅導入部分數據的情況。此處的有效數據不包括由于類型轉換錯誤等數據質量問題而被過濾的數據,數據質量問題可以參見數據導入常見問題
MySQL和HTTP協議StarRocks提供MySQL協議和HTTP協議兩種訪問協議接口來提交作業。
Broker LoadBroker導入,即通過部署的Broker程序讀取外部數據源(例如HDFS)中的數據,并導入到StarRocks。Broker進程利用自身的計算資源對數據進行預處理導入。
Spark LoadSpark導入,即通過外部資源(例如Spark)對數據進行預處理生成中間文件,StarRocks讀取中間文件導入。Spark Load是一種異步的導入方式,您需要通過MySQL協議創建導入,并通過查看導入命令檢查導入結果。
FEFrontend,StarRocks系統的元數據和調度節點。在導入流程中主要負責導入執行計劃的生成和導入任務的調度工作。
BEBackend,StarRocks系統的計算和存儲節點。在導入流程中主要負責數據的ETL和存儲。
TabletStarRocks表的邏輯分片,一個表按照分區、分桶規則可以劃分為多個分片,詳情請參見數據分布

基本原理

導入執行流程如下圖所示。StarRocks flow chart
一個導入作業主要分為以下五個階段。
階段描述
PENDING非必須。該階段是指用戶提交導入作業后,等待FE調度執行。

Broker Load和Spark Load包括該步驟。

ETL非必須。該階段執行數據的預處理,包括清洗、分區、排序和聚合等。

Spark Load包括該步驟,他使用外部計算資源Spark完成ETL。

LOADING該階段先對數據進行清洗和轉換,然后將數據發送給BE處理。當數據全部導入后,進入等待生效過程,此時導入作業依舊是LOADING狀態。
FINISHED在導入作業涉及的所有數據均生效后,作業的狀態變成FINISHED,FINISHED后導入的數據均可查詢。FINISHED是導入作業的最終狀態。
CANCELLED在導入作業狀態變為FINISHED之前,作業隨時可能被取消并進入CANCELLED狀態,例如,您手動取消或導入出現錯誤等。CANCELLED也是導入作業的一種最終狀態。
數據導入格式如表。
類型描述
整型類TINYINT、SMALLINT、INT、BIGINT、LARGEINT。例如:1,1000,1234。
浮點類FLOAT、DOUBLE、DECIMAL。例如:1.1,0.23,0.356。
日期類DATE、DATETIME。例如:2017-10-03,2017-06-13 12:34:03。
字符串類CHAR、VARCHAR。例如:I am a student,a。

導入方式

為適配不同的數據導入需求,StarRocks系統提供了5種不同的導入方式,以支持不同的數據源(例如HDFS、Kafka和本地文件等),或者按不同的方式導入數據,StarRocks目前導入數據的方式分為同步導入和異步導入兩種。

所有導入方式都支持CSV數據格式。其中Broker Load還支持Parquet和ORC數據格式。

導入方式介紹

導入方式描述導入類型
Broker Load通過Broker進程訪問并讀取外部數據源,然后采用MySQL協議向StarRocks創建導入作業。提交的作業將異步執行,您可以通過SHOW LOAD命令查看導入結果。

Broker Load適用于源數據在Broker進程可訪問的存儲系統(例如HDFS)中,數據量為幾十GB到上百GB,詳細信息請參見Broker Load

異步導入
Spark Load通過外部的Spark資源實現對導入數據的預處理,提高StarRocks大數據量的導入性能并且節省StarRocks集群的計算資源。Spark Load是一種異步導入方式,需要通過MySQL協議創建導入作業,并通過SHOW LOAD查看導入結果。

Spark Load適用于初次遷移大數據量(可達到TB級別)到StarRocks的場景,且源數據在Spark可訪問的存儲系統(例如HDFS)中,詳細信息請參見Spark Load

異步導入
Stream Load是一種同步執行的導入方式。您可以通過HTTP協議發送請求將本地文件或數據流導入到StarRocks中,并等待系統返回導入的結果狀態,從而判斷導入是否成功。

Stream Load適用于導入本地文件,或通過程序導入數據流中的數據,詳細信息請參見Stream Load

同步導入
Routine LoadRoutine Load(例行導入)提供了一種自動從指定數據源進行數據導入的功能。您可以通過MySQL協議提交例行導入作業,生成一個常駐線程,不間斷的從數據源(例如Kafka)中讀取數據并導入到StarRocks中,詳細信息請參見Routine Load異步導入
Insert Into類似MySQL中的Insert語句,StarRocks提供INSERT INTO tbl SELECT ...;的方式從StarRocks的表中讀取數據并導入到另一張表,或者通過INSERT INTO tbl VALUES(...);插入單條數據,詳細信息請參見Insert Into同步導入

導入類型

重要 如果是外部程序接入StarRocks的導入功能,需要先判斷使用導入方式是哪類,然后再確定接入邏輯。
  • 同步導入

    同步導入方式即用戶創建導入任務,StarRocks同步執行,執行完成后返回導入結果。用戶可以通過該結果判斷導入是否成功。

    操作步驟:
    1. 用戶(外部系統)創建導入任務。
    2. StarRocks返回導入結果。
    3. 用戶(外部系統)判斷導入結果。如果導入結果為失敗,則可以再次創建導入任務。
  • 異步導入

    異步導入方式即用戶創建導入任務后,StarRocks直接返回創建成功。創建成功不代表數據已經導入成功。導入任務會被異步執行,用戶在創建成功后,需要通過輪詢的方式發送查看命令查看導入作業的狀態。如果創建失敗,則可以根據失敗信息,判斷是否需要再次創建。

    操作步驟:
    1. 用戶(外部系統)創建導入任務。
    2. StarRocks返回創建任務的結果。
    3. 用戶(外部系統)判斷創建任務的結果,如果成功則進入步驟4;如果失敗則可以回到步驟1,重新嘗試創建導入任務。
    4. 用戶(外部系統)輪詢查看任務狀態,直至狀態變為FINISHED或CANCELLED。

適用場景

場景描述
HDFS導入如果HDFS導入源數據存儲在HDFS中,當數據量為幾十GB到上百GB時,則可以采用Broker Load方法向StarRocks導入數據。此時要求部署的Broker進程可以訪問HDFS數據源。導入數據的作業異步執行,您可以通過SHOW LOAD命令查看導入結果。

如果源數據存儲在HDSF中,當數據量達到TB級別時,則可以采用Spark Load方法向StarRocks導入數據。此時要求部署的Spark進程可以訪問HDFS數據源。導入數據的作業異步執行,您可以通過SHOW LOAD命令查看導入結果。

對于其他外部數據源,只要Broker或Spark進程能讀取對應數據源,也可以采用Broker Load或Spark Load方法導入數據。

本地文件導入數據存儲在本地文件中,數據量小于10 GB,可以采用Stream Load方法將數據快速導入StarRocks系統。采用HTTP協議創建導入作業,作業同步執行,您可以通過HTTP請求的返回值判斷導入是否成功。
Kafka導入數據來自于Kafka等流式數據源,需要向StarRocks系統導入實時數據時,可以采用Routine Load方法。您通過MySQL協議創建例行導入作業,StarRocks持續不斷地從Kafka中讀取并導入數據。
Insert Into導入手工測試及臨時數據處理時可以使用Insert Into方法向StarRocks表中寫入數據。

其中,INSERT INTO tbl SELECT ...;語句是從StarRocks的表中讀取數據并導入到另一張表,INSERT INTO tbl VALUES(...);語句是向指定表里插入單條數據。

內存限制

您可以通過設置參數來限制單個導入作業的內存使用,以防止導入占用過多的內存而導致系統OOM。不同導入方式限制內存的方式略有不同,詳情可以參見各個導入方式的文檔。

一個導入作業通常會分布在多個BE上執行,內存參數限制的是一個導入作業在單個BE上的內存使用,而不是在整個集群的內存使用。同時,每個BE會設置可用于導入作業的內存總上限,詳情請參見通用系統配置。配置限制了所有在該BE上運行的導入任務的總體內存使用上限。

較小的內存限制可能會影響導入效率,因為導入流程可能會因為內存達到上限而頻繁的將內存中的數據寫回磁盤。而過大的內存限制可能導致當導入并發較高時系統OOM。所以需要根據需求合理地設置內存參數。

通用系統配置

FE配置

以下配置屬于FE的系統配置,可以通過FE的配置文件fe.conf來修改。

參數描述
max_load_timeout_second導入超時時間的最大、最小取值范圍,均以秒為單位。默認的最大超時時間為3天,最小超時時間為1秒。您自定義的導入超時時間不可超過該范圍。該參數通用于所有類型的導入任務。
min_load_timeout_second
desired_max_waiting_jobs等待隊列可以容納的最多導入任務數目,默認值為100。

例如,FE中處于PENDING狀態(即等待執行)的導入任務數目達到該值,則新的導入請求會被拒絕。此配置僅對異步執行的導入有效,如果處于等待狀態的異步導入任務數達到限額,則后續創建導入的請求會被拒絕。

max_running_txn_num_per_db每個數據庫中正在運行的導入任務的最大個數(不區分導入類型、統一計數),默認值為100。

當數據庫中正在運行的導入任務超過最大值時,后續的導入任務不會被執行。如果是同步作業,則作業會被拒絕;如果是異步作業,則作業會在隊列中等待。

label_keep_max_second導入任務記錄的保留時間。

已經完成的(FINISHED或CANCELLED)導入任務記錄會在StarRocks系統中保留一段時間,時間長短則由此參數決定。參數默認值為3天。該參數通用于所有類型的導入任務。

BE配置

以下配置屬于BE的系統配置,可以通過BE的配置文件be.conf來修改。

參數描述
push_write_mbytes_per_secBE上單個Tablet的寫入速度限制。默認值是10,即10MB/s。

根據Schema以及系統的不同,通常BE對單個Tablet的最大寫入速度大約在10~30MB/s之間。您可以適當調整該參數來控制導入速度。

write_buffer_size導入數據在BE上會先寫入到一個內存塊,當該內存塊達到閾值后才會寫回磁盤。默認值為100 MB。

過小的閾值可能導致BE上存在大量的小文件。您可以適當提高該閾值減少文件數量。但過大的閾值可能導致RPC超時,詳細請參見參數tablet_writer_rpc_timeout_sec

tablet_writer_rpc_timeout_sec導入過程中,發送一個Batch(1024行)的RPC超時時間。默認為600秒。

因為該RPC可能涉及多個分片內存塊的寫盤操作,所以可能會因為寫盤導致RPC超時,可以適當調整超時時間來減少超時錯誤(例如send batch fail)。同時,如果調大參數write_buffer_size,則tablet_writer_rpc_timeout_sec參數也需要適當調大。

streaming_load_rpc_max_alive_time_sec在導入過程中,StarRocks會為每個Tablet開啟一個Writer,用于接收數據并寫入。該參數指定了Writer的等待超時時間。默認為600秒。

如果在參數指定時間內Writer沒有收到任何數據,則Writer會被自動銷毀。當系統處理速度較慢時,Writer可能長時間接收不到下一批數據,導致導入報錯TabletWriter add batch with unknown id。此時可適當調大該參數。

load_process_max_memory_limit_percent分別為最大內存和最大內存百分比,限制了單個BE上可用于導入任務的內存上限。系統會在兩個參數中取較小者,作為最終的BE導入任務內存使用上限。
  • load_process_max_memory_limit_percent:表示對BE總內存限制的百分比。默認為80。總內存限制mem_limit默認為80%,表示對物理內存的百分比。即假設物理內存為M,則默認導入內存限制為M * 80% * 80%。
  • load_process_max_memory_limit_bytes:默認為100 GB。
load_process_max_memory_limit_bytes