日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

數(shù)倉(cāng)場(chǎng)景:分鐘級(jí)準(zhǔn)實(shí)時(shí)分析

本文通過示例為您介紹如何基于StarRocks構(gòu)建分鐘級(jí)準(zhǔn)實(shí)時(shí)分析。

前提條件

說明

本文示例中DataFlow集群為EMR-3.40.0版本、StarRocks集群為EMR-5.6.0版本,MySQL為5.7版本。

使用限制

  • DataFlow集群、StarRocks集群和RDS MySQL實(shí)例需要在同一個(gè)VPC下,并且在同一個(gè)可用區(qū)下。

  • DataFlow集群和StarRocks集群均須開啟公網(wǎng)訪問。

  • RDS MySQL為5.7及以上版本。

場(chǎng)景介紹

該場(chǎng)景與數(shù)倉(cāng)場(chǎng)景:即席查詢構(gòu)建數(shù)倉(cāng)的邏輯基本一致,都是直接在StarRocks中進(jìn)行數(shù)倉(cāng)分層建模,區(qū)別在于分鐘級(jí)準(zhǔn)實(shí)時(shí)場(chǎng)景將即席查詢場(chǎng)景中的視圖部分物化成了表,因此具有更高的計(jì)算效率,可以支撐更高的QPS查詢。

方案架構(gòu)

分鐘級(jí)準(zhǔn)實(shí)時(shí)場(chǎng)景的基本架構(gòu)如下圖所示。Kafka-StarRocks

整體數(shù)據(jù)流如下:

  1. Flink清洗導(dǎo)入Kafka的日志或者通過Flink-CDC-StarRocks工具讀取MySQL Binlog導(dǎo)入StarRocks,根據(jù)需要選用明細(xì)、聚合、更新、主鍵各種模型,只物理落地ODS層。

  2. 利用第三方任務(wù)調(diào)度器(例如Airflow)將各層數(shù)據(jù)表按血緣關(guān)系進(jìn)行任務(wù)編排,再按具體的分鐘間隔作為一個(gè)微批粒度進(jìn)行任務(wù)調(diào)度,依次構(gòu)建ODS之上的各層數(shù)據(jù)表。

方案特點(diǎn)

該方案主要特點(diǎn)是:計(jì)算邏輯在StarRocks側(cè),適用于高頻查詢場(chǎng)景,各層數(shù)據(jù)表按具體的分鐘間隔時(shí)間作為微批粒度的數(shù)據(jù)同步。

  • 將操作層(ODS層)的數(shù)據(jù)經(jīng)過簡(jiǎn)單的清理、關(guān)聯(lián),然后存儲(chǔ)到明細(xì)數(shù)據(jù),暫不做過多的二次加工匯總,明細(xì)數(shù)據(jù)直接寫入StarRocks。

  • DWD或DWS層為實(shí)際的物理表,可以通過DataWorks或Airflow等調(diào)度工具調(diào)度周期性寫入數(shù)據(jù)。

  • StarRocks通過表的形式直接對(duì)接上層應(yīng)用,實(shí)現(xiàn)應(yīng)用實(shí)時(shí)查詢。

  • 前端實(shí)時(shí)請(qǐng)求實(shí)際的物理表,數(shù)據(jù)的實(shí)時(shí)性依賴DataWorks或Airflow調(diào)度周期配置,例如5分鐘調(diào)度、10分鐘調(diào)度等。

方案優(yōu)勢(shì)

  • 查詢性能強(qiáng),上層應(yīng)用只查詢最后匯總的數(shù)據(jù),相比View,查詢的數(shù)據(jù)量更大,性能會(huì)更強(qiáng)。

  • 數(shù)據(jù)重刷快,當(dāng)某一個(gè)環(huán)節(jié)或者數(shù)據(jù)有錯(cuò)誤時(shí),重新運(yùn)行DataWorks或Airflow調(diào)度任務(wù)即可。因?yàn)樗械倪壿嫸际枪袒玫?,無需復(fù)雜的訂正鏈路操作。

  • 業(yè)務(wù)邏輯調(diào)整快,當(dāng)需要新增或者調(diào)整各層業(yè)務(wù),可以基于SQL所見即所得開發(fā)對(duì)應(yīng)的業(yè)務(wù)場(chǎng)景,業(yè)務(wù)上線周期縮短。

方案缺點(diǎn)

因?yàn)橐肓烁嗟募庸ず驼{(diào)度,所以時(shí)效性低于即席查詢場(chǎng)景。

適用場(chǎng)景

數(shù)據(jù)來源于數(shù)據(jù)庫(kù)和埋點(diǎn)系統(tǒng),對(duì)QPS和實(shí)時(shí)性均有要求,適合80%實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景使用,能滿足大部分業(yè)務(wù)場(chǎng)景需求。

操作流程

示例操作如下:

  1. 步驟一:創(chuàng)建MySQL源數(shù)據(jù)表

  2. 步驟二:創(chuàng)建StarRocks表

  3. 步驟三:同步RDS中的源數(shù)據(jù)到StarRocks的ODS表

  4. 步驟四:通過任務(wù)調(diào)度器,編排各數(shù)據(jù)層的微批同步任務(wù)

  5. 步驟五:查看數(shù)據(jù)庫(kù)和表信息

  6. 步驟六:場(chǎng)景演示,查詢插入后的數(shù)據(jù)

步驟一:創(chuàng)建MySQL源數(shù)據(jù)表

  1. 創(chuàng)建測(cè)試的數(shù)據(jù)庫(kù)和賬號(hào),具體操作請(qǐng)參見創(chuàng)建數(shù)據(jù)庫(kù)和賬號(hào)。

    創(chuàng)建完數(shù)據(jù)庫(kù)和賬號(hào)后,需要授權(quán)測(cè)試賬號(hào)的讀寫權(quán)限。

    說明

    本文示例中創(chuàng)建的數(shù)據(jù)庫(kù)名稱為flink_cdc,賬號(hào)為emr_test。

  2. 使用創(chuàng)建的測(cè)試賬號(hào)連接MySQL實(shí)例,具體操作請(qǐng)參見通過DMS登錄RDS MySQL

  3. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表orders和customers。

    • 創(chuàng)建orders表。

      CREATE TABLE flink_cdc.orders (
         order_id INT NOT NULL AUTO_INCREMENT,
         order_revenue FLOAT NOT NULL,
         order_region VARCHAR(40) NOT NULL,
         customer_id INT NOT NULL,
         PRIMARY KEY ( order_id )
      );
    • 創(chuàng)建customers表。

      CREATE TABLE flink_cdc.customers (
         customer_id INT NOT NULL,
         customer_age INT NOT NULL,
         customer_name VARCHAR(40) NOT NULL,
         PRIMARY KEY ( customer_id )
      );

步驟二:創(chuàng)建StarRocks表

  1. 使用SSH方式登錄StarRocks集群,具體操作請(qǐng)參見登錄集群

  2. 執(zhí)行以下命令,連接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  3. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)庫(kù)。

    CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  4. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表customers和orders。

    • 創(chuàng)建customers表

      CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
        `timestamp` DateTime NOT NULL COMMENT "",
        `customer_id` INT NOT NULL  COMMENT "",
        `customer_age` FLOAT NOT NULL  COMMENT "",
        `customer_name` STRING NOT NULL  COMMENT ""
      ) ENGINE=olap
      PRIMARY KEY(`timestamp`, `customer_id`)
      COMMENT ""
      DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
      PROPERTIES (
        "replication_num" = "1"
      );
    • 創(chuàng)建orders表

      CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
        `timestamp` DateTime NOT NULL COMMENT "",
        `order_id` INT NOT NULL  COMMENT "",
        `order_revenue` FLOAT NOT NULL  COMMENT "",
        `order_region` STRING NOT NULL  COMMENT "",
        `customer_id` INT NOT NULL  COMMENT ""
      ) ENGINE=olap
      PRIMARY KEY(`timestamp`, `order_id`)
      COMMENT ""
      DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
      PROPERTIES (
        "replication_num" = "1"
      );
  5. 執(zhí)行以下命令,創(chuàng)建DWD表。

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`(
      `timestamp` DateTime NOT NULL COMMENT "",
      `order_id` INT NOT NULL  COMMENT "",
      `order_revenue` FLOAT NOT NULL  COMMENT "",
      `order_region` STRING NOT NULL  COMMENT "",
      `customer_id` INT NOT NULL  COMMENT "",
      `customer_age` FLOAT NOT NULL  COMMENT "",
      `customer_name` STRING NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`timestamp`, `order_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  6. 執(zhí)行以下命令,創(chuàng)建DWS表。

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` (
      `timestamp` DateTime NOT NULL COMMENT "",
      `order_region` STRING NOT NULL  COMMENT "",
      `order_cnt` INT NOT NULL  COMMENT "",
      `order_total_revenue` INT NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`timestamp`, `order_region`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_region`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );

步驟三:同步RDS中的源數(shù)據(jù)到StarRocks的ODS表

  1. 下載Flink CDC connectorFlink StarRocks Connector,并上傳至DataFlow集群的/opt/apps/FLINK/flink-current/lib目錄下。

  2. 拷貝DataFlow集群的/opt/apps/FLINK/flink-current/opt/connectors/kafka目錄下的JAR包至/opt/apps/FLINK/flink-current/lib目錄下。

  3. 使用SSH方式登錄DataFlow集群,具體操作請(qǐng)參見登錄集群。

  4. 執(zhí)行以下命令,啟動(dòng)集群。

    重要

    本文示例僅供測(cè)試,如果是生產(chǎn)級(jí)別的Flink作業(yè)請(qǐng)使用YARN或Kubernetes方式提交,詳情請(qǐng)參見Apache Hadoop YARNNative Kubernetes。

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. 編寫Flink SQL作業(yè),并保存為demo.sql

    執(zhí)行以下命令,編輯demo.sql文件。

    vim demo.sql

    文件內(nèi)容如下所示。

    CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`;
    
    -- create source tables
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`(
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`) NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'orders'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`) NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` (
      `timestamp` TIMESTAMP NOT NULL,
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`timestamp`,`order_id`)
     NOT ENFORCED
    ) with (
      'connector' = 'starrocks',
      'database-name' = 'flink_cdc',
      'table-name' = 'orders',
      'username' = 'root',
      'password' = '',
      'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030',
      'load-url' = '192.168.**.**:8030',
      'sink.properties.format' = 'json',
      'sink.properties.strip_outer_array' = 'true',
      'sink.buffer-flush.interval-ms' = '15000'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` (
      `timestamp` TIMESTAMP NOT NULL,
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`timestamp`,`customer_id`)
     NOT ENFORCED
    ) with (
      'connector' = 'starrocks',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers',
      'username' = 'root',
      'password' = '',
      'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030',
      'load-url' = '192.168.**.**:8030',
      'sink.properties.format' = 'json',
      'sink.properties.strip_outer_array' = 'true',
      'sink.buffer-flush.interval-ms' = '15000'
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink`
    SELECT
      LOCALTIMESTAMP,
      order_id,
      order_revenue,
      order_region,
      customer_id
    FROM `default_catalog`.`flink_cdc`.`orders_src`;
    
    INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink`
    SELECT
      LOCALTIMESTAMP,
      customer_id,
      customer_age,
      customer_name
    FROM `default_catalog`.`flink_cdc`.`customers_src`;
    
    END;

    涉及參數(shù)如下所示:

    • 創(chuàng)建數(shù)據(jù)表orders_src和customers_src。

      參數(shù)

      描述

      connector

      固定值為mysql-cdc。

      hostname

      RDS的內(nèi)網(wǎng)地址。

      您可以在RDS的數(shù)據(jù)庫(kù)連接頁(yè)面,單擊內(nèi)網(wǎng)地址進(jìn)行復(fù)制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。

      port

      固定值為3306。

      username

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號(hào)名。本示例為emr_test。

      password

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號(hào)的密碼。本示例為Yz12****。

      database-name

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫(kù)名。本示例為flink_cdc。

      table-name

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表。

      • orders_src:本示例為orders。

      • customers_src:本示例為customers。

    • 創(chuàng)建數(shù)據(jù)表orders_sink和customers_sink。

      參數(shù)

      描述

      connector

      固定值為starrocks。

      database-name

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫(kù)名。本示例為flink_cdc。

      table-name

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表。

      • orders_sink:本示例為orders。

      • customers_sink:本示例為customers。

      username

      StarRocks連接用戶名。固定值為root。

      password

      不填寫。

      jdbc-url

      用于在StarRocks中執(zhí)行查詢操作。

      例如,jdbc:mysql://10.0.**.**:9030。其中,10.0.**.**為StarRocks集群的內(nèi)網(wǎng)IP地址。

      load-url

      指定FE的IP地址和HTTP端口,格式為StarRocks集群的內(nèi)網(wǎng)IP地址:端口。本文以8030端口為例,實(shí)際請(qǐng)根據(jù)您的集群版本選擇訪問的端口:

      • 18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。

      • 8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。

      說明

      訪問端口詳情,請(qǐng)參見UI和端口。

  6. 執(zhí)行以下命令,啟動(dòng)Flink任務(wù)。

     /opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql

步驟四:通過任務(wù)調(diào)度器,編排各數(shù)據(jù)層的微批同步任務(wù)

將以下兩個(gè)Job以10分鐘為一次間隔,編排成定時(shí)任務(wù)。

  • Job 1

    -- ODS to DWD
    INSERT INTO dwd_order_customer_valid
    SELECT
    '{start_time}',
    o.order_id,
    o.order_revenue,
    o.order_region,
    c.customer_id,
    c.customer_age,
    c.customer_name
    FROM customers c JOIN orders o ON c.customer_id=o.customer_id
    WHERE o.timestamp >= '{start_time}' AND o.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE) AND
    c.timestamp >= '{start_time}' AND c.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE);
  • Job 2

    -- DWD to DWS
    INSERT INTO dws_agg_by_region
    SELECT
    '{start_time}',
    order_region,
    count(*) AS order_cnt,
    sum(order_revenue) AS order_total_revenue
    FROM dwd_order_customer_valid
    WHERE timestamp >= '{start_time}' AND timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE)
    GROUP BY timestamp, order_region;

本示例使用EMR Studio作為任務(wù)調(diào)度器,您也可以使用自己的任務(wù)編排方案。

  1. 為EMR Studio集群添加用戶,詳情請(qǐng)參見添加用戶

  2. 為添加的用戶授權(quán)。

    1. 使用SSH方式登錄EMR Studio集群,具體操作請(qǐng)參見登錄集群。

    2. 執(zhí)行以下命令,授權(quán)添加的用戶為AirFlow的Admin Role。

      source /usr/lib/airflow-current/bin/activate
      airflow users add-role -r Admin -u <user>
      說明

      示例中的<user>為您上一步驟中添加的用戶名稱。

  3. 進(jìn)入數(shù)據(jù)開發(fā)控制臺(tái)。

    在EMR Studio集群的訪問鏈接與端口頁(yè)面,單擊Studio Workspace UI所在行的鏈接。

    輸入步驟1中添加的用戶名和密碼,即可正常訪問Web UI頁(yè)面。

  4. 創(chuàng)建Airflow的Connection。

    1. 在左側(cè)導(dǎo)航欄中,單擊Airflow

    2. Airflow頁(yè)面,選擇上方的Admin > Connections。

    3. 單擊add Connections圖標(biāo)。

    4. 在Add Connection頁(yè)面,配置相關(guān)參數(shù)。

      Connection

      參數(shù)

      描述

      Connection Id

      Connection名稱,您可以自定義。本示例為starrocks_conn。

      Connection Type

      選擇MySQL。

      Host

      StarRocks集群的內(nèi)網(wǎng)IP地址。

      Login

      固定值為root。

      Port

      固定值為9030。

  5. 創(chuàng)建Zeppelin的Note。

    1. 在左側(cè)導(dǎo)航欄中,單擊Zeppelin

    2. Zeppelin頁(yè)面,單擊Create New Note。

    3. Create New Note對(duì)話框中,輸入Note Name,在Default Interpreter下拉框中,選擇airflow

      Note

    4. 編寫Airflow DAG腳本。

      配置starrocks_demo_dag腳本,示例代碼如下。

      %airflow.push_dag
      from airflow import DAG
      from datetime import datetime, timedelta
      from airflow.operators.mysql_operator import MySqlOperator
      
      default_args = {
          'owner': 'airflow',
          'depends_on_past': False,
          'start_date': datetime.now(),
          'retries': 0,
          'retry_delay': timedelta(minutes=5),
      }
      
      with DAG('starrocks_demo_dag',
               schedule_interval='*/10 * * * *',
               default_args=default_args) as dag:
      
          execution_time = "{{ ts }}"
      
          ods_to_dwd_sql = """
          INSERT INTO dwd_order_customer_valid
          SELECT
          '{start_time}',
          o.order_id,
          o.order_revenue,
          o.order_region,
          c.customer_id,
          c.customer_age,
          c.customer_name
          FROM customers c JOIN orders o ON c.customer_id=o.customer_id
          WHERE o.timestamp >= '{start_time}' AND o.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE) AND
          c.timestamp >= '{start_time}' AND c.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE)
          """.format(start_time=execution_time, interval_time=10)
      
          dwd_to_dws_sql = """
          INSERT INTO dws_agg_by_region
          SELECT
          '{start_time}',
          order_region,
          count(*) AS order_cnt,
          sum(order_revenue) AS order_total_revenue
          FROM dwd_order_customer_valid
          WHERE timestamp >= '{start_time}' AND timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE)
          GROUP BY timestamp, order_region;
          """.format(start_time=execution_time, interval_time=10)
      
          ods_to_dwd = MySqlOperator(
              task_id='ods_to_dwd',
              sql=ods_to_dwd_sql,
              mysql_conn_id='starrocks_conn',
              autocommit=True
          )
      
          dwd_to_dws = MySqlOperator(
              task_id='dwd_to_dws',
              sql=dwd_to_dws_sql,
              mysql_conn_id='starrocks_conn',
              autocommit=True
          )
      
          ods_to_dwd >> dwd_to_dws
    5. 單擊run圖標(biāo),運(yùn)行腳本。

      run dag

      執(zhí)行成功后,Paragraph輸出以下提示信息。Success

  6. 查看DAG狀態(tài)。

    在Airflow頁(yè)面即可看到starrocks_demo_dag的運(yùn)行情況。dag

步驟五:查看數(shù)據(jù)庫(kù)和表信息

  1. 使用SSH方式登錄StarRocks集群,具體操作請(qǐng)參見登錄集群。

  2. 執(zhí)行以下,連接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  3. 執(zhí)行以下命令,查詢數(shù)據(jù)庫(kù)信息。

    1. 執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。

      use flink_cdc;
    2. 執(zhí)行以下命令,查看表信息。

      show tables;

      返回信息如下所示。

      +--------------------------+
      | Tables_in_flink_cdc      |
      +--------------------------+
      | customers                |
      | dwd_order_customer_valid |
      | dws_agg_by_region        |
      | orders                   |
      +--------------------------+
      4 rows in set (0.01 sec)

步驟六:場(chǎng)景演示,查詢插入后的數(shù)據(jù)

  1. 使用步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的測(cè)試賬號(hào)連接MySQL實(shí)例,具體操作請(qǐng)參見通過DMS登錄RDS MySQL。

  2. 在RDS數(shù)據(jù)庫(kù)窗口執(zhí)行以下命令,向表orders和customers中插入數(shù)據(jù)。

    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
    INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
  3. 使用SSH方式登錄StarRocks集群,具體操作請(qǐng)參見登錄集群。

  4. 執(zhí)行以下命令,連接StarRocks集群。

    mysql -h127.0.0.1 -P 9030 -uroot
  5. 執(zhí)行以下命令,查詢ODS層數(shù)據(jù)。

    1. 執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。

      use flink_cdc;
    2. 執(zhí)行以下命令,查看orders和customers表信息。

      • 查看orders表

        SELECT * FROM orders;

        返回信息如下所示。

        +----------------------------+----------+---------------+--------------+-------------+
        | timestamp                  | order_id | order_revenue | order_region | customer_id |
        +----------------------------+----------+---------------+--------------+-------------+
        | 2022-05-27 13:39:50.098000 |        1 |            10 | beijing      |           1 |
        | 2022-05-27 13:39:50.596000 |        2 |            10 | beijing      |           1 |
        +----------------------------+----------+---------------+--------------+-------------+
        2 rows in set (0.00 sec)
      • 查看customers表

        SELECT * FROM customers;

        返回信息如下所示。

        +----------------------------+-------------+--------------+---------------+
        | timestamp                  | customer_id | customer_age | customer_name |
        +----------------------------+-------------+--------------+---------------+
        | 2022-05-27 13:40:11.005000 |           1 |           22 | emr_test      |
        +----------------------------+-------------+--------------+---------------+
        1 row in set (0.01 sec)
  6. 執(zhí)行以下命令,查詢DWD層數(shù)據(jù)。

    1. 執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。

      use flink_cdc;
    2. 執(zhí)行以下命令,查看dwd_order_customer_valid表信息。

      SELECT * FROM dwd_order_customer_valid;

      返回信息如下所示。

      +---------------------+----------+---------------+--------------+-------------+--------------+---------------+
      | timestamp           | order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
      +---------------------+----------+---------------+--------------+-------------+--------------+---------------+
      | 2022-05-27 13:35:00 |        1 |            10 | beijing      |           1 |           22 | emr_test      |
      | 2022-05-27 13:35:00 |        2 |            10 | beijing      |           1 |           22 | emr_test      |
      +---------------------+----------+---------------+--------------+-------------+--------------+---------------+
      2 rows in set (0.01 sec)
  7. 執(zhí)行以下命令,查詢DWS層數(shù)據(jù)。

    1. 執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。

      use flink_cdc;
    2. 執(zhí)行以下命令,查看dws_agg_by_region表信息。

      SELECT * FROM dws_agg_by_region;

      返回信息如下所示。

      +---------------------+--------------+-----------+---------------------+
      | timestamp           | order_region | order_cnt | order_total_revenue |
      +---------------------+--------------+-----------+---------------------+
      | 2022-05-27 13:35:00 | beijing      |         2 |                  20 |
      +---------------------+--------------+-----------+---------------------+
      1 row in set (0.00 sec)

      您也可以執(zhí)行以下命令,查詢部分字段信息。

      SELECT order_region, sum(order_cnt),sum(order_total_revenue)  FROM dws_agg_by_region GROUP BY order_region;

      返回信息如下所示。

      +--------------+------------------+----------------------------+
      | order_region | sum(`order_cnt`) | sum(`order_total_revenue`) |
      +--------------+------------------+----------------------------+
      | beijing      |                2 |                         20 |
      +--------------+------------------+----------------------------+
      1 row in set (0.05 sec)