日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用時序Writer寫入時序數據

表格存儲支持通過時序Writer將時序數據寫入時序表,時序Writer還支持多表寫入、寫入狀態統計、行級別回調和自定義配置功能。本文介紹使用時序Writer寫入時序數據的使用流程。

背景信息

使用時序模型時,用戶主要通過調用PutTimeseriesData接口來實現時序數據的寫入,但是此方式存在如下使用限制:

  • 需要手動控制并發。

  • 不支持行級別的回調功能。

  • 不支持性能參數和限制參數配置。

  • 不支持一次向多個時序表中寫入數據。

因此表格存儲Java SDK提供了時序Writer用于高性能寫入時序數據。時序Writer封裝了PutTimeseriesData接口,實現了內部控制并發寫入時序數據的功能,同時支持多表寫入、寫入狀態統計、行級別回調和自定義配置功能。

功能特性

時序Writer提供了多表寫入、批量寫入、并發寫入、寫入狀態統計、行級別回調和自定義配置功能。詳細說明請參見下表。

功能

描述

多表寫入

時序Writer可以實現向多個不同的數據表中寫入數據。用戶新增數據后先緩存至內存,再根據表名構造不同的請求。

批量寫入

單行寫入接口為addTimeseriesRowChange(TimeseriesRow),支持使用批量寫入接口addTimeseriesRowChange(List<TimeseriesRow>)寫入多行數據。

并發寫入

通過數據分桶與分桶間的并發控制,提升數據的寫入速率。

寫入狀態統計

支持總請求次數、總行數、總成功行數、總失敗行數、單行請求數等指標的查詢。

行級別回調

支持對每一行數據的寫入結果設置回調函數。

自定義配置

支持對并發數、重試策略、隊列大小等參數進行自定義配置。

注意事項

目前支持使用時序模型功能的地域有華東1(杭州)、華東2(上海)、華北2(北京)、華北3(張家口)、華北6(烏蘭察布)、華南1(深圳)、中國香港、德國(法蘭克福)、美國(弗吉尼亞)和新加坡。

如果使用過程中遇到問題,請通過釘釘加入用戶群44327024(物聯網存儲 IoTstore 開發者交流群)聯系我們。

前提條件

  • 已創建時序模型實例以及時序表,用于存放時序數據。具體操作,請參見創建時序模型實例創建時序表。

  • 已創建RAM用戶,并授予RAM用戶管理表格存儲服務的權限(AliyunOTSFullAccess)。具體操作,請參見通過RAM Policy為RAM用戶授權

    重要

    由于數據同步任務配置時需要填寫訪問密鑰AccessKey(AK)信息來執行授權,為避免阿里云賬號泄露AccessKey帶來的安全風險,建議您通過RAM用戶來完成授權和AccessKey的創建。

  • 已獲取AccessKey(包括AccessKey ID和AccessKey Secret),用于進行簽名認證。具體操作,請參見獲取AccessKey。

快速上手

使用時序Writer快速寫入時序數據到時序表。

說明

此處按照步驟詳細介紹時序Writer的使用方式,完整代碼請參見附錄:完整代碼。

步驟一:安裝時序Writer

如果已經安裝表格存儲Java SDK,可跳過此操作。

在Maven工程中使用時序Writer,只需在pom.xml中加入表格存儲 Java SDK依賴即可。以5.13.15版本為例,在<dependencies>內加入如下內容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>tablestore</artifactId>
    <version>5.13.15</version>
</dependency>  

步驟二:創建時序Writer實例

創建時序Writer實例用于將時序數據寫入時序表。

說明
  • 表格存儲提供了三種構造時序Writer函數的方式,此處以通過傳入credentials且使用默認配置為例介紹構造函數的方式。更多信息,請參見構造函數。

  • 創建時序Writer實例時支持通過TimeseriesWriterConfig()工具類實現自定義參數。具體的自定義參數說明請參見配置TimeseriesWriterConfig。

endpoint = "";
accessKeyId = "";
accessKeySecret = "";
instanceName = "";

ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
TimeseriesWriterConfig config = new TimeseriesWriterConfig();

DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter(
       endpoint, credentials, instanceName, config, null);

步驟三:新增數據

構造一條時序數據,新增的時序數據將被緩存到本地。

Map<String, String> tags = new HashMap<String, String>();
tags.put("region", "hangzhou");
tags.put("os", "Ubuntu16.04");
TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_0", tags);
TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000); //一條時序數據。
String tableName = "rowTableName"; // 時序數據所在的時序表。
TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); //封裝時序數據與時序表。
writer.addTimeseriesRowChange(rowInTable); // 新增的時序數據將被緩存到本地。

