數(shù)倉(cāng)場(chǎng)景:分鐘級(jí)準(zhǔn)實(shí)時(shí)分析
本文通過示例為您介紹如何基于StarRocks構(gòu)建分鐘級(jí)準(zhǔn)實(shí)時(shí)分析。
前提條件
已創(chuàng)建DataFlow集群或自定義集群,具體操作請(qǐng)參見創(chuàng)建集群。
已創(chuàng)建StarRocks集群,具體操作請(qǐng)參見創(chuàng)建StarRocks集群。
已創(chuàng)建RDS MySQL,具體操作請(qǐng)參見快速創(chuàng)建RDS MySQL實(shí)例。
已創(chuàng)建EMR Studio集群,并開通了8443、8000和8081端口,具體操作請(qǐng)參見創(chuàng)建EMR Studio集群和添加安全組規(guī)則。
本文示例中DataFlow集群為EMR-3.40.0版本、StarRocks集群為EMR-5.6.0版本,MySQL為5.7版本。
使用限制
DataFlow集群、StarRocks集群和RDS MySQL實(shí)例需要在同一個(gè)VPC下,并且在同一個(gè)可用區(qū)下。
DataFlow集群和StarRocks集群均須開啟公網(wǎng)訪問。
RDS MySQL為5.7及以上版本。
場(chǎng)景介紹
該場(chǎng)景與數(shù)倉(cāng)場(chǎng)景:即席查詢構(gòu)建數(shù)倉(cāng)的邏輯基本一致,都是直接在StarRocks中進(jìn)行數(shù)倉(cāng)分層建模,區(qū)別在于分鐘級(jí)準(zhǔn)實(shí)時(shí)場(chǎng)景將即席查詢場(chǎng)景中的視圖部分物化成了表,因此具有更高的計(jì)算效率,可以支撐更高的QPS查詢。
方案架構(gòu)
分鐘級(jí)準(zhǔn)實(shí)時(shí)場(chǎng)景的基本架構(gòu)如下圖所示。
整體數(shù)據(jù)流如下:
Flink清洗導(dǎo)入Kafka的日志或者通過Flink-CDC-StarRocks工具讀取MySQL Binlog導(dǎo)入StarRocks,根據(jù)需要選用明細(xì)、聚合、更新、主鍵各種模型,只物理落地ODS層。
利用第三方任務(wù)調(diào)度器(例如Airflow)將各層數(shù)據(jù)表按血緣關(guān)系進(jìn)行任務(wù)編排,再按具體的分鐘間隔作為一個(gè)微批粒度進(jìn)行任務(wù)調(diào)度,依次構(gòu)建ODS之上的各層數(shù)據(jù)表。
方案特點(diǎn)
該方案主要特點(diǎn)是:計(jì)算邏輯在StarRocks側(cè),適用于高頻查詢場(chǎng)景,各層數(shù)據(jù)表按具體的分鐘間隔時(shí)間作為微批粒度的數(shù)據(jù)同步。
將操作層(ODS層)的數(shù)據(jù)經(jīng)過簡(jiǎn)單的清理、關(guān)聯(lián),然后存儲(chǔ)到明細(xì)數(shù)據(jù),暫不做過多的二次加工匯總,明細(xì)數(shù)據(jù)直接寫入StarRocks。
DWD或DWS層為實(shí)際的物理表,可以通過DataWorks或Airflow等調(diào)度工具調(diào)度周期性寫入數(shù)據(jù)。
StarRocks通過表的形式直接對(duì)接上層應(yīng)用,實(shí)現(xiàn)應(yīng)用實(shí)時(shí)查詢。
前端實(shí)時(shí)請(qǐng)求實(shí)際的物理表,數(shù)據(jù)的實(shí)時(shí)性依賴DataWorks或Airflow調(diào)度周期配置,例如5分鐘調(diào)度、10分鐘調(diào)度等。
方案優(yōu)勢(shì)
查詢性能強(qiáng),上層應(yīng)用只查詢最后匯總的數(shù)據(jù),相比View,查詢的數(shù)據(jù)量更大,性能會(huì)更強(qiáng)。
數(shù)據(jù)重刷快,當(dāng)某一個(gè)環(huán)節(jié)或者數(shù)據(jù)有錯(cuò)誤時(shí),重新運(yùn)行DataWorks或Airflow調(diào)度任務(wù)即可。因?yàn)樗械倪壿嫸际枪袒玫?,無需復(fù)雜的訂正鏈路操作。
業(yè)務(wù)邏輯調(diào)整快,當(dāng)需要新增或者調(diào)整各層業(yè)務(wù),可以基于SQL所見即所得開發(fā)對(duì)應(yīng)的業(yè)務(wù)場(chǎng)景,業(yè)務(wù)上線周期縮短。
方案缺點(diǎn)
因?yàn)橐肓烁嗟募庸ず驼{(diào)度,所以時(shí)效性低于即席查詢場(chǎng)景。
適用場(chǎng)景
數(shù)據(jù)來源于數(shù)據(jù)庫(kù)和埋點(diǎn)系統(tǒng),對(duì)QPS和實(shí)時(shí)性均有要求,適合80%實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景使用,能滿足大部分業(yè)務(wù)場(chǎng)景需求。
操作流程
示例操作如下:
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表
創(chuàng)建測(cè)試的數(shù)據(jù)庫(kù)和賬號(hào),具體操作請(qǐng)參見創(chuàng)建數(shù)據(jù)庫(kù)和賬號(hào)。
創(chuàng)建完數(shù)據(jù)庫(kù)和賬號(hào)后,需要授權(quán)測(cè)試賬號(hào)的讀寫權(quán)限。
說明本文示例中創(chuàng)建的數(shù)據(jù)庫(kù)名稱為flink_cdc,賬號(hào)為emr_test。
使用創(chuàng)建的測(cè)試賬號(hào)連接MySQL實(shí)例,具體操作請(qǐng)參見通過DMS登錄RDS MySQL。
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表orders和customers。
創(chuàng)建orders表。
CREATE TABLE flink_cdc.orders ( order_id INT NOT NULL AUTO_INCREMENT, order_revenue FLOAT NOT NULL, order_region VARCHAR(40) NOT NULL, customer_id INT NOT NULL, PRIMARY KEY ( order_id ) );
創(chuàng)建customers表。
CREATE TABLE flink_cdc.customers ( customer_id INT NOT NULL, customer_age INT NOT NULL, customer_name VARCHAR(40) NOT NULL, PRIMARY KEY ( customer_id ) );
步驟二:創(chuàng)建StarRocks表
使用SSH方式登錄StarRocks集群,具體操作請(qǐng)參見登錄集群。
執(zhí)行以下命令,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)庫(kù)。
CREATE DATABASE IF NOT EXISTS `flink_cdc`;
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表customers和orders。
創(chuàng)建customers表
CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` ( `timestamp` DateTime NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`timestamp`, `customer_id`) COMMENT "" DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
創(chuàng)建orders表
CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` ( `timestamp` DateTime NOT NULL COMMENT "", `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`timestamp`, `order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
執(zhí)行以下命令,創(chuàng)建DWD表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`( `timestamp` DateTime NOT NULL COMMENT "", `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`timestamp`, `order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
執(zhí)行以下命令,創(chuàng)建DWS表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` ( `timestamp` DateTime NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `order_cnt` INT NOT NULL COMMENT "", `order_total_revenue` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`timestamp`, `order_region`) COMMENT "" DISTRIBUTED BY HASH(`order_region`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
步驟三:同步RDS中的源數(shù)據(jù)到StarRocks的ODS表
下載Flink CDC connector和Flink StarRocks Connector,并上傳至DataFlow集群的/opt/apps/FLINK/flink-current/lib目錄下。
拷貝DataFlow集群的/opt/apps/FLINK/flink-current/opt/connectors/kafka目錄下的JAR包至/opt/apps/FLINK/flink-current/lib目錄下。
使用SSH方式登錄DataFlow集群,具體操作請(qǐng)參見登錄集群。
執(zhí)行以下命令,啟動(dòng)集群。
重要本文示例僅供測(cè)試,如果是生產(chǎn)級(jí)別的Flink作業(yè)請(qǐng)使用YARN或Kubernetes方式提交,詳情請(qǐng)參見Apache Hadoop YARN和Native Kubernetes。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
編寫Flink SQL作業(yè),并保存為demo.sql。
執(zhí)行以下命令,編輯demo.sql文件。
vim demo.sql
文件內(nèi)容如下所示。
CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`; -- create source tables CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'orders' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'customers' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` ( `timestamp` TIMESTAMP NOT NULL, `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`timestamp`,`order_id`) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'database-name' = 'flink_cdc', 'table-name' = 'orders', 'username' = 'root', 'password' = '', 'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030', 'load-url' = '192.168.**.**:8030', 'sink.properties.format' = 'json', 'sink.properties.strip_outer_array' = 'true', 'sink.buffer-flush.interval-ms' = '15000' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` ( `timestamp` TIMESTAMP NOT NULL, `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`timestamp`,`customer_id`) NOT ENFORCED ) with ( 'connector' = 'starrocks', 'database-name' = 'flink_cdc', 'table-name' = 'customers', 'username' = 'root', 'password' = '', 'jdbc-url' = 'jdbc:mysql://192.168.**.**:9030', 'load-url' = '192.168.**.**:8030', 'sink.properties.format' = 'json', 'sink.properties.strip_outer_array' = 'true', 'sink.buffer-flush.interval-ms' = '15000' ); BEGIN STATEMENT SET; INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink` SELECT LOCALTIMESTAMP, order_id, order_revenue, order_region, customer_id FROM `default_catalog`.`flink_cdc`.`orders_src`; INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink` SELECT LOCALTIMESTAMP, customer_id, customer_age, customer_name FROM `default_catalog`.`flink_cdc`.`customers_src`; END;
涉及參數(shù)如下所示:
創(chuàng)建數(shù)據(jù)表orders_src和customers_src。
參數(shù)
描述
connector
固定值為mysql-cdc。
hostname
RDS的內(nèi)網(wǎng)地址。
您可以在RDS的數(shù)據(jù)庫(kù)連接頁(yè)面,單擊內(nèi)網(wǎng)地址進(jìn)行復(fù)制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。
port
固定值為3306。
username
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號(hào)名。本示例為emr_test。
password
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號(hào)的密碼。本示例為Yz12****。
database-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫(kù)名。本示例為flink_cdc。
table-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表。
orders_src:本示例為orders。
customers_src:本示例為customers。
創(chuàng)建數(shù)據(jù)表orders_sink和customers_sink。
參數(shù)
描述
connector
固定值為starrocks。
database-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫(kù)名。本示例為flink_cdc。
table-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表。
orders_sink:本示例為orders。
customers_sink:本示例為customers。
username
StarRocks連接用戶名。固定值為root。
password
不填寫。
jdbc-url
用于在StarRocks中執(zhí)行查詢操作。
例如,jdbc:mysql://10.0.**.**:9030。其中,10.0.**.**為StarRocks集群的內(nèi)網(wǎng)IP地址。
load-url
指定FE的IP地址和HTTP端口,格式為
StarRocks集群的內(nèi)網(wǎng)IP地址:端口
。本文以8030端口為例,實(shí)際請(qǐng)根據(jù)您的集群版本選擇訪問的端口:18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。
8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。
說明訪問端口詳情,請(qǐng)參見UI和端口。
執(zhí)行以下命令,啟動(dòng)Flink任務(wù)。
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql
步驟四:通過任務(wù)調(diào)度器,編排各數(shù)據(jù)層的微批同步任務(wù)
將以下兩個(gè)Job以10分鐘為一次間隔,編排成定時(shí)任務(wù)。
Job 1
-- ODS to DWD INSERT INTO dwd_order_customer_valid SELECT '{start_time}', o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM customers c JOIN orders o ON c.customer_id=o.customer_id WHERE o.timestamp >= '{start_time}' AND o.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE) AND c.timestamp >= '{start_time}' AND c.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE);
Job 2
-- DWD to DWS INSERT INTO dws_agg_by_region SELECT '{start_time}', order_region, count(*) AS order_cnt, sum(order_revenue) AS order_total_revenue FROM dwd_order_customer_valid WHERE timestamp >= '{start_time}' AND timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE) GROUP BY timestamp, order_region;
本示例使用EMR Studio作為任務(wù)調(diào)度器,您也可以使用自己的任務(wù)編排方案。
為EMR Studio集群添加用戶,詳情請(qǐng)參見添加用戶。
為添加的用戶授權(quán)。
使用SSH方式登錄EMR Studio集群,具體操作請(qǐng)參見登錄集群。
執(zhí)行以下命令,授權(quán)添加的用戶為AirFlow的Admin Role。
source /usr/lib/airflow-current/bin/activate airflow users add-role -r Admin -u <user>
說明示例中的
<user>
為您上一步驟中添加的用戶名稱。
進(jìn)入數(shù)據(jù)開發(fā)控制臺(tái)。
在EMR Studio集群的訪問鏈接與端口頁(yè)面,單擊Studio Workspace UI所在行的鏈接。
輸入步驟1中添加的用戶名和密碼,即可正常訪問Web UI頁(yè)面。
創(chuàng)建Airflow的Connection。
在左側(cè)導(dǎo)航欄中,單擊Airflow。
在Airflow頁(yè)面,選擇上方的 。
單擊圖標(biāo)。
在Add Connection頁(yè)面,配置相關(guān)參數(shù)。
參數(shù)
描述
Connection Id
Connection名稱,您可以自定義。本示例為starrocks_conn。
Connection Type
選擇MySQL。
Host
StarRocks集群的內(nèi)網(wǎng)IP地址。
Login
固定值為root。
Port
固定值為9030。
創(chuàng)建Zeppelin的Note。
在左側(cè)導(dǎo)航欄中,單擊Zeppelin。
在Zeppelin頁(yè)面,單擊Create New Note。
在Create New Note對(duì)話框中,輸入Note Name,在Default Interpreter下拉框中,選擇airflow。
編寫Airflow DAG腳本。
配置starrocks_demo_dag腳本,示例代碼如下。
%airflow.push_dag from airflow import DAG from datetime import datetime, timedelta from airflow.operators.mysql_operator import MySqlOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.now(), 'retries': 0, 'retry_delay': timedelta(minutes=5), } with DAG('starrocks_demo_dag', schedule_interval='*/10 * * * *', default_args=default_args) as dag: execution_time = "{{ ts }}" ods_to_dwd_sql = """ INSERT INTO dwd_order_customer_valid SELECT '{start_time}', o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM customers c JOIN orders o ON c.customer_id=o.customer_id WHERE o.timestamp >= '{start_time}' AND o.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE) AND c.timestamp >= '{start_time}' AND c.timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE) """.format(start_time=execution_time, interval_time=10) dwd_to_dws_sql = """ INSERT INTO dws_agg_by_region SELECT '{start_time}', order_region, count(*) AS order_cnt, sum(order_revenue) AS order_total_revenue FROM dwd_order_customer_valid WHERE timestamp >= '{start_time}' AND timestamp < DATE_ADD('{start_time}',INTERVAL '{interval_time}' MINUTE) GROUP BY timestamp, order_region; """.format(start_time=execution_time, interval_time=10) ods_to_dwd = MySqlOperator( task_id='ods_to_dwd', sql=ods_to_dwd_sql, mysql_conn_id='starrocks_conn', autocommit=True ) dwd_to_dws = MySqlOperator( task_id='dwd_to_dws', sql=dwd_to_dws_sql, mysql_conn_id='starrocks_conn', autocommit=True ) ods_to_dwd >> dwd_to_dws
單擊圖標(biāo),運(yùn)行腳本。
執(zhí)行成功后,Paragraph輸出以下提示信息。
查看DAG狀態(tài)。
在Airflow頁(yè)面即可看到starrocks_demo_dag的運(yùn)行情況。
步驟五:查看數(shù)據(jù)庫(kù)和表信息
使用SSH方式登錄StarRocks集群,具體操作請(qǐng)參見登錄集群。
執(zhí)行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,查詢數(shù)據(jù)庫(kù)信息。
執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。
use flink_cdc;
執(zhí)行以下命令,查看表信息。
show tables;
返回信息如下所示。
+--------------------------+ | Tables_in_flink_cdc | +--------------------------+ | customers | | dwd_order_customer_valid | | dws_agg_by_region | | orders | +--------------------------+ 4 rows in set (0.01 sec)
步驟六:場(chǎng)景演示,查詢插入后的數(shù)據(jù)
使用步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的測(cè)試賬號(hào)連接MySQL實(shí)例,具體操作請(qǐng)參見通過DMS登錄RDS MySQL。
在RDS數(shù)據(jù)庫(kù)窗口執(zhí)行以下命令,向表orders和customers中插入數(shù)據(jù)。
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1); INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1); INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
使用SSH方式登錄StarRocks集群,具體操作請(qǐng)參見登錄集群。
執(zhí)行以下命令,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,查詢ODS層數(shù)據(jù)。
執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。
use flink_cdc;
執(zhí)行以下命令,查看orders和customers表信息。
查看orders表
SELECT * FROM orders;
返回信息如下所示。
+----------------------------+----------+---------------+--------------+-------------+ | timestamp | order_id | order_revenue | order_region | customer_id | +----------------------------+----------+---------------+--------------+-------------+ | 2022-05-27 13:39:50.098000 | 1 | 10 | beijing | 1 | | 2022-05-27 13:39:50.596000 | 2 | 10 | beijing | 1 | +----------------------------+----------+---------------+--------------+-------------+ 2 rows in set (0.00 sec)
查看customers表
SELECT * FROM customers;
返回信息如下所示。
+----------------------------+-------------+--------------+---------------+ | timestamp | customer_id | customer_age | customer_name | +----------------------------+-------------+--------------+---------------+ | 2022-05-27 13:40:11.005000 | 1 | 22 | emr_test | +----------------------------+-------------+--------------+---------------+ 1 row in set (0.01 sec)
執(zhí)行以下命令,查詢DWD層數(shù)據(jù)。
執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。
use flink_cdc;
執(zhí)行以下命令,查看dwd_order_customer_valid表信息。
SELECT * FROM dwd_order_customer_valid;
返回信息如下所示。
+---------------------+----------+---------------+--------------+-------------+--------------+---------------+ | timestamp | order_id | order_revenue | order_region | customer_id | customer_age | customer_name | +---------------------+----------+---------------+--------------+-------------+--------------+---------------+ | 2022-05-27 13:35:00 | 1 | 10 | beijing | 1 | 22 | emr_test | | 2022-05-27 13:35:00 | 2 | 10 | beijing | 1 | 22 | emr_test | +---------------------+----------+---------------+--------------+-------------+--------------+---------------+ 2 rows in set (0.01 sec)
執(zhí)行以下命令,查詢DWS層數(shù)據(jù)。
執(zhí)行以下命令,使用數(shù)據(jù)庫(kù)。
use flink_cdc;
執(zhí)行以下命令,查看dws_agg_by_region表信息。
SELECT * FROM dws_agg_by_region;
返回信息如下所示。
+---------------------+--------------+-----------+---------------------+ | timestamp | order_region | order_cnt | order_total_revenue | +---------------------+--------------+-----------+---------------------+ | 2022-05-27 13:35:00 | beijing | 2 | 20 | +---------------------+--------------+-----------+---------------------+ 1 row in set (0.00 sec)
您也可以執(zhí)行以下命令,查詢部分字段信息。
SELECT order_region, sum(order_cnt),sum(order_total_revenue) FROM dws_agg_by_region GROUP BY order_region;
返回信息如下所示。
+--------------+------------------+----------------------------+ | order_region | sum(`order_cnt`) | sum(`order_total_revenue`) | +--------------+------------------+----------------------------+ | beijing | 2 | 20 | +--------------+------------------+----------------------------+ 1 row in set (0.05 sec)