本文為您介紹如何使用阿里云實時計算Flink的VVP平臺同步MySQL數據到E-MapReduce的StarRocks。
前提條件
已開通阿里云實時計算Flink全托管,詳情請參見開通Flink全托管。
已創建StarRocks集群,詳情請參見創建StarRocks集群。
說明Core實例數量設置為3。
已創建RDS MySQL,詳情請參見創建RDS MySQL實例。
說明本文以MySQL 5.7版本為例介紹。
使用限制
RDS MySQL須為5.7及以上版本。
創建的VVP集群、StarRocks集群以及RDS MySQL實例需要在同一個VPC下,并且在同一個可用區下。
StarRocks集群須為EMR-3.42.0及以上版本。
Flink的引擎須為vvr-4.0.11-flink-1.13及以上版本。
注意事項
如果RDS的表有修改(ALTER TABLE
),則MySQL修改后的Schema變更需要在StarRocks手動同步。如果RDS的表有新建,則MySQL新加的表需要重新運行StarRocks Migrate Tool以進行數據同步。
操作流程
步驟一:準備測試數據
創建測試的數據庫和賬號,詳情請參見創建數據庫和賬號。
創建完數據庫和賬號后,需要授權測試賬號的讀寫權限。
說明本文創建的數據庫名稱為test_cdc,賬號為emr_test。
使用創建的測試賬號連接MySQL實例,詳情請參見通過DMS登錄RDS MySQL。
執行以下命令,創建數據表。
/* MySQL建表語句 */ CREATE TABLE test_cdc.`runoob_tbl` ( `runoob_id` int unsigned NOT NULL AUTO_INCREMENT, `runoob_title` varchar(100) NOT NULL, `runoob_author` varchar(40) NOT NULL, `submission_date` date DEFAULT NULL, `add_col` int DEFAULT NULL, PRIMARY KEY (`runoob_id`) ) ENGINE=InnoDB INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
在RDS控制臺的數據安全性頁面設置Flink網段的白名單,詳情請參見通過客戶端、命令行連接RDS MySQL實例中的步驟2。
您可以在實時計算管理控制臺,單擊目標工作空間操作列下的
查看Flink網段。使用SSH方式登錄StarRocks集群,詳情請參見登錄集群。
執行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執行以下命令,創建用戶、授權和建表。
/* StarRocks建表語句 */ CREATE USER 'test' IDENTIFIED by '123456'; CREATE DATABASE test_cdc; GRANT ALL on test_cdc to test; use test_cdc; CREATE TABLE `runoob_tbl1` ( `runoob_id` bigint(20) NOT NULL COMMENT "", `runoob_title` varchar(100) NOT NULL COMMENT "", `runoob_author` varchar(40) NOT NULL COMMENT "", `submission_date` date NULL COMMENT "", `add_col` int(11) NULL COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`runoob_id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`runoob_id`) BUCKETS 8;
步驟二:通過VVP創建自定義Connector
vvr-6.0.3-flink-1.15及以上版本可以直接跳過此步驟,使用內置的StarRocks-Connector。
登錄實時計算管理控制臺。
在實時計算控制臺,單擊目標工作空間操作列下的控制臺。
在左側導航欄,選擇 。
創建Connector。
在作業開發頁面,單擊Connectors頁簽。
選擇引擎版本。
重要引擎須為vvr-4.0.11-flink-1.13及以上版本。
單擊Connectors所在行的圖標。
在創建Connector對話框中,選擇flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar文件,單擊繼續。
在Formats下拉列表中選擇json和csv,單擊完成。
其余參數使用默認值即可。創建完成后自定義的Connector會出現在Connectors列表中。
步驟三:通過VVP創建MySQL的Catalog
在實時計算控制臺的作業開發頁面,單擊新建。
在新建文件對話框中,輸入文件名稱,文件類型使用默認的SQL類型,單擊確認。
在文本編輯區域,輸入配置MySQL Catalog的命令。
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '******', 'default-database' = 'test_cdc' );
參數
說明
type
類型,固定值為mysql。
hostname
RDS的內網地址。您可以在RDS的數據庫連接頁面,單擊內網地址進行復制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。
port
MySQL數據庫服務的端口號,默認值為3306。
username
MySQL數據庫服務的用戶名。
填寫步驟一:準備測試數據中賬號的用戶名。本示例為emr_test。
password
MySQL數據庫服務的密碼。
填寫步驟一:準備測試數據中賬號的密碼。
default-database
默認的MySQL數據庫名稱。
填寫步驟一:準備測試數據中創建的數據庫名。本示例為test_cdc。
單擊驗證,進行語法檢查。
驗證通過后,單擊上方的執行。
執行完會提示Query has been executed。如果執行失敗,請仔細檢查各參數是否填寫正確。
在左側,單擊Schemas頁簽。
單擊圖標,刷新查看新建的MySQL Catalog。
步驟四:通過VVP創建StarRocks結果表
在實時計算控制臺的作業開發頁面,單擊新建。
在新建文件對話框中,輸入文件名稱,文件類型使用默認的SQL類型,單擊確認。
拷貝以下作業代碼到作業文本編輯區。
CREATE TEMPORARY TABLE sr_result ( runoob_id BIGINT, runoob_title VARCHAR, runoob_author VARCHAr, submission_date date, add_col int, PRIMARY KEY (runoob_id) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030', 'load-url' = '192.168.**.**:8030', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl1', 'username' = 'emr_test', 'password' = '******', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ); INSERT INTO sr_result SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col from mysql.test_cdc.`runoob_tbl`;
參數
說明
connector
固定值為starrocks。
jdbc-url
用于在StarRocks中執行查詢操作。
例如,jdbc:mysql://10.0.**.**:9030。其中,10.0.**.**為StarRocks集群的內網IP地址。
load-url
指定FE的IP地址和HTTP端口,格式為
StarRocks集群的內網IP地址:端口
。本文以8030端口為例,實際請根據您的集群版本選擇訪問的端口:18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。
8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。
說明訪問端口詳情,請參見UI和端口。
datdatabase-name
StarRocks中的數據庫名稱。
填寫步驟一:準備測試數據中創建的數據庫名。本示例為test_cdc。
table-name
StarRocks中的表名稱。
填寫步驟一:準備測試數據中創建的表名。本示例為runoob_tbl1。
username
StarRocks的用戶名。
填寫步驟一:準備測試數據中創建的用戶名。本示例為test。
password
StarRocks的密碼。
填寫步驟一:準備測試數據中設置的密碼。本示例為123456。
sink.buffer-flush.interval-ms
Buffer刷新時間間隔,取值范圍為1000 ms~3600000 ms。
sink.properties.row_delimiter
自定義行分隔符。
sink.properties.column_separator
自定義列分隔符。
其中
with
選項的詳細信息,請參見StarRocks官網的使用flink-connector-starrocks導入至StarRocks。重要如果sink.semantic設置為exactly-once,則需要配合checkpoint使用,且checkpoint周期不宜過長(數據只在一個checkpoint周期結束后才可見,checkpoint期間數據會存儲在flink內存中)。
默認使用csv格式進行導入,您可以通過指定
'sink.properties.row_delimiter' = '\\x02'
(此參數自StarRocks-1.15.0 開始支持)與'sink.properties.column_separator' = '\\x01'
來自定義行分隔符與列分隔符。
單擊驗證,進行語法檢查。
驗證通過后,單擊上線。
步驟五:通過VVP啟動作業
在實時計算控制臺的左側導航欄中,單擊作業運維。
在作業運維頁面,單擊目標作業名稱操作列中的啟動。
在彈出的對話框中,單擊啟動。
直到狀態變為運行中,則代表作業運行正常,您可以導入數據。
步驟六:場景演示
查詢數據
使用SSH方式登錄StarRocks集群,詳情請參見登錄集群。
執行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
在StarRocks連接窗口執行以下命令,查看表數據。
use test_cdc; select * from runoob_tbl1;
返回信息如下,表示MySQL上的數據已同步至StarRocks。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
查詢插入后的數據
在RDS數據庫窗口執行以下命令,插入數據。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1)
在StarRocks連接窗口執行以下命令,查看表數據。
select * from runoob_tbl1;
返回信息如下,表示數據已成功插入。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步數據更新
在RDS數據庫窗口執行以下命令,更新指定數據。
update runoob_tbl set runoob_title= 'new' where runoob_id = 18
在StarRocks連接窗口執行以下命令,查看表數據。
select * from runoob_tbl1;
返回信息如下,表示數據已同步更新。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
同步數據刪除
在RDS數據庫窗口執行以下命令,刪除指定數據。
DELETE FROM runoob_tbl WHERE runoob_id = 1
在StarRocks連接窗口執行以下命令,查看表數據。
select * from runoob_tbl1;
返回信息如下,表示數據已同步刪除。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Flink與StarRocks數據類型映射關系
Flink數據類型 | StarRocks數據類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY\<T> | ARRAY\<T> |
MAP\<KT,VT> | JSON STRING |
ROW\<arg T...> | JSON STRING |
常見問題
Q:導入StarRocks的數據存在時區不一致問題該如何處理?
A:您可以在Insert into語句中以hint語法增加時區配置來解決該問題,示例如下。
INSERT INTO sr_result SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col from mysql.test_cdc.`runoob_tbl` /*+ OPTIONS('server-time-zone'='Asia/Shanghai') */;