本文為您介紹如何將自建Flink集群上的Datastream JAR作業(yè),遷移至實(shí)時(shí)計(jì)算Flink全托管的JAR作業(yè)類型中。

背景信息

本文介紹的遷移場(chǎng)景如下圖所示。場(chǎng)景3

前提條件

  • 本地已安裝Maven 3.x。
  • 已在Maven資源中心下載了開(kāi)源JDBC Connector包,包括mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar
    重要 依賴文件的版本需要與Flink集群的引擎版本保持一致。
  • 已下載了代碼示例,下載的flink2vvp-main包中的文件說(shuō)明如下:
    • Table/SQL類型:TableJobKafka2Rds.java
    • Datastream類型:DataStreamJobKafka2Rds.java
  • 已搭建好了基礎(chǔ)環(huán)境,詳情請(qǐng)參見(jiàn)搭建基礎(chǔ)環(huán)境
  • 已構(gòu)建自建集群測(cè)試作業(yè)并跑通。詳情請(qǐng)參見(jiàn)構(gòu)建自建集群測(cè)試作業(yè)
    說(shuō)明 本文使用EMR-Flink作為用戶自建Flink集群來(lái)運(yùn)行遷移前的作業(yè),以此來(lái)對(duì)比驗(yàn)證自建Flink遷移后的作業(yè)運(yùn)行結(jié)果。如果您已經(jīng)有了自建Flink集群,可以忽略此步驟。

自建Flink遷移Flink全托管

  1. 在RDS控制臺(tái),創(chuàng)建自建集群的sink表rds_old_table1。
    1. 登錄RDS控制臺(tái)。
      登錄方式,請(qǐng)參見(jiàn)搭建基礎(chǔ)環(huán)境
    2. 單擊登錄數(shù)據(jù)庫(kù)
    3. 填寫(xiě)實(shí)例信息。
      實(shí)例信息
    4. 單擊登錄
    5. 單擊復(fù)制IP網(wǎng)段
    6. 將復(fù)制的IP網(wǎng)段添加到實(shí)例白名單中。
      詳情請(qǐng)參見(jiàn)設(shè)置IP白名單
    7. 將以下創(chuàng)建表的命令復(fù)制到SQL執(zhí)行窗口,創(chuàng)建新表rds_new_table3。
      CREATE TABLE `rds_new_table3` ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
        PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
    8. 單擊執(zhí)行(F8)
  2. 在Maven中修改以下配置信息并打包。
    1. 在IntelliJ IDEA中,選擇File > Open,打開(kāi)下載并解壓縮完成的flink2vvp包。
    2. 雙擊打開(kāi)DataStreamJobKafka2Rds
    3. 修改Kafka和RDS的連接信息。
      配置信息
      類別參數(shù)說(shuō)明
      KafkaKAFKA_TOPICKafka Topic名稱。本示例Topic名稱為kafka-order。
      KAFKA_BOOT_SERVERSKafka Broker地址。

      格式為host:port,host:port,host:port,以英文逗號(hào)(,)分割。

      KAFKA_GROUP_IDKafka消費(fèi)組ID。本示例消費(fèi)組ID為demo-group_new3。
      說(shuō)明 為了避免消費(fèi)組ID沖突,您可以在Kafka控制臺(tái)上,創(chuàng)建新的消費(fèi)組,并在此處使用新的Kafka消費(fèi)組ID。
      RDSRDS_URLURL的格式為:jdbc:mysql://<內(nèi)網(wǎng)地址>/<databaseName>,其中<databaseName>為對(duì)應(yīng)的數(shù)據(jù)庫(kù)名稱。

      云數(shù)據(jù)庫(kù)RDS版專有網(wǎng)絡(luò)VPC地址,即內(nèi)網(wǎng)地址,詳情請(qǐng)?參見(jiàn)查看或修改內(nèi)外網(wǎng)地址和端口

      RDS_USER_NAME用戶名。
      RDS_PASSWORD密碼。
      RDS_TABLE表名稱。本示例填寫(xiě)為rds_new_table3。
    4. 使用mvn clean package命令構(gòu)建新JAR包。
      mvn命令
      構(gòu)建成功后可以在target目錄下找到相應(yīng)的JAR包。JAR包
  3. 在Flink全托管開(kāi)發(fā)控制臺(tái)上,新建Flink JAR流作業(yè)。
    1. 登錄實(shí)時(shí)計(jì)算Flink開(kāi)發(fā)控制臺(tái)。
      登錄方式,請(qǐng)參見(jiàn)搭建基礎(chǔ)環(huán)境
    2. 在左側(cè)導(dǎo)航欄,單擊作業(yè)開(kāi)發(fā)
    3. 單擊新建
    4. 新建文件對(duì)話框,填寫(xiě)作業(yè)配置信息。
      配置項(xiàng)說(shuō)明
      文件名稱示例為ds_kafka3rds。
      文件類型請(qǐng)選擇為流作業(yè)/JAR。
    5. 單擊確認(rèn)。
    6. 填寫(xiě)作業(yè)信息。
      配置項(xiàng)說(shuō)明
      JAR URI上傳剛編譯好的JAR包或者填寫(xiě)對(duì)應(yīng)的JAR信息。
      Entry Point Class指定主類名為com.alibaba.realtimecompute.DataStreamJobKafka2Rdsds
      并行度設(shè)置為2。
      附加依賴文件上傳已下載的mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar兩個(gè)JAR包。附加依賴文件
  4. 高級(jí)配置頁(yè)簽,配置JM和TM的資源量。
    JM和TM資源量
  5. 單擊保存
  6. 在頁(yè)面右上角,單擊上線,將作業(yè)提交至集群。
  7. 作業(yè)運(yùn)維頁(yè)面,單擊啟動(dòng)
    如果作業(yè)的狀態(tài)變?yōu)?span id="z68uejxpaoma" class="ph uicontrol" data-tag="uicontrol" id="uicontrol-m33-1el-aym">運(yùn)行中,則表示作業(yè)已正常運(yùn)行。
  8. 在RDS控制臺(tái)上,雙擊rds_new_table3表后,單擊執(zhí)行(F8),查詢對(duì)應(yīng)的計(jì)算結(jié)果。
    結(jié)果3
    說(shuō)明 如果上游Kafka有持續(xù)的流數(shù)據(jù),則5分鐘后即可到RDS控制臺(tái)上查詢到對(duì)應(yīng)的計(jì)算結(jié)果。

構(gòu)建自建集群測(cè)試作業(yè)

說(shuō)明 本文使用EMR-Flink作為用戶自建Flink集群來(lái)運(yùn)行遷移前的作業(yè),以此來(lái)對(duì)比驗(yàn)證自建Flink遷移后的作業(yè)運(yùn)行結(jié)果。如果您已經(jīng)有了自建Flink集群,可以忽略此步驟。
  1. 登錄RDS數(shù)據(jù)庫(kù)創(chuàng)建自建集群的sink表rds_old_table3。
    1. 登錄RDS數(shù)據(jù)庫(kù)。
      登錄方式,請(qǐng)參見(jiàn)搭建基礎(chǔ)環(huán)境
    2. 執(zhí)行以下創(chuàng)建rds_old_table3表的命令。
      CREATE TABLE `rds_old_table3` ( 
          `window_start` timestamp NOT NULL, 
          `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
          `order_type` varchar(8) NOT NULL,
          `order_number` bigint NULL,
          `order_value_sum` double NULL,
      PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
      table3
  2. 在Maven中修改以下配置信息并打包。
    1. 在IntelliJ IDEA中,選擇File > Open,打開(kāi)下載并解壓縮完成的flink2vvp-main包。
    2. 雙擊打開(kāi)DataStreamJobKafka2Rds
    3. 修改Kafka和RDS的連接信息。
      消費(fèi)組
      類別參數(shù)說(shuō)明
      KafkaKAFKA_TOPICKafka Topic名稱。本示例Topic名稱為kafka-order。
      KAFKA_BOOT_SERVERSKafka Broker地址。

      格式為host:port,host:port,host:port,以英文逗號(hào)(,)分割。

      KAFKA_GROUP_IDKafka消費(fèi)組ID。
      說(shuō)明 為了避免消費(fèi)組ID沖突,您可以在Kafka控制臺(tái)上,創(chuàng)建新的消費(fèi)組,并在此處使用新的Kafka消費(fèi)組ID。
      RDSRDS_URLURL的格式為:jdbc:mysql://<內(nèi)網(wǎng)地址>/<databaseName>,其中<databaseName>為對(duì)應(yīng)的數(shù)據(jù)庫(kù)名稱。

      云數(shù)據(jù)庫(kù)RDS版專有網(wǎng)絡(luò)VPC地址,即內(nèi)網(wǎng)地址,詳情請(qǐng)?參見(jiàn)查看或修改內(nèi)外網(wǎng)地址和端口

      RDS_USER_NAME用戶名。
      RDS_PASSWORD密碼。
      RDS_TABLE表名稱。本示例填寫(xiě)為rds_old_table3。
    4. 使用mvn clean package命令構(gòu)建新JAR包。
      mvn命令
      構(gòu)建成功后可以在target目錄下找到相應(yīng)的JAR包。JAR包
  3. 拷貝新構(gòu)建的JAR包到自建EMR-Flink集群。
    1. 鼠標(biāo)左鍵雙擊EMR-Flink圖標(biāo)。
    2. 查看publp信息,即自建flink master ip。
      emr IP
    3. 使用以下命令拷貝構(gòu)建的新JAR包到EMR-Flink集群。
      scp {jar包路徑} root@{自建flink master ip}:/
      拷貝包到集群

      上個(gè)步驟獲取的publp即為自建flink master ip。

  4. 連接EMR-Flink集群后,執(zhí)行以下命令運(yùn)行Flink作業(yè)。
    連接方式詳情請(qǐng)參見(jiàn) 連接方式概述 ECS遠(yuǎn)程連接操作指南
    cd /
    flink  run -d -t yarn-per-job -p 2 -ynm 'tablejobkafka2rds' -yjm 1024m -ytm 2048m -yD yarn.appmaster.vcores=1 -yD yarn.containers.vcores=1 -yD state.checkpoints.dir=hdfs:///flink/flink-checkpoints -yD execution.checkpointing.interval="180s" -c com.alibaba.realtimecompute.TableJobKafka2Rds ./flink2vvp-1.0-SNAPSHOT.jar
    運(yùn)行Flink作業(yè)
  5. 在DMS控制臺(tái),查看寫(xiě)入RDS的結(jié)果。
    oldtable3

    由上圖可見(jiàn),自建Flink作業(yè)正常消費(fèi)Kafka數(shù)據(jù)并成功寫(xiě)入RDS結(jié)果表。