日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Routine Load是一種例行導(dǎo)入方式,StarRocks通過(guò)該方式支持從Kafka持續(xù)不斷的導(dǎo)入數(shù)據(jù),并且支持通過(guò)SQL控制導(dǎo)入任務(wù)的暫停、重啟和停止。本文為您介紹Routine Load導(dǎo)入的基本原理、導(dǎo)入示例以及常見問(wèn)題。

基本概念

  • RoutineLoadJob:提交的一個(gè)例行導(dǎo)入任務(wù)。
  • JobScheduler:例行導(dǎo)入任務(wù)調(diào)度器,用于調(diào)度和拆分一個(gè)RoutineLoadJob為多個(gè)Task。
  • Task:RoutineLoadJob被JobScheduler根據(jù)規(guī)則拆分的子任務(wù)。
  • TaskScheduler:任務(wù)調(diào)度器,用于調(diào)度Task的執(zhí)行。

基本原理

Routine Load的導(dǎo)入流程如下圖。Routine Load
導(dǎo)入流程如下:
  1. 用戶通過(guò)支持MySQL協(xié)議的客戶端向FE提交一個(gè)Kafka導(dǎo)入任務(wù)。
  2. FE將一個(gè)導(dǎo)入任務(wù)拆分成若干個(gè)Task,每個(gè)Task負(fù)責(zé)導(dǎo)入指定的一部分?jǐn)?shù)據(jù)。
  3. 每個(gè)Task被分配到指定的BE上執(zhí)行。在BE上,一個(gè)Task被視為一個(gè)普通的導(dǎo)入任務(wù),通過(guò)Stream Load的導(dǎo)入機(jī)制進(jìn)行導(dǎo)入。
  4. BE導(dǎo)入完成后,向FE匯報(bào)。
  5. FE根據(jù)匯報(bào)結(jié)果,繼續(xù)生成后續(xù)新的Task,或者對(duì)失敗的Task進(jìn)行重試。
  6. FE會(huì)不斷的產(chǎn)生新的Task,來(lái)完成數(shù)據(jù)不間斷的導(dǎo)入。
說(shuō)明 本文圖片和部分內(nèi)容來(lái)源于開源StarRocks的從Apache Kafka持續(xù)導(dǎo)入

導(dǎo)入流程

環(huán)境要求

  • 支持訪問(wèn)無(wú)認(rèn)證或使用SSL方式認(rèn)證的Kafka集群。

  • 支持的消息格式如下:

    • CSV文本格式,每一個(gè)message為一行,且行尾不包含換行符。

    • JSON文本格式。

  • 不支持Array類型。

  • 僅支持Kafka 0.10.0.0及以上版本。

創(chuàng)建導(dǎo)入任務(wù)

  • 語(yǔ)法

    CREATE ROUTINE LOAD <database>.<job_name> ON <table_name>
        [COLUMNS TERMINATED BY "column_separator" ,]
        [COLUMNS (col1, col2, ...) ,]
        [WHERE where_condition ,]
        [PARTITION (part1, part2, ...)]
        [PROPERTIES ("key" = "value", ...)]
        FROM [DATA_SOURCE]
        [(data_source_properties1 = 'value1',
        data_source_properties2 = 'value2',
        ...)]

    相關(guān)參數(shù)描述如下表所示。

    參數(shù)是否必填描述
    job_name導(dǎo)入任務(wù)的名稱,前綴可以攜帶導(dǎo)入數(shù)據(jù)庫(kù)名稱,常見命名方式為時(shí)間戳+表名。 一個(gè)DataBase內(nèi),任務(wù)名稱不可重復(fù)。
    table_name導(dǎo)入的目標(biāo)表的名稱。
    COLUMNS TERMINATED子句指定源數(shù)據(jù)文件中的列分隔符,分隔符默認(rèn)為\t。
    COLUMNS子句指定源數(shù)據(jù)中列和表中列的映射關(guān)系。
    • 映射列:例如,目標(biāo)表有三列col1、col2和col3,源數(shù)據(jù)有4列,其中第1、2、4列分別對(duì)應(yīng)col2、col1和col3,則書寫為COLUMNS (col2, col1, temp, col3),其中temp列為不存在的一列,用于跳過(guò)源數(shù)據(jù)中的第三列。
    • 衍生列:除了直接讀取源數(shù)據(jù)的列內(nèi)容之外,StarRocks還提供對(duì)數(shù)據(jù)列的加工操作。例如,目標(biāo)表后加入了第四列col4,其結(jié)果由col1 + col2產(chǎn)生,則可以書寫為COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)
    WHERE子句指定過(guò)濾條件,可以過(guò)濾掉不需要的行。過(guò)濾條件可以指定映射列或衍生列。

    例如,只導(dǎo)入k1大于100并且k2等于1000的行,則書寫為WHERE k1 > 100 and k2 = 1000

    PARTITION子句指定導(dǎo)入目標(biāo)表的Partition。如果不指定,則會(huì)自動(dòng)導(dǎo)入到對(duì)應(yīng)的Partition中。
    PROPERTIES子句指定導(dǎo)入任務(wù)的通用參數(shù)。
    desired_concurrent_number導(dǎo)入并發(fā)度,指定一個(gè)導(dǎo)入任務(wù)最多會(huì)被分成多少個(gè)子任務(wù)執(zhí)行。必須大于0,默認(rèn)值為3。
    max_batch_interval每個(gè)子任務(wù)的最大執(zhí)行時(shí)間。范圍為5~60,單位是秒。默認(rèn)值為10。

    1.15版本后,該參數(shù)表示子任務(wù)的調(diào)度時(shí)間,即任務(wù)多久執(zhí)行一次。任務(wù)的消費(fèi)數(shù)據(jù)時(shí)間為fe.conf中的routine_load_task_consume_second,默認(rèn)為3s。任務(wù)的執(zhí)行超時(shí)時(shí)間為fe.conf中的routine_load_task_timeout_second,默認(rèn)為15s。

    max_batch_rows每個(gè)子任務(wù)最多讀取的行數(shù)。必須大于等于200000。默認(rèn)值為200000。

    1.15版本后,該參數(shù)只用于定義錯(cuò)誤檢測(cè)窗口范圍,窗口的范圍是10 * max-batch-rows

    max_batch_size每個(gè)子任務(wù)最多讀取的字節(jié)數(shù)。單位為字節(jié),范圍是100 MB到1 GB。默認(rèn)值為100 MB。

    1.15版本后,廢棄該參數(shù),任務(wù)消費(fèi)數(shù)據(jù)的時(shí)間為fe.conf中的routine_load_task_consume_second,默認(rèn)為3s。

    max_error_number采樣窗口內(nèi),允許的最大錯(cuò)誤行數(shù)。必須大于等于0。默認(rèn)是0,即不允許有錯(cuò)誤行。
    重要 被WHERE條件過(guò)濾掉的行不算錯(cuò)誤行。
    strict_mode是否開啟嚴(yán)格模式,默認(rèn)為開啟。

    如果開啟后,非空原始數(shù)據(jù)的列類型變換為NULL,則會(huì)被過(guò)濾。關(guān)閉方式為設(shè)置該參數(shù)為false。

    timezone指定導(dǎo)入任務(wù)所使用的時(shí)區(qū)。

    默認(rèn)為使用Session的timezone參數(shù)。該參數(shù)會(huì)影響所有導(dǎo)入涉及的和時(shí)區(qū)有關(guān)的函數(shù)結(jié)果。

    DATA_SOURCE指定數(shù)據(jù)源,請(qǐng)使用KAFKA。
    data_source_properties指定數(shù)據(jù)源相關(guān)的信息。包括以下參數(shù):
    • kafka_broker_list:Kafka的Broker連接信息,格式為ip:host。多個(gè)Broker之間以逗號(hào)(,)分隔。
    • kafka_topic:指定待訂閱的Kafka的Topic。
      說(shuō)明 如果指定數(shù)據(jù)源相關(guān)的信息,則kafka_broker_listkafka_topic必填。
    • kafka_partitionskafka_offsets:指定需要訂閱的Kafka Partition,以及對(duì)應(yīng)的每個(gè)Partition的起始o(jì)ffset。
    • property:Kafka相關(guān)的屬性,功能等同于Kafka Shell中"--property"參數(shù)。創(chuàng)建導(dǎo)入任務(wù)更詳細(xì)的語(yǔ)法可以通過(guò)執(zhí)行HELP ROUTINE LOAD; 命令查看。
  • 示例:向StarRocks提交一個(gè)無(wú)認(rèn)證的Routine Load導(dǎo)入任務(wù)example_tbl2_ordertest,持續(xù)消費(fèi)Kafka集群中Topic ordertest2的消息,并導(dǎo)入至example_tbl2表中,導(dǎo)入任務(wù)會(huì)從此Topic所指定分區(qū)的最早位點(diǎn)開始消費(fèi)。

    CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl
    COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
    PROPERTIES
    (
        "desired_concurrent_number"="5",
        "format" ="json",
        "jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
     )
    FROM KAFKA
    (
        "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
        "kafka_topic" = "ordertest2",
        "kafka_partitions" ="0,1,2,3,4",
        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
  • 示例:使用SSL安全協(xié)議訪問(wèn)Kafka,具體的配置示例如下。

    -- 指定安全協(xié)議為SSL。
    "property.security.protocol" = "ssl", 
    
     -- CA證書的位置。
    "property.ssl.ca.location" = "FILE:ca-cert",
    
    -- 如果Kafka Server端開啟了Client認(rèn)證,則還需設(shè)置以下三個(gè)參數(shù):
    -- Client的公鑰位置。
    "property.ssl.certificate.location" = "FILE:client.pem", 
    -- Client的私鑰位置。
    "property.ssl.key.location" = "FILE:client.key", 
    -- Client私鑰的密碼。
    "property.ssl.key.password" = "******"

    關(guān)于創(chuàng)建文件的詳細(xì)信息,請(qǐng)參見CREATE FILE

    說(shuō)明

    在使用CREATE FILE時(shí),請(qǐng)使用OSS的HTTP訪問(wèn)地址作為url。具體的使用方式,請(qǐng)參見OSS訪問(wèn)域名使用規(guī)則

查看任務(wù)狀態(tài)

  • 顯示load_test下,所有的例行導(dǎo)入任務(wù)(包括已停止或取消的任務(wù))。結(jié)果為一行或多行。

    USE load_test;
    SHOW ALL ROUTINE LOAD;
  • 顯示load_test下,名稱為example_tbl2_ordertest的當(dāng)前正在運(yùn)行的例行導(dǎo)入任務(wù)。

    SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;
  • 在EMR StarRocks Manager控制臺(tái),單擊元數(shù)據(jù)管理,單擊待查看的數(shù)據(jù)庫(kù)名稱,單擊任務(wù),即可在Kafka導(dǎo)入頁(yè)簽查看任務(wù)的執(zhí)行狀態(tài)。State

重要

StarRocks只能查看當(dāng)前正在運(yùn)行中的任務(wù),已結(jié)束和未開始的任務(wù)無(wú)法查看。

執(zhí)行SHOW ALL ROUTINE LOAD命令,可以查看當(dāng)前正在運(yùn)行的所有Routine Load任務(wù),返回如下類似信息。

*************************** 1. row ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)

