日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過實(shí)時(shí)計(jì)算Flink版寫入數(shù)據(jù)到云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版

本文介紹如何通過阿里云實(shí)時(shí)計(jì)算Flink版寫入數(shù)據(jù)到AnalyticDB PostgreSQL版

使用限制

  • 該功能暫不支持AnalyticDB PostgreSQL版Serverless模式

  • 僅Flink實(shí)時(shí)計(jì)算引擎VVR 6.0.0及以上版本支持云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版連接器。

  • 僅Flink實(shí)時(shí)計(jì)算引擎VVR 8.0.1及以上版本支持云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版7.0版本。

    說明

    如果您使用了自定義連接器,具體操作請參見管理自定義連接器

前提條件

  • 已創(chuàng)建Flink全托管工作空間。具體操作,請參見開通Flink全托管

  • 已創(chuàng)建AnalyticDB PostgreSQL版實(shí)例。具體操作,請參見創(chuàng)建實(shí)例

  • AnalyticDB PostgreSQL版實(shí)例和Flink全托管工作空間需要位于同一VPC下。

配置AnalyticDB PostgreSQL版實(shí)例

  1. 登錄云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版控制臺(tái)
  2. 將Flink工作空間所屬的網(wǎng)段加入AnalyticDB PostgreSQL版的白名單。如何添加白名單,請參見設(shè)置白名單

  3. 單擊登錄數(shù)據(jù)庫,連接數(shù)據(jù)庫的更多方式,請參見客戶端連接

  4. AnalyticDB PostgreSQL版實(shí)例上創(chuàng)建一張表。

    建表SQL示例如下:

    CREATE TABLE test_adbpg_table(
    b1 int,
    b2 int,
    b3 text,
    PRIMARY KEY(b1)
    );

配置實(shí)時(shí)計(jì)算Flink

  1. 登錄實(shí)時(shí)計(jì)算控制臺(tái)

  2. Flink全托管頁簽,單擊目標(biāo)工作空間操作列下的控制臺(tái)

  3. 在左側(cè)導(dǎo)航欄,單擊數(shù)據(jù)連接

  4. 數(shù)據(jù)連接頁面,單擊創(chuàng)建自定義連接器

  5. 上傳自定義連接器JAR文件。

    說明
    • 獲取AnalyticDB PostgreSQL版自定義Flink Connector的JAR包,請參見AnalyticDB PostgreSQL Connector

    • JAR包的版本需要與實(shí)時(shí)計(jì)算平臺(tái)的Flink引擎版本一致。

  6. 上傳完成后,單擊下一步

    系統(tǒng)會(huì)對您上傳的自定義連接器內(nèi)容進(jìn)行解析。如果解析成功,您可以繼續(xù)下一步。如果解析失敗,請確認(rèn)您上傳的自定義連接器代碼是否符合Flink社區(qū)標(biāo)準(zhǔn)。

  7. 單擊完成

    創(chuàng)建完成的自定義連接器會(huì)出現(xiàn)在連接器列表中。

創(chuàng)建Flink作業(yè)

  1. 登錄實(shí)時(shí)計(jì)算控制臺(tái),在Flink全托管頁簽,單擊目標(biāo)工作空間操作列下的控制臺(tái)

  2. 在左側(cè)導(dǎo)航欄,單擊SQL開發(fā),單擊新建,選擇空白的流作業(yè)草稿,單擊下一步

  3. 新建文件草稿對話框,填寫作業(yè)配置信息。

    作業(yè)參數(shù)

    說明

    示例

    文件名稱

    作業(yè)的名稱。

    說明

    作業(yè)名稱在當(dāng)前項(xiàng)目中必須保持唯一。

    adbpg-test

    存儲(chǔ)位置

    指定該作業(yè)的代碼文件所屬的文件夾。

    您還可以在現(xiàn)有文件夾右側(cè),單擊新建文件夾圖標(biāo),新建子文件夾。

    作業(yè)草稿

    引擎版本

    當(dāng)前作業(yè)使用的Flink的引擎版本。引擎版本號(hào)含義、版本對應(yīng)關(guān)系和生命周期重要時(shí)間點(diǎn)詳情請參見引擎版本介紹

    vvr-6.0.7-flink-1.15

  4. 單擊創(chuàng)建

寫入數(shù)據(jù)到AnalyticDB PostgreSQL版

  1. 編寫作業(yè)代碼。

    創(chuàng)建隨機(jī)源表datagen_source和對應(yīng)AnalyticDB PostgreSQL版的表test_adbpg_table,將以下作業(yè)代碼拷貝到作業(yè)文本編輯區(qū)。

    CREATE TABLE datagen_source (
     f_sequence INT,
     f_random INT,
     f_random_str STRING
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_sequence.kind'='sequence',
     'fields.f_sequence.start'='1',
     'fields.f_sequence.end'='1000',
     'fields.f_random.min'='1',
     'fields.f_random.max'='1000',
     'fields.f_random_str.length'='10'
    );
    
    CREATE TABLE test_adbpg_table (
        `B1` bigint   ,
        `B2` bigint  ,
        `B3` VARCHAR ,
        `B4` VARCHAR,
         PRIMARY KEY(B1) not ENFORCED
    ) with (
       'connector' = 'adbpg-nightly-1.13',
       'password' = 'xxx',
       'tablename' = 'test_adbpg_table',
       'username' = 'xxxx',
       'url' = 'jdbc:postgresql://url:5432/schema',
       'maxretrytimes' = '2',
       'batchsize' = '50000',
       'connectionmaxactive' = '5',
       'conflictmode' = 'ignore',
       'usecopy' = '0',
       'targetschema' = 'public',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    其中datagen_source表的參數(shù)無需修改,test_adbpg_table表的參數(shù)需要根據(jù)您實(shí)際情況進(jìn)行修改,參數(shù)說明如下:

    參數(shù)

    是否必填

    說明

    connector

    connector名稱,固定為adbpg-nightly-版本號(hào),例如adbpg-nightly-1.13

    url

    AnalyticDB PostgreSQL版的JDBC連接地址。格式為jdbc:postgresql://<內(nèi)網(wǎng)地址>:<端口>/<連接的數(shù)據(jù)庫名稱>,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres

    tablename

    AnalyticDB PostgreSQL版的表名。

    username

    AnalyticDB PostgreSQL版的數(shù)據(jù)庫賬號(hào)。

    password

    AnalyticDB PostgreSQL版的數(shù)據(jù)庫賬號(hào)密碼。

    maxretrytimes

    SQL執(zhí)行失敗后重試次數(shù),默認(rèn)為3次。

    batchsize

    一次批量寫入的最大條數(shù),默認(rèn)為50000條。

    exceptionmode

    數(shù)據(jù)寫入過程中出現(xiàn)異常時(shí)的處理策略。支持以下兩種處理策略:

    • ignore(默認(rèn)值):忽略出現(xiàn)異常時(shí)寫入的數(shù)據(jù)。

    • strict:數(shù)據(jù)寫入異常時(shí),故障轉(zhuǎn)移(Failover)并報(bào)錯(cuò)。

    conflictmode

    當(dāng)出現(xiàn)主鍵沖突或者唯一索引沖突時(shí)的處理策略。支持以下四種處理策略:

    • ignore :忽略主鍵沖突,保留之前的數(shù)據(jù)。

    • strict:主鍵沖突時(shí),故障轉(zhuǎn)移(Failover)并報(bào)錯(cuò)。

    • update:主鍵沖突時(shí),更新新到的數(shù)據(jù)。

    • upsert(默認(rèn)值):主鍵沖突時(shí),采用UPSERT方式寫入數(shù)據(jù)。

      AnalyticDB PostgreSQL版通過INSERT ON CONFLICTCOPY ON CONFLICT實(shí)現(xiàn)UPSERT寫入數(shù)據(jù),如果目標(biāo)表為分區(qū)表,則需要內(nèi)核小版本為V6.3.6.1及以上,如何升級(jí)內(nèi)核小版本,請參見版本升級(jí)

    targetschema

    AnalyticDB PostgreSQL版的Schema,默認(rèn)為public。

    writemode

    寫入方式。取值說明:

    • 0 :采用BATCH INSERT方式寫入數(shù)據(jù)。

    • 1(默認(rèn)值):采用COPY API寫入數(shù)據(jù)。

    • 2:采用BATCH UPSERT方式寫入數(shù)據(jù)。

    verbose

    是否輸出connector運(yùn)行日志。取值說明:

    • 0(默認(rèn)):不輸出運(yùn)行日志。

    • 1:輸出運(yùn)行日志。

    retrywaittime

    出現(xiàn)異常重試時(shí)間隔的時(shí)間。單位為ms,默認(rèn)值為100。

    batchwritetimeoutms

    攢批寫入數(shù)據(jù)時(shí)最長攢批時(shí)間,超過此事件會(huì)觸發(fā)這一批的寫入。單位為毫秒(ms),默認(rèn)值為50000。

    connectionmaxactive

    連接池參數(shù),單個(gè)Task manager中連接池同一時(shí)刻最大連接數(shù)。默認(rèn)值為5。

    casesensitive

    列名和表名是否區(qū)分大小寫,取值說明:

    • 0(默認(rèn)):不區(qū)分大小寫。

    • 1:區(qū)分大小寫。

    說明

    支持參數(shù)和類型映射,詳情請參見連接器云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版

  2. 啟動(dòng)作業(yè)。

    1. 在作業(yè)開發(fā)頁面頂部,單擊部署,在彈出的對話框中,單擊確定

      說明

      Session集群適用于非生產(chǎn)環(huán)境的開發(fā)測試環(huán)境,您可以使用Session集群模式調(diào)試作業(yè),提高作業(yè)JM(Job Manager)資源利用率和作業(yè)啟動(dòng)速度。但不推薦您將作業(yè)提交至Session集群中,因?yàn)闀?huì)存在業(yè)務(wù)穩(wěn)定性問題,詳情請參見作業(yè)調(diào)試

    2. 作業(yè)運(yùn)維頁面,單擊目標(biāo)作業(yè)操作列的啟動(dòng)

    3. 單擊啟動(dòng)

觀察同步結(jié)果

  1. 連接AnalyticDB PostgreSQL版數(shù)據(jù)庫。具體操作,請參見客戶端連接

  2. 執(zhí)行以下語句查詢test_adbpg_table表。

    SELECT * FROM test_adbpg_table;

    數(shù)據(jù)正常寫入到AnalyticDB PostgreSQL版中,返回示例如下。

    adbpg2.png

相關(guān)文檔