日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

兼容PolarDB PostgreSQL版(兼容Oracle)的Flink CDC

兼容PolarDB PostgreSQL版(兼容Oracle)的Flink CDC連接器(簡稱PolarDBO Flink CDC)可用于依次讀取PolarDB PostgreSQL版(兼容Oracle)數(shù)據(jù)庫全量快照數(shù)據(jù)和變更數(shù)據(jù),具體功能及用法請參考社區(qū)Postgres CDC

由于PolarDB PostgreSQL版(兼容Oracle)與社區(qū)PoatgreSQL僅在少量數(shù)據(jù)類型和內(nèi)置對象處理存在差異,本文為您介紹如何基于社區(qū)Postgres CDC,通過少量代碼適配打包出支持PolarDB PostgreSQL版(兼容Oracle)的PolarDBO Flink CDC連接器。

說明

PolarDB PostgreSQL版(兼容Oracle)的DATE類型是64位,而社區(qū)PostgreSQL的DATE類型為32位。因此,在PolarDBO Flink CDC中會對DATA類型數(shù)據(jù)的處理進(jìn)行適配。

打包PolarDBO Flink CDC連接器

重要

PolarDBO Flink CDC連接器基于社區(qū)Postgres CDC適配開發(fā),無論是您自行打包,還是使用本文中提供的JAR包,PolarDBO Flink CDC連接器都不提供SLA保障。

操作前提

  • 確定Flink-CDC版本

    如果您使用的是阿里云實(shí)時計(jì)算 Flink 版,需要確認(rèn)與對應(yīng)Ververica Runtime(簡稱VVR)版本兼容的社區(qū)Flink-CDC版本,具體可以參考CDC與VVR版本對應(yīng)關(guān)系

    說明

    Flink-CDC代碼倉庫請參考Flink-CDC

  • 確定Debezium版本

    在對應(yīng)版本的Flink-CDC的pom.xml中通過查找關(guān)鍵字debezium.version確定Debezium版本。

    說明

    Debezium代碼倉庫請參考Debezium

  • 確定PgJDBC版本

    在對應(yīng)版本的Postgres-CDC的pom.xml中通過查找關(guān)鍵字org.postgresql確定PgJDBC版本。

    說明
    • release-3.0以下版本文件路徑為:flink-connector-postgres-cdc/pom.xml

    • release-3.0及以上版本文件路徑為:flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml

    • PgJDBC代碼倉庫請參考PgJDBC

操作步驟

release-3.1打包

社區(qū)Flink-CDC release-3.1版本兼容阿里云實(shí)時計(jì)算 Flink 版的vvr-8.0.x-flink-1.17。

打包對應(yīng)版本的PolarDBO Flink CDC連接器步驟如下:

  1. 克隆對應(yīng)版本的Flink-CDC、Debezium和PgJDBC的代碼文件。

    git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
  2. 復(fù)制Debezium和PgJDBC部分文件到Flink-CDC中。

    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. 應(yīng)用適配PolarDB PostgreSQL版(兼容Oracle)的patch文件。

    git apply release-3.1_support_polardbo.patch
    說明

    以上使用的PolarDBO Flink CDC兼容patch文件:release-3.1_support_polardbo.patch

  4. 使用Maven打包PolarDBO Flink CDC連接器。

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # 打包完成后可以在flink-sql-connector-postgres-cdc的target目錄中獲取到j(luò)ar包

按照以上流程基于JDK8打包出PolarDBO Flink CDC連接器的JAR包:flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar

release-2.3打包

社區(qū)Flink-CDC release-2.3版本兼容阿里云實(shí)時計(jì)算 Flink 版的vvr-4.0.15-flink-1.13 ~ vvr-6.0.2-flink-1.15。

打包對應(yīng)版本的PolarDBO Flink CDC連接器步驟如下:

  1. 克隆對應(yīng)版本的Flink-CDC、Debezium和PgJDBC的代碼文件。

    git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
  2. 復(fù)制Debezium和PgJDBC部分文件到Flink-CDC中。

    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. 應(yīng)用適配PolarDB PostgreSQL版(兼容Oracle)的patch文件。

    git apply release-2.3_support_polardbo.patch
    說明

    以上使用的PolarDBO Flink CDC兼容patch文件:release-2.3_support_polardbo.patch

  4. 使用Maven打包PolarDBO Flink CDC連接器。

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # 打包完成后可以在flink-sql-connector-postgres-cdc的target目錄中獲取到j(luò)ar包

按照以上流程基于JDK8打包出PolarDBO Flink CDC連接器的JAR包:flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar

使用說明

PolarDBO Flink CDC連接器通過PolarDB PostgreSQL版(兼容Oracle)數(shù)據(jù)庫的邏輯復(fù)制讀取CDC變更流數(shù)據(jù),需要滿足以下條件:

  • wal_level參數(shù)的值需設(shè)置為logical,即在預(yù)寫式日志W(wǎng)AL(Write-ahead logging)中增加支持邏輯復(fù)制所需的信息。

    說明

    您可以通過控制臺設(shè)置wal_level參數(shù),詳細(xì)操作請參考設(shè)置集群參數(shù)。修改該參數(shù)后集群將會重啟,請?jiān)谛薷膮?shù)前做好業(yè)務(wù)安排,謹(jǐn)慎操作。

  • 執(zhí)行ALTER TABLE schema.table REPLICA IDENTITY FULL;命令設(shè)置訂閱表的REPLICA IDENTITYFULL(發(fā)出的插入和更新操作事件包含表中所有列的舊值),以保障該表數(shù)據(jù)同步的一致性。

    說明
    • REPLICA IDENTITY是PostgreSQL特有的表級設(shè)置,決定了邏輯解碼插件在發(fā)生(INSERT)和更新(UPDATE)事件時,是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情,請參見REPLICA IDENTITY

    • 設(shè)置訂閱表的REPLICA IDENTITYFULL時可能需要鎖表,可能影響業(yè)務(wù),請?jiān)谛薷膮?shù)前做好業(yè)務(wù)安排。您可以通過以下命令查看當(dāng)前配置是否為FULL

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • 需要確保max_wal_senders和max_replication_slots的參數(shù)值均大于當(dāng)前數(shù)據(jù)庫復(fù)制槽已使用數(shù)與Flink作業(yè)所需要的slot數(shù)量。

  • 確保使用的是高權(quán)限賬號或者同時擁有LOGIN和REPLICATION權(quán)限,并且具有訂閱表的SELECT權(quán)限用于全量數(shù)據(jù)查詢。

  • 只能連接PolarDB集群的主地址,集群地址不支持邏輯復(fù)制。

PolarDBO Flink CDC連接器與Postgres CDC區(qū)別

PolarDBO Flink CDC連接器基于Postgres CDC打包,具體語法和參數(shù)可以參考Postgres CDC。但存在以下主要區(qū)別:

  • WITH的connector參數(shù)需要設(shè)置為固定值:polardbo-cdc

  • PolarDBO Flink CDC同時兼容PolarDB PostgreSQL版各版本、PolarDB PostgreSQL版(兼容Oracle) 1.0和PolarDB PostgreSQL版(兼容Oracle) 2.0版本。

    說明

    如果您使用的是PolarDB PostgreSQL版,推薦您直接使用社區(qū)Postgres CDC

  • PolarDB PostgreSQL版(兼容Oracle) 1.0、PolarDB PostgreSQL版(兼容Oracle) 2.0中的DATE類型的列,F(xiàn)link SQL中的source和sink表對應(yīng)類型必須指定為timestamp

  • 建議將decoding.plugin.name參數(shù)設(shè)置為pgoutput,否則非UTF-8編碼的數(shù)據(jù)庫可能會發(fā)生增量解析亂碼,詳細(xì)介紹請參考社區(qū)文檔

類型映射

PolarDB PostgreSQL和Flink字段類型映射,除DATE類型外,其他字段類型和社區(qū)PostgreSQL完全相同,具體映射關(guān)系如下:

PolarDB PostgreSQL字段類型

Flink字段類型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

  • PolarDB PostgreSQL版(兼容Oracle) 1.0:TIMESTAMP

  • PolarDB PostgreSQL版(兼容Oracle) 2.0:TIMESTAMP

  • PolarDB PostgreSQL版:DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

示例

以下示例用于說明,如何通過PolarDBO Flink CDC,將PolarDB PostgreSQL版(兼容Oracle) 2.0集群中flink_source庫的shipments表,同步到flink_sink庫的shipments_sink表中。

說明

以下示例僅用于簡單驗(yàn)證打包的PolarDBO Flink CDC能夠在PolarDB PostgreSQL版(兼容Oracle)上運(yùn)行。正式使用時,為滿足您的業(yè)務(wù)需求,請參考社區(qū)Postgres CDC配置參數(shù)。

  1. 前提準(zhǔn)備

    • PolarDB PostgreSQL版(兼容Oracle)準(zhǔn)備

      1. PolarDB集群購買頁面,購買PolarDB PostgreSQL版(兼容Oracle) 2.0集群。

      2. 創(chuàng)建高權(quán)限賬戶,詳細(xì)操作請參考創(chuàng)建賬號

      3. 獲取集群主地址,詳細(xì)操作請參考查看連接地址。如果PolarDB集群和實(shí)時計(jì)算 Flink 版在同一可用區(qū),可直接使用私網(wǎng)地址,否則需要申請公網(wǎng)地址。將Flink實(shí)例地址添加到PolarDB集群白名單中,請參考設(shè)置集群白名單

      4. 在控制臺創(chuàng)建源數(shù)據(jù)庫flink_source和目標(biāo)數(shù)據(jù)庫flink_sink,詳細(xì)步驟請參考創(chuàng)建數(shù)據(jù)庫

      5. 執(zhí)行如下語句,在源數(shù)據(jù)庫flink_source中創(chuàng)建shipments表,并寫入數(shù)據(jù)。

        CREATE TABLE public.shipments (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
      6. 執(zhí)行如下語句,在目標(biāo)數(shù)據(jù)庫flink_sink中創(chuàng)建shipments_sink表。

        CREATE TABLE public.shipments_sink (
           shipment_id INT,
           order_id INT,
           origin TEXT,
           destination TEXT,
           is_arrived BOOLEAN,
           order_time TIMESTAMP,
           PRIMARY KEY (shipment_id)
         );
    • 實(shí)時計(jì)算 Flink 版準(zhǔn)備

      1. 登錄實(shí)時計(jì)算控制臺,購買實(shí)時計(jì)算 Flink 版實(shí)例,詳細(xì)操作請參考開通實(shí)時計(jì)算Flink版

        說明

        建議實(shí)時計(jì)算 Flink 版地域專有網(wǎng)絡(luò)PolarDB集群保持一致,連接地址可以直接使用PolarDB集群主地址的私網(wǎng)地址。

      2. 創(chuàng)建自定義連接器,上傳打包好的PolarDBO Flink CDCFormats選擇debezium-json,詳細(xì)步驟請參考創(chuàng)建自定義連接器

        image

  2. 創(chuàng)建Flink作業(yè)

    1. 登錄實(shí)時計(jì)算控制臺,新建一個SQL作業(yè)草稿,請參考SQL作業(yè)開發(fā)。使用以下Flink SQL語句,修改PolarDB集群主地址,端口,賬號和密碼。

      說明

      PolarDB PostgreSQL版(兼容Oracle)的DATE類型是64位,而Flink SQL以及大部分?jǐn)?shù)據(jù)庫的DATE類型為32位。因此,源表中DATE類型的列,在Flink SQL的source和sink表中都必須要指定為TIMESTAMP類型。否則,作業(yè)會因?yàn)轭愋筒黄ヅ涠鴪?bào)錯中斷,例如:“java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471):1720891573000”

      CREATE TEMPORARY TABLE shipments (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
         'connector' = 'polardbo-cdc',
         'hostname' = '<yourHostname>',
         'port' = '<yourPort>',
         'username' = '<yourUserName>',
         'password' = '<yourPassWord>',
         'database-name' = 'flink_source',
         'schema-name' = 'public',
         'table-name' = 'shipments',
         'decoding.plugin.name' = 'pgoutput',
         'slot.name' = 'flink'
       );
      
      CREATE TEMPORARY TABLE shipments_sink (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink',
        'table-name' = 'shipments_sink',
        'username' = '<yourUserName>',
        'password' = '<yourPassWord>'
      );
      
      INSERT INTO shipments_sink SELECT * FROM shipments;
    2. 部署并啟動作業(yè)。

      image

      image

    3. 測試與驗(yàn)證。

      • 部署作業(yè)運(yùn)行成功后,即狀態(tài)為運(yùn)行中,shipments表中的數(shù)據(jù)已經(jīng)同步到目標(biāo)數(shù)據(jù)庫flink_sink的shipments_sink表。

        SELECT * FROM public.shipments_sink;

        返回結(jié)果如下:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | f          | 2024-09-18 05:45:08
        (1 row)
      • 在源數(shù)據(jù)庫flink_source的shipments表上執(zhí)行DML,新增修改也將實(shí)時同步。

        INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments WHERE shipment_id = 2;
        INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;

        shipments表中的數(shù)據(jù)已經(jīng)同步更新到目標(biāo)數(shù)據(jù)庫flink_sink的shipments_sink表。

        SELECT * FROM public.shipments_sink;

        返回結(jié)果如下:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | t          | 2024-09-18 05:45:08
                   3 |        3 | test3  | test3       | t          | 2024-09-18 07:33:23
        (2 rows)