本示例創(chuàng)建名為routine_load_wikipedia的導(dǎo)入任務(wù),相關(guān)參數(shù)描述如下表。

參數(shù)

描述

State

導(dǎo)入任務(wù)狀態(tài)。RUNNING表示該導(dǎo)入任務(wù)處于持續(xù)運(yùn)行中。

Statistic

進(jìn)度信息,記錄了從創(chuàng)建任務(wù)開始后的導(dǎo)入信息。

receivedBytes

接收到的數(shù)據(jù)大小,單位是Byte。

errorRows

導(dǎo)入錯(cuò)誤行數(shù)。

committedTaskNum

FE提交的Task數(shù)。

loadedRows

已導(dǎo)入的行數(shù)。

loadRowsRate

導(dǎo)入數(shù)據(jù)速率,單位是行每秒(row/s)。

abortedTaskNum

BE失敗的Task數(shù)。

totalRows

接收的總行數(shù)。

unselectedRows

被WHERE條件過(guò)濾的行數(shù)。

receivedBytesRate

接收數(shù)據(jù)速率,單位是Bytes/s。

taskExecuteTimeMs

導(dǎo)入耗時(shí),單位是ms。

ErrorLogUrls

錯(cuò)誤信息日志,可以通過(guò)URL看到導(dǎo)入過(guò)程中的錯(cuò)誤信息。

暫停導(dǎo)入任務(wù)

使用PAUSE語(yǔ)句后,此時(shí)導(dǎo)入任務(wù)進(jìn)入PAUSED狀態(tài),數(shù)據(jù)暫停導(dǎo)入,但任務(wù)未終止,可以通過(guò)RESUME語(yǔ)句重啟任務(wù)。

PAUSE ROUTINE LOAD FOR <job_name>;

暫停導(dǎo)入任務(wù)后,任務(wù)的State變更為PAUSEDStatisticProgress中的導(dǎo)入信息停止更新。此時(shí),任務(wù)并未終止,通過(guò)SHOW ROUTINE LOAD語(yǔ)句可以看到已經(jīng)暫停的導(dǎo)入任務(wù)。

恢復(fù)導(dǎo)入任務(wù)

使用RESUME語(yǔ)句后,任務(wù)會(huì)短暫的進(jìn)入NEED_SCHEDULE狀態(tài),表示任務(wù)正在重新調(diào)度,一段時(shí)間后會(huì)重新恢復(fù)至RUNNING狀態(tài),繼續(xù)導(dǎo)入數(shù)據(jù)。

