本文通過示例為您介紹如何基于StarRocks的視圖能力構建數(shù)倉場景-即席查詢解決方案。
前提條件
已創(chuàng)建DataFlow或自定義集群,具體操作請參見創(chuàng)建集群。
已創(chuàng)建StarRocks集群,具體操作請參見創(chuàng)建StarRocks集群。
已創(chuàng)建RDS MySQL,具體操作請參見快速創(chuàng)建RDS MySQL實例。
說明本文示例中DataFlow集群為EMR-3.42.0版本、StarRocks集群為EMR-5.8.0版本,MySQL為5.7版本。
使用限制
DataFlow集群、StarRocks集群和RDS MySQL實例需要在同一個VPC下,并且在同一個可用區(qū)下。
DataFlow集群和StarRocks集群均須開啟公網訪問。
RDS MySQL為5.7及以上版本。
注意事項
本文檔僅供測試使用,生產級別的Flink作業(yè)請使用阿里云實時計算Flink版產品進行配置,或者使用YARN或者Kubernetes提交作業(yè)。
場景介紹
隨著向量化、CBO(Cost Based Optimizer,基于代價的優(yōu)化器)、單機多核調度等技術的應用,StarRocks的計算能力逐步提升。很多時候您在使用StarRocks進行數(shù)倉分層建模時,大部分將數(shù)據(jù)建模到DWD層(基礎整合層)或DWS層(維度寬度)。在實際業(yè)務中,運用StarRocks的計算能力,可以直接查詢DWD或DWS層數(shù)據(jù),還可以靈活地交互式即席查詢。
方案架構
使用StarRocks實現(xiàn)數(shù)倉場景即席查詢的基本架構如下圖所示。
整體數(shù)據(jù)流如下:
Flink清洗導入Kafka的日志或者通過Flink-CDC-StarRocks工具讀取MySQL Binlog導入StarRocks。根據(jù)需要選用明細、聚合、更新或主鍵各種模型,只物理落地ODS層(格式整理層)。
向上采用StarRocks View視圖能力,利用StarRocks向量化極速查詢和CBO優(yōu)化器滿足多表關聯(lián)、嵌套子查詢等復雜SQL,查詢時現(xiàn)場計算指標結果,保證指標上卷和下鉆高度同源一致。
方案特點
該方案主要特點是,計算邏輯在StarRocks側(現(xiàn)場查詢),適用于業(yè)務庫高頻數(shù)據(jù)更新的場景,實體數(shù)據(jù)只在ODS或DWD層存儲。
方案優(yōu)勢
靈活性強,可隨時根據(jù)業(yè)務邏輯調整View。
指標修改簡單,上層都是View邏輯封裝,只需要更新底表數(shù)據(jù)。
方案缺點
當View的邏輯較為復雜,數(shù)據(jù)量較多時,查詢性能較低。
適用場景
數(shù)據(jù)來源于數(shù)據(jù)庫和埋點系統(tǒng),適合對QPS要求不高,對靈活性要求比較高,且計算資源較為充足的場景。
實時要求非常高,要求寫入即可查,更新即反饋。適合有即席查詢需求,且資源較為充足,查詢復雜度較低的場景。
操作流程
示例操作如下:
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表
創(chuàng)建測試的數(shù)據(jù)庫和賬號,具體操作請參見創(chuàng)建數(shù)據(jù)庫和賬號。
創(chuàng)建完數(shù)據(jù)庫和賬號后,需要授權測試賬號的讀寫權限。
說明本文示例中創(chuàng)建的數(shù)據(jù)庫名稱為flink_cdc,賬號為emr_test。
使用創(chuàng)建的測試賬號連接MySQL實例,具體操作請參見通過DMS登錄RDS MySQL。
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表orders。
CREATE TABLE flink_cdc.orders ( order_id INT NOT NULL AUTO_INCREMENT, order_revenue FLOAT NOT NULL, order_region VARCHAR(40) NOT NULL, customer_id INT NOT NULL, PRIMARY KEY (order_id) );
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表customers。
CREATE TABLE flink_cdc.customers ( customer_id INT NOT NULL, customer_age INT NOT NULL, customer_name VARCHAR(40) NOT NULL, PRIMARY KEY (customer_id) );
步驟二:創(chuàng)建StarRocks表
使用SSH方式登錄StarRocks集群,詳情請參見登錄集群。
執(zhí)行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)庫。
CREATE DATABASE IF NOT EXISTS `flink_cdc`;
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表customers。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` ( `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`customer_id`) COMMENT "" DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表orders。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` ( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
執(zhí)行以下命令,基于ODS表創(chuàng)建DWD視圖。
CREATE VIEW flink_cdc.dwd_order_customer_valid ( order_id, order_revenue, order_region, customer_id, customer_age, customer_name ) AS SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM flink_cdc.customers c JOIN flink_cdc.orders o ON c.customer_id=o.customer_id WHERE c.customer_id != -1;
執(zhí)行以下命令,基于DWD表創(chuàng)建DWS視圖。
CREATE VIEW flink_cdc.dws_agg_by_region ( order_region, order_cnt, order_total_revenue) AS SELECT order_region, count(order_region), sum(order_revenue) FROM flink_cdc.dwd_order_customer_valid GROUP BY order_region;
步驟三:執(zhí)行Flink任務,啟動數(shù)據(jù)流
下載Flink CDC connector和Flink StarRocks Connector,并上傳到DataFlow集群的/opt/apps/FLINK/flink-current/lib目錄下。
使用SSH方式登錄DataFlow集群,詳情請參見登錄集群。
添加端口配置,并修改并行執(zhí)行的任務槽數(shù)量。
執(zhí)行以下命令,編輯文件flink-conf.yaml。
vim /etc/taihao-apps/flink-conf/flink-conf.yaml
添加以下內容至文件最后一行。
rest.port: 8083
在同一個配置文件中,修改參數(shù)
taskmanager.numberOfTaskSlots
的值為3。說明默認情況下,
taskmanager.numberOfTaskSlots
配置項的值為1,這意味著TaskManager只能并行執(zhí)行一個作業(yè)。鑒于后續(xù)demo.sql中包含了兩個作業(yè),為了確保所有作業(yè)能并行運行,建議增加taskmanager.numberOfTaskSlots
的值至少為2。
執(zhí)行以下命令,啟動集群。
重要本文示例僅供測試,如果是生產級別的Flink作業(yè)請使用YARN或Kubernetes方式提交,詳情請參見Apache Hadoop YARN和Native Kubernetes。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
編寫Flink SQL作業(yè),并保存為demo.sql。
執(zhí)行以下命令,編輯demo.sql文件。
vim demo.sql
文件內容如下所示。
CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`; CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'customers' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'load-url' = '10.0.**.**:8030', 'database-name' = 'flink_cdc', 'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030', 'sink.buffer-flush.interval-ms' = '15000', 'sink.properties.format' = 'json', 'username' = 'root', 'table-name' = 'customers', 'sink.properties.strip_outer_array' = 'true', 'password' = '', 'sink.max-retries' = '10', 'connector' = 'starrocks' ); INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`customers_src`; CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src` ( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'database-name' = 'flink_cdc', 'table-name' = 'orders', 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` ( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'sink.properties.strip_outer_array' = 'true', 'password' = '', 'sink.max-retries' = '10', 'connector' = 'starrocks', 'table-name' = 'orders', 'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030', 'sink.buffer-flush.interval-ms' = '15000', 'sink.properties.format' = 'json', 'username' = 'root', 'load-url' = '10.0.**.**:8030', 'database-name' = 'flink_cdc' ); INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`orders_src`;
涉及參數(shù)如下所示:
創(chuàng)建數(shù)據(jù)表customers_src。
參數(shù)
描述
connector
固定值為mysql-cdc。
hostname
RDS的內網地址。
您可以在RDS的數(shù)據(jù)庫連接頁面,單擊內網地址進行復制。例如,rm-2ze8398257383****.mysql.rds.aliyuncs.com。
port
固定值為3306。
username
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號名。本示例為emr_test。
password
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號的密碼。本示例為Yz12****。
database-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫名。本示例為flink_cdc。
table-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表。本示例為customers。
創(chuàng)建數(shù)據(jù)表customers_sink和orders_sink。
參數(shù)
描述
load-url
指定FE的IP地址和HTTP端口,格式為
StarRocks集群的內網IP地址:端口
。本文以8030端口為例,實際請根據(jù)您的集群版本選擇訪問的端口:18030:EMR-5.9.0及以上版本、EMR-3.43.0及以上版本。
8030:EMR-5.8.0及以下版本、EMR-3.42.0及以下版本。
說明訪問端口詳情,請參見UI和端口。
database-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫名。本示例為flink_cdc。
jdbc-url
用于在StarRocks中執(zhí)行查詢操作。
例如,jdbc:mysql://10.0.**.**:9030。其中,10.0.**.**為StarRocks集群的內網IP地址。
username
StarRocks連接用戶名。固定為root。
table-name
本示例固定值為customers。
connector
固定值為starrocks。
執(zhí)行以下命令,啟動Flink任務。
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql
步驟四:場景演示
使用步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的測試賬號連接MySQL實例,具體操作請參見通過DMS登錄RDS MySQL。
在RDS數(shù)據(jù)庫窗口執(zhí)行以下命令,向表orders和customers中插入數(shù)據(jù)。
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1); INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1); INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
使用SSH方式登錄StarRocks集群,詳情請參見登錄集群。
執(zhí)行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,查詢ODS層數(shù)據(jù)。
執(zhí)行以下命令,查看orders表信息。
SELECT * FROM flink_cdc.orders;
返回信息如下所示。
+----------+---------------+--------------+-------------+ | order_id | order_revenue | order_region | customer_id | +----------+---------------+--------------+-------------+ | 1 | 10 | beijing | 1 | | 2 | 10 | beijing | 1 | +----------+---------------+--------------+-------------+
執(zhí)行以下命令,查看customers表信息。
SELECT * FROM flink_cdc.customers;
返回信息如下所示。
+-------------+--------------+---------------+ | customer_id | customer_age | customer_name | +-------------+--------------+---------------+ | 1 | 22 | emr_test | +-------------+--------------+---------------+
執(zhí)行以下命令,查詢DWD層數(shù)據(jù)。
SELECT * FROM flink_cdc.dwd_order_customer_valid;
返回信息如下所示。
+----------+---------------+--------------+-------------+--------------+---------------+ | order_id | order_revenue | order_region | customer_id | customer_age | customer_name | +----------+---------------+--------------+-------------+--------------+---------------+ | 1 | 10 | beijing | 1 | 22 | emr_test | | 2 | 10 | beijing | 1 | 22 | emr_test | +----------+---------------+--------------+-------------+--------------+---------------+ 2 rows in set (0.00 sec)
執(zhí)行以下命令,查詢DWS層數(shù)據(jù)。
SELECT * FROM flink_cdc.dws_agg_by_region;
返回信息如下所示。
+--------------+-----------+---------------------+ | order_region | order_cnt | order_total_revenue | +--------------+-----------+---------------------+ | beijing | 2 | 20 | +--------------+-----------+---------------------+ 1 row in set (0.01 sec)