表格存儲支持通過時序Writer將時序數據寫入時序表,時序Writer還支持多表寫入、寫入狀態統計、行級別回調和自定義配置功能。本文介紹使用時序Writer寫入時序數據的使用流程。
背景信息
使用時序模型時,用戶主要通過調用PutTimeseriesData接口來實現時序數據的寫入,但是此方式存在如下使用限制:
需要手動控制并發。
不支持行級別的回調功能。
不支持性能參數和限制參數配置。
不支持一次向多個時序表中寫入數據。
因此表格存儲Java SDK提供了時序Writer用于高性能寫入時序數據。時序Writer封裝了PutTimeseriesData接口,實現了內部控制并發寫入時序數據的功能,同時支持多表寫入、寫入狀態統計、行級別回調和自定義配置功能。
功能特性
時序Writer提供了多表寫入、批量寫入、并發寫入、寫入狀態統計、行級別回調和自定義配置功能。詳細說明請參見下表。
功能 | 描述 |
多表寫入 | 時序Writer可以實現向多個不同的數據表中寫入數據。用戶新增數據后先緩存至內存,再根據表名構造不同的請求。 |
批量寫入 | 單行寫入接口為 |
并發寫入 | 通過數據分桶與分桶間的并發控制,提升數據的寫入速率。 |
寫入狀態統計 | 支持總請求次數、總行數、總成功行數、總失敗行數、單行請求數等指標的查詢。 |
行級別回調 | 支持對每一行數據的寫入結果設置回調函數。 |
自定義配置 | 支持對并發數、重試策略、隊列大小等參數進行自定義配置。 |
注意事項
目前支持使用時序模型功能的地域有華東1(杭州)、華東2(上海)、華北2(北京)、華北3(張家口)、華北6(烏蘭察布)、華南1(深圳)、中國香港、德國(法蘭克福)、美國(弗吉尼亞)和新加坡。
如果使用過程中遇到問題,請通過釘釘加入用戶群44327024(物聯網存儲 IoTstore 開發者交流群
)聯系我們。
前提條件
已創建RAM用戶,并授予RAM用戶管理表格存儲服務的權限(AliyunOTSFullAccess)。具體操作,請參見通過RAM Policy為RAM用戶授權。
重要由于數據同步任務配置時需要填寫訪問密鑰AccessKey(AK)信息來執行授權,為避免阿里云賬號泄露AccessKey帶來的安全風險,建議您通過RAM用戶來完成授權和AccessKey的創建。
已獲取AccessKey(包括AccessKey ID和AccessKey Secret),用于進行簽名認證。具體操作,請參見獲取AccessKey。
快速上手
使用時序Writer快速寫入時序數據到時序表。
此處按照步驟詳細介紹時序Writer的使用方式,完整代碼請參見附錄:完整代碼。
步驟一:安裝時序Writer
如果已經安裝表格存儲Java SDK,可跳過此操作。
在Maven工程中使用時序Writer,只需在pom.xml中加入表格存儲 Java SDK依賴即可。以5.13.15版本為例,在<dependencies>內加入如下內容:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore</artifactId> <version>5.13.15</version> </dependency>
步驟二:創建時序Writer實例
創建時序Writer實例用于將時序數據寫入時序表。
表格存儲提供了三種構造時序Writer函數的方式,此處以通過傳入credentials且使用默認配置為例介紹構造函數的方式。更多信息,請參見構造函數。
創建時序Writer實例時支持通過
TimeseriesWriterConfig()
工具類實現自定義參數。具體的自定義參數說明請參見配置TimeseriesWriterConfig。
endpoint = ""; accessKeyId = ""; accessKeySecret = ""; instanceName = ""; ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret); TimeseriesWriterConfig config = new TimeseriesWriterConfig(); DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter( endpoint, credentials, instanceName, config, null);
步驟三:新增數據
構造一條時序數據,新增的時序數據將被緩存到本地。
Map<String, String> tags = new HashMap<String, String>(); tags.put("region", "hangzhou"); tags.put("os", "Ubuntu16.04"); TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_0", tags); TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000); //一條時序數據。 String tableName = "rowTableName"; // 時序數據所在的時序表。 TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); //封裝時序數據與時序表。 writer.addTimeseriesRowChange(rowInTable); // 新增的時序數據將被緩存到本地。
步驟四:寫入時序數據
新增數據到本地緩存后,您可以調用flush函數手動將本地緩存數據寫入時序表。
當某一分桶中的數據量達到maxBatchRowsCount(默認為200)設定值時,該分桶將會自動將本地緩存數據寫入到時序表。
writer.flush(); // 將本地緩存數據寫入時序表。
步驟五:關閉時序Writer
時序數據寫入后,關閉時序Writer。時序Writer關閉前會將本地緩存中未寫入的時序數據自動寫入時序表。
writer.close();
接口說明
DefaultTableStoreTimeseriesWriter類是實現時序Writer的主工具類。
接口定義
DefaultTableStoreTimeseriesWriter類中的函數定義如下:
public class DefaultTableStoreTimeseriesWriter implements TableStoreTimeseriesWriter { //構造函數 public DefaultTableStoreTimeseriesWriter(...) // 單時間線新增數據。 void addTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException; // 單時間線新增數據。 Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(TimeseriesTableRow timeseriesTableRow) throws ClientException; // 嘗試單時間線新增數據。 boolean tryAddTimeseriesRowChange(TimeseriesTableRow timeseriesTableRow) throws ClientException; // 多時間線新增數據。 void addTimeseriesRowChange(List<TimeseriesTableRow> timeseriesTableRow, List<TimeseriesRow> dirtySeriesRow) throws ClientException; // 多時間線新增數據。 Future<TimeseriesWriterResult> addTimeseriesRowChangeWithFuture(List<TimeseriesTableRow> timeseriesTableRows) throws ClientException; // 用戶自定義行級別callback。 void setResultCallback(TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> callback); // 獲取callback。 TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> getResultCallback(); // 獲取時序writer配置。 TimeseriesWriterConfig getTimeseriesWriterConfig(); // 設置時序writer。 TimeseriesWriterHandleStatistics getTimeseriesWriterStatistics(); // 等待writer內存中數據全部落盤并寫入數據。 void flush() throws ClientException; // 關閉時序表寫入writer。 void close(); }
構造函數
DefaultTableStoreTimeseriesWriter支持三種構造函數,請根據實際應用場景選擇構造函數的方式。
通過傳入credentials且使用默認配置構建函數
通過傳入credentials,時序Writer會在內部構建時序client,在釋放時序Writer后無需其他操作。推薦使用此方式構造函數。
/** * 推薦使用的時序writer。 * * @param endpoint 實例的服務地址。 * @param credentials 認證信息,包含AccessKey,也支持使用臨時訪問憑證token。 * @param instanceName 實例名稱。 * @param config 時序writer的配置。 * @param resultCallback 行級別回調。 */ public DefaultTableStoreTimeseriesWriter( String endpoint, ServiceCredentials credentials, String instanceName, TimeseriesWriterConfig config, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
通過傳入credentials且使用自定義配置構建函數
通過傳入credentials,時序Writer會在內部構建時序client,同時可以進行自定義配置,在釋放時序Writer后無需其他操作。
/** * 推薦使用的時序writer。 * * @param endpoint 實例的服務地址。 * @param credentials 認證信息,含AccessKey,也支持臨時訪問憑證token。 * @param instanceName 實例名稱。 * @param config 時序writer的配置。 * @param cc 客戶端的配置。 * @param resultCallback 行級別回調。 */ public DefaultTableStoreTimeseriesWriter( String endpoint, ServiceCredentials credentials, String instanceName, TimeseriesWriterConfig config, ClientConfiguration cc, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback)
直接傳入時序client構造函數
直接傳入時序client,時序client的配置更靈活,但是需要在釋放Writer后主動對otsclient進行shutdown。
重要在釋放Writer后請務必主動對otsclient進行shutdown。
/** * 時序writer。 * * @param ots 異步時序表客戶端實例。 * @param config 時序writer的配置。 * @param callback 行級別回調。 * @param executor 線程池。 */ public DefaultTableStoreTimeseriesWriter( AsyncTimeseriesClientInterface ots, TimeseriesWriterConfig config, TableStoreCallback<TimeseriesRow, TimeseriesRowResult> callback, Executor executor)
配置TimeseriesWriterConfig
時序Writer支持自定義配置參數,通過TimeseriesWriterConfig()
工具類實現自定義參數。
參數 | 類型 | 示例 | 說明 |
maxBatchRowsCount | Integer | 200 | 一次批量RPC請求導入的最大行數。默認值為200。取值范圍為1~200。 |
maxBatchSize | Integer | 4*1024*1024 | 一次批量RPC請求導入的最大數據量。默認值為 |
concurrency | Integer | 64 | 一個時序Writer的最大請求并發數。默認值為64。 |
bucketCount | Integer | 4 | 時序數據緩存分桶數。默認值為4。 由于總緩存隊列為 |
bufferSize | Integer | 1024 | 每個桶的隊列長度。默認值為1024。 |
flushInterval | Integer | 10000 | flush數據的時間間隔。默認值為10000。單位為毫秒。 |
logInterval | Integer | 10000 | 記錄日志的時間間隔。默認值為10000。單位為毫秒。 |
dispatchMode | String | HASH_PRIMARY_KEY | 多桶分發模式。取值范圍如下:
|
writeMode | String | PARALLEL | 寫入模式。取值范圍如下:
|
callbackThreadCount | Integer | 9 | 內部Callback運行的線程池的最大線程數。默認值為 |
callbackThreadPoolQueueSize | Integer | 1024 | 內部Callback運行的線程池的隊列大小。默認值為1024。只有調用Callback函數內部構建線程池時生效。 |
writerRetryStrategy | String | CERTAIN_ERROR_CODE_NOT_RETRY | 通過傳入credentials方式構造函數時內部構建client使用的重試策略。取值范圍如下:
|
clientMaxConnections | String | 300 | 通過傳入credentials方式構造函數時,內部構建client使用的最大連接數配置。默認值為300。 |
allowDuplicatedRowInBatchRequest | Boolean | true | 批量請求是否允許對同一時間線進行寫入。取值范圍為true和false。默認值為true。 如果批量請求時不允許對同一時間線進行寫入,請配置此參數為false。 |
附錄:完整代碼
完整代碼主要包括了創建時序表、構造時序Writer、使用時序Writer寫入10000條數據和關閉時序Writer四個操作。
使用時序Writer前,請確保已安裝5.13.15版本及以上的表格存儲Java SDK。
public class TimeseriesWriterSample { // 填寫實例的服務地址。 private static String endpoint = "https://[instanceName].cn-hangzhou.ots.aliyuncs.com"; // 填寫實例名稱。 private static String instanceName = "instanceName"; // 填寫阿里云賬號或者RAM用戶的AccessKey(包括AccessKey ID和Access Secret)。 private static String accessKeyId = "XXXXXXXXXX"; private static String accessKeySecret = "XXXXXXXXXXXXXXXXXXXXXX"; // 填寫時序表名稱。 private static String tableName = "tableName"; private static AtomicLong succeedRows = new AtomicLong(); private static AtomicLong failedRows = new AtomicLong(); public static void main(String[] args) { TimeseriesWriterSample sample = new TimeseriesWriterSample(); /** * 使用Writer前確保表已存在 * 經過測試,時序表創建30秒后向其中寫入數據不會出現異常情況 * writer會校驗表的存在性 * */ sample.tryCreateTable(); /** * 初始化建議使用 * DefaultTableStoreTimeseriesWriter( * String endpoint, // 實例的服務地址、 * ServiceCredentials credentials, // 認證信息,包含AccessKey,也支持使用臨時訪問憑證token。 * String instanceName, // 實例名稱。 * String tableName, // 時序表名稱:一個時序writer僅針對一個時序表。 * TimeseriesWriterConfig config, // 時序writer的配置。 * TableStoreCallback<TimeseriesRow, TimeseriesRowResult> resultCallback) // 行級別回調。 * */ DefaultTableStoreTimeseriesWriter writer = sample.createTableStoreTimeseriesWriter(); /** * Future使用:批量寫 * */ sample.writeTimeseriesRowWithFuture(writer); System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get()); System.out.println("Count by WriterStatics: " + writer.getTimeseriesWriterStatistics()); /** * 用戶需要主動關閉Writer,內部等候所有隊列數據寫入后,自動關閉client與內部的線程池。 * */ writer.close(); } private void tryCreateTable() { TimeseriesClient ots = new TimeseriesClient(endpoint, accessKeyId, accessKeySecret, instanceName); TimeseriesTableMeta timeseriesTableMeta = new TimeseriesTableMeta(tableName); int timeToLive = -1; timeseriesTableMeta.setTimeseriesTableOptions(new TimeseriesTableOptions(timeToLive)); CreateTimeseriesTableRequest request = new CreateTimeseriesTableRequest(timeseriesTableMeta); try { CreateTimeseriesTableResponse res = ots.createTimeseriesTable(request); System.out.println("waiting for creating time series table ......"); TimeUnit.SECONDS.sleep(30); } catch (Exception e) { throw new ClientException(e); } finally { ots.shutdown(); } } private DefaultTableStoreTimeseriesWriter createTableStoreTimeseriesWriter() { TimeseriesWriterConfig config = new TimeseriesWriterConfig(); config.setWriteMode(TSWriteMode.PARALLEL); // 并行寫(每個桶內并行寫)。 config.setDispatchMode(TSDispatchMode.HASH_PRIMARY_KEY); // 基于主鍵哈希值做分桶,保證相同主鍵落在同一個桶內有序寫入。 config.setBucketCount(4); // 分桶數,提升串行寫并發。未達到機器瓶頸時,寫入速率與分桶數正相關。 config.setCallbackThreadCount(16); // 設置Writer內部Callback運行的線程池線程。 /** * 用戶自定義的行級別callback * 該示例通過成功、失敗計數,簡單展示回調能力 * */ TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> resultCallback = new TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult>() { @Override public void onCompleted(TimeseriesTableRow rowChange, TimeseriesRowResult cc) { succeedRows.incrementAndGet(); } @Override public void onFailed(TimeseriesTableRow rowChange, Exception ex) { failedRows.incrementAndGet(); } }; ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret); /** * 推薦使用內部構建的線程池與Client,方便用戶使用,隔離初始化、釋放的邏輯 * */ DefaultTableStoreTimeseriesWriter writer = new DefaultTableStoreTimeseriesWriter( endpoint, credentials, instanceName, config, resultCallback); return writer; } private void writeTimeseriesRowWithFuture(DefaultTableStoreTimeseriesWriter writer) { System.out.println("=========================================================[Start]"); System.out.print("Write Timeseries Row With Future, "); int rowsCount = 10000; int columnsCount = 10; AtomicLong rowIndex = new AtomicLong(-1); List<Future<TimeseriesWriterResult>> futures = new LinkedList<Future<TimeseriesWriterResult>>(); List<TimeseriesTableRow> rows = new ArrayList<TimeseriesTableRow>(); for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) { Map<String, String> tags = new HashMap<String, String>(); tags.put("region", "hangzhou"); tags.put("os", "Ubuntu16.04"); // 通過measurementName、dataSource和tags構建TimeseriesKey。 TimeseriesKey timeseriesKey = new TimeseriesKey("cpu", "host_" + index, tags); // 指定timeseriesKey和timeInUs創建timeseriesRow。 TimeseriesRow row = new TimeseriesRow(timeseriesKey, System.currentTimeMillis() * 1000 + index); // 增加數據值(field)。 for (int j = 0; j < columnsCount; j++) { row.addField("cpu_usage_" + j, ColumnValue.fromDouble(Math.random() * 100)); } row.addField("index", ColumnValue.fromLong(index)); TimeseriesTableRow rowInTable = new TimeseriesTableRow(row, tableName); rows.add(rowInTable); if (Math.random() > 0.9995 || index == rowsCount - 1) { Future<TimeseriesWriterResult> future = writer.addTimeseriesRowChangeWithFuture(rows); futures.add(future); rows.clear(); } } System.out.println("Write thread finished."); writer.flush(); printFutureResult(futures); System.out.println("=========================================================[Finish]"); } private void printFutureResult(List<Future<TimeseriesWriterResult>> futures) { int totalRow = 0; System.out.println("time series writer results as follow:"); for (int index = 0; index < futures.size(); index++) { try { TimeseriesWriterResult result = futures.get(index).get(); totalRow += result.getTotalCount(); System.out.println(String.format( "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d", index, result.getFailedRows().size(), result.getSucceedRows().size(), result.getTotalCount(), totalRow)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }