Routine Load
例行導(dǎo)入(Routine Load)功能,支持用戶提交一個(gè)常駐的導(dǎo)入任務(wù),通過不斷的從指定的數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)導(dǎo)入到Doris中。本文主要介紹Routine Load功能的實(shí)現(xiàn)原理、使用方式以及最佳實(shí)踐。
使用限制
當(dāng)前僅支持從Kafka進(jìn)行例行導(dǎo)入。
支持無認(rèn)證的Kafka訪問,以及通過SSL方式認(rèn)證的Kafka集群。
支持的消息格式為CSV或JSON文本格式。CSV每一個(gè)message為一行,且行尾不包含換行符。
默認(rèn)支持Kafka 0.10.0.0及以上版本。如果要使用Kafka 0.10.0.0以下版本 (0.9.0、0.8.2、0.8.1、0.8.0),需要修改BE配置,將 kafka_broker_version_fallback參數(shù)值設(shè)置為要兼容的舊版本,或者在創(chuàng)建Routine Load時(shí)直接設(shè)置property.broker.version.fallback參數(shù)值為要兼容的舊版本。
說明使用舊版本可能會導(dǎo)致Routine Load的部分新特性無法使用,例如根據(jù)時(shí)間設(shè)置Kafka分區(qū)的offset。
基本原理
Client向FE提交一個(gè)Routine Load作業(yè)的原理如下:
+---------+
| Client |
+----+----+
|
+-----------------------------+
| FE | |
| +-----------v------------+ |
| | | |
| | Routine Load Job | |
| | | |
| +---+--------+--------+--+ |
| | | | |
| +---v--+ +---v--+ +---v--+ |
| | task | | task | | task | |
| +--+---+ +---+--+ +---+--+ |
| | | | |
+-----------------------------+
| | |
v v v
+---+--+ +--+---+ ++-----+
| BE | | BE | | BE |
+------+ +------+ +------+
FE通過JobScheduler將一個(gè)導(dǎo)入作業(yè)拆分成若干個(gè)Task。每個(gè)Task負(fù)責(zé)導(dǎo)入指定的一部分?jǐn)?shù)據(jù)。Task被TaskScheduler分配到指定的BE 上執(zhí)行。
在BE上,一個(gè)Task被視為一個(gè)普通的導(dǎo)入任務(wù),通過Stream Load的導(dǎo)入機(jī)制進(jìn)行導(dǎo)入。導(dǎo)入完成后,向FE匯報(bào)。
FE中的JobScheduler根據(jù)匯報(bào)結(jié)果,繼續(xù)生成后續(xù)新的Task,或者對失敗的Task進(jìn)行重試。
整個(gè)Routine Load作業(yè)通過不斷地產(chǎn)生新的Task,來完成數(shù)據(jù)不間斷的導(dǎo)入。
Kafka例行導(dǎo)入
下面詳細(xì)介紹Kafka例行導(dǎo)入的使用方式和最佳實(shí)踐。
創(chuàng)建任務(wù)
創(chuàng)建例行導(dǎo)入任務(wù)的詳細(xì)語法可以查看CREATE ROUTINE LOAD命令手冊或執(zhí)行HELP ROUTINE LOAD;
查看語法幫助。下面以幾個(gè)示例說明如何創(chuàng)建Routine Load任務(wù)。
為example_db的example_tbl創(chuàng)建一個(gè)名為test1的Kafka例行導(dǎo)入任務(wù)。指定列分隔符和group.id和client.id,并且自動(dòng)默認(rèn)消費(fèi)所有分區(qū),且從有數(shù)據(jù)的位置(OFFSET_BEGINNING)開始訂閱。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
以嚴(yán)格模式為example_db的example_tbl創(chuàng)建一個(gè)名為test1的Kafka例行導(dǎo)入任務(wù)。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), WHERE k1 > 100 and k2 like "%doris%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );
導(dǎo)入JSON格式數(shù)據(jù)使用示例。
Routine Load導(dǎo)入的JSON格式僅支持以下兩種:
第一種為只有一條記錄,且為JSON對象。
{"category":"a9jadhx","author":"test","price":895}
第二種為JSON數(shù)組,數(shù)組中可含多條記錄。
[ { "category":"11", "author":"4avc", "price":895, "timestamp":1589191587 }, { "category":"22", "author":"2avc", "price":895, "timestamp":1589191487 }, { "category":"33", "author":"3avc", "price":342, "timestamp":1589191387 } ]
創(chuàng)建待導(dǎo)入的Doris數(shù)據(jù)表,示例如下。
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20200509")), PARTITION p20200509 VALUES [("20200509"), ("20200510")), PARTITION p20200510 VALUES [("20200510"), ("20200511")), PARTITION p20200511 VALUES [("20200511"), ("20200512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4 PROPERTIES ( "replication_num" = "1" );
以簡單模式導(dǎo)入JSON數(shù)據(jù),示例如下。
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1 COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
精準(zhǔn)導(dǎo)入JSON格式數(shù)據(jù),示例如下。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
說明表中的分區(qū)字段dt在示例的數(shù)據(jù)里并沒有,而是在Routine load語句里通過
dt=from_unixtime(timestamp, '%Y%m%d')
轉(zhuǎn)換得來的。strict mode與source data的導(dǎo)入關(guān)系
列類型為TinyInt,且表中的列允許導(dǎo)入空值時(shí):
source data
source data example
string to int
strict_mode
result
空值
\N
N/A
true or false
NULL
not null
aaa or 2000
NULL
true
invalid data(filtered)
not null
aaa
NULL
true
NULL
not null
1
1
true or false
correct data
列類型為 Decimal(1,0),且表中的列允許導(dǎo)入空值時(shí):
source data
source data example
string to int
strict_mode
result
空值
\N
N/A
true or false
NULL
not null
aaa
NULL
true
invalid data(filtered)
not null
aaa
NULL
false
NULL
not null
1 or 10
1
true or false
correct data
說明10雖然是一個(gè)超過范圍的值,但是因?yàn)槠漕愋头蟙ecimal的要求,所以strict mode對其不產(chǎn)生影響。10最后會在其他ETL處理流程中被過濾,但不會被strict mode過濾。
訪問SSL認(rèn)證的Kafka集群
訪問SSL認(rèn)證的Kafka集群需要您提供用于認(rèn)證Kafka Broker公鑰的證書文件(ca.pem)。如果Kafka集群同時(shí)開啟了客戶端認(rèn)證,則還需提供客戶端的公鑰(client.pem)、密鑰文件(client.key)、以及密鑰密碼。文件需要先通過CREATE FILE命令上傳到Doris中,并且catalog名稱為kafka。CREATE FILE命令詳情可使用
HELP CREATE FILE;
查看。示例如下。上傳文件。
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
創(chuàng)建例行導(dǎo)入作業(yè)。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcd***" );
Doris通過Kafka的C++ API librdkafka來訪問Kafka集群,librdkafka所支持的參數(shù)詳情請參見librdkafka。
查看作業(yè)狀態(tài)
查看作業(yè)狀態(tài)的具體命令和示例可以通過HELP SHOW ROUTINE LOAD;
命令查看。
查看任務(wù)運(yùn)行狀態(tài)的具體命令和示例可以通過HELP SHOW ROUTINE LOAD TASK;
命令查看。
只能查看當(dāng)前正在運(yùn)行中的任務(wù),已結(jié)束和未開始的任務(wù)無法查看。
修改作業(yè)屬性
您可以修改已經(jīng)創(chuàng)建的作業(yè)。詳情請參見ALTER ROUTINE LOAD或通過HELP ALTER ROUTINE LOAD;
命令查看。
作業(yè)控制
您可以通過STOP、PAUSE或RESUME三個(gè)命令來控制作業(yè)的停止、暫停和重啟??梢酝ㄟ^HELP STOP ROUTINE LOAD;
、HELP PAUSE ROUTINE LOAD;
以及 HELP RESUME ROUTINE LOAD;
三個(gè)命令查看幫助和示例。
其他說明
例行導(dǎo)入作業(yè)和ALTER TABLE操作的關(guān)系
例行導(dǎo)入不會阻塞SCHEMA CHANGE和ROLLUP操作。
但需注意的是,如果SCHEMA CHANGE完成后,列映射關(guān)系無法匹配,則會導(dǎo)致作業(yè)的錯(cuò)誤數(shù)據(jù)激增,最終導(dǎo)致作業(yè)暫停。建議通過在例行導(dǎo)入作業(yè)中顯式指定列映射關(guān)系,以及通過增加Nullable列或帶Default值的列來減少這類問題。
刪除表的Partition可能會導(dǎo)致導(dǎo)入數(shù)據(jù)無法找到對應(yīng)的Partition,使得作業(yè)暫停。
例行導(dǎo)入作業(yè)和其他導(dǎo)入作業(yè)的關(guān)系(LOAD、DELETE、INSERT)
例行導(dǎo)入和其他LOAD作業(yè)以及INSERT操作沒有沖突。
當(dāng)執(zhí)行DELETE操作時(shí),對應(yīng)表分區(qū)不能有任何正在執(zhí)行的導(dǎo)入任務(wù)。所以在執(zhí)行DELETE操作前,需要先暫停例行導(dǎo)入作業(yè),并等待已下發(fā)的task全部完成后,才可以執(zhí)行DELETE。
例行導(dǎo)入作業(yè)和DROP DATABASE/TABLE操作的關(guān)系:當(dāng)例行導(dǎo)入對應(yīng)的DATABASE或TABLE被刪除后,作業(yè)會自動(dòng)CANCEL。
Kafka類型的例行導(dǎo)入作業(yè)和Kafka topic的關(guān)系
當(dāng)您在創(chuàng)建例行導(dǎo)入聲明的kafka_topic在Kafka集群中不存在時(shí):
如果您的Kafka集群的broker設(shè)置了auto.create.topics.enable = true,則kafka_topic會先被自動(dòng)創(chuàng)建,自動(dòng)創(chuàng)建的partition個(gè)數(shù)是由您的Kafka集群中的broker配置num.partitions決定的。例行作業(yè)會正常的不斷讀取該topic的數(shù)據(jù)。
如果您的Kafka集群的broker設(shè)置了auto.create.topics.enable = false, 則kafka_topic不會被自動(dòng)創(chuàng)建,例行作業(yè)會在沒有讀取任何數(shù)據(jù)之前就被暫停,狀態(tài)為PAUSED。
因此,如果您希望當(dāng)Kafka topic不存在的時(shí)候,被例行作業(yè)可以自動(dòng)創(chuàng)建,只需要將您的kafka集群中的broker設(shè)置auto.create.topics.enable = true即可。
在網(wǎng)絡(luò)隔離的環(huán)境中可能出現(xiàn)的問題
在有些環(huán)境中存在網(wǎng)段和域名解析的隔離措施,所以需要注意:
創(chuàng)建Routine load任務(wù)中指定的Broker list必須能夠被Doris服務(wù)訪問。
Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必須能夠被Doris服務(wù)訪問。
關(guān)于指定消費(fèi)的Partition和Offset
Doris支持指定Partition和Offset開始消費(fèi),新版中還支持了指定時(shí)間點(diǎn)進(jìn)行消費(fèi)的功能。有三個(gè)相關(guān)參數(shù):
kafka_partitions:指定待消費(fèi)的partition列表,如:"0, 1, 2, 3"。
kafka_offsets:指定每個(gè)分區(qū)的起始o(jì)ffset,必須和kafka_partitions列表個(gè)數(shù)對應(yīng),如:"1000, 1000, 2000, 2000"。
property.kafka_default_offset:指定分區(qū)默認(rèn)的起始o(jì)ffset。
在創(chuàng)建導(dǎo)入作業(yè)時(shí),這三個(gè)參數(shù)可以有以下組合:
組合
kafka_partitions
kafka_offsets
property.kafka_default_offset
行為
1
No
No
No
系統(tǒng)會自動(dòng)查找topic對應(yīng)的所有分區(qū)并從 OFFSET_END開始消費(fèi)。
2
No
No
Yes
系統(tǒng)會自動(dòng)查找topic對應(yīng)的所有分區(qū)并從default offset指定的位置開始消費(fèi)。
3
Yes
No
No
系統(tǒng)會從指定分區(qū)的OFFSET_END開始消費(fèi)。
4
Yes
Yes
No
系統(tǒng)會從指定分區(qū)的指定offset處開始消費(fèi)。
5
Yes
No
Yes
系統(tǒng)會從指定分區(qū),default offset指定的位置開始消費(fèi)。
STOP和PAUSE的區(qū)別
FE會自動(dòng)定期清理STOP狀態(tài)的ROUTINE LOAD,而PAUSE狀態(tài)的則可以再次被恢復(fù)啟用。
相關(guān)參數(shù)
一些系統(tǒng)配置參數(shù)會影響例行導(dǎo)入的使用,具體說明如下。
配置項(xiàng) | FE/BE | 默認(rèn)值 | 說明 |
max_routine_load_task_concurrent_num | FE | 5 | 該參數(shù)限制了一個(gè)例行導(dǎo)入作業(yè)最大的子任務(wù)并發(fā)數(shù)??梢赃\(yùn)行時(shí)修改,建議維持默認(rèn)值。設(shè)置過大,可能導(dǎo)致同時(shí)并發(fā)的任務(wù)數(shù)過多,占用集群資源。 |
max_routine_load_task_num_per_be | FE | 5 | 該參數(shù)限制了每個(gè)BE節(jié)點(diǎn)最多并發(fā)執(zhí)行的子任務(wù)個(gè)數(shù)??梢赃\(yùn)行時(shí)修改,建議維持默認(rèn)值。如果設(shè)置過大,可能導(dǎo)致并發(fā)任務(wù)數(shù)過多,占用集群資源。 |
max_routine_load_job_num | FE | 100 | 該參數(shù)限制了例行導(dǎo)入作業(yè)的總數(shù),包括 NEED_SCHEDULED、RUNNING、PAUSE這些狀態(tài)??梢赃\(yùn)行時(shí)修改。超過后,不能再提交新的作業(yè)。 |
max_consumer_num_per_group | BE | 3 | 該參數(shù)表示一個(gè)子任務(wù)中最多生成幾個(gè)consumer進(jìn)行數(shù)據(jù)消費(fèi)。對于Kafka數(shù)據(jù)源,一個(gè)consumer可能消費(fèi)一個(gè)或多個(gè)Kafka partition。假設(shè)一個(gè)任務(wù)需要消費(fèi)6個(gè)Kafka partition,則會生成3個(gè) consumer,每個(gè)consumer消費(fèi)2個(gè)partition。如果只有2個(gè)partition,則只會生成2個(gè)consumer,每個(gè) consumer消費(fèi)1個(gè)partition。 |
push_write_mbytes_per_sec | BE | 10,即10MB/s | 該參數(shù)是導(dǎo)入通用參數(shù),不限于例行導(dǎo)入作業(yè)。該參數(shù)限制了導(dǎo)入數(shù)據(jù)寫入磁盤的速度。對于SSD等高性能存儲設(shè)備,可以適當(dāng)增加這個(gè)限速。 |
max_tolerable_backend_down_num | FE | 0 | 在滿足某些條件下,Doris可PAUSED的任務(wù)重新調(diào)度,即變成RUNNING。該參數(shù)為0代表只有所有BE節(jié)點(diǎn)是alive狀態(tài)才允許重新調(diào)度。 |
period_of_auto_resume_min | FE | 5分鐘 | Doris重新調(diào)度,只會在5分鐘這個(gè)周期內(nèi),最多嘗試3次。如果3次都失敗則鎖定當(dāng)前任務(wù),后續(xù)不再進(jìn)行調(diào)度。但可通過人為干預(yù),進(jìn)行手動(dòng)恢復(fù)。 |