日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

例行導(dǎo)入(Routine Load)功能,支持用戶提交一個(gè)常駐的導(dǎo)入任務(wù),通過不斷的從指定的數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)導(dǎo)入到Doris中。本文主要介紹Routine Load功能的實(shí)現(xiàn)原理、使用方式以及最佳實(shí)踐。

使用限制

當(dāng)前僅支持從Kafka進(jìn)行例行導(dǎo)入。

  • 支持無認(rèn)證的Kafka訪問,以及通過SSL方式認(rèn)證的Kafka集群。

  • 支持的消息格式為CSV或JSON文本格式。CSV每一個(gè)message為一行,且行尾不包含換行符。

  • 默認(rèn)支持Kafka 0.10.0.0及以上版本。如果要使用Kafka 0.10.0.0以下版本 (0.9.0、0.8.2、0.8.1、0.8.0),需要修改BE配置,將 kafka_broker_version_fallback參數(shù)值設(shè)置為要兼容的舊版本,或者在創(chuàng)建Routine Load時(shí)直接設(shè)置property.broker.version.fallback參數(shù)值為要兼容的舊版本。

    說明

    使用舊版本可能會導(dǎo)致Routine Load的部分新特性無法使用,例如根據(jù)時(shí)間設(shè)置Kafka分區(qū)的offset。

基本原理

Client向FE提交一個(gè)Routine Load作業(yè)的原理如下:

+---------+
         |  Client |
         +----+----+
              |
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+
     |         |        |
     v         v        v
 +---+--+   +--+---+   ++-----+
 |  BE  |   |  BE  |   |  BE  |
 +------+   +------+   +------+
  1. FE通過JobScheduler將一個(gè)導(dǎo)入作業(yè)拆分成若干個(gè)Task。每個(gè)Task負(fù)責(zé)導(dǎo)入指定的一部分?jǐn)?shù)據(jù)。Task被TaskScheduler分配到指定的BE 上執(zhí)行。

  2. 在BE上,一個(gè)Task被視為一個(gè)普通的導(dǎo)入任務(wù),通過Stream Load的導(dǎo)入機(jī)制進(jìn)行導(dǎo)入。導(dǎo)入完成后,向FE匯報(bào)。

  3. FE中的JobScheduler根據(jù)匯報(bào)結(jié)果,繼續(xù)生成后續(xù)新的Task,或者對失敗的Task進(jìn)行重試。

  4. 整個(gè)Routine Load作業(yè)通過不斷地產(chǎn)生新的Task,來完成數(shù)據(jù)不間斷的導(dǎo)入。

Kafka例行導(dǎo)入

下面詳細(xì)介紹Kafka例行導(dǎo)入的使用方式和最佳實(shí)踐。

創(chuàng)建任務(wù)

