日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Stream Load

當(dāng)您需要將本地文件或數(shù)據(jù)流導(dǎo)入到云數(shù)據(jù)庫 SelectDB 版實例時,您可以使用Stream Load進(jìn)行數(shù)據(jù)同步導(dǎo)入,并通過即時的返回結(jié)果判斷本次導(dǎo)入是否成功。本文介紹如何通過Stream Load導(dǎo)入數(shù)據(jù)至云數(shù)據(jù)庫 SelectDB 版實例中。

背景信息

Stream Load屬于同步接口的導(dǎo)入方式,您可以通過發(fā)送HTTP請求將本地文件或數(shù)據(jù)流導(dǎo)入到云數(shù)據(jù)庫 SelectDB 版實例中。Stream Load執(zhí)行并返回導(dǎo)入結(jié)果,您可以通過請求的返回結(jié)果判斷本次導(dǎo)入是否成功。

Stream Load適用于導(dǎo)入本地文件或通過程序?qū)霐?shù)據(jù)流中的數(shù)據(jù),支持的數(shù)據(jù)格式包括:CSV(文本)、JSONPARQUETORC

創(chuàng)建導(dǎo)入

Stream Load通過HTTP協(xié)議提交和傳輸數(shù)據(jù)。以下通過curl命令提交導(dǎo)入,也可以通過其他HTTP Client進(jìn)行操作。

語法

# Header中支持的屬性,請參見下面的參數(shù)說明。
# 格式為: -H "key1:value1"。
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_name> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

參數(shù)說明

參數(shù)名稱

參數(shù)說明

--location-trusted

需要認(rèn)證時,會將usernamepassword傳遞給被重定向到的服務(wù)器。

-u

指定云數(shù)據(jù)庫 SelectDB 版實例的用戶名和密碼。

-H

指定本次Stream Load導(dǎo)入請求的請求頭(Header)內(nèi)容。

-T

指定需要導(dǎo)入數(shù)據(jù)的文件。

-XPUT

HTTP請求的Method,采用PUT請求方法,指定云數(shù)據(jù)庫 SelectDB 版的數(shù)據(jù)導(dǎo)入地址,其中包括參數(shù)如下。

  • host云數(shù)據(jù)庫 SelectDB 版實例的VPC地址或公網(wǎng)地址。

    說明

    申請公網(wǎng)的具體操作,請參見申請和釋放公網(wǎng)地址

  • port云數(shù)據(jù)庫 SelectDB 版實例的HTTP端口號,默認(rèn)為8080。

    說明

    您可以在云數(shù)據(jù)庫 SelectDB 版的實例詳情頁面查看云數(shù)據(jù)庫 SelectDB 版實例的連接地址和端口號。

  • db_name:數(shù)據(jù)庫名。

  • table_name:數(shù)據(jù)表名。

Stream Load使用HTTP協(xié)議,因此導(dǎo)入任務(wù)有關(guān)的參數(shù)主要設(shè)置在請求頭(Header)中。常用的導(dǎo)入?yún)?shù)如下。

參數(shù)名稱

參數(shù)說明

label

導(dǎo)入任務(wù)的唯一標(biāo)識。Label是在導(dǎo)入命令中自定義的名稱。通過這個Label,可以查看對應(yīng)導(dǎo)入任務(wù)的執(zhí)行情況。Label也可用于防止重復(fù)導(dǎo)入相同的數(shù)據(jù),當(dāng)Label對應(yīng)的導(dǎo)入作業(yè)狀態(tài)為CANCELLED時,該Label可以再次被使用。

說明

推薦同一批次數(shù)據(jù)使用相同的Label。這樣同一批次數(shù)據(jù)的重復(fù)請求只會被接受一次,保證了At-Most-Once

format

指定導(dǎo)入數(shù)據(jù)格式,默認(rèn)值為CSV。支持CSVJSONPARQUETORCcsv_with_names(CSV文件行首過濾)和csv_with_names_and_typesCSV文件前兩行過濾)。

line_delimiter

指定導(dǎo)入文件中的換行符,默認(rèn)為\n。可以使用做多個字符的組合作為換行符。

column_separator

指定導(dǎo)入文件中的列分隔符,默認(rèn)為\t。可以使用多個字符的組合作為列分隔符。如果是不可見字符,則需要加\x作為前綴,使用十六進(jìn)制來表示分隔符。例如:Hive文件的分隔符\x01,需要指定為-H"column_separator:\x01"

compress_type

指定文件的壓縮格式。僅支持CSV文件的壓縮,支持gzlzobz2lz4lzopdeflate壓縮格式。

max_filter_ratio

指定導(dǎo)入任務(wù)的最大容忍率,默認(rèn)為0,即零容忍,取值范圍是0~1。當(dāng)導(dǎo)入的錯誤率超過該值,則導(dǎo)入失敗。如果希望忽略錯誤的行,可以通過設(shè)置這個參數(shù)大于0,來保證導(dǎo)入成功。

strict_mode

指定是否開啟嚴(yán)格過濾模式,默認(rèn)為false。開啟后,會對導(dǎo)入過程中的列類型轉(zhuǎn)換進(jìn)行嚴(yán)格過濾,錯誤的數(shù)據(jù)將被過濾。

cloud_cluster

指定導(dǎo)入使用的集群。默認(rèn)為該實例的默認(rèn)集群。如果該實例沒有設(shè)置默認(rèn)集群,則自動選擇一個有權(quán)限的集群。

load_to_single_tablet

指定是否只導(dǎo)入數(shù)據(jù)到對應(yīng)分區(qū)的一個tablet,默認(rèn)值為false。該參數(shù)只允許在對帶有random分區(qū)的Duplicate表導(dǎo)入數(shù)據(jù)的時候設(shè)置。

where

指定導(dǎo)入任務(wù)的過濾條件。支持對原始數(shù)據(jù)指定where語句進(jìn)行過濾,被過濾的數(shù)據(jù)將不會被導(dǎo)入,也不會參與filter ratio的計算,但會被計入num_rows_unselected

partitions

指定待導(dǎo)入數(shù)據(jù)的分區(qū)(Partition)信息。如果待導(dǎo)入數(shù)據(jù)不屬于指定的分區(qū)(Partition)則不會被導(dǎo)入。這些數(shù)據(jù)將計入dpp.abnorm.ALL。

columns

指定待導(dǎo)入數(shù)據(jù)的函數(shù)變換配置。支持的函數(shù)變換方法包含列的順序變化以及表達(dá)式變換,其中表達(dá)式變換的方法與查詢語句的一致。

merge_type

指定數(shù)據(jù)合并類型,默認(rèn)為APPEND。默認(rèn)值表示本次導(dǎo)入是普通的追加寫操作。MERGEDELETE類型僅適用于Unique Key表模型。其中MERGE類型需要配合delete參數(shù)使用,以標(biāo)注Delete Flag列。而DELETE類型則表示本次導(dǎo)入的所有數(shù)據(jù)皆為刪除數(shù)據(jù)。

delete

僅在指定merge_type類型為MERGE時才具有意義,表示數(shù)據(jù)的刪除條件。

function_column.sequence_col

只適用于UNIQUE_KEYS,相同Key列下,保證Value列按照source_sequence列進(jìn)行REPLACE, source_sequence可以是數(shù)據(jù)源中的列,也可以是表結(jié)構(gòu)中的一列。

exec_mem_limit

指定導(dǎo)入內(nèi)存限制。單位為字節(jié),默認(rèn)為2147483648,即2 GiB

timeout

指定導(dǎo)入的超時時間,單位:秒,默認(rèn)值為600。范圍為1~259200秒。

timezone

指定本次導(dǎo)入所使用的時區(qū),默認(rèn)為"Asia/Shanghai",即東八區(qū)。該參數(shù)會影響所有導(dǎo)入涉及的和時區(qū)有關(guān)的函數(shù)結(jié)果。

two_phase_commit

指定是否開啟兩階段事務(wù)提交模式,默認(rèn)為false。開啟兩階段事務(wù)提交模式后,數(shù)據(jù)寫入完成即會返回信息給用戶,此時數(shù)據(jù)不可見,事務(wù)狀態(tài)為PRECOMMITTED,用戶手動觸發(fā)commit操作之后,數(shù)據(jù)才可見。

Stream load是一種同步的導(dǎo)入方式,因此導(dǎo)入的結(jié)果會通過創(chuàng)建導(dǎo)入的返回值直接返回給用戶。返回結(jié)果示例如下。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

返回結(jié)果參數(shù)說明如下。

參數(shù)名稱

參數(shù)說明

TxnId

導(dǎo)入的事務(wù)ID。

Label

導(dǎo)入Label,由用戶指定或系統(tǒng)自動生成。

Status

導(dǎo)入狀態(tài),取值如下:

  • Success:導(dǎo)入成功。

  • Publish Timeout:導(dǎo)入已經(jīng)完成,只是數(shù)據(jù)可能會延遲可見,無需重試。

  • Label Already ExistsLabel重復(fù),需更換Label

  • Fail:導(dǎo)入失敗。

ExistingJobStatus

已存在的Label對應(yīng)的導(dǎo)入作業(yè)的狀態(tài)。

這個字段只有在當(dāng)Status為"Label Already Exists"時才會顯示。用戶可以通過這個狀態(tài),知曉已存在Label對應(yīng)的導(dǎo)入作業(yè)的狀態(tài)。"RUNNING"表示作業(yè)還在執(zhí)行,"FINISHED"表示作業(yè)成功。

Message

錯誤信息提示。

NumberTotalRows

導(dǎo)入總處理的行數(shù)。

NumberLoadedRows

成功導(dǎo)入的行數(shù)。

NumberFilteredRows

數(shù)據(jù)質(zhì)量不合格的行數(shù)。

NumberUnselectedRows

where條件過濾的行數(shù)。

LoadBytes

導(dǎo)入的字節(jié)數(shù)。

LoadTimeMs

導(dǎo)入完成時間,單位毫秒。

BeginTxnTimeMs

向Fe請求開始一個事務(wù)所花費的時間,單位:毫秒。

StreamLoadPutTimeMs

向Fe請求獲取導(dǎo)入數(shù)據(jù)執(zhí)行計劃所花費的時間,單位:毫秒。

ReadDataTimeMs

讀取數(shù)據(jù)所花費的時間,單位:毫秒。

WriteDataTimeMs

執(zhí)行寫入數(shù)據(jù)操作所花費的時間,單位:毫秒。

CommitAndPublishTimeMs

向Fe請求提交并且發(fā)布事務(wù)所花費的時間,單位:毫秒。

ErrorURL

如果有數(shù)據(jù)質(zhì)量問題,通過訪問這個URL查看具體錯誤行。

使用Stream Load導(dǎo)入數(shù)據(jù),示例如下。

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

取消導(dǎo)入

無法手動取消Stream Load,Stream Load在超時或者導(dǎo)入錯誤后會被系統(tǒng)自動取消。

查看Stream Load

您可以通過show stream load來查看已經(jīng)完成的Stream load任務(wù)。默認(rèn)BE(BackEnd)不保留Stream Load的啟用記錄,如果您要查看則需要在BE上啟用記錄,配置參數(shù)為:enable_stream_load_record=true,具體操作請參見BE配置項。?

使用示例

腳本示例

  1. 創(chuàng)建待導(dǎo)入的SelectDB數(shù)據(jù)表,示例如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 16
    PROPERTIES("replication_num" = "1");
  2. 創(chuàng)建待導(dǎo)入文件test.data,示例如下。

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  3. 使用不同參數(shù)配置導(dǎo)入數(shù)據(jù),示例如下。

    • 將本地文件test.data中的數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫test_db中的test_table表,使用Label用于去重,指定超時時間為100秒。

       curl --location-trusted -u root -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 將本地文件test.data中的數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫test_db中的test_table表,使用Label用于去重,指定文件的列名,并且只導(dǎo)入address等于hangzhou的數(shù)據(jù)。

      curl --location-trusted -u root -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 將本地文件test.data中的數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫test_db中的test_table表,允許20%的錯誤率。

      curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 導(dǎo)入數(shù)據(jù)進(jìn)行嚴(yán)格模式過濾,并設(shè)置時區(qū)為Africa/Abidjan

      curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 刪除與這批導(dǎo)入Key相同的數(shù)據(jù)。

      curl --location-trusted -u root -H "merge_type: DELETE" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 將這批數(shù)據(jù)中address列為hangzhou的數(shù)據(jù)的行刪除,其他行正常追加。

      curl --location-trusted -u root: -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.data http://host:port/api/testDb/testTbl/_stream_load

Java代碼示例

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. 參數(shù)配置
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. 構(gòu)建httpclient,特別注意需要開啟重定向(isRedirectable)
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // 開啟重定向
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. 構(gòu)建httpPut請求對象
        HttpPut httpPut = new HttpPut(loadUrl);

        // 設(shè)置httpHeader...
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. 設(shè)置要發(fā)送的數(shù)據(jù),這里寫入csv
        // 假設(shè)有一張表,字段如下:
        // field1,field2,field3,field4
        // 這里模擬了三條csv記錄,doris 中csv的行分隔符默認(rèn)是\n,列分隔符默認(rèn)是\t
        String data =
                "1\t2\t3\t4\n" +
                "11\t22\t33\t44\n" +
                "111\t222\t333\t444";

        httpPut.setEntity(new StringEntity(data));

        // 5. 發(fā)送請求,處理結(jié)果
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // 選擇適合的JSON序列化組件,對返回結(jié)果進(jìn)行序列化
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // 獲取SelectDB返回的狀態(tài)碼...
            String dorisStatus = respAsMap.get("Status");
            // SelectDB返回以下狀態(tài),都表示數(shù)據(jù)寫入成功
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

相關(guān)系統(tǒng)配置

FE配置

stream_load_default_timeout_second:導(dǎo)入任務(wù)的超時時間,單位:秒。默認(rèn)值為600。導(dǎo)入任務(wù)在設(shè)定的timeout時間內(nèi)未完成則會被系統(tǒng)取消,變成CANCELLED。如果導(dǎo)入的源文件無法在規(guī)定時間內(nèi)完成導(dǎo)入,您可以在Stream load請求中設(shè)置單獨的超時時間或者調(diào)整FE的參數(shù)stream_load_default_timeout_second來設(shè)置全局的默認(rèn)超時時間。

BE配置

streaming_load_max_mb:Stream load的最大導(dǎo)入大小,單位:MB,默認(rèn)值為10240。如果您的原始文件超過該值,則需要調(diào)整BE參數(shù)streaming_load_max_mb

Http Stream模式

在Stream Load中,依托Table Value Function(TVF)功能,可以通過使用SQL表達(dá)式來表達(dá)導(dǎo)入的參數(shù)。這個Stream Load依托TVF功能后名為http_stream。更多Table Value Function(TVF)的使用方式,詳情請參見TVF

使用http_stream進(jìn)行Stream Load導(dǎo)入時的Rest API URL不同于Stream Load普通導(dǎo)入的 URL。

  • 普通Stream Load的URL為:http://host:http_port/api/{db}/{table}/_stream_load

  • 使用TVF http_stream的URL 為:http://host:http_port/api/_http_stream

語法

Stream Load的Http Stream模式。

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

Http Stream參數(shù)說明請參見參數(shù)說明

使用示例

在Http Header中添加一個SQL的參數(shù)load_sql,去替代之前參數(shù)中的column_separatorline_delimiterwherecolumns等參數(shù),SQL參數(shù)load_sql示例如下。

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

完整示例:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream