日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

基于DataFlow集群的Flink服務使用CTAS語句同步MySQL數據至StarRocks

本文為您介紹如何使用EMR DataFlow集群中的Flink服務,通過CTAS語句將MySQL數據同步至EMR Serverless StarRocks中。

背景信息

您可以通過CTAS或CDAS語句將MySQL數據同步至EMR Serverless StarRocks,CTAS可以實現單表的結構和數據同步,CDAS可以實現整庫同步或者同一庫中的多表結構和數據同步。本文使用CTAS語句,CDAS語句的使用方法與CTAS類似,具體請參見CDAS介紹

通過CTAS(CREATE TABLE AS)語句,您可以在StarRocks中自動創建和MySQL中表結構一致的表,并進行數據同步。同時還能實時同步上游表結構(Schema)的變更到下游表,提高您在目標存儲中創建表和維護源表結構變更的效率。

當執行CTAS語句時,Flink會按照以下流程執行:

  1. 檢查目標存儲中是否存在該目標表。

    • 如果不存在,則通過目標端Catalog在目標存儲中創建相應的目標表,該目標表具有和數據源相同的Schema。

    • 如果存在,則跳過建表。如果已存在的目標表與源表Schema不一致,則會報錯提示。

  2. 提交和啟動相應的數據同步作業。同步數據源的數據以及Schema的變更到目標表中。

表結構變更同步策略通過CTAS語句,在實時同步數據的同時,還能同步源表Schema的變更到目標表中。

Schema變更包括初始表的創建以及未來表的變更。

  • 當前支持同步的Schema變更:

    • 添加可空列:會自動在目標表Schema末尾添加對應的列,并自動同步新增列的數據。

    • 刪除可空列:不會直接在目標表中刪除該列,而是將該列的數據自動填充為NULL值。

    • 重命名列:直接在目標表中末尾添加重命名后的列,并將重命名前的列數據自動填充為NULL值。

      例如,如果col_a重命名為col_b,則會在目標表末尾添加col_b,并自動將col_a的數據填充為NULL值。

  • 暫不支持同步的Schema變更:

    • 數據類型的變更。

      例如,由VARCHAR變為BIGINT,由NOT NULL變為NULLABLE屬性。

    • 主鍵或索引等約束的變更。

    • 非空列的增加或刪除的變更。

說明
  • 如果遇到不支持的Schema變更,則需要您手動刪除下游目標表,重新啟動CTAS作業,即重新創建目標表并重新同步歷史數據。
  • CTAS不會識別具體的DDL類型,而是對比前后兩條數據的Schema差異。因此,如果您先刪除了某列后,又加回了該列,且這兩個DDL之間無數據變化,則CTAS會認為沒有發生結構變更。同理,如果您添加了一列,直到該表有數據變化,CTAS才會感知到結構變更,才會同步結構變更到目標表。
  • 通過CTAS建表支持的字段類型信息,請參見Flink與StarRocks的數據類型映射關系

前提條件

說明

本文以5.7版本的MySQL、EMR-3.42.0版本的DataFlow集群為例介紹。

使用限制

  • DataFlow集群、EMR Serverless StarRocks實例和RDS MySQL實例需要在同一個VPC下。

  • DataFlow集群和EMR Serverless StarRocks實例均須開啟公網訪問。

  • RDS MySQL須為5.7及以上版本。

  • DataFlow集群須為EMR-3.42.0及后續版本或EMR-5.8.0及后續版本。

步驟一:準備測試數據

  1. 創建測試的數據庫和賬號,詳情請參見創建數據庫和賬號

    創建完數據庫和賬號后,需要授權測試賬號的讀寫權限。

    說明

    本文創建的數據庫名稱為test_cdc,賬號為emr_test。

  2. 使用創建的測試賬號連接MySQL實例,詳情請參見通過DMS登錄RDS MySQL

  3. 執行以下命令,創建數據表。

    use test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. 登錄并連接EMR Serverless StarRocks實例,詳情請參見通過客戶端方式連接StarRocks實例

  5. 執行以下命令,創建數據庫test_cdc、創建超級管理員用戶test(示例密碼為1qaz!QAZ)或者創建普通用戶test并給普通用戶授予該數據庫權限,詳情請參見管理用戶

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED by '1qaz!QAZ';
    GRANT ALL on test_cdc to test;

步驟二:上傳自定義Connector

上傳自定義的Connector用于Flink、StarRocks和RDS MySQL連接。

  1. 使用SSH方式登錄DataFlow集群,詳情請參見登錄集群

  2. 下載flink-connector-starrocks-1.2.2_flink-1.13_2.11.jarververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar,并上傳到DataFlow集群的/opt/apps/FLINK/flink-current/lib目錄下。

步驟三:執行CTAS操作

  1. 通過Session模式提交作業。

    1. 使用SSH方式登錄DataFlow集群,詳情請參見登錄集群

    2. 執行以下命令,進入/opt/apps/FLINK/flink-current目錄。

      cd /opt/apps/FLINK/flink-current
    3. 執行以下命令,啟動YARN Session。

      ./bin/yarn-session.sh --detached

      執行成功后,返回信息中的application_XXXX_YY,即為登錄SQL客戶端需要用到的sessionId。sessionid

    4. 執行以下命令,打開SQL客戶端。

      ./bin/sql-client.sh -s <application_XXXX_YY>
      說明

      請修改<application_XXXX_YY>為您前一步獲取到的sessionId。

  2. 創建MySQL和StarRocks的Catalog。

    CREATE CATALOG sr WITH (
      'type' = 'starrocks',
      'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'username' = 'test',
      'password' = '1qaz!QAZ',
      'dbname' = 'test_cdc'
    );
    
    CREATE CATALOG mysql WITH (
      'type' = 'mysql',
      'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = '123456',
      'default-database' = 'test_cdc'
    );

    請根據實際信息修改各參數值,各參數描述如下表所示。

    表 1. StarRocks Catalog參數

    參數

    描述

    type

    類型,固定值為starrocks。

    endpoint

    指定FE節點的內網地址和查詢端口,格式為EMR Serverless StarRocks實例FE節點的內網地址:9030。例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。

    說明

    關于如何獲取EMR Serverless StarRocks實例FE節點的內網地址,請參見查看實例列表與詳情

    username

    StarRocks的用戶名。

    填寫步驟一:準備測試數據中創建的用戶名。本示例為test。

    password

    StarRocks數據庫服務的密碼。

    填寫步驟一:準備測試數據中賬號設置的密碼。本示例為1qaz!QAZ。

    dbname

    StarRocks數據庫名稱。

    填寫步驟一:準備測試數據中創建的數據庫名。本示例為test_cdc。

    表 2. MySQL Catalog參數

    參數

    描述

    type

    類型,固定值為mysql。

    hostname

    RDS的內網地址。

    您可以在RDS的數據庫連接頁面,單擊內網地址進行復制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    port

    MySQL數據庫服務的端口號,默認值為3306。

    username

    MySQL數據庫服務的用戶名。

    填寫步驟一:準備測試數據中賬號的用戶名。本示例為emr_test。

    password

    MySQL數據庫服務的密碼。

    填寫步驟一:準備測試數據中賬號的密碼。本示例為123456。

    default-database

    默認的MySQL數據庫名稱。

    填寫步驟一:準備測試數據中創建的數據庫名。本示例為test_cdc。

  3. 在StarRocks的Catalog下,發送CTAS語句。

    您可以使用以下三種示例發送CTAS語句。

    • At Least Once語義:通過sink.buffer-flush.interval-ms配置項,配置每次寫入StarRocks的時間間隔,優點是寫入間隔時間短,占用內存較少。

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
    • Exactly once語義:需要定義checkpoint間隔,優點是在各種異常情況下保障數據不丟失不重復,缺點是數據可見時間取決于checkpoint間隔。更多信息,請參見Checkpointing

      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS ( 'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • Simple模式:優點是創建表時不需要關注原表有哪些字段,會按照MySQL的表格式照搬過來,開發者使用比較方便。缺點是不能創建分區,對于需要分區的表,仍需要通過normal模式創建。

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'='buckets 8',
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr_test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      

      表 3. WITH參數

      參數

      是否必選

      描述

      starrocks.create.table.properties

      StarRocks建表語句中除了字段定義以外的其他后綴定義,例如示例中的engine、key和buckets等。

      database-name

      StarRocks數據庫名稱。

      本示例為test_cdc。

      jdbc-url

      用于在StarRocks中執行查詢操作。

      例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com為EMR Serverless StarRocks實例FE節點的內網地址。

      說明

      關于如何獲取EMR Serverless StarRocks實例FE節點的內網地址,請參見查看實例列表與詳情

      load-url

      指定FE節點的內網地址和查詢端口,格式為EMR Serverless StarRocks實例FE節點的內網地址:8030

      例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

      說明

      關于如何獲取EMR Serverless StarRocks實例FE節點的內網地址,請參見查看實例列表與詳情

      sink.semantic

      填寫exactly-once可以保障數據一致性語義,默認為at-least-once。

      starrocks.create.table.mode

      支持以下參數值:

      • normal模式(默認值):必須像示例一樣在starrocks.create.table.properties配置中填寫engine、key和buckets等完整的配置。

      • simple模式:默認選擇engine為olap,選擇key類型為primary key,且主鍵與MySQL的主鍵保持完全一致,默認distributed by hash(所有的主鍵),默認無分區。需要在starrocks.create.table.properties配置中填寫的必填內容為buckets ,選填內容為properties等配置。

      sink.properties.row_delimiter

      自定義行分隔符。

      sink.properties.column_separator

      自定義列分隔符。

      說明
      • 因為vvr-6.0.5-flink-1.15及以上版本移除了sink.use.new-api,所以使用vvr-6.0.5-flink-1.15之前的版本時,請在with參數中添加'sink.use.new-api'='false',

      • 其他配置請參見從Apache Flink持續導入

      表 4. OPTIONS參數

      參數

      描述

      connector

      類型,固定值為mysql-cdc。

      hostname

      RDS的內網地址。

      您可以在RDS的數據庫連接頁面,單擊內網地址進行復制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

      port

      MySQL數據庫服務的端口號,默認值為3306。

      username

      MySQL數據庫服務的用戶名。

      填寫步驟一:準備測試數據中賬號的用戶名。本示例為emr_test。

      password

      MySQL數據庫服務的密碼。

      填寫步驟一:準備測試數據中賬號的密碼。

      table-name

      StarRocks中的表名稱。

      填寫步驟一:準備測試數據中創建的表名。本示例為runoob_tbl。

      database-name

      默認的MySQL數據庫名稱。

      填寫步驟一:準備測試數據中創建的數據庫名。本示例為test_cdc。

步驟四:驗證數據同步結果

說明

如果開啟了checkpoint,則最長等待時間大約是checkpoint的時間間隔。

查詢數據

  1. 登錄并連接EMR Serverless StarRocks實例,詳情請參見通過客戶端方式連接StarRocks實例

  2. 在StarRocks連接窗口執行以下命令,查看表數據。

    use test_cdc;
    select * from runoob_tbl1;

    返回信息如下,表示MySQL上的數據已同步至StarRocks。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

查詢插入后的數據

  1. 在RDS數據庫窗口執行以下命令,插入數據。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1);
  2. 在StarRocks連接窗口執行以下命令,查看表數據。

    select * from runoob_tbl1;

    返回信息如下,表示數據已成功插入。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步數據更新

  1. 在RDS數據庫窗口執行以下命令,更新指定數據。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18;
  2. 在StarRocks連接窗口執行以下命令,查看表數據。

    select * from runoob_tbl1;

    返回信息如下,表示數據已同步更新。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

同步數據刪除

  1. 在RDS數據庫窗口執行以下命令,刪除指定數據。

    DELETE FROM runoob_tbl WHERE runoob_id = 1;
  2. 在StarRocks連接窗口執行以下命令,查看表數據。

    select * from runoob_tbl1;

    返回信息如下,表示數據已同步刪除。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

增加可空列

  1. 在RDS數據庫窗口執行以下命令,增加可空列。

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. 執行以下命令 ,插入數據。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. 在StarRocks連接窗口執行以下命令,查看表數據。

    select * from runoob_tbl1;

    返回信息如下,表示Schema已經成功變更。

    +-----------+--------------+---------------+-----------------+---------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 |
    +-----------+--------------+---------------+-----------------+---------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |       2 |
    |        18 | new          | tom           | 2022-06-22      |       3 |    NULL |
    +-----------+--------------+---------------+-----------------+---------+---------+

CDAS介紹

CDAS是CTAS的一個語法糖。通過CDAS語句,可以實現MySQL中的整庫同步,即生成一個Flink Job,源表是MySQL中的Database,目標表是StarRocks中對應的多張表,同時可以使用including table語法,只選擇一個Database中的部分表進行CDAS操作。

與CTAS的執行相同,需要在創建MySQL和StarRocks相應的Catalog后,執行CDAS語句。創建語法示例如下。

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username'='test',
'password' = '1qaz!QAZ',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
 as DATABASEmysql.test_cdc including table
 'tabl1','tbl2','tbl3'   /*+ OPTIONS (   'connector' = 'mysql-cdc',
  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'test',
  'password' = '123456',
  'database-name' = 'test_cdc' )*/;