日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

實(shí)時(shí)計(jì)算 Flink 版通過訂閱云原生數(shù)據(jù)倉庫 AnalyticDB MySQL 版,可以實(shí)時(shí)捕獲和處理數(shù)據(jù)庫變更數(shù)據(jù),實(shí)現(xiàn)高效的數(shù)據(jù)同步和流式計(jì)算。本文為您介紹如何使用Flink訂閱AnalyticDB for MySQL Binlog。

前提條件

  • AnalyticDB for MySQL產(chǎn)品系列為企業(yè)版基礎(chǔ)版、湖倉版數(shù)倉版彈性模式。

  • AnalyticDB for MySQL集群的內(nèi)核版本需為3.2.1.0及以上版本。

    說明
    • 查看企業(yè)版基礎(chǔ)版湖倉版集群的內(nèi)核版本,請(qǐng)執(zhí)行SELECT adb_version();。如需升級(jí)內(nèi)核版本,請(qǐng)聯(lián)系技術(shù)支持。

    • 查看和升級(jí)數(shù)倉版集群的內(nèi)核版本,請(qǐng)參見查看和升級(jí)版本

  • Flink實(shí)時(shí)計(jì)算引擎需為VVR 8.0.4及以上版本。

  • AnalyticDB for MySQL和Flink全托管工作空間需要位于同一VPC下,詳情請(qǐng)參見創(chuàng)建集群開通實(shí)時(shí)計(jì)算Flink版。

  • 已將Flink工作空間所屬的網(wǎng)段加入AnalyticDB for MySQL的白名單,詳情請(qǐng)參見Flink所屬網(wǎng)段查看方法設(shè)置白名單。

使用限制

  • AnalyticDB for MySQL僅支持按表開啟Binlog功能。

  • Flink僅支持處理AnalyticDB for MySQL Binlog中的所有基礎(chǔ)數(shù)據(jù)類型和復(fù)雜數(shù)據(jù)類型JSON,詳情請(qǐng)參見數(shù)據(jù)類型。

  • Flink不會(huì)處理AnalyticDB for MySQL Binlog中的DDL操作記錄和分區(qū)表自動(dòng)分區(qū)刪除的操作記錄。

步驟一:開啟Binlog功能

  1. 開啟Binlog功能,本文以表名為source_table為例。

    建表時(shí),開啟Binlog

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    建表后,開啟Binlog

    ALTER TABLE source_table BINLOG=true;
  2. (可選)查看Binlog信息。

    說明

    使用以下語句查看Binlog文件的信息時(shí),若僅開啟Binlog功能,日志信息顯示為0。只有Flink成功訂閱Binlog后,才會(huì)顯示日志信息。

    • 查看當(dāng)前寫入的Binlog的位點(diǎn),SQL語句如下:

      SHOW MASTER STATUS FOR source_table;
    • 查看集群內(nèi)對(duì)應(yīng)表所有Binlog文件的信息,SQL語句如下:

      SHOW BINARY LOGS FOR source_table;
  3. (可選)修改Binlog保留時(shí)長(zhǎng)。

    您可以通過修改binlog_ttl參數(shù)來調(diào)整Binlog的保留時(shí)長(zhǎng)。以下示例表示將表source_table的Binlog保留時(shí)長(zhǎng)設(shè)置為1天。

    ALTER TABLE source_table binlog_ttl='1d';

    binlog_ttl參數(shù)取值支持以下格式:

    • 純數(shù)字,表示毫秒。例如,60代表60毫秒。

    • 數(shù)字+s,表示秒。例如,30s代表30秒。

    • 數(shù)字+h,表示小時(shí)。例如,2h代表2小時(shí)。

    • 數(shù)字+d,表示天。例如,1d代表1天。

步驟二:配置Flink連接器

  1. 登錄實(shí)時(shí)計(jì)算控制臺(tái)。

  2. Flink全托管頁簽,單擊目標(biāo)工作空間操作列下的控制臺(tái)。

  3. 在左側(cè)導(dǎo)航欄,單擊數(shù)據(jù)連接

  4. 數(shù)據(jù)連接頁面,單擊創(chuàng)建自定義連接器

  5. 上傳自定義連接器JAR包。下載鏈接:AnalyticDB for MySQL Connector

  6. 上傳完成后,單擊下一步

  7. 單擊完成。創(chuàng)建完成的自定義連接器會(huì)出現(xiàn)在連接器列表中。

步驟三:訂閱Binlog

  1. 登錄實(shí)時(shí)計(jì)算控制臺(tái),新建SQL作業(yè)。詳情請(qǐng)參見創(chuàng)建作業(yè)。

  2. 創(chuàng)建源表,連接到AnalyticDB for MySQL并讀取指定表(source_table)的Binlog數(shù)據(jù)。

    說明
    • Flink DDL中定義的主鍵必須和AnalyticDB for MySQL集群物理表中的主鍵保持一致,主鍵一致包括主鍵和主鍵名稱一致。如果不一致,會(huì)影響數(shù)據(jù)正確性。

    • Flink的數(shù)據(jù)類型需要和AnalyticDB for MySQL兼容。映射關(guān)系,請(qǐng)參見類型映射

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    WITH參數(shù)說明:

    參數(shù)

    是否必填

    默認(rèn)值

    數(shù)據(jù)類型

    說明

    connector

    STRING

    使用的連接器。

    這里填寫自定義連接器,固定填寫adb-mysql-cdc。

    hostname

    STRING

    AnalyticDB for MySQL的VPC地址。

    username

    STRING

    AnalyticDB for MySQL數(shù)據(jù)庫賬號(hào)。

    password

    STRING

    AnalyticDB for MySQL數(shù)據(jù)庫密碼。

    database-name

    STRING

    AnalyticDB for MySQL數(shù)據(jù)庫名稱。

    由于AnalyticDB for MySQL實(shí)現(xiàn)的是表級(jí)Binlog,此處僅支持設(shè)置一個(gè)數(shù)據(jù)庫。

    table-name

    STRING

    AnalyticDB for MySQL數(shù)據(jù)庫的表名。

    由于AnalyticDB for MySQL實(shí)現(xiàn)的是表級(jí)Binlog,此處僅支持設(shè)置一個(gè)表。

    port

    3306

    INTEGER

    端口號(hào)。

    scan.incremental.snapshot.enabled

    true

    BOOLEAN

    增量快照。

    默認(rèn)開啟。增量快照是一種讀取表快照的新機(jī)制,與舊的快照機(jī)制相比,增量快照有許多優(yōu)點(diǎn),包括:

    • 在讀取快照期間,Source支持并發(fā)讀取。

    • 在讀取快照期間,Source支持進(jìn)行Chunk粒度的Checkpoint。

    • 在讀取快照之前,Source不需要獲取數(shù)據(jù)庫鎖權(quán)限。

    scan.incremental.snapshot.chunk.size

    8096

    INTEGER

    表快照的Chunk大小(包含的行數(shù))。

    當(dāng)開啟增量快照讀取時(shí),表會(huì)被切分成多個(gè)Chunk讀取。

    scan.snapshot.fetch.size

    1024

    INTEGER

    讀取表快照時(shí),每次讀取數(shù)據(jù)的最大行數(shù)。

    scan.startup.mode

    initial

    STRING

    消費(fèi)數(shù)據(jù)的啟動(dòng)模式。

    取值如下:

    • initial(默認(rèn)):在第一次啟動(dòng)時(shí),會(huì)先掃描歷史全量數(shù)據(jù),然后讀取最新的Binlog數(shù)據(jù)。

    • earliest-offset:不掃描歷史全量數(shù)據(jù),直接從可讀取的最早Binlog開始讀取。

    • specific-offset:不掃描歷史全量數(shù)據(jù),從您指定的Binlog位點(diǎn)啟動(dòng),位點(diǎn)可通過同時(shí)配置scan.startup.specific-offset.filescan.startup.specific-offset.pos參數(shù)來指定從特定Binlog文件名和偏移量啟動(dòng)。

    scan.startup.specific-offset.file

    STRING

    在specific-offset啟動(dòng)模式下,啟動(dòng)位點(diǎn)的Binlog文件名。

    最新Binlog文件名可使用SHOW MASTER STATUES for table_name獲取。

    scan.startup.specific-offset.pos

    LONG

    在specific-offset啟動(dòng)模式下,啟動(dòng)位點(diǎn)的Binlog文件位置。

    最新Binlog位置可使用SHOW MASTER STATUES for table_name獲取。

    scan.startup.specific-offset.skip-events

    LONG

    在指定的啟動(dòng)位點(diǎn)后需要跳過的事件數(shù)量。

    scan.startup.specific-offset.skip-rows

    LONG

    在指定的啟動(dòng)位點(diǎn)后需要跳過的數(shù)據(jù)行數(shù)。

    server-time-zone

    STRING

    數(shù)據(jù)庫服務(wù)器中的會(huì)話時(shí)區(qū)。

    例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP類型如何轉(zhuǎn)成STRING類型。如果沒有設(shè)置,則使用ZONELD.SYSTEMDEFAULT()來確定服務(wù)器時(shí)區(qū)。

    debezium.min.row.count.to.stream.result

    1000

    INTEGER

    當(dāng)表的行數(shù)大于該值時(shí),連接器會(huì)對(duì)結(jié)果進(jìn)行流式處理。

    若將此參數(shù)設(shè)置為0,會(huì)跳過所有表大小檢查,始終在快照期間對(duì)所有結(jié)果進(jìn)行流式處理。

    connect.timeout

    30s

    DURATION

    連接數(shù)據(jù)庫服務(wù)器超時(shí),重試連接之前等待超時(shí)的最長(zhǎng)時(shí)間。

    默認(rèn)單位為秒(s)。

    connect.max-retries

    3

    INTEGER

    連接數(shù)據(jù)庫服務(wù)時(shí),連接失敗后重試的最大次數(shù)。

  3. 在目標(biāo)端創(chuàng)建表,用于存儲(chǔ)處理后的數(shù)據(jù)。本文以AnalyticDB for MySQL作為目標(biāo)端。Flink支持的連接器請(qǐng)參見支持的連接器

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. 創(chuàng)建結(jié)果表,連接步驟3創(chuàng)建的表,用于將處理后的數(shù)據(jù)寫入到AnalyticDB for MySQL指定的表。

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    結(jié)果表對(duì)應(yīng)的WITH參數(shù)和映射類型,詳情請(qǐng)見:云原生數(shù)據(jù)倉庫AnalyticDB MySQL版(ADB)3.0。

  5. 將捕獲到的源數(shù)據(jù)變化同步到結(jié)果表,并由結(jié)果表將數(shù)據(jù)同步到目標(biāo)端。

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. 單擊保存

  7. 單擊深度檢查。

    深度檢查能夠檢查作業(yè)的SQL語義、網(wǎng)絡(luò)連通性以及作業(yè)使用的表的元數(shù)據(jù)信息。同時(shí),您可以單擊結(jié)果區(qū)域的SQL優(yōu)化,展開查看SQL風(fēng)險(xiǎn)問題提示以及對(duì)應(yīng)的SQL優(yōu)化建議。

  8. (可選)單擊調(diào)試。

    您可以使用作業(yè)調(diào)試功能模擬作業(yè)運(yùn)行、檢查輸出結(jié)果,驗(yàn)證SELECT或INSERT業(yè)務(wù)邏輯的正確性,提升開發(fā)效率,降低數(shù)據(jù)質(zhì)量風(fēng)險(xiǎn)。詳情請(qǐng)參見作業(yè)調(diào)試。

  9. 單擊部署詳情請(qǐng)參見部署SQL作業(yè)。

    完成作業(yè)開發(fā)和深度檢查后,即可部署作業(yè),將數(shù)據(jù)發(fā)布至生產(chǎn)環(huán)境。部署后,您可以在作業(yè)運(yùn)維頁面啟動(dòng)作業(yè)至運(yùn)行階段,詳情請(qǐng)參見作業(yè)啟動(dòng)。

類型映射

AnalyticDB for MySQLFlink的數(shù)據(jù)類型映射關(guān)系如下:

AnalyticDB for MySQL字段類型

Flink字段類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s)或NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING