日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

通過SDK使用通道服務

更新時間:

使用SDK快速體驗通道服務功能。使用前,您需要了解使用通道服務的注意事項、接口等信息。

注意事項

  • TunnelWorkerConfig中默認會啟動讀數據和處理數據的線程池。如果使用的是單臺機器,當需要啟動多個TunnelWorker時,建議共用一個TunnelWorkerConfig。

  • 由于Tunnel的增量日志最多會保留7天(具體的值和數據表的Stream的日志過期時間一致),因此在使用全量加增量類型或者增量類型的Tunnel消費數據時,會出現如下情況:

    • 當Tunnel處于全量階段時,如果全量數據在7天內沒有消費完成,則此Tunnel會出現OTSTunnelExpired錯誤,導致后續數據無法繼續消費。如果您預估全量數據無法在7天內消費完成,請及時聯系表格存儲技術支持或者加入釘釘群23307953(表格存儲技術交流群-2)進行咨詢。

    • 當Tunnel處于增量階段時,如果增量數據超過7天沒有消費,Tunnel會從最近可以消費的數據開始消費,因此可能會出現漏消費數據風險。

      說明

      如果增量數據超過7天(具體值和數據表的Stream的日志過期時間一致)沒有消費,則數據會出現過期的情況,當Tunnel過期超過一段時間(默認7天)后,表格存儲會禁用掉該Tunnel,即該Tunnel不能再用于消費數據。

  • TunnelWorker的初始化需要預熱時間,該值受TunnelWorkerConfig中的heartbeatIntervalInSec參數影響,可以通過TunnelWorkerConfig中的setHeartbeatIntervalInSec方法配置,默認為30s,最小值為5s。

  • 當Tunnel從全量切換至增量階段時,全量的Channel會結束,增量的Channel會啟動,此階段會有初始化時間,該值也受TunnelWorkerConfig中的heartbeatIntervalInSec參數影響。

  • 當客戶端(TunnelWorker)沒有被正常shutdown時(例如異常退出或者手動結束),TunnelWorker會自動進行資源的回收,包括釋放線程池,自動調用用戶在Channel上注冊的shutdown方法,關閉Tunnel連接等。

接口

接口

說明

CreateTunnel

創建一個通道。

ListTunnel

列舉某個數據表內通道的具體信息。

DescribeTunnel

描述某個通道里的具體Channel信息。

DeleteTunnel

刪除一個通道。

使用

您可以使用如下語言的SDK實現通道服務。

前提條件

  • 已配置訪問密鑰并將密鑰配置到環境變量中,并確定要使用的Endpoint。具體操作,請參見初始化OTSClient

    表格存儲使用OTS_AK_ENV環境變量名表示阿里云賬號或者RAM用戶的AccessKey ID,使用OTS_SK_ENV環境變量名表示對應AccessKey Secret,請根據實際配置。

  • 已創建數據表。具體操作,請參見通過控制臺創建數據表通過SDK創建數據表

體驗通道服務

使用Java SDK最小化的體驗通道服務。

  1. 初始化Tunnel Client。

    //endPoint為表格存儲實例的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。
    //accessKeyId和accessKeySecret分別為訪問表格存儲服務的AccessKey的Id和Secret。
    //instanceName為實例名稱。
    final String endPoint = "";
    final String accessKeyId = System.getenv("OTS_AK_ENV");
    final String accessKeySecret = System.getenv("OTS_SK_ENV");
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. 創建新通道。

    請提前創建一張測試表或者使用已有的一張數據表。如果需要新建測試表,可以使用SyncClient中的createTable方法或者使用官網控制臺等方式創建。

    重要

    創建增量或者全量加增量類型的通道時,時間戳的配置規則如下:

    • 如果不指定增量數據的起始時間戳,則起始時間戳為創建通道的時間。

    • 如果指定增量數據的起始時間戳(startTime)和結束時間戳(endTime),其取值范圍為[當前系統時間-Stream過期時間+5分鐘 , 當前系統時間],單位為毫秒。

      • Stream過期時間為增量日志過期時長的毫秒單位時間戳,最大值為7天。您可以在為數據表開啟Stream功能時設置,過期時長一經設置不能修改。

      • 結束時間戳的取值必須大于起始時間戳。

    //支持創建TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三種類型的Tunnel。
    //如下示例為創建全量加增量類型的Tunnel,如果需創建其它類型的Tunnel,則將CreateTunnelRequest中的TunnelType設置為相應的類型。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    //tunnelId用于后續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel獲取。
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. 用戶自定義數據消費Callback,開始自動化的數據消費。

    //用戶自定義數據消費Callback,即實現IChannelProcessor接口(process和shutdown)。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            //ProcessRecordsInput中包含有拉取到的數據。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用于Tunnel Client的翻頁。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //模擬消費處理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    //TunnelWorkerConfig默認會啟動讀數據和處理數據的線程池。
    //如果使用的是單臺機器,當需要啟動多個TunnelWorker時,建議共用一個TunnelWorkerConfig。TunnelWorkerConfig中包括更多的高級參數。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    //配置TunnelWorker,并啟動自動化的數據處理任務。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

配置TunnelWorkerConfig

TunnelWorkerConfig提供Tunnel Client的自定義配置,可根據實際需要配置參數,參數說明請參見下表。

配置

參數

說明

Heartbeat的間隔和超時時間

heartbeatTimeoutInSec

Heartbeat的超時間隔。

默認值為300s。

當Heartbeat發生超時,Tunnel服務端會認為當前TunnelClient不可用(失活),客戶端需要重新進行ConnectTunnel。

heartbeatIntervalInSec

進行Heartbeat的間隔。

Heartbeat用于活躍Channel的探測、Channel狀態的更新、(自動化)數據拉取任務的初始化等。

默認值為30s,最小支持配置到5s,單位為s。

記錄消費位點的時間間隔

checkpointIntervalInMillis

用戶消費完數據后,向Tunnel服務端進行記錄消費位點操作(checkpoint)的時間間隔。

默認值為5000ms,單位為ms。

說明
  • 因為讀取任務所在機器不同,進程可能會遇到各種類型的錯誤。例如因為環境因素重啟,需要定期對處理完的數據做記錄(checkpoint)。當任務重啟后,會接著上次的checkpoint繼續往后做。在極端情況下,通道服務不保證傳給您的記錄只有一次,只保證數據至少傳一次,且記錄的順序不變。如果出現局部數據重復發送的情況,需要您注意業務的處理邏輯。

  • 如果希望減少在出錯情況下數據的重復處理,可以增加做checkpoint的頻率。但是過于頻繁的checkpoint會降低系統的吞吐量,請根據自身業務特點決定checkpoint的操作頻率。

客戶端的自定義標識

clientTag

客戶端的自定義標識,可以生成Tunnel Client ID,用于區分TunnelWorker。

數據處理的自定義Callback

channelProcessor

用戶注冊的處理數據的Callback,包括process和shutdown方法。

數據讀取和數據處理的線程池資源配置

readRecordsExecutor

用于數據讀取的線程池資源。無特殊需求,建議使用默認的配置。

processRecordsExecutor

用于處理數據的線程池資源。無特殊需求,建議使用默認的配置。

說明
  • 自定義上述線程池時,線程池中的線程數要和Tunnel中的Channel數盡可能一致,此時可以保障每個Channel都能很快的分配到計算資源(CPU)。

  • 在默認線程池配置中,為了保證吞吐量,表格存儲進行了如下操作:

    • 默認預先分配32個核心線程,以保障數據較小時(Channel數較少時)的實時吞吐量。

    • 工作隊列的大小適當調小,當在用戶數據量比較大(Channel數較多)時,可以更快觸發線程池新建線程的策略,及時彈起更多的計算資源。

    • 設置了默認的線程保活時間(默認為60s),當數據量降下后,可以及時回收線程資源。

內存控制

maxChannelParallel

讀取和處理數據的最大Channel并行度,可用于內存控制。

默認值為-1,表示不限制最大并行度。

說明

僅Java SDK 5.10.0及以上版本支持此功能。

最大退避時間配置

maxRetryIntervalInMillis

Tunnel的最大退避時間基準值,最大退避時間在此基準值附近隨機變化,具體范圍為0.75*maxRetryIntervalInMillis~1.25*maxRetryIntervalInMillis。

默認值為2000ms,最小值為200ms。

說明
  • 僅Java SDK 5.4.0及以上版本支持此功能。

  • Tunnel對于數據量較小的情況(單次拉取小于900 KB或500條)會進行一定時間的指數退避,直至達到最大退避時間。

CLOSING分區狀態檢測

enableClosingChannelDetect

是否開啟CLOSING分區實時檢測。類型為Boolean,默認值為false,表示不開啟CLOSING分區實時檢測。

說明
  • 僅5.13.13及以上版本支持此功能。

  • 未開啟此功能時,在某些極端場景(包括但不限于通道分區數較多但客戶端資源較低等)下,會出現分區卡住不消費的情況。

  • CLOSING分區指調度中的分區,表示該分區正在切換Tunnel Client,會調度到其他Tunnel Client。

附錄:完整代碼

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用于Tunnel Client的翻頁。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //模擬消費處理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1.初始化Tunnel Client。
        final String endPoint = "";
        final String accessKeyId = System.getenv("OTS_AK_ENV");
        final String accessKeySecret = System.getenv("OTS_SK_ENV");
        final String instanceName = "";
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2.創建新通道(此步驟需要提前創建一張測試表,可以使用SyncClient的createTable或者使用官網控制臺等方式創建)。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        //tunnelId用于后續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel獲取。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3.用戶自定義數據消費Callback,開始自動化的數據消費。
        //TunnelWorkerConfig中有更多的高級參數。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}