創(chuàng)建例行導(dǎo)入任務(wù)的詳細(xì)語法可以查看CREATE ROUTINE LOAD命令手冊或執(zhí)行HELP ROUTINE LOAD; 查看語法幫助。下面以幾個(gè)示例說明如何創(chuàng)建Routine Load任務(wù)。

  • 為example_db的example_tbl創(chuàng)建一個(gè)名為test1的Kafka例行導(dǎo)入任務(wù)。指定列分隔符和group.id和client.id,并且自動(dòng)默認(rèn)消費(fèi)所有分區(qū),且從有數(shù)據(jù)的位置(OFFSET_BEGINNING)開始訂閱。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS TERMINATED BY ",",
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "property.group.id" = "xxx",
                "property.client.id" = "xxx",
                "property.kafka_default_offsets" = "OFFSET_BEGINNING"
            );
  • 嚴(yán)格模式為example_db的example_tbl創(chuàng)建一個(gè)名為test1的Kafka例行導(dǎo)入任務(wù)。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
            WHERE k1 > 100 and k2 like "%doris%"
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "true"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2,3",
                "kafka_offsets" = "101,0,0,200"
            );
  • 導(dǎo)入JSON格式數(shù)據(jù)使用示例。

    Routine Load導(dǎo)入的JSON格式僅支持以下兩種:

    第一種為只有一條記錄,且為JSON對象。

    {"category":"a9jadhx","author":"test","price":895}

    第二種為JSON數(shù)組,數(shù)組中可含多條記錄。

    [
        {
            "category":"11",
            "author":"4avc",
            "price":895,
            "timestamp":1589191587
        },
        {
            "category":"22",
            "author":"2avc",
            "price":895,
            "timestamp":1589191487
        },
        {
            "category":"33",
            "author":"3avc",
            "price":342,
            "timestamp":1589191387
        }
    ]

    創(chuàng)建待導(dǎo)入的Doris數(shù)據(jù)表,示例如下。

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20200509")),
        PARTITION p20200509 VALUES [("20200509"), ("20200510")),
        PARTITION p20200510 VALUES [("20200510"), ("20200511")),
        PARTITION p20200511 VALUES [("20200511"), ("20200512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
    PROPERTIES (
        "replication_num" = "1"
    );

    以簡單模式導(dǎo)入JSON數(shù)據(jù),示例如下。

    CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    COLUMNS(category,price,author)
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
     );

    精準(zhǔn)導(dǎo)入JSON格式數(shù)據(jù),示例如下。

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json",
        "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
        "strip_outer_array" = "true"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
    );
    說明

    表中的分區(qū)字段dt在示例的數(shù)據(jù)里并沒有,而是在Routine load語句里通過dt=from_unixtime(timestamp, '%Y%m%d')轉(zhuǎn)換得來的。

    • strict mode與source data的導(dǎo)入關(guān)系

      • 列類型為TinyInt,且表中的列允許導(dǎo)入空值時(shí):

        source data

        source data example

        string to int

        strict_mode

        result

        空值

        \N

        N/A

        true or false

        NULL

        not null

        aaa or 2000

        NULL

        true

        invalid data(filtered)

        not null

        aaa

        NULL

        true

        NULL

        not null

        1

        1

        true or false

        correct data

      • 列類型為 Decimal(1,0),且表中的列允許導(dǎo)入空值時(shí):

        source data

        source data example

        string to int

        strict_mode

        result

        空值

        \N

        N/A

        true or false

        NULL

        not null

        aaa

        NULL

        true

        invalid data(filtered)

        not null

        aaa

        NULL

        false

        NULL

        not null

        1 or 10

        1

        true or false

        correct data

        說明

        10雖然是一個(gè)超過范圍的值,但是因?yàn)槠漕愋头蟙ecimal的要求,所以strict mode對其不產(chǎn)生影響。10最后會在其他ETL處理流程中被過濾,但不會被strict mode過濾。

    • 訪問SSL認(rèn)證的Kafka集群

      訪問SSL認(rèn)證的Kafka集群需要您提供用于認(rèn)證Kafka Broker公鑰的證書文件(ca.pem)。如果Kafka集群同時(shí)開啟了客戶端認(rèn)證,則還需提供客戶端的公鑰(client.pem)、密鑰文件(client.key)、以及密鑰密碼。文件需要先通過CREATE FILE命令上傳到Doris中,并且catalog名稱為kafka。CREATE FILE命令詳情可使用HELP CREATE FILE;查看。示例如下。

      1. 上傳文件。

        CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
        CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
        CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
      2. 創(chuàng)建例行導(dǎo)入作業(yè)。

        CREATE ROUTINE LOAD db1.job1 on tbl1
        PROPERTIES
        (
            "desired_concurrent_number"="1"
        )
        FROM KAFKA
        (
            "kafka_broker_list"= "broker1:9091,broker2:9091",
            "kafka_topic" = "my_topic",
            "property.security.protocol" = "ssl",
            "property.ssl.ca.location" = "FILE:ca.pem",
            "property.ssl.certificate.location" = "FILE:client.pem",
            "property.ssl.key.location" = "FILE:client.key",
            "property.ssl.key.password" = "abcd***"
        );

      Doris通過Kafka的C++ API librdkafka來訪問Kafka集群,librdkafka所支持的參數(shù)詳情請參見librdkafka。

查看作業(yè)狀態(tài)

查看作業(yè)狀態(tài)的具體命令和示例可以通過HELP SHOW ROUTINE LOAD;命令查看。

查看任務(wù)運(yùn)行狀態(tài)的具體命令和示例可以通過HELP SHOW ROUTINE LOAD TASK; 命令查看。

說明

只能查看當(dāng)前正在運(yùn)行中的任務(wù),已結(jié)束和未開始的任務(wù)無法查看。

修改作業(yè)屬性

您可以修改已經(jīng)創(chuàng)建的作業(yè)。詳情請參見ALTER ROUTINE LOAD或通過HELP ALTER ROUTINE LOAD; 命令查看。

作業(yè)控制

您可以通過STOP、PAUSE或RESUME三個(gè)命令來控制作業(yè)的停止、暫停和重啟??梢酝ㄟ^HELP STOP ROUTINE LOAD; HELP PAUSE ROUTINE LOAD;以及 HELP RESUME ROUTINE LOAD; 三個(gè)命令查看幫助和示例。

其他說明

  • 例行導(dǎo)入作業(yè)和ALTER TABLE操作的關(guān)系

    • 例行導(dǎo)入不會阻塞SCHEMA CHANGE和ROLLUP操作。

      但需注意的是,如果SCHEMA CHANGE完成后,列映射關(guān)系無法匹配,則會導(dǎo)致作業(yè)的錯(cuò)誤數(shù)據(jù)激增,最終導(dǎo)致作業(yè)暫停。建議通過在例行導(dǎo)入作業(yè)中顯式指定列映射關(guān)系,以及通過增加Nullable列或帶Default值的列來減少這類問題。

    • 刪除表的Partition可能會導(dǎo)致導(dǎo)入數(shù)據(jù)無法找到對應(yīng)的Partition,使得作業(yè)暫停。

  • 例行導(dǎo)入作業(yè)和其他導(dǎo)入作業(yè)的關(guān)系(LOAD、DELETE、INSERT)

    • 例行導(dǎo)入和其他LOAD作業(yè)以及INSERT操作沒有沖突。

    • 當(dāng)執(zhí)行DELETE操作時(shí),對應(yīng)表分區(qū)不能有任何正在執(zhí)行的導(dǎo)入任務(wù)。所以在執(zhí)行DELETE操作前,需要先暫停例行導(dǎo)入作業(yè),并等待已下發(fā)的task全部完成后,才可以執(zhí)行DELETE。

  • 例行導(dǎo)入作業(yè)和DROP DATABASE/TABLE操作的關(guān)系:當(dāng)例行導(dǎo)入對應(yīng)的DATABASE或TABLE被刪除后,作業(yè)會自動(dòng)CANCEL。

  • Kafka類型的例行導(dǎo)入作業(yè)和Kafka topic的關(guān)系

    當(dāng)您在創(chuàng)建例行導(dǎo)入聲明的kafka_topic在Kafka集群中不存在時(shí):

    • 如果您的Kafka集群的broker設(shè)置了auto.create.topics.enable = true,則kafka_topic會先被自動(dòng)創(chuàng)建,自動(dòng)創(chuàng)建的partition個(gè)數(shù)是由您的Kafka集群中的broker配置num.partitions決定的。例行作業(yè)會正常的不斷讀取該topic的數(shù)據(jù)。

    • 如果您的Kafka集群的broker設(shè)置了auto.create.topics.enable = false, 則kafka_topic不會被自動(dòng)創(chuàng)建,例行作業(yè)會在沒有讀取任何數(shù)據(jù)之前就被暫停,狀態(tài)為PAUSED。

    因此,如果您希望當(dāng)Kafka topic不存在的時(shí)候,被例行作業(yè)可以自動(dòng)創(chuàng)建,只需要將您的kafka集群中的broker設(shè)置auto.create.topics.enable = true即可。

  • 在網(wǎng)絡(luò)隔離的環(huán)境中可能出現(xiàn)的問題

    在有些環(huán)境中存在網(wǎng)段和域名解析的隔離措施,所以需要注意:

    • 創(chuàng)建Routine load任務(wù)中指定的Broker list必須能夠被Doris服務(wù)訪問。

    • Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必須能夠被Doris服務(wù)訪問。

  • 關(guān)于指定消費(fèi)的Partition和Offset

    Doris支持指定Partition和Offset開始消費(fèi),新版中還支持了指定時(shí)間點(diǎn)進(jìn)行消費(fèi)的功能。有三個(gè)相關(guān)參數(shù):

    • kafka_partitions:指定待消費(fèi)的partition列表,如:"0, 1, 2, 3"。

    • kafka_offsets:指定每個(gè)分區(qū)的起始o(jì)ffset,必須和kafka_partitions列表個(gè)數(shù)對應(yīng),如:"1000, 1000, 2000, 2000"。

    • property.kafka_default_offset:指定分區(qū)默認(rèn)的起始o(jì)ffset。

    在創(chuàng)建導(dǎo)入作業(yè)時(shí),這三個(gè)參數(shù)可以有以下組合:

    組合

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    行為

    1

    No

    No

    No

    系統(tǒng)會自動(dòng)查找topic對應(yīng)的所有分區(qū)并從 OFFSET_END開始消費(fèi)。

    2

    No

    No

    Yes

    系統(tǒng)會自動(dòng)查找topic對應(yīng)的所有分區(qū)并從default offset指定的位置開始消費(fèi)。

    3

    Yes

    No

    No

    系統(tǒng)會從指定分區(qū)的OFFSET_END開始消費(fèi)。

    4

    Yes

    Yes

    No

    系統(tǒng)會從指定分區(qū)的指定offset處開始消費(fèi)。

    5

    Yes

    No

    Yes

    系統(tǒng)會從指定分區(qū),default offset指定的位置開始消費(fèi)。

  • STOP和PAUSE的區(qū)別

    FE會自動(dòng)定期清理STOP狀態(tài)的ROUTINE LOAD,而PAUSE狀態(tài)的則可以再次被恢復(fù)啟用。

相關(guān)參數(shù)

一些系統(tǒng)配置參數(shù)會影響例行導(dǎo)入的使用,具體說明如下。

配置項(xiàng)

FE/BE

默認(rèn)值

說明

max_routine_load_task_concurrent_num

FE

5

該參數(shù)限制了一個(gè)例行導(dǎo)入作業(yè)最大的子任務(wù)并發(fā)數(shù)??梢赃\(yùn)行時(shí)修改,建議維持默認(rèn)值。設(shè)置過大,可能導(dǎo)致同時(shí)并發(fā)的任務(wù)數(shù)過多,占用集群資源。

max_routine_load_task_num_per_be

FE

5

該參數(shù)限制了每個(gè)BE節(jié)點(diǎn)最多并發(fā)執(zhí)行的子任務(wù)個(gè)數(shù)??梢赃\(yùn)行時(shí)修改,建議維持默認(rèn)值。如果設(shè)置過大,可能導(dǎo)致并發(fā)任務(wù)數(shù)過多,占用集群資源。

max_routine_load_job_num

FE

100

該參數(shù)限制了例行導(dǎo)入作業(yè)的總數(shù),包括 NEED_SCHEDULED、RUNNING、PAUSE這些狀態(tài)??梢赃\(yùn)行時(shí)修改。超過后,不能再提交新的作業(yè)。

max_consumer_num_per_group

BE

3

該參數(shù)表示一個(gè)子任務(wù)中最多生成幾個(gè)consumer進(jìn)行數(shù)據(jù)消費(fèi)。對于Kafka數(shù)據(jù)源,一個(gè)consumer可能消費(fèi)一個(gè)或多個(gè)Kafka partition。假設(shè)一個(gè)任務(wù)需要消費(fèi)6個(gè)Kafka partition,則會生成3個(gè) consumer,每個(gè)consumer消費(fèi)2個(gè)partition。如果只有2個(gè)partition,則只會生成2個(gè)consumer,每個(gè) consumer消費(fèi)1個(gè)partition。

push_write_mbytes_per_sec

BE

10,即10MB/s

該參數(shù)是導(dǎo)入通用參數(shù),不限于例行導(dǎo)入作業(yè)。該參數(shù)限制了導(dǎo)入數(shù)據(jù)寫入磁盤的速度。對于SSD等高性能存儲設(shè)備,可以適當(dāng)增加這個(gè)限速。

max_tolerable_backend_down_num

FE

0

在滿足某些條件下,Doris可PAUSED的任務(wù)重新調(diào)度,即變成RUNNING。該參數(shù)為0代表只有所有BE節(jié)點(diǎn)是alive狀態(tài)才允許重新調(diào)度。

period_of_auto_resume_min

FE

5分鐘

Doris重新調(diào)度,只會在5分鐘這個(gè)周期內(nèi),最多嘗試3次。如果3次都失敗則鎖定當(dāng)前任務(wù),后續(xù)不再進(jìn)行調(diào)度。但可通過人為干預(yù),進(jìn)行手動(dòng)恢復(fù)。