RESUME ROUTINE LOAD FOR <job_name>;

停止導(dǎo)入任務(wù)

使用STOP語(yǔ)句讓導(dǎo)入任務(wù)進(jìn)入STOP狀態(tài),數(shù)據(jù)停止導(dǎo)入,任務(wù)終止,無(wú)法恢復(fù)數(shù)據(jù)導(dǎo)入。

STOP ROUTINE LOAD FOR <job_name>;

停止導(dǎo)入任務(wù)后,任務(wù)的State變更為STOPPEDStatisticProgress中的導(dǎo)入信息再也不會(huì)更新。此時(shí),通過(guò)SHOW ROUTINE LOAD語(yǔ)句無(wú)法看到已經(jīng)停止的導(dǎo)入任務(wù)。stop

最佳實(shí)踐案例

本案例是創(chuàng)建了一個(gè)Routine Load導(dǎo)入任務(wù),持續(xù)不斷地消費(fèi)Kafka集群的CSV格式的數(shù)據(jù),然后導(dǎo)入至StarRocks中。

  1. 在Kafka集群中執(zhí)行以下操作。

    1. 創(chuàng)建測(cè)試的topic。

      kafka-topics.sh --create  --topic order_sr_topic --replication-factor 3 --partitions 10 --bootstrap-server "core-1-1:9092,core-1-2:9092,core-1-3:9092"
    2. 執(zhí)行如下命令,生產(chǎn)數(shù)據(jù)。

      kafka-console-producer.sh  --broker-list core-1-1:9092 --topic order_sr_topic
    3. 輸入測(cè)試數(shù)據(jù)。

      2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
      2020050802,2020-05-08,Julien Sorel,France,male,893
      2020050803,2020-05-08,Dorian Grey,UK,male,1262
      2020051001,2020-05-10,Tess Durbeyfield,US,female,986
      2020051101,2020-05-11,Edogawa Conan,japan,male,8924
  2. 在StarRocks集群中執(zhí)行以下操作。

    1. 執(zhí)行以下命令,創(chuàng)建目標(biāo)數(shù)據(jù)庫(kù)和數(shù)據(jù)表。

      根據(jù)CSV數(shù)據(jù)中需要導(dǎo)入的幾列(例如除第五列性別外的其余五列需要導(dǎo)入至StarRocks), 在StarRocks集群的目標(biāo)數(shù)據(jù)庫(kù)load_test 中創(chuàng)建表routine_load_tbl_csv。

      CREATE TABLE load_test.routine_load_tbl_csv (
          `order_id` bigint NOT NULL COMMENT "訂單編號(hào)",
          `pay_dt` date NOT NULL COMMENT "支付日期",
          `customer_name` varchar(26) NULL COMMENT "顧客姓名",
          `nationality` varchar(26) NULL COMMENT "國(guó)籍",
          `price`double NULL COMMENT "支付金額"
      )
      ENGINE=OLAP
      PRIMARY KEY (order_id,pay_dt)
      DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;
    2. 執(zhí)行以下命令,創(chuàng)建導(dǎo)入任務(wù)。

      CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv
      COLUMNS TERMINATED BY ",",
      COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
      PROPERTIES
      (
          "desired_concurrent_number" = "5"
      )
      FROM KAFKA
      (
          "kafka_broker_list" ="192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
          "kafka_topic" = "order_sr_topic",
          "kafka_partitions" ="0,1,2,3,4",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      )
    3. 執(zhí)行以下命令,查看名稱為routine_load_tbl_ordertest_csv的導(dǎo)入任務(wù)的信息。

      SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;

      如果狀態(tài)為RUNNING,則說(shuō)明作業(yè)運(yùn)行正常。running

    4. 執(zhí)行以下命令,查詢目標(biāo)表中的數(shù)據(jù),您會(huì)發(fā)現(xiàn)數(shù)據(jù)已經(jīng)同步完成。

      SELECT * FROM routine_load_tbl_csv;

      您還可以任務(wù)進(jìn)行以下操作:

      • 暫停導(dǎo)入任務(wù)

        PAUSE ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
      • 恢復(fù)導(dǎo)入任務(wù)

        RESUME ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
      • 修改導(dǎo)入任務(wù)

        說(shuō)明

        僅支持修改狀態(tài)為PAUSED的任務(wù)。

        例如:修改desired_concurrent_number為6。

        ALTER ROUTINE LOAD FOR routine_load_tbl_ordertest_csv
        PROPERTIES
        (
            "desired_concurrent_number" = "6"
        )
      • 停止導(dǎo)入任務(wù)

        STOP ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;