日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Pull模式創建數據訂閱通道

本文介紹通過Pull模式創建數據訂閱功能,創建后訂閱通道會實時拉取數據庫實例的增量數據,并將增量數據保存在訂閱通道中,您可以使用Lindorm提供的SDK從訂閱通道中訂閱增量數據并進行消費。同時,您可以在LTS頁面進行訂閱通道的創建、查看及刪除等操作。

前提條件

已將客戶端IP添加至白名單中,具體操作請參見設置白名單

已開通數據訂閱功能,具體操作,請參見開通數據訂閱

操作步驟

  1. 進入LTS(原BDS)頁面,在左側導航欄中,選擇數據訂閱 > Pull模式

    streamone
  2. 單擊創建數據訂閱通道,并配置以下參數。

    創建訂閱通道

    名稱

    描述

    源集群

    填寫Lindorm實例ID。

    Lindorm表名

    選擇需要創建數據訂閱通道的Lindorm實例表,一條通道只能選擇一張表格。

    主題名

    用于消費數據的主題名稱。

    數據過期時間(天)

    表示數據可以保存的天數,默認為7天。

    主題分區數

    表示Kafka客戶端為該主題設置多個分區,多分區可以并發消費數據,默認為4個分區。

  3. 單擊提交

  4. (可選)找到目標訂閱通道,單擊操作列的詳情,可以查看數據訂閱通道詳情、消費詳情和存儲詳情。

    詳細信息
  5. (可選)您可以通過以下代碼在Kafka客戶端對訂閱數據進行消費。

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // 創建訂閱通道時填寫的topic名稱
        String topic = "test-topic";
    
        // 鏈接endpoint的配置項
        Properties props = new Properties();
        // 指定鏈接endpoint地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // 指定Key序列化器,不可更改
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定Value序列化器,不可更改
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 指定消費組名稱,在消費時會自動創建
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
    
        // 創建消費者
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // 訂閱主題
        consumer.subscribe(Arrays.asList(topic));
    
        // 用消費者拉取數據
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // 查看數據內容
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // 提交當前消費位移
        consumer.commitSync();
        // 關閉消費者
        consumer.close();
      }
    }
    說明

    數據消費格式說明請參見數據消費格式