日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

使用阿里云實時計算Flink實現MySQL至StarRocks的數據同步

更新時間:

本文為您介紹如何使用阿里云實時計算Flink的VVP平臺同步MySQL數據到E-MapReduce的StarRocks。

前提條件

使用限制

  • RDS MySQL須為5.7及以上版本。

  • 創建的VVP集群、StarRocks集群以及RDS MySQL實例需要在同一個VPC下,并且在同一個可用區下。

  • StarRocks集群須為EMR-3.42.0及以上版本。

  • Flink的引擎須為vvr-4.0.11-flink-1.13及以上版本。

注意事項

如果RDS的表有修改(ALTER TABLE),則MySQL修改后的Schema變更需要在StarRocks手動同步。如果RDS的表有新建,則MySQL新加的表需要重新運行StarRocks Migrate Tool以進行數據同步。

操作流程

  1. 步驟一:準備測試數據

  2. 步驟二:通過VVP創建自定義Connector

  3. 步驟三:通過VVP創建MySQL的Catalog

  4. 步驟四:通過VVP創建StarRocks結果表

  5. 步驟五:通過VVP啟動作業

  6. 步驟六:場景演示

步驟一:準備測試數據

  1. 創建測試的數據庫和賬號,詳情請參見創建數據庫和賬號

    創建完數據庫和賬號后,需要授權測試賬號的讀寫權限。

    說明

    本文創建的數據庫名稱為test_cdc,賬號為emr_test。

  2. 使用創建的測試賬號連接MySQL實例,詳情請參見通過DMS登錄RDS MySQL

  3. 執行以下命令,創建數據表。

    /*
       MySQL建表語句
    */
    CREATE TABLE test_cdc.`runoob_tbl` (
      `runoob_id` int unsigned NOT NULL AUTO_INCREMENT,
      `runoob_title` varchar(100) NOT NULL,
      `runoob_author` varchar(40) NOT NULL,
      `submission_date` date DEFAULT NULL,
      `add_col` int DEFAULT NULL,
      PRIMARY KEY (`runoob_id`)
    ) ENGINE=InnoDB
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. 在RDS控制臺的數據安全性頁面設置Flink網段的白名單,詳情請參見通過客戶端、命令行連接RDS MySQL實例中的步驟2。

    您可以在實時計算管理控制臺,單擊目標工作空間操作列下的更多 > 工作空間詳情查看Flink網段。

  5. 使用SSH方式登錄StarRocks集群,詳情請參見登錄集群

  6. 執行以下,連接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  7. 執行以下命令,創建用戶、授權和建表。

    /*
       StarRocks建表語句
    */
    CREATE USER 'test' IDENTIFIED by '123456';
    
    CREATE DATABASE test_cdc;
    
    GRANT ALL on test_cdc to test;
    
    use test_cdc;
    
    CREATE TABLE `runoob_tbl1` (
      `runoob_id` bigint(20) NOT NULL COMMENT "",
      `runoob_title` varchar(100) NOT NULL COMMENT "",
      `runoob_author` varchar(40) NOT NULL COMMENT "",
      `submission_date` date NULL COMMENT "",
      `add_col` int(11) NULL COMMENT ""
    ) ENGINE=OLAP
    PRIMARY KEY(`runoob_id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`runoob_id`) BUCKETS 8;

步驟二:通過VVP創建自定義Connector

重要

vvr-6.0.3-flink-1.15及以上版本可以直接跳過此步驟,使用內置的StarRocks-Connector。

  1. 登錄實時計算管理控制臺

  2. 在實時計算控制臺,單擊目標工作空間操作列下的控制臺。

  3. 在左側導航欄,選擇應用 > 作業開發

  4. 創建Connector。

    1. 作業開發頁面,單擊Connectors頁簽。

    2. 選擇引擎版本。

      重要

      引擎須為vvr-4.0.11-flink-1.13及以上版本。

    3. 單擊Connectors所在行的add圖標。

    4. 創建Connector對話框中,選擇flink-connector-starrocks-1.2.3_flink-1.13_2.11.jar文件,單擊繼續

    5. Formats下拉列表中選擇jsoncsv,單擊完成

      其余參數使用默認值即可。創建完成后自定義的Connector會出現在Connectors列表中。

步驟三:通過VVP創建MySQL的Catalog

  1. 在實時計算控制臺的作業開發頁面,單擊新建

  2. 在新建文件對話框中,輸入文件名稱文件類型使用默認的SQL類型,單擊確認

  3. 在文本編輯區域,輸入配置MySQL Catalog的命令。

    CREATE CATALOG mysql WITH (
      'type' = 'mysql',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = '******',
      'default-database' = 'test_cdc'
    );

    參數

    說明

    type

    類型,固定值為mysql。

    hostname

    RDS的內網地址。您可以在RDS的數據庫連接頁面,單擊內網地址進行復制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。

    port

    MySQL數據庫服務的端口號,默認值為3306。

    username

    MySQL數據庫服務的用戶名。

    填寫步驟一:準備測試數據中賬號的用戶名。本示例為emr_test。

    password

    MySQL數據庫服務的密碼。

    填寫步驟一:準備測試數據中賬號的密碼。

    default-database

    默認的MySQL數據庫名稱。

    填寫步驟一:準備測試數據中創建的數據庫名。本示例為test_cdc。

  4. 單擊驗證,進行語法檢查。

  5. 驗證通過后,單擊上方的執行

    執行完會提示Query has been executed。如果執行失敗,請仔細檢查各參數是否填寫正確。

  6. 在左側,單擊Schemas頁簽。

  7. 單擊refresh圖標,刷新查看新建的MySQL Catalog。

步驟四:通過VVP創建StarRocks結果表

  1. 在實時計算控制臺的作業開發頁面,單擊新建

  2. 在新建文件對話框中,輸入文件名稱文件類型使用默認的SQL類型,單擊確認

  3. 拷貝以下作業代碼到作業文本編輯區。

    CREATE TEMPORARY TABLE sr_result (
        runoob_id BIGINT,
        runoob_title VARCHAR,
        runoob_author VARCHAr,
        submission_date date,
        add_col int,
        PRIMARY KEY (runoob_id) NOT ENFORCED
      )
    with (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030',
        'load-url' = '192.168.**.**:8030',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl1',
        'username' = 'emr_test',
        'password' = '******',
        'sink.buffer-flush.interval-ms' = '5000',
        'sink.properties.row_delimiter' = '\x02',
        'sink.properties.column_separator' = '\x01'
      );
    
    INSERT INTO sr_result
    SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col
    from
      mysql.test_cdc.`runoob_tbl`;

    參數

    說明

    connector

    固定值為starrocks。

    jdbc-url

    用于在StarRocks中執行查詢操作。

    例如,jdbc:mysql://10.0.**.**:9030。其中,10.0.**.**為StarRocks集群的內網IP地址。

    load-url

    指定FE的IP地址和HTTP端口,格式為StarRocks集群的內網IP地址:端口。本文以8030端口為例,實際請根據您的集群版本選擇訪問的端口:

    • 18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。

    • 8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。

    說明

    訪問端口詳情,請參見UI和端口

    datdatabase-name

    StarRocks中的數據庫名稱。

    填寫步驟一:準備測試數據中創建的數據庫名。本示例為test_cdc。

    table-name

    StarRocks中的表名稱。

    填寫步驟一:準備測試數據中創建的表名。本示例為runoob_tbl1。

    username

    StarRocks的用戶名。

    填寫步驟一:準備測試數據中創建的用戶名。本示例為test。

    password

    StarRocks的密碼。

    填寫步驟一:準備測試數據中設置的密碼。本示例為123456。

    sink.buffer-flush.interval-ms

    Buffer刷新時間間隔,取值范圍為1000 ms~3600000 ms。

    sink.properties.row_delimiter

    自定義行分隔符。

    sink.properties.column_separator

    自定義列分隔符。

    其中with選項的詳細信息,請參見StarRocks官網的使用flink-connector-starrocks導入至StarRocks

    重要
    • 如果sink.semantic設置為exactly-once,則需要配合checkpoint使用,且checkpoint周期不宜過長(數據只在一個checkpoint周期結束后才可見,checkpoint期間數據會存儲在flink內存中)。

    • 默認使用csv格式進行導入,您可以通過指定'sink.properties.row_delimiter' = '\\x02'(此參數自StarRocks-1.15.0 開始支持)與'sink.properties.column_separator' = '\\x01'來自定義行分隔符與列分隔符。

  4. 單擊驗證,進行語法檢查。

  5. 驗證通過后,單擊上線

步驟五:通過VVP啟動作業

  1. 在實時計算控制臺的左側導航欄中,單擊作業運維

  2. 作業運維頁面,單擊目標作業名稱操作列中的啟動

  3. 在彈出的對話框中,單擊啟動

    直到狀態變為運行中,則代表作業運行正常,您可以導入數據。

步驟六:場景演示

查詢數據

  1. 使用SSH方式登錄StarRocks集群,詳情請參見登錄集群

  2. 執行以下,連接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  3. 在StarRocks連接窗口執行以下命令,查看表數據。

    use test_cdc;
    select * from runoob_tbl1;

    返回信息如下,表示MySQL上的數據已同步至StarRocks。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

查詢插入后的數據

  1. 在RDS數據庫窗口執行以下命令,插入數據。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. 在StarRocks連接窗口執行以下命令,查看表數據。

    select * from runoob_tbl1;

    返回信息如下,表示數據已成功插入。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步數據更新

  1. 在RDS數據庫窗口執行以下命令,更新指定數據。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. 在StarRocks連接窗口執行以下命令,查看表數據。

    select * from runoob_tbl1;

    返回信息如下,表示數據已同步更新。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步數據刪除

  1. 在RDS數據庫窗口執行以下命令,刪除指定數據。

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. 在StarRocks連接窗口執行以下命令,查看表數據。

    select * from runoob_tbl1;

    返回信息如下,表示數據已同步刪除。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Flink與StarRocks數據類型映射關系

Flink數據類型

StarRocks數據類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

BINARY

INT

CHAR

STRING

VARCHAR

STRING

STRING

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE(N)

DATETIME

TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)

DATETIME

ARRAY\<T>

ARRAY\<T>

MAP\<KT,VT>

JSON STRING

ROW\<arg T...>

JSON STRING

常見問題

  • Q:導入StarRocks的數據存在時區不一致問題該如何處理?

  • A:您可以在Insert into語句中以hint語法增加時區配置來解決該問題,示例如下。

    INSERT INTO sr_result
    SELECT runoob_id, runoob_title, runoob_author, submission_date, add_col
    from
     mysql.test_cdc.`runoob_tbl` /*+
    OPTIONS('server-time-zone'='Asia/Shanghai') */;