創(chuàng)建通道
如果需要實(shí)時(shí)消費(fèi)表中數(shù)據(jù),您需要調(diào)用CreateTunnel接口為數(shù)據(jù)表創(chuàng)建一個(gè)通道,一張數(shù)據(jù)表上可以創(chuàng)建多個(gè)通道。在創(chuàng)建通道時(shí)需要指定數(shù)據(jù)表名稱、通道名稱和通道類型。
前提條件
已初始化TunnelClient。
已創(chuàng)建數(shù)據(jù)表。具體操作,請(qǐng)參見創(chuàng)建數(shù)據(jù)表。
參數(shù)
請(qǐng)求參數(shù)
參數(shù) | 說明 |
TableName | 數(shù)據(jù)表名稱。 |
TunnelName | 通道的名稱。 |
TunnelType | 通道的類型。取值范圍如下:
創(chuàng)建增量或者全量加增量類型的通道時(shí),系統(tǒng)默認(rèn)創(chuàng)建通道后寫入的數(shù)據(jù)為增量數(shù)據(jù)。如果要消費(fèi)指定時(shí)間點(diǎn)后的增量數(shù)據(jù),請(qǐng)配置增量數(shù)據(jù)的起始時(shí)間戳(startTime)。
|
響應(yīng)參數(shù)
參數(shù) | 說明 |
TunnelId | 通道的ID。 |
ResponseInfo | 返回的一些其它字段。 |
RequestId | 當(dāng)次請(qǐng)求的Request ID。 |
示例
創(chuàng)建全量類型的通道
以下示例用于為數(shù)據(jù)表創(chuàng)建一個(gè)全量類型的通道。
//支持創(chuàng)建三種類型的通道TunnelType.BaseData(全量)、TunnelType.Stream(增量)和TunnelType.BaseAndStream(全量加增量)。
//本示例為創(chuàng)建全量類型的通道,如果需要?jiǎng)?chuàng)建其它類型的通道,則將CreateTunnelRequest中的TunnelType設(shè)置為相應(yīng)的類型。
private static void createTunnel(TunnelClient client, String tableName, String tunnelName) {
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseData);
CreateTunnelResponse resp = client.createTunnel(request);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
創(chuàng)建增量或者全量加增量類型的通道
以下示例用于為數(shù)據(jù)表創(chuàng)建增量或全量加增量類型的通道,并指定讀取的增量數(shù)據(jù)時(shí)間范圍。
//創(chuàng)建增量或者全量加增量類型的通道,指定起始時(shí)間戳或結(jié)束時(shí)間戳,表示讀取的增量數(shù)據(jù)時(shí)間范圍。對(duì)于全量類型的通道,StreamTunnelConfig的配置不生效。
private static void createStreamTunnelByOffset(TunnelClient client,String tableName,String tunnelName, long startTime, long endTime){
CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.Stream);//創(chuàng)建增量類型通道。
//CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.BaseAndStream);//創(chuàng)建全量加增量類型通道。
StreamTunnelConfig streamTunnelConfig = new StreamTunnelConfig();
/*
指定增量數(shù)據(jù)的起始時(shí)間戳(startTime)和結(jié)束時(shí)間戳(endTime)。單位為毫秒,取值范圍為[CurrentSystemTime - StreamExpiration + 5 minute, CurrentSystemTime)。
其中CurrentSystemTime為當(dāng)前系統(tǒng)時(shí)間的毫秒單位時(shí)間戳;StreamExpiration為增量日志過期時(shí)間的毫秒單位時(shí)間戳,最大值為7天。您可以在為數(shù)據(jù)表開啟Stream功能時(shí)設(shè)置。
結(jié)束時(shí)間戳的取值必須大于起始時(shí)間戳。
*/
streamTunnelConfig.setStartOffset(startTime);
streamTunnelConfig.setEndOffset(endTime);
createTunnelRequest.setStreamTunnelConfig(streamTunnelConfig);
CreateTunnelResponse resp = client.createTunnel(createTunnelRequest);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
常見問題
相關(guān)文檔
關(guān)于API說明的更多信息,請(qǐng)參見CreateTunnel。
如果要快速使用通道服務(wù)消費(fèi)數(shù)據(jù),請(qǐng)參見快速使用通道服務(wù)文檔進(jìn)行操作。
如果要查看指定表的所有通道信息,您可以通過獲取表內(nèi)的通道信息實(shí)現(xiàn)。更多信息,請(qǐng)參見獲取表內(nèi)的通道信息。
如果要查看指定通道的詳細(xì)信息,您可以通過獲取通道的具體信息實(shí)現(xiàn)。更多信息,請(qǐng)參見獲取通道的具體信息。
如果不再使用某個(gè)通道,您可以刪除相應(yīng)通道。更多信息,請(qǐng)參見刪除通道。
使用通道服務(wù)可以實(shí)現(xiàn)數(shù)據(jù)遷移。更多信息,請(qǐng)參見將表格存儲(chǔ)數(shù)據(jù)表中數(shù)據(jù)同步到另一個(gè)數(shù)據(jù)表或將表格存儲(chǔ)時(shí)序表中數(shù)據(jù)同步到另一個(gè)時(shí)序表。
實(shí)時(shí)計(jì)算Flink能將通道服務(wù)的數(shù)據(jù)通道作為流式數(shù)據(jù)的輸入,可實(shí)現(xiàn)通過Flink計(jì)算與分析表格存儲(chǔ)數(shù)據(jù),更多信息,請(qǐng)參見使用教程(寬表模型)和使用教程(時(shí)序模型)。
當(dāng)前通道服務(wù)本身沒有額外的費(fèi)用開銷。在消費(fèi)通道服務(wù)數(shù)據(jù)時(shí),表格存儲(chǔ)會(huì)根據(jù)實(shí)際拉取的數(shù)據(jù)產(chǎn)生讀吞吐量計(jì)量計(jì)費(fèi)。更多信息,請(qǐng)參見計(jì)費(fèi)概述。