日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

通過實時計算Flink集成向量數(shù)據(jù)

云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版支持通過flink-adbpg-connector集成向量化數(shù)據(jù)。本文以將Kafka數(shù)據(jù)導入至AnalyticDB PostgreSQL版為例,介紹如何將向量數(shù)據(jù)導入AnalyticDB PostgreSQL版

前提條件

  • 已創(chuàng)建AnalyticDB PostgreSQL版實例。具體操作,請參見創(chuàng)建實例

  • 已創(chuàng)建Flink全托管工作空間,且與AnalyticDB PostgreSQL版實例位于同一VPC下。具體操作,請參見開通Flink全托管

  • AnalyticDB PostgreSQL版數(shù)據(jù)庫已安裝向量檢索插件FastANN。

    您可以在psql客戶端通過\dx fastann命令查看是否安裝。

    • 如果返回FastANN插件的相關(guān)信息,表示已安裝。

    • 如果沒有返回任何信息,請提交工單聯(lián)系技術(shù)支持進行安裝。

  • 已購買并部署Kafka實例,且Kafka實例與AnalyticDB PostgreSQL版實例位于同一VPC下。具體操作,請參見購買和部署實例

  • 已將Flink工作空間和Kafka實例所屬的網(wǎng)段加入AnalyticDB PostgreSQL版的白名單。具體操作,請參見設(shè)置白名單

測試數(shù)據(jù)

為方便測試,AnalyticDB PostgreSQL版提供了測試數(shù)據(jù)。下載鏈接,請參見vector_sample_data.csv

測試數(shù)據(jù)的表結(jié)構(gòu)如下。

字段

類型

說明

id

bigint

編號。

market_time

timestamp

汽車上市時間。

color

varchar(10)

汽車的顏色。

price

int

汽車的價格。

feature

float4[]

汽車照片的特征向量。

操作流程

  1. 創(chuàng)建結(jié)構(gòu)化索引和向量化索引

  2. 將向量化測試數(shù)據(jù)寫入Kafka Topic

  3. 創(chuàng)建映射表并導入數(shù)據(jù)

創(chuàng)建結(jié)構(gòu)化索引和向量化索引

  1. 連接AnalyticDB PostgreSQL版數(shù)據(jù)庫。本文以通過psql客戶端連接數(shù)據(jù)庫為例,詳情請參見psql連接數(shù)據(jù)庫

  2. 執(zhí)行以下命令,創(chuàng)建測試庫并切換至測試庫。

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. 執(zhí)行以下命令,創(chuàng)建目標表。

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. 執(zhí)行以下命令,創(chuàng)建結(jié)構(gòu)化索引和向量化索引。

    -- 修改向量列的存儲格式為PLAIN。
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- 創(chuàng)建結(jié)構(gòu)化索引。
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- 創(chuàng)建向量索引。
    CREATE INDEX ON vector_test.car_info USING ann(feature) 
    WITH (dim='10', pq_enable='0');

將向量化測試數(shù)據(jù)寫入Kafka Topic

  1. 執(zhí)行以下命令,創(chuàng)建Kafka Topic。

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1\ 
    --bootstrap-server <your_broker_list>
  2. 執(zhí)行以下命令,將向量測試數(shù)據(jù)寫入Kafka Topic。

    bin/kafka-console-producer.sh\
    --bootstrap-server <your_broker_list>\
    --topic vector_ingest < ../vector_sample_data.csv

<your_broker_list>:接入點信息。您可在云消息隊列Kafka版控制臺實例詳情頁面的接入點信息區(qū)域獲取。

創(chuàng)建映射表并導入數(shù)據(jù)

  1. 創(chuàng)建Flink作業(yè)。

    1. 登錄實時計算控制臺,在Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

    2. 在左側(cè)導航欄,單擊SQL開發(fā),單擊新建,選擇空白的流作業(yè)草稿,單擊下一步

    3. 新建作業(yè)草稿對話框,填寫作業(yè)配置信息。

      作業(yè)參數(shù)

      說明

      示例

      文件名稱

      作業(yè)的名稱。

      說明

      作業(yè)名稱在當前項目中必須保持唯一。

      adbpg-test

      存儲位置

      指定該作業(yè)的代碼文件所屬的文件夾。

      您還可以在現(xiàn)有文件夾右側(cè),單擊新建文件夾圖標,新建子文件夾。

      作業(yè)草稿

      引擎版本

      當前作業(yè)使用的Flink的引擎版本。引擎版本號含義、版本對應關(guān)系和生命周期重要時間點詳情請參見引擎版本介紹

      vvr-6.0.6-flink-1.15

  2. 執(zhí)行以下命令,創(chuàng)建AnalyticDB PostgreSQL版映射表。

    CREATE TABLE vector_ingest (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature VARCHAR
    )WITH (
       'connector' = 'adbpg-nightly-1.13',
       'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
       'tablename' = 'car_info',
       'username' = '<your_username>',
       'password' = '<your_password>',
       'targetschema' = 'vector_test',
       'maxretrytimes' = '2',
       'batchsize' = '3000',
       'batchwritetimeoutms' = '10000',
       'connectionmaxactive' = '20',
       'conflictmode' = 'ignore',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    參數(shù)說明,請參見寫入數(shù)據(jù)到AnalyticDB PostgreSQL版

  3. 執(zhí)行以下命令,創(chuàng)建Kafka映射表。

    CREATE TABLE vector_kafka (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature string
    ) 
    WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = '<your_broker_list>',
        'topic' = 'vector_ingest',
        'format' = 'csv',
        'csv.field-delimiter' = '\t',
        'scan.startup.mode' = 'earliest-offset'
    );

    參數(shù)說明如下。

    參數(shù)

    是否必填

    說明

    connector

    連接器名。固定值為Kafka。

    properties.bootstrap.servers

    接入點信息。您可在云消息隊列Kafka版控制臺的實例詳情頁面的接入點信息區(qū)域獲取。

    topic

    Kafka消息所在的Topic名稱。

    format

    寫入Kafka消息Value部分時使用的格式。支持的格式:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    csv.field-delimiter

    CSV字段分隔符。

    scan.startup.mode

    Kafka讀取數(shù)據(jù)的啟動位點。取值如下:

    • earliest-offset:從Kafka最早分區(qū)開始讀取。

    • latest-offset:從Kafka最新位點開始讀取。

  4. 執(zhí)行以下命令,創(chuàng)建導入任務。

    INSERT INTO vector_ingest SELECT * FROM vector_kafka;