并發(fā)導出數(shù)據(jù)
當使用場景中不關心整個結果集的順序時,您可以使用并發(fā)導出數(shù)據(jù)功能以更快的速度將命中的數(shù)據(jù)全部返回。
表格存儲Java SDK從5.6.0版本開始支持并發(fā)導出數(shù)據(jù)功能。使用并發(fā)導出數(shù)據(jù)功能時,請確保獲取了正確的Java SDK版本。關于Java SDK歷史迭代版本的更多信息,請參見Java SDK歷史迭代版本。
背景
多元索引中提供了Search接口,Search接口支持全功能集,包括所有的查詢功能以及排序、統(tǒng)計聚合等分析能力。使用Search接口查詢數(shù)據(jù)的結果會按照指定的順序返回。
但是在有些場景中,例如對接計算系統(tǒng)Spark、Presto等或者一些圈選場景,只需要使用完整的查詢能力將命中的數(shù)據(jù)以更快的速度全部返回,不關心整個結果集的順序。為了支持此類需求,多元索引中提供了ParallelScan接口。
ParallelScan接口相對于Search接口,保留了所有的查詢功能,但是舍棄了排序、統(tǒng)計聚合等分析功能,從而帶來了5倍以上的性能提升,因此使用ParallelScan接口可以實現(xiàn)1分鐘內(nèi)上億級別數(shù)據(jù)行的導出能力,導出能力可以水平擴展,不存在上限。
在ParallelScan接口的實現(xiàn)中,單次請求的limit限制更寬松。目前Search接口的limit最大允許100行,但是ParallelScan接口的limit最大允許2000行。同時支持多個線程一起并發(fā)請求,因此導出速度會極快。
場景
如果請求關心排序、統(tǒng)計聚合,或者是終端客戶的直接查詢請求,請使用Search接口實現(xiàn)。
如果請求不關心排序,只關心把所有符合條件的數(shù)據(jù)盡快返回,或者是計算系統(tǒng)(Spark、Presto等)拉取數(shù)據(jù),請使用ParallelScan接口實現(xiàn)。
特點
ParallelScan接口相對于Search接口的典型特征如下:
結果穩(wěn)定性
ParallelScan任務是有狀態(tài)的。在一個Session請求中獲取到的結果集是確定的,由發(fā)起第一次請求時的數(shù)據(jù)狀態(tài)決定。如果發(fā)起第一次請求后插入了新的數(shù)據(jù)或者修改了原有的數(shù)據(jù)不會對結果集造成影響。
新增會話(Session)概念
重要在某些不易獲取sessionId的場景中,ParallelScan接口也支持不攜帶sessionId發(fā)起請求,但是不使用sessionId可能會有極小的概率導致獲取到的結果集中有重復數(shù)據(jù)。
在ParallelScan系列接口中新增了Session概念。使用sessionId能夠保證獲取到的結果集是穩(wěn)定的,具體流程如下:
通過ComputeSplits接口獲取最大并發(fā)數(shù)和當前sessionId。
通過發(fā)起多個并發(fā)ParallelScan請求讀取數(shù)據(jù),請求中需要指定當前的sessionId和當前并發(fā)ID。
如果在ParallelScan任務執(zhí)行的過程中發(fā)生網(wǎng)絡異常、線程異常、動態(tài)Schema修改、索引切換等情況,則會導致ParallelScan不能繼續(xù)掃描數(shù)據(jù),服務端會返回“OTSSessionExpired”錯誤碼,此時需要重新開始一個新的ParallelScan任務,從最開始重新拉取數(shù)據(jù)。
同一個sessionId且ScanQuery相同的多個并發(fā)任務視為一個ParallelScan任務。一個ParallelScan任務的生命周期定義為開始時間是第一次發(fā)出ParallelScan請求,結束時間是翻頁掃描完所有數(shù)據(jù)或者請求的token失效。
最大并發(fā)數(shù)
ParallelScan支持的單請求的最大并發(fā)數(shù)由ComputeSplits的返回值確定。數(shù)據(jù)越多,支持的并發(fā)數(shù)越大。
單請求指同一個查詢語句,例如查詢
city="杭州"
的結果,如果使用Search接口查詢,則Search請求的返回值中會包括所有city="杭州"
的結果;如果使用ParallelScan接口查詢且并發(fā)數(shù)為2,則每個ParallelScan請求返回50%的結果,然后將兩個并發(fā)的結果集合并在一起才是完整的結果集。性能
ParallelScan接口單并發(fā)掃描數(shù)據(jù)的性能是Search接口的5倍。當增大并發(fā)數(shù)時,性能可以繼續(xù)線性提高,例如8并發(fā)時仍可以繼續(xù)提高4倍性能。
成本
由于ParallelScan請求對資源的消耗更少,價格會更便宜,所以對于大數(shù)據(jù)量的導出類需求,強烈建議使用ParallelScan接口。
注意事項
同時存在的ParallelScan任務數(shù)量有一定的限制,當前為10個,后續(xù)會根據(jù)客戶需求繼續(xù)調(diào)整。
只支持返回多元索引中已存在的列,但是不能返回多元索引中日期類型和嵌套類型的列。
目前已支持返回數(shù)組和地理位置的列,但是返回的字段值會被格式化,可能和寫入數(shù)據(jù)表的值不一致。例如對于數(shù)組類型,寫入數(shù)據(jù)表的值為"[1,2, 3, 4]",則通過ParallelScan接口導出的值為"[1,2,3,4]";對于地理位置類型,寫入數(shù)據(jù)表的值為
10,50
,則通過ParallelScan接口導出的值為10.0,50.0
。實際使用時,ReturnType可以配置為RETURN_ALL_INDEX或者RETURN_SPECIFIED,不能配置為RETURN_ALL。
每次最大支持返回2000行數(shù)據(jù)(即limit默認值為2000)。當超過2000后,limit的變化對性能基本無影響。
接口
多并發(fā)數(shù)據(jù)導出功能涉及如下兩個接口。
ComputeSplits:獲取當前ParallelScan單個請求的最大并發(fā)數(shù)。
ParallelScan:執(zhí)行具體的數(shù)據(jù)導出功能。
使用
您可以使用如下語言的SDK并發(fā)導出數(shù)據(jù)。
參數(shù)
參數(shù) | 說明 | |
tableName | 數(shù)據(jù)表名稱。 | |
indexName | 多元索引名稱。 | |
scanQuery | query | 多元索引的查詢語句。支持精確查詢、模糊查詢、范圍查詢、地理位置查詢、嵌套查詢等,功能和Search接口一致。 |
limit | 掃描數(shù)據(jù)時一次能返回的數(shù)據(jù)行數(shù)。 | |
maxParallel | 最大并發(fā)數(shù)。請求支持的最大并發(fā)數(shù)由用戶數(shù)據(jù)量決定。數(shù)據(jù)量越大,支持的并發(fā)數(shù)越多,每次任務前可以通過ComputeSplits API進行獲取。 | |
currentParallelId | 當前并發(fā)ID。取值范圍為[0, maxParallel)。 | |
token | 用于翻頁功能。ParallelScan請求結果中有下一次進行翻頁的token,使用該token可以接著上一次的結果繼續(xù)讀取數(shù)據(jù)。 | |
aliveTime | ParallelScan的當前任務有效時間,也是token的有效時間。默認值為60,建議使用默認值,單位為秒。如果在有效時間內(nèi)沒有發(fā)起下一次請求,則不能繼續(xù)讀取數(shù)據(jù)。持續(xù)發(fā)起請求會刷新token有效時間。 說明 動態(tài)修改schema中的切換索引、服務端單臺機器故障、服務端負載均衡等均會導致Session提前過期,此時需要重新創(chuàng)建Session。 | |
columnsToGet | 指定分組結果中需要返回的列名,可以通過將列名加入Columns來實現(xiàn)。 如果需要返回多元索引中的所有列,則可以使用更簡潔的ReturnAllFromIndex實現(xiàn)。 重要 此處不能使用ReturnAll。 | |
sessionId | 本次并發(fā)掃描數(shù)據(jù)任務的sessionId。創(chuàng)建Session可以通過ComputeSplits API來創(chuàng)建,同時獲得本次任務支持的最大并發(fā)數(shù)。 |
示例
請根據(jù)實際選擇單并發(fā)掃描數(shù)據(jù)和多線程并發(fā)掃描數(shù)據(jù)。
單并發(fā)掃描數(shù)據(jù)
相對于多并發(fā)掃描數(shù)據(jù),單并發(fā)掃描數(shù)據(jù)的代碼更簡單,單并發(fā)代碼無需關心currentParallelId和maxParallel參數(shù)。單并發(fā)使用方式的整體吞吐比Search接口方式高,但是比多線程多并發(fā)使用方式的吞吐低。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
public class Test {
public static List<Row> scanQuery(final SyncClient client) {
String tableName = "<TableName>";
String indexName = "<IndexName>";
//獲取sessionId和本次請求支持的最大并發(fā)數(shù)。
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
byte[] sessionId = computeSplitsResponse.getSessionId();
int splitsSize = computeSplitsResponse.getSplitsSize();
/*
* 創(chuàng)建并發(fā)掃描數(shù)據(jù)請求。
*/
ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
parallelScanRequest.setTableName(tableName);
parallelScanRequest.setIndexName(indexName);
ScanQuery scanQuery = new ScanQuery();
//該query決定了掃描出的數(shù)據(jù)范圍,可用于構建嵌套的復雜的query。
Query query = new MatchAllQuery();
scanQuery.setQuery(query);
//設置單次請求返回的數(shù)據(jù)行數(shù)。
scanQuery.setLimit(2000);
parallelScanRequest.setScanQuery(scanQuery);
ColumnsToGet columnsToGet = new ColumnsToGet();
columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
parallelScanRequest.setColumnsToGet(columnsToGet);
parallelScanRequest.setSessionId(sessionId);
/*
* 使用builder模式創(chuàng)建并發(fā)掃描數(shù)據(jù)請求,功能與前面一致。
*/
ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(2000)
.build())
.addColumnsToGet("col_1", "col_2")
.sessionId(sessionId)
.build();
List<Row> result = new ArrayList<>();
/*
* 使用原生的API掃描數(shù)據(jù)。
*/
{
ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
//下次請求的ScanQuery的token。
byte[] nextToken = parallelScanResponse.getNextToken();
//獲取數(shù)據(jù)。
List<Row> rows = parallelScanResponse.getRows();
result.addAll(rows);
while (nextToken != null) {
//設置token。
parallelScanRequest.getScanQuery().setToken(nextToken);
//繼續(xù)掃描數(shù)據(jù)。
parallelScanResponse = client.parallelScan(parallelScanRequest);
//獲取數(shù)據(jù)。
rows = parallelScanResponse.getRows();
result.addAll(rows);
nextToken = parallelScanResponse.getNextToken();
}
}
/*
* 推薦方式。
* 使用iterator方式掃描所有匹配數(shù)據(jù)。使用方式上更簡單,速度和前面方法一致。
*/
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
result.add(row);
//獲取具體的值。
String col_1 = row.getLatestColumn("col_1").getValue().asString();
long col_2 = row.getLatestColumn("col_2").getValue().asLong();
}
}
/*
* 關于失敗重試的問題,如果本函數(shù)的外部調(diào)用者有重試機制或者不需要考慮失敗重試問題,可以忽略此部分內(nèi)容。
* 為了保證可用性,遇到任何異常均推薦進行任務級別的重試,重新開始一個新的ParallelScan任務。
* 異常分為如下兩種:
* 1、服務端Session異常OTSSessionExpired。
* 2、調(diào)用者客戶端網(wǎng)絡等異常。
*/
try {
//正常處理邏輯。
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
//處理row,內(nèi)存足夠大時可直接放到list中。
result.add(row);
}
}
} catch (Exception ex) {
//重試。
{
result.clear();
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
//處理row,內(nèi)存足夠大時可直接放到list中。
result.add(row);
}
}
}
return result;
}
}
多線程并發(fā)掃描數(shù)據(jù)
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
public class Test {
public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
// 獲取機器的CPU數(shù)量。
final int cpuProcessors = Runtime.getRuntime().availableProcessors();
// 指定客戶端多線程的并發(fā)數(shù)量。建議和客戶端的CPU核數(shù)一致,避免客戶端壓力太大,影響查詢性能。
final Semaphore semaphore = new Semaphore(cpuProcessors);
// 獲取sessionId和本次請求支持的最大并發(fā)數(shù)。
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
final byte[] sessionId = computeSplitsResponse.getSessionId();
final int maxParallel = computeSplitsResponse.getSplitsSize();
// 業(yè)務統(tǒng)計行數(shù)使用。
AtomicLong rowCount = new AtomicLong(0);
/*
* 為了使用一個函數(shù)實現(xiàn)多線程功能,此處構建一個內(nèi)部類繼承Thread來使用多線程。
* 您也可以構建一個正常的外部類,使代碼更有條理。
*/
final class ThreadForScanQuery extends Thread {
private final int currentParallelId;
private ThreadForScanQuery(int currentParallelId) {
this.currentParallelId = currentParallelId;
this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // 設置線程名稱。
}
@Override
public void run() {
System.out.println("start thread:" + this.getName());
try {
// 正常處理邏輯。
{
ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此處的query決定了獲取什么數(shù)據(jù)。
.limit(2000)
.currentParallelId(currentParallelId)
.maxParallel(maxParallel)
.build())
.addColumnsToGet("col_long", "col_keyword", "col_bool") // 設置要返回的多元索引中的部分字段,或者使用下行注釋的內(nèi)容獲取多元索引中全部數(shù)據(jù)。
//.returnAllColumnsFromIndex(true)
.sessionId(sessionId)
.build();
// 使用Iterator形式獲取所有數(shù)據(jù)。
RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
long count = 0;
while (ltr.hasNext()) {
Row row = ltr.next();
// 增加自定義的處理邏輯,此處代碼以統(tǒng)計行數(shù)為例介紹。
count++;
}
rowCount.addAndGet(count);
System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
}
} catch (Exception ex) {
// 如果有異常,此處需要考慮重試正常處理邏輯。
} finally {
semaphore.release();
}
}
}
// 多個線程同時執(zhí)行,currentParallelId取值范圍為[0, maxParallel)。
List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
threadList.add(thread);
}
// 同時啟動。
for (ThreadForScanQuery thread : threadList) {
// 利用semaphore限制同時啟動的線程數(shù)量,避免客戶端瓶頸。
semaphore.acquire();
thread.start();
}
// 主線程阻塞等待所有線程完成任務。
for (ThreadForScanQuery thread : threadList) {
thread.join();
}
System.out.println("all thread finished! total rows:" + rowCount.get());
}
}