Flink訂閱Binlog
實(shí)時(shí)計(jì)算 Flink 版通過訂閱云原生數(shù)據(jù)倉庫 AnalyticDB MySQL 版,可以實(shí)時(shí)捕獲和處理數(shù)據(jù)庫變更數(shù)據(jù),實(shí)現(xiàn)高效的數(shù)據(jù)同步和流式計(jì)算。本文為您介紹如何使用Flink訂閱AnalyticDB for MySQL Binlog。
前提條件
AnalyticDB for MySQL產(chǎn)品系列為企業(yè)版、基礎(chǔ)版、湖倉版或數(shù)倉版彈性模式。
AnalyticDB for MySQL集群的內(nèi)核版本需為3.2.1.0及以上版本。
說明查看企業(yè)版、基礎(chǔ)版或湖倉版集群的內(nèi)核版本,請(qǐng)執(zhí)行
SELECT adb_version();
。如需升級(jí)內(nèi)核版本,請(qǐng)聯(lián)系技術(shù)支持。查看和升級(jí)數(shù)倉版集群的內(nèi)核版本,請(qǐng)參見查看和升級(jí)版本。
Flink實(shí)時(shí)計(jì)算引擎需為VVR 8.0.4及以上版本。
AnalyticDB for MySQL和Flink全托管工作空間需要位于同一VPC下,詳情請(qǐng)參見創(chuàng)建集群和開通實(shí)時(shí)計(jì)算Flink版。
已將Flink工作空間所屬的網(wǎng)段加入AnalyticDB for MySQL的白名單,詳情請(qǐng)參見Flink所屬網(wǎng)段查看方法和設(shè)置白名單。
使用限制
AnalyticDB for MySQL僅支持按表開啟Binlog功能。
Flink僅支持處理AnalyticDB for MySQL Binlog中的所有基礎(chǔ)數(shù)據(jù)類型和復(fù)雜數(shù)據(jù)類型JSON,詳情請(qǐng)參見數(shù)據(jù)類型。
Flink不會(huì)處理AnalyticDB for MySQL Binlog中的DDL操作記錄和分區(qū)表自動(dòng)分區(qū)刪除的操作記錄。
步驟一:開啟Binlog功能
開啟Binlog功能,本文以表名為source_table為例。
建表時(shí),開啟Binlog
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;
建表后,開啟Binlog
ALTER TABLE source_table BINLOG=true;
(可選)查看Binlog信息。
說明使用以下語句查看Binlog文件的信息時(shí),若僅開啟Binlog功能,日志信息顯示為0。只有Flink成功訂閱Binlog后,才會(huì)顯示日志信息。
查看當(dāng)前寫入的Binlog的位點(diǎn),SQL語句如下:
SHOW MASTER STATUS FOR source_table;
查看集群內(nèi)對(duì)應(yīng)表所有Binlog文件的信息,SQL語句如下:
SHOW BINARY LOGS FOR source_table;
(可選)修改Binlog保留時(shí)長(zhǎng)。
您可以通過修改
binlog_ttl
參數(shù)來調(diào)整Binlog的保留時(shí)長(zhǎng)。以下示例表示將表source_table的Binlog保留時(shí)長(zhǎng)設(shè)置為1天。ALTER TABLE source_table binlog_ttl='1d';
binlog_ttl
參數(shù)取值支持以下格式:純數(shù)字,表示毫秒。例如,60代表60毫秒。
數(shù)字+s,表示秒。例如,30s代表30秒。
數(shù)字+h,表示小時(shí)。例如,2h代表2小時(shí)。
數(shù)字+d,表示天。例如,1d代表1天。
步驟二:配置Flink連接器
在Flink全托管頁簽,單擊目標(biāo)工作空間操作列下的控制臺(tái)。
在左側(cè)導(dǎo)航欄,單擊數(shù)據(jù)連接。
在數(shù)據(jù)連接頁面,單擊創(chuàng)建自定義連接器
上傳自定義連接器JAR包。下載鏈接:AnalyticDB for MySQL Connector。
上傳完成后,單擊下一步
單擊完成。創(chuàng)建完成的自定義連接器會(huì)出現(xiàn)在連接器列表中。
步驟三:訂閱Binlog
登錄實(shí)時(shí)計(jì)算控制臺(tái),新建SQL作業(yè)。詳情請(qǐng)參見創(chuàng)建作業(yè)。
創(chuàng)建源表,連接到AnalyticDB for MySQL并讀取指定表(source_table)的Binlog數(shù)據(jù)。
說明Flink DDL中定義的主鍵必須和AnalyticDB for MySQL集群物理表中的主鍵保持一致,主鍵一致包括主鍵和主鍵名稱一致。如果不一致,會(huì)影響數(shù)據(jù)正確性。
Flink的數(shù)據(jù)類型需要和AnalyticDB for MySQL兼容。映射關(guān)系,請(qǐng)參見類型映射。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );
WITH參數(shù)說明:
參數(shù)
是否必填
默認(rèn)值
數(shù)據(jù)類型
說明
connector
是
無
STRING
使用的連接器。
這里填寫自定義連接器,固定填寫
adb-mysql-cdc
。hostname
是
無
STRING
AnalyticDB for MySQL的VPC地址。
username
是
無
STRING
AnalyticDB for MySQL數(shù)據(jù)庫賬號(hào)。
password
是
無
STRING
AnalyticDB for MySQL數(shù)據(jù)庫密碼。
database-name
是
無
STRING
AnalyticDB for MySQL數(shù)據(jù)庫名稱。
由于AnalyticDB for MySQL實(shí)現(xiàn)的是表級(jí)Binlog,此處僅支持設(shè)置一個(gè)數(shù)據(jù)庫。
table-name
是
無
STRING
AnalyticDB for MySQL數(shù)據(jù)庫的表名。
由于AnalyticDB for MySQL實(shí)現(xiàn)的是表級(jí)Binlog,此處僅支持設(shè)置一個(gè)表。
port
否
3306
INTEGER
端口號(hào)。
scan.incremental.snapshot.enabled
否
true
BOOLEAN
增量快照。
默認(rèn)開啟。增量快照是一種讀取表快照的新機(jī)制,與舊的快照機(jī)制相比,增量快照有許多優(yōu)點(diǎn),包括:
在讀取快照期間,Source支持并發(fā)讀取。
在讀取快照期間,Source支持進(jìn)行Chunk粒度的Checkpoint。
在讀取快照之前,Source不需要獲取數(shù)據(jù)庫鎖權(quán)限。
scan.incremental.snapshot.chunk.size
否
8096
INTEGER
表快照的Chunk大小(包含的行數(shù))。
當(dāng)開啟增量快照讀取時(shí),表會(huì)被切分成多個(gè)Chunk讀取。
scan.snapshot.fetch.size
否
1024
INTEGER
讀取表快照時(shí),每次讀取數(shù)據(jù)的最大行數(shù)。
scan.startup.mode
否
initial
STRING
消費(fèi)數(shù)據(jù)的啟動(dòng)模式。
取值如下:
initial(默認(rèn)):在第一次啟動(dòng)時(shí),會(huì)先掃描歷史全量數(shù)據(jù),然后讀取最新的Binlog數(shù)據(jù)。
earliest-offset:不掃描歷史全量數(shù)據(jù),直接從可讀取的最早Binlog開始讀取。
specific-offset:不掃描歷史全量數(shù)據(jù),從您指定的Binlog位點(diǎn)啟動(dòng),位點(diǎn)可通過同時(shí)配置
scan.startup.specific-offset.file
和scan.startup.specific-offset.pos
參數(shù)來指定從特定Binlog文件名和偏移量啟動(dòng)。
scan.startup.specific-offset.file
否
無
STRING
在specific-offset啟動(dòng)模式下,啟動(dòng)位點(diǎn)的Binlog文件名。
最新Binlog文件名可使用
SHOW MASTER STATUES for table_name
獲取。scan.startup.specific-offset.pos
否
無
LONG
在specific-offset啟動(dòng)模式下,啟動(dòng)位點(diǎn)的Binlog文件位置。
最新Binlog位置可使用
SHOW MASTER STATUES for table_name
獲取。scan.startup.specific-offset.skip-events
否
無
LONG
在指定的啟動(dòng)位點(diǎn)后需要跳過的事件數(shù)量。
scan.startup.specific-offset.skip-rows
否
無
LONG
在指定的啟動(dòng)位點(diǎn)后需要跳過的數(shù)據(jù)行數(shù)。
server-time-zone
否
無
STRING
數(shù)據(jù)庫服務(wù)器中的會(huì)話時(shí)區(qū)。
例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP類型如何轉(zhuǎn)成STRING類型。如果沒有設(shè)置,則使用
ZONELD.SYSTEMDEFAULT()
來確定服務(wù)器時(shí)區(qū)。debezium.min.row.count.to.stream.result
否
1000
INTEGER
當(dāng)表的行數(shù)大于該值時(shí),連接器會(huì)對(duì)結(jié)果進(jìn)行流式處理。
若將此參數(shù)設(shè)置為
0
,會(huì)跳過所有表大小檢查,始終在快照期間對(duì)所有結(jié)果進(jìn)行流式處理。connect.timeout
否
30s
DURATION
連接數(shù)據(jù)庫服務(wù)器超時(shí),重試連接之前等待超時(shí)的最長(zhǎng)時(shí)間。
默認(rèn)單位為秒(s)。
connect.max-retries
否
3
INTEGER
連接數(shù)據(jù)庫服務(wù)時(shí),連接失敗后重試的最大次數(shù)。
在目標(biāo)端創(chuàng)建表,用于存儲(chǔ)處理后的數(shù)據(jù)。本文以AnalyticDB for MySQL作為目標(biāo)端。Flink支持的連接器請(qǐng)參見支持的連接器。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )
創(chuàng)建結(jié)果表,連接步驟3創(chuàng)建的表,用于將處理后的數(shù)據(jù)寫入到AnalyticDB for MySQL指定的表。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );
結(jié)果表對(duì)應(yīng)的WITH參數(shù)和映射類型,詳情請(qǐng)見:云原生數(shù)據(jù)倉庫AnalyticDB MySQL版(ADB)3.0。
將捕獲到的源數(shù)據(jù)變化同步到結(jié)果表,并由結(jié)果表將數(shù)據(jù)同步到目標(biāo)端。
INSERT INTO adb_sink SELECT * FROM adb_source;
單擊保存。
單擊深度檢查。
深度檢查能夠檢查作業(yè)的SQL語義、網(wǎng)絡(luò)連通性以及作業(yè)使用的表的元數(shù)據(jù)信息。同時(shí),您可以單擊結(jié)果區(qū)域的SQL優(yōu)化,展開查看SQL風(fēng)險(xiǎn)問題提示以及對(duì)應(yīng)的SQL優(yōu)化建議。
(可選)單擊調(diào)試。
您可以使用作業(yè)調(diào)試功能模擬作業(yè)運(yùn)行、檢查輸出結(jié)果,驗(yàn)證SELECT或INSERT業(yè)務(wù)邏輯的正確性,提升開發(fā)效率,降低數(shù)據(jù)質(zhì)量風(fēng)險(xiǎn)。詳情請(qǐng)參見作業(yè)調(diào)試。
單擊部署,詳情請(qǐng)參見部署SQL作業(yè)。
完成作業(yè)開發(fā)和深度檢查后,即可部署作業(yè),將數(shù)據(jù)發(fā)布至生產(chǎn)環(huán)境。部署后,您可以在作業(yè)運(yùn)維頁面啟動(dòng)作業(yè)至運(yùn)行階段,詳情請(qǐng)參見作業(yè)啟動(dòng)。
類型映射
AnalyticDB for MySQL與Flink的數(shù)據(jù)類型映射關(guān)系如下:
AnalyticDB for MySQL字段類型 | Flink字段類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s)或NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |