基于實(shí)時(shí)計(jì)算Flink使用CTAS&CDAS功能同步MySQL數(shù)據(jù)至StarRocks
CTAS可以實(shí)現(xiàn)單表的結(jié)構(gòu)和數(shù)據(jù)同步,CDAS可以實(shí)現(xiàn)整庫同步或者同一庫中的多表結(jié)構(gòu)和數(shù)據(jù)同步。本文為您介紹如何使用實(shí)時(shí)計(jì)算Flink平臺(tái)和E-MapReduce StarRocks通過CTAS&CDAS功能實(shí)現(xiàn)實(shí)時(shí)數(shù)倉中TP(Transaction Processing)和AP(Analytical Processing)數(shù)據(jù)同步的場景。
背景信息
通過CTAS(CREATE TABLE AS)語句可以在StarRocks中自動(dòng)創(chuàng)建和MySQL中表結(jié)構(gòu)一致的表,并進(jìn)行數(shù)據(jù)同步,還能實(shí)時(shí)同步上游表結(jié)構(gòu)(Schema)的變更到下游表,提高您在目標(biāo)存儲(chǔ)中創(chuàng)建表和維護(hù)源表結(jié)構(gòu)變更的效率。
當(dāng)執(zhí)行CTAS語句時(shí),F(xiàn)link會(huì)按照以下流程執(zhí)行:
檢查目標(biāo)存儲(chǔ)中是否存在該目標(biāo)表。
如果不存在,則通過目標(biāo)端Catalog在目標(biāo)存儲(chǔ)中創(chuàng)建相應(yīng)的目標(biāo)表,該目標(biāo)表具有和數(shù)據(jù)源相同的Schema。
如果存在,則跳過建表。如果已存在的目標(biāo)表與源表Schema不一致,則會(huì)報(bào)錯(cuò)提示。
提交和啟動(dòng)相應(yīng)的數(shù)據(jù)同步作業(yè)。同步數(shù)據(jù)源的數(shù)據(jù)以及Schema的變更到目標(biāo)表中。
表結(jié)構(gòu)變更同步策略通過CTAS語句,在實(shí)時(shí)同步數(shù)據(jù)的同時(shí),還能同步源表Schema的變更到目標(biāo)表中。
Schema變更包括初始表的創(chuàng)建以及未來表的變更。
當(dāng)前支持同步的Schema變更:
添加可空列:會(huì)自動(dòng)在目標(biāo)表Schema末尾添加對應(yīng)的列,并自動(dòng)同步新增列的數(shù)據(jù)。
刪除可空列:不會(huì)直接在目標(biāo)表中刪除該列,而是將該列的數(shù)據(jù)自動(dòng)填充為NULL值。
重命名列:被視為添加列和刪除列。直接在目標(biāo)表中末尾添加重命名后的列,并將重命名前的列數(shù)據(jù)自動(dòng)填充為NULL值。
例如,如果col_a重命名為col_b,則會(huì)在目標(biāo)表末尾添加col_b,并自動(dòng)將col_a的數(shù)據(jù)填充為NULL值。
暫不支持同步的Schema變更:
數(shù)據(jù)類型的變更。
例如,由VARCHAR變?yōu)锽IGINT,由NOT NULL變?yōu)镹ULLABLE屬性。
主鍵或索引等約束的變更。
非空列的增加或刪除的變更。
DDL中字段長度的調(diào)整。
如果遇到不支持的Schema變更,則需要您手動(dòng)刪除下游目標(biāo)表,重新啟動(dòng)CTAS作業(yè),即重新創(chuàng)建目標(biāo)表并重新同步歷史數(shù)據(jù)。
CTAS不會(huì)識(shí)別具體的DDL類型,而是對比前后兩條數(shù)據(jù)的Schema差異。因此,如果您先刪除了某列后,又加回了該列,且這兩個(gè)DDL之間無數(shù)據(jù)變化,則CTAS會(huì)認(rèn)為沒有發(fā)生結(jié)構(gòu)變更。同理,如果您添加了一列,直到該表有數(shù)據(jù)變化,CTAS才會(huì)感知到結(jié)構(gòu)變更,才會(huì)同步結(jié)構(gòu)變更到目標(biāo)表。
通過CTAS建表支持的字段類型信息,請參見Flink與StarRocks的數(shù)據(jù)類型映射關(guān)系。
在使用CTAS語句合并MySQL多張表時(shí),默認(rèn)情況下,系統(tǒng)會(huì)自動(dòng)在生成的新表結(jié)構(gòu)最前面添加
_db_name
和_table_name
兩列,用來追蹤源數(shù)據(jù)表信息。由于這一自動(dòng)添加行為不可更改,您在定義新表的列順序時(shí),請直接從第三列開始定義您期望的列順序,以確保新表結(jié)構(gòu)符合預(yù)期。
前提條件
已開通阿里云實(shí)時(shí)計(jì)算Flink全托管并創(chuàng)建了Flink集群,詳情請參見開通Flink全托管和Flink SQL作業(yè)快速入門。
已創(chuàng)建StarRocks集群,詳情請參見創(chuàng)建StarRocks集群。
已創(chuàng)建RDS MySQL,詳情請參見創(chuàng)建RDS MySQL實(shí)例。
本文以5.7版本的MySQL、EMR-3.39.1版本的StarRocks集群和1.15-6.0.3版本的Flink為例介紹。
使用限制
創(chuàng)建的Flink集群、StarRocks集群以及RDS MySQL實(shí)例需要在同一個(gè)VPC下。
RDS MySQL須為5.7及以上版本。
StarRocks須開啟公網(wǎng)訪問。
Flink集群中的Flink須為1.15-vvr-6.0.3及以上版本。
步驟一:準(zhǔn)備測試數(shù)據(jù)
創(chuàng)建測試的數(shù)據(jù)庫和賬號(hào),詳情請參見創(chuàng)建數(shù)據(jù)庫和賬號(hào)。
創(chuàng)建完數(shù)據(jù)庫和賬號(hào)后,需要授權(quán)測試賬號(hào)的讀寫權(quán)限。
說明本文創(chuàng)建的數(shù)據(jù)庫名稱為test_cdc,賬號(hào)為test。
使用創(chuàng)建的測試賬號(hào)連接MySQL實(shí)例,詳情請參見通過DMS登錄RDS MySQL。
在MySQL中執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表。
use test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
使用SSH方式登錄StarRocks集群,詳情請參見登錄集群。
執(zhí)行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,創(chuàng)建用戶和授權(quán)。
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED by '123456'; GRANT CREATE TABLE ON DATABASE test_cdc TO test;
步驟二:在實(shí)時(shí)計(jì)算Flink控制臺(tái)通過SQL客戶端創(chuàng)建Catalog
在阿里云實(shí)時(shí)計(jì)算Flink控制臺(tái)的作業(yè)開發(fā)頁面中,創(chuàng)建MySQL和StarRocks的Catalog。詳情請參見Flink SQL作業(yè)快速入門。
參數(shù)僅供參考格式,具體內(nèi)容請根據(jù)實(shí)際情況配置。
MySQL Catalog
代碼示例
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'default-database' = 'test_cdc' );
參數(shù)配置
參數(shù)
說明
type
類型,固定值為mysql。
hostname
RDS的內(nèi)網(wǎng)地址。您可以在RDS的數(shù)據(jù)庫連接頁面,單擊內(nèi)網(wǎng)地址進(jìn)行復(fù)制。例如,rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com。
port
MySQL數(shù)據(jù)庫服務(wù)的端口號(hào),默認(rèn)值為3306。
username
MySQL數(shù)據(jù)庫服務(wù)的用戶名。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中賬號(hào)的用戶名。本示例為test。
password
MySQL數(shù)據(jù)庫服務(wù)的密碼。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中賬號(hào)的密碼。
default-database
默認(rèn)的MySQL數(shù)據(jù)庫名稱。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中創(chuàng)建的數(shù)據(jù)庫名。本示例為test_cdc。
StarRocks Catalog
代碼示例
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = '172.16.**.**:9030', 'username' = 'test', 'password' = '123456', 'dbname' = 'test_cdc' );
參數(shù)配置
參數(shù)
說明
type
類型,固定值為starrocks。
endpoint
StarRocks FE的IP地址和端口。
username
StarRocks的用戶名。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中賬號(hào)的用戶名。本示例為test。
password
StarRocks數(shù)據(jù)庫服務(wù)的密碼。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中賬號(hào)的密碼。
dbname
StarRocks數(shù)據(jù)庫名稱。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中創(chuàng)建的數(shù)據(jù)庫名。本示例為test_cdc。
步驟三:創(chuàng)建并上線作業(yè)
在阿里云實(shí)時(shí)計(jì)算Flink控制臺(tái)的作業(yè)開發(fā)頁面,編寫CTAS語句。
您可以使用以下三種示例發(fā)送CTAS語句。
AtLeast once語義:通過sink.buffer-flush.interval-ms配置項(xiàng),配置每次寫入StarRocks的時(shí)間間隔,優(yōu)點(diǎn)是寫入間隔時(shí)間短,占用內(nèi)存較少。
/* AtLeast once 語義 */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl_sr with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '123456', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
Exactly once語義:需要定義checkpoint間隔,優(yōu)點(diǎn)是在各種異常情況下保障數(shù)據(jù)不丟失不重復(fù),缺點(diǎn)是數(shù)據(jù)可見時(shí)間取決于checkpoint間隔。更多信息,請參見Checkpointing。
/* Exactly once 語義。 */ set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '123456', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01 ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
Simple模式:優(yōu)點(diǎn)是創(chuàng)建表時(shí)不需要關(guān)注原表有哪些字段,會(huì)按照MySQL的表格式照搬過來,開發(fā)者使用比較方便。缺點(diǎn)是不能創(chuàng)建分區(qū),對于需要分區(qū)的表,仍需要通過normal模式創(chuàng)建。
/* 上面兩個(gè)為normal模式,本示例演示simple模式 */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'='buckets 8', 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://172.16.**.**:9030', 'load-url'='172.16.**.**:18030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '123456', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;
表 1. WITH參數(shù)
參數(shù)
是否必選
描述
starrocks.create.table.properties
是
StarRocks建表語句中除了字段定義以外的其他后綴定義,例如示例中的engine、key和buckets等。
database-name
是
StarRocks數(shù)據(jù)庫名稱。
本示例為test_cdc。
jdbc-url
是
用于在StarRocks中執(zhí)行查詢操作。
例如,jdbc:mysql://172.16.**.**:9030。其中,
172.16.**.**
為StarRocks集群的內(nèi)網(wǎng)IP地址。load-url
是
指定FE的IP地址和HTTP端口,格式為
StarRocks集群的內(nèi)網(wǎng)IP地址:端口
。本文以8030端口為例,實(shí)際請根據(jù)您的集群版本選擇訪問的端口:18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。
8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。
說明訪問端口詳情,請參見UI和端口。
sink.semantic
否
填寫exactly-once可以保障數(shù)據(jù)一致性語義,默認(rèn)為at-least-once。
starrocks.create.table.mode
否
支持以下參數(shù)值:
normal模式(默認(rèn)值):必須像示例一樣在starrocks.create.table.properties配置中填寫engine、key和buckets等完整的配置。
simple模式:默認(rèn)選擇engine為olap,選擇key類型為primary key,且主鍵與MySQL的主鍵保持完全一致,默認(rèn)distributed by hash(所有的主鍵),默認(rèn)無分區(qū)。需要在starrocks.create.table.properties配置中填寫的必填內(nèi)容為buckets ,選填內(nèi)容為properties等配置。
說明因?yàn)関vr-6.0.5-flink-1.15及以上版本移除了
sink.use.new-api
,所以使用vvr-6.0.5-flink-1.15之前的版本時(shí),請?jiān)趙ith參數(shù)中添加'sink.use.new-api'='false',
。其他配置請參見從Apache Flink持續(xù)導(dǎo)入。
表 2. OPTIONS參數(shù)
參數(shù)
描述
connector
類型,固定值為mysql-cdc。
hostname
RDS的內(nèi)網(wǎng)地址。
您可以在RDS的數(shù)據(jù)庫連接頁面,單擊內(nèi)網(wǎng)地址進(jìn)行復(fù)制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。
port
MySQL數(shù)據(jù)庫服務(wù)的端口號(hào),默認(rèn)值為3306。
username
MySQL數(shù)據(jù)庫服務(wù)的用戶名。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中賬號(hào)的用戶名。本示例為test。
password
MySQL數(shù)據(jù)庫服務(wù)的密碼。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中賬號(hào)的密碼。
table-name
StarRocks中的表名稱。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中創(chuàng)建的表名。本示例為runoob_tbl。
database-name
默認(rèn)的MySQL數(shù)據(jù)庫名稱。
填寫步驟一:準(zhǔn)備測試數(shù)據(jù)中創(chuàng)建的數(shù)據(jù)庫名。本示例為test_cdc。
在作業(yè)開發(fā)頁面的高級配置中,選擇vvr-6.0.3及以上的版本。
單擊上線。
在作業(yè)運(yùn)維頁面,單擊目標(biāo)作業(yè)操作列的啟動(dòng)。
步驟四:場景演示
查詢數(shù)據(jù)
使用SSH方式登錄StarRocks集群,詳情請參見登錄集群。
執(zhí)行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
在StarRocks連接窗口執(zhí)行以下命令,查看表數(shù)據(jù)。
use test_cdc; select * from runoob_tbl1;
返回信息如下,表示MySQL上的數(shù)據(jù)已同步至StarRocks。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
查詢插入后的數(shù)據(jù)
在RDS數(shù)據(jù)庫窗口執(zhí)行以下命令,插入數(shù)據(jù)。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1)
在StarRocks連接窗口執(zhí)行以下命令,查看表數(shù)據(jù)。
select * from runoob_tbl1;
返回信息如下,表示數(shù)據(jù)已成功插入。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步數(shù)據(jù)更新
在RDS數(shù)據(jù)庫窗口執(zhí)行以下命令,更新指定數(shù)據(jù)。
update runoob_tbl set runoob_title= 'new' where runoob_id = 18
在StarRocks連接窗口執(zhí)行以下命令,查看表數(shù)據(jù)。
select * from runoob_tbl1;
返回信息如下,表示數(shù)據(jù)已同步更新。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步數(shù)據(jù)刪除
在RDS數(shù)據(jù)庫窗口執(zhí)行以下命令,刪除指定數(shù)據(jù)。
DELETE FROM runoob_tbl WHERE runoob_id = 1
在StarRocks連接窗口執(zhí)行以下命令,查看表數(shù)據(jù)。
select * from runoob_tbl1;
返回信息如下,表示數(shù)據(jù)已同步刪除。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
增加可空列
在RDS數(shù)據(jù)庫窗口執(zhí)行以下命令,增加可空列。
alter table `runoob_tbl` add COLUMN `add_col2` INT;
執(zhí)行以下命令 ,插入數(shù)據(jù)。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)
在StarRocks連接窗口執(zhí)行以下命令,查看表數(shù)據(jù)。
select * from runoob_tbl1;
返回信息如下,表示Schema已經(jīng)成功變更。
+-----------+--------------+---------------+-----------------+---------+----------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | +-----------+--------------+---------------+-----------------+---------+----------+ | 18 | new | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+ | 1 | second | tom2 | 2022-06-23 | 1 | 2 | | 18 | first | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+
CDAS介紹
CDAS是CTAS的一個(gè)語法糖。通過CDAS語句,可以實(shí)現(xiàn)MySQL中的整庫同步,即生成一個(gè)Flink Job。Source是MySQL中的database,目標(biāo)表是StarRocks中對應(yīng)的多張表,同時(shí)可以使用including table語法,只選擇一個(gè)database中的部分表進(jìn)行CDAS操作。
與CTAS的執(zhí)行相同,需要在創(chuàng)建MySQL和StarRocks相應(yīng)的Catalog后,執(zhí)行CDAS語句。創(chuàng)建語法示例如下。
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:18030',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000' ,
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table 'tabl1','tbl2','tbl3'
/*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;