步驟四:寫入時序數據

新增數據到本地緩存后,您可以調用flush函數手動將本地緩存數據寫入時序表。

重要

當某一分桶中的數據量達到maxBatchRowsCount(默認為200)設定值時,該分桶將會自動將本地緩存數據寫入到時序表。

writer.flush(); // 將本地緩存數據寫入時序表。

步驟五:關閉時序Writer

時序數據寫入后,關閉時序Writer。時序Writer關閉前會將本地緩存中未寫入的時序數據自動寫入時序表。

writer.close();

接口說明

DefaultTableStoreTimeseriesWriter類是實現時序Writer的主工具類。

接口定義

DefaultTableStoreTimeseriesWriter類中的函數定義如下:


public class DefaultTableStoreTimeseriesWriter implements TableStoreTimeseriesWriter {
    //構造函數
    public DefaultTableStoreTimeseriesWriter(...)
      
    // 單時間線新增數據。
    void addTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException;
  
    // 單時間線新增數據。
    Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(TimeseriesTableRow timeseriesTableRow) throws ClientException;
  
    // 嘗試單時間線新增數據。
    boolean tryAddTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException;
  
    // 多時間線新增數據。
    void addTimeseriesRowChange(List<TimeseriesTableRow> timeseriesTableRow, List<TimeseriesRow> dirtySeriesRow) throws ClientException;
  
    // 多時間線新增數據。
    Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(List<TimeseriesTableRow> timeseriesTableRows) throws ClientException;
  
    // 用戶自定義行級別callback。
    void setResultCallback(TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> callback);
  
    // 獲取callback。
    TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> getResultCallback();
  
    // 獲取時序writer配置。
    TimeseriesWriterConfig getTimeseriesWriterConfig();
  
    // 設置時序writer。
    TimeseriesWriterHandleStatistics getTimeseriesWriterStatistics();
  
    // 等待writer內存中數據全部落盤并寫入數據。
    void flush() throws ClientException;
  
    // 關閉時序表寫入writer。
    void close();
}

構造函數

DefaultTableStoreTimeseriesWriter支持三種構造函數,請根據實際應用場景選擇構造函數的方式。

  • 通過傳入credentials且使用默認配置構建函數

    通過傳入credentials,時序Writer會在內部構建時序client,在釋放時序Writer后無需其他操作。推薦使用此方式構造函數。

    /**
     * 推薦使用的時序writer。
     *
     * @param endpoint 實例的服務地址。
     * @param credentials 認證信息,包含AccessKey,也支持使用臨時訪問憑證token。
     * @param instanceName 實例名稱。
     * @param config 時序writer的配置。
     * @param resultCallback 行級別回調。
     */
    public DefaultTableStoreTimeseriesWriter(
            String endpoint,
            ServiceCredentials credentials,
            String instanceName,
            TimeseriesWriterConfig config,
            TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
  • 通過傳入credentials且使用自定義配置構建函數

    通過傳入credentials,時序Writer會在內部構建時序client,同時可以進行自定義配置,在釋放時序Writer后無需其他操作。

    /**
     * 推薦使用的時序writer。
     *
     * @param endpoint 實例的服務地址。
     * @param credentials 認證信息,含AccessKey,也支持臨時訪問憑證token。
     * @param instanceName 實例名稱。
     * @param config 時序writer的配置。
     * @param cc 客戶端的配置。
     * @param resultCallback 行級別回調。
     */
    public DefaultTableStoreTimeseriesWriter(
            String endpoint,
            ServiceCredentials credentials,
            String instanceName,
            TimeseriesWriterConfig config,
            ClientConfiguration cc,
            TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
  • 直接傳入時序client構造函數

    直接傳入時序client,時序client的配置更靈活,但是需要在釋放Writer后主動對otsclient進行shutdown。

    重要

    在釋放Writer后請務必主動對otsclient進行shutdown。

    /**
     * 時序writer。
     *
     * @param ots 異步時序表客戶端實例。
     * @param config 時序writer的配置。
     * @param callback 行級別回調。
     * @param executor 線程池。
     */
    public DefaultTableStoreTimeseriesWriter(
            AsyncTimeseriesClientInterface ots,
            TimeseriesWriterConfig config,
            TableStoreCallback<TimeseriesRow, TimeseriesRowResult> callback,
            Executor executor)

配置TimeseriesWriterConfig

時序Writer支持自定義配置參數,通過TimeseriesWriterConfig()工具類實現自定義參數。

參數

類型

示例

說明

maxBatchRowsCount

Integer

200

一次批量RPC請求導入的最大行數。默認值為200。取值范圍為1~200。

maxBatchSize

Integer

4*1024*1024

一次批量RPC請求導入的最大數據量。默認值為4*1024*1024(即4 MB)。單位為字節。

concurrency

Integer

64

一個時序Writer的最大請求并發數。默認值為64。

bucketCount

Integer

4

時序數據緩存分桶數。默認值為4。

由于總緩存隊列為bucketCount*bufferSize,如果分桶數量太少會導致并發能力下降,如果分桶數量太大會因資源搶占而寫入速率下降,因此分桶數量一般建議取值4或8。

bufferSize

Integer

1024

每個桶的隊列長度。默認值為1024。

flushInterval

Integer

10000

flush數據的時間間隔。默認值為10000。單位為毫秒。

logInterval

Integer

10000

記錄日志的時間間隔。默認值為10000。單位為毫秒。

dispatchMode

String

HASH_PRIMARY_KEY

多桶分發模式。取值范圍如下:

  • HASH_PRIMARY_KEY(默認):哈希完整主鍵分桶派發,可保證相同主鍵(度量名稱、數據源、tags)的時序數據被寫入同一個桶內,實現一個桶只對部分分區進行寫入。一個桶內只包含部分分區的數據。

  • ROUND_ROBIN:循環遍歷分桶派發。一個桶內可能包含所有分區的數據。

writeMode

String

PARALLEL

寫入模式。取值范圍如下:

  • SEQUENTIAL:串行寫。不同桶間并發,同一個桶內串行請求。

  • PARALLEL(默認):并發寫。不同桶間并發,同一個桶內也會并行請求。

callbackThreadCount

Integer

9

內部Callback運行的線程池的最大線程數。默認值為CPU核數+1。只有調用Callback函數內部構建線程池時生效。

callbackThreadPoolQueueSize

Integer

1024

內部Callback運行的線程池的隊列大小。默認值為1024。只有調用Callback函數內部構建線程池時生效。

writerRetryStrategy

String

CERTAIN_ERROR_CODE_NOT_RETRY

通過傳入credentials方式構造函數時內部構建client使用的重試策略。取值范圍如下:

  • CERTAIN_ERROR_CODE_RETRY:指定需要重試的錯誤碼集合。如果產生的不是指定錯誤碼,則均不做重試。

    當產生OTSInternalServerError、OTSRequestTimeout、OTSPartitionUnavailable、OTSTableNotReady、OTSRowOperationConflict、OTSTimeout、OTSServerUnavailable和OTSServerBusy錯誤碼時,系統才會重試。

  • CERTAIN_ERROR_CODE_NOT_RETRY(默認):指定不需要重試的錯誤碼集合。如果產生的不是指定的錯誤碼,則均做重試。

    當產生OTSParameterInvalid、OTSConditionCheckFail、OTSRequestBodyTooLarge,OTSInvalidPK、OTSOutOfColumnCountLimit和OTSOutOfRowSizeLimit錯誤碼時,系統均不會重試。

clientMaxConnections

String

300

通過傳入credentials方式構造函數時,內部構建client使用的最大連接數配置。默認值為300。

allowDuplicatedRowInBatchRequest

Boolean

true

批量請求是否允許對同一時間線進行寫入。取值范圍為true和false。默認值為true。

如果批量請求時不允許對同一時間線進行寫入,請配置此參數為false。

附錄:完整代碼

完整代碼主要包括了創建時序表、構造時序Writer、使用時序Writer寫入10000條數據和關閉時序Writer四個操作。

重要

使用時序Writer前,請確保已安裝5.13.15版本及以上的表格存儲Java SDK。

public class TimeseriesWriterSample {
    // 填寫實例的服務地址。
    private static String endpoint = "https://[instanceName].cn-hangzhou.ots.aliyuncs.com";
    // 填寫實例名稱。
    private static String instanceName = "instanceName";
    // 填寫阿里云賬號或者RAM用戶的AccessKey(包括AccessKey ID和Access Secret)。
    private static String accessKeyId = "XXXXXXXXXX";
    private static String accessKeySecret = "XXXXXXXXXXXXXXXXXXXXXX";
    // 填寫時序表名稱。
    private static String tableName = "tableName";
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) {
        TimeseriesWriterSample sample = new TimeseriesWriterSample();

        /**
         * 使用Writer前確保表已存在
         * 經過測試,時序表創建30秒后向其中寫入數據不會出現異常情況
         * writer會校驗表的存在性
         * */
        sample.tryCreateTable();

        /**
         * 初始化建議使用
         * DefaultTableStoreTimeseriesWriter(
         *             String endpoint,                                                             // 實例的服務地址、
         *             ServiceCredentials credentials,                                              // 認證信息,包含AccessKey,也支持使用臨時訪問憑證token。
         *             String instanceName,                                                         // 實例名稱。
         *             String tableName,                                                            // 時序表名稱:一個時序writer僅針對一個時序表。
         *             TimeseriesWriterConfig config,                                               // 時序writer的配置。
         *             TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)       // 行級別回調。
         * */
        DefaultTableStoreTimeseriesWriter writer = sample.createTableStoreTimeseriesWriter();

        /**
         * Future使用:批量寫
         * */
        sample.writeTimeseriesRowWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getTimeseriesWriterStatistics());

        /**
         * 用戶需要主動關閉Writer,內部等候所有隊列數據寫入后,自動關閉client與內部的線程池。
         * */
        writer.close();
    }

    private void tryCreateTable() {
        TimeseriesClient ots = new TimeseriesClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        TimeseriesTableMeta timeseriesTableMeta = new TimeseriesTableMeta(tableName);
        int timeToLive = -1;
        timeseriesTableMeta.setTimeseriesTableOptions(new TimeseriesTableOptions(timeToLive));
        CreateTimeseriesTableRequest request = new CreateTimeseriesTableRequest(timeseriesTableMeta);

        try {
            CreateTimeseriesTableResponse res = ots.createTimeseriesTable(request);
            System.out.println("waiting for creating time series table ......");
            TimeUnit.SECONDS.sleep(30);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    private DefaultTableStoreTimeseriesWriter createTableStoreTimeseriesWriter() {

        TimeseriesWriterConfig config = new TimeseriesWriterConfig();
        config.setWriteMode(TSWriteMode.PARALLEL);                         // 并行寫(每個桶內并行寫)。
        config.setDispatchMode(TSDispatchMode.HASH_PRIMARY_KEY);           // 基于主鍵哈希值做分桶,保證相同主鍵落在同一個桶內有序寫入。
        config.setBucketCount(4);                                          // 分桶數,提升串行寫并發。未達到機器瓶頸時,寫入速率與分桶數正相關。
        config.setCallbackThreadCount(16);                                 // 設置Writer內部Callback運行的線程池線程。
        /**
         * 用戶自定義的行級別callback
         * 該示例通過成功、失敗計數,簡單展示回調能力
         * */
        TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> resultCallback = new TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult>() {
            @Override
            public void onCompleted(TimeseriesTableRow rowChange, TimeseriesRowResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(TimeseriesTableRow rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

        /**
         * 推薦使用內部構建的線程池與Client,方便用戶使用,隔離初始化、釋放的邏輯
         * */
        DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter(
                endpoint, credentials, instanceName, config, resultCallback);

        return writer;
    }

    private void writeTimeseriesRowWithFuture(DefaultTableStoreTimeseriesWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.print("Write Timeseries Row With Future, ");

        int rowsCount = 10000;
        int columnsCount = 10;
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<TimeseriesWriterResult>> futures = new LinkedList<Future<TimeseriesWriterResult>>();
        List<TimeseriesTableRow> rows = new ArrayList<TimeseriesTableRow>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {
            Map<String, String> tags = new HashMap<String, String>();
            tags.put("region", "hangzhou");
            tags.put("os", "Ubuntu16.04");
            // 通過measurementName、dataSource和tags構建TimeseriesKey。
            TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_" + index, tags);
            // 指定timeseriesKey和timeInUs創建timeseriesRow。
            TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000 + index);
            // 增加數據值(field)。
            for (int j = 0; j < columnsCount; j++) {
                row.addField("cpu_usage_" + j, ColumnValue.fromDouble(Math.random() * 100));
            }
            row.addField("index", ColumnValue.fromLong(index));
            TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName);
            rows.add(rowInTable);
            if (Math.random() > 0.9995 || index == rowsCount - 1) {
                Future<TimeseriesWriterResult> future = writer.addTimeseriesRowChangeWithFuture(rows);
                futures.add(future);
                rows.clear();
            }
        }
        System.out.println("Write thread finished.");
        writer.flush();
        printFutureResult(futures);
        System.out.println("=========================================================[Finish]");

    }

    private void printFutureResult(List<Future<TimeseriesWriterResult>> futures) {
        int totalRow = 0;
        System.out.println("time series writer results as follow:");
        for (int index = 0; index < futures.size(); index++) {
            try {
                TimeseriesWriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

    }

}