數(shù)倉場景:增量數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)
本文通過示例為您介紹如何基于StarRocks構(gòu)建數(shù)倉場景-增量數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)。
前提條件
已創(chuàng)建DataFlow或自定義集群,且集群中已包含F(xiàn)link、Kafka服務(wù),具體操作請參見創(chuàng)建集群。
已創(chuàng)建StarRocks集群,具體操作請參見創(chuàng)建StarRocks集群。
已創(chuàng)建RDS MySQL,具體操作請參見快速創(chuàng)建RDS MySQL實(shí)例。
說明本文示例中DataFlow集群為EMR-3.40.0版本、StarRocks集群為EMR-5.6.0版本,MySQL為5.7版本。
使用限制
DataFlow集群、StarRocks集群和RDS MySQL實(shí)例需要在同一個(gè)VPC下,并且在同一個(gè)可用區(qū)下。
DataFlow集群和StarRocks集群均須開啟公網(wǎng)訪問。
RDS MySQL為5.7及以上版本。
場景介紹
因?yàn)椴糠謭鼍皩?duì)數(shù)據(jù)延遲非常敏感,數(shù)據(jù)產(chǎn)生的時(shí)候必須完成加工,所以此時(shí)您可以通過增量數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方式,提前使用Flink將明細(xì)層、匯總層等層數(shù)據(jù)進(jìn)行匯聚,匯聚之后把結(jié)果集存儲(chǔ)下來再對(duì)外提供服務(wù)。
方案架構(gòu)
增量數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的基本架構(gòu)如下圖所示。
整體數(shù)據(jù)流如下:
直接使用Flink構(gòu)建實(shí)時(shí)數(shù)倉,由Flink進(jìn)行清洗加工轉(zhuǎn)換和聚合匯總,將各層結(jié)果集寫入Kafka中。
StarRocks從Kafka分別訂閱各層數(shù)據(jù),將各層數(shù)據(jù)持久化到StarRocks中,用于之后的查詢分析。
方案特點(diǎn)
該方案主要特點(diǎn)如下:
增量計(jì)算的數(shù)據(jù)由Flink進(jìn)行清洗加工轉(zhuǎn)換和聚合匯總,各層應(yīng)用數(shù)據(jù)通過Kafka分別持久化到StarRocks中。
Flink加工的結(jié)果集可以采取雙寫的方式,一方面繼續(xù)投遞給下一層消息流Topic,一方面Sink到同層的StarRocks中;也可以采用單寫Kafka再通過StarRocks實(shí)時(shí)消費(fèi)Kafka對(duì)應(yīng)Topic上的數(shù)據(jù),方便后續(xù)歷史數(shù)據(jù)的狀態(tài)檢查與刷新。
StarRocks通過表的形式直接對(duì)接上層應(yīng)用,實(shí)現(xiàn)應(yīng)用實(shí)時(shí)查詢。
方案優(yōu)勢
實(shí)時(shí)性強(qiáng),能滿足業(yè)務(wù)對(duì)實(shí)時(shí)性敏感的場景。
指標(biāo)修正簡單,與傳統(tǒng)增量計(jì)算方式不一樣的是,該方案將中間的狀態(tài)也持久存儲(chǔ)在StarRocks中,提升了后續(xù)分析的靈活性,當(dāng)中間數(shù)據(jù)質(zhì)量有問題時(shí),直接對(duì)表修正,重刷數(shù)據(jù)即可。
方案缺點(diǎn)
大部分實(shí)時(shí)增量計(jì)算依賴于Flink,需要使用者有一定的Flink技能。
不適合數(shù)據(jù)頻繁更新,無法進(jìn)行累加計(jì)算的場景。
不適合多流Join等計(jì)算復(fù)雜資源開銷大的場景。
適用場景
實(shí)時(shí)需求簡單,數(shù)據(jù)量不大,以埋點(diǎn)數(shù)據(jù)統(tǒng)計(jì)為主的數(shù)據(jù),實(shí)時(shí)性最強(qiáng)。
操作流程
示例操作如下:
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表
創(chuàng)建測試的數(shù)據(jù)庫和賬號(hào),具體操作請參見創(chuàng)建數(shù)據(jù)庫和賬號(hào)。
創(chuàng)建完數(shù)據(jù)庫和賬號(hào)后,需要授權(quán)測試賬號(hào)的讀寫權(quán)限。
說明本文示例中創(chuàng)建的數(shù)據(jù)庫名稱為flink_cdc,賬號(hào)為emr_test。
使用創(chuàng)建的測試賬號(hào)連接MySQL實(shí)例,具體操作請參見通過DMS登錄RDS MySQL。
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表orders和customers。
創(chuàng)建orders表。
CREATE TABLE flink_cdc.orders ( order_id INT NOT NULL AUTO_INCREMENT, order_revenue FLOAT NOT NULL, order_region VARCHAR(40) NOT NULL, customer_id INT NOT NULL, PRIMARY KEY ( order_id ) );
創(chuàng)建customers表。
CREATE TABLE flink_cdc.customers ( customer_id INT NOT NULL, customer_age INT NOT NULL, customer_name VARCHAR(40) NOT NULL, PRIMARY KEY ( customer_id ) );
步驟二:創(chuàng)建Kafka的Topic
使用SSH方式登錄DataFlow集群,具體操作請參見登錄集群。
執(zhí)行以下命令, 進(jìn)入Kafka的bin目錄。
cd /opt/apps/FLINK/flink-current/bin
執(zhí)行以下命令,創(chuàng)建對(duì)應(yīng)的Topic。
kafka-topics.sh --create --topic ods_order --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic ods_customers --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic dwd_order_customer_valid --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092 kafka-topics.sh --create --topic dws_agg_by_region --replication-factor 1 --partitions 1 --bootstrap-server core-1-1:9092
步驟三:創(chuàng)建StarRocks表和導(dǎo)入任務(wù)
使用SSH方式登錄StarRocks集群,具體操作請參見登錄集群。
執(zhí)行以下命令,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)庫。
CREATE DATABASE IF NOT EXISTS `flink_cdc`;
執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表customers和orders。
創(chuàng)建customers表
CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` ( `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`customer_id`) COMMENT "" DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
創(chuàng)建orders表
CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` ( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
執(zhí)行以下命令,創(chuàng)建DWD表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dwd_order_customer_valid`( `order_id` INT NOT NULL COMMENT "", `order_revenue` FLOAT NOT NULL COMMENT "", `order_region` STRING NOT NULL COMMENT "", `customer_id` INT NOT NULL COMMENT "", `customer_age` FLOAT NOT NULL COMMENT "", `customer_name` STRING NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_id`) COMMENT "" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
執(zhí)行以下命令,創(chuàng)建DWS表。
CREATE TABLE IF NOT EXISTS `flink_cdc`.`dws_agg_by_region` ( `order_region` STRING NOT NULL COMMENT "", `order_cnt` INT NOT NULL COMMENT "", `order_total_revenue` INT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`order_region`) COMMENT "" DISTRIBUTED BY HASH(`order_region`) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
執(zhí)行以下命令,創(chuàng)建Routine Load導(dǎo)入任務(wù),訂閱Kafka數(shù)據(jù)源的數(shù)據(jù)。
CREATE ROUTINE LOAD flink_cdc.routine_load_orders ON orders COLUMNS (order_id, order_revenue, order_region, customer_id) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_order" ); CREATE ROUTINE LOAD flink_cdc.routine_load_customers ON customers COLUMNS (customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "ods_customers" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dwd_order_customer_valid ON dwd_order_customer_valid COLUMNS (order_id, order_revenue, order_region, customer_id, customer_age, customer_name) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_id\",\"$.order_revenue\",\"$.order_region\",\"$.customer_id\",\"$.customer_age\",\"$.customer_name\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dwd_order_customer_valid" ); CREATE ROUTINE LOAD flink_cdc.routine_load_dws_agg_by_region ON dws_agg_by_region COLUMNS (order_region, order_cnt, order_total_revenue) PROPERTIES ( "format" = "json", "jsonpaths" = "[\"$.order_region\",\"$.order_cnt\",\"$.order_total_revenue\"]" ) FROM KAFKA ( "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "dws_agg_by_region" );
步驟四:執(zhí)行Flink任務(wù),啟動(dòng)數(shù)據(jù)流
下載Flink CDC connector和Flink StarRocks Connector,并上傳至DataFlow集群的/opt/apps/FLINK/flink-current/lib目錄下。
拷貝DataFlow集群的/opt/apps/FLINK/flink-current/opt/connectors/kafka目錄下的JAR包至/opt/apps/FLINK/flink-current/lib目錄下。
使用SSH方式登錄DataFlow集群,具體操作請參見登錄集群。
執(zhí)行以下命令,啟動(dòng)集群。
重要本文示例僅供測試,如果是生產(chǎn)級(jí)別的Flink作業(yè)請使用YARN或Kubernetes方式提交,詳情請參見Apache Hadoop YARN和Native Kubernetes。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
編寫Flink SQL作業(yè),并保存為demo.sql。
執(zhí)行以下命令,編輯demo.sql文件。
vim demo.sql
文件內(nèi)容如下所示。
CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`; --數(shù)據(jù)的訂單源表。 CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src`( `order_id` INT NOT NULL, `order_revenue` FLOAT NOT NULL, `order_region` STRING NOT NULL, `customer_id` INT NOT NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'orders' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` ( `customer_id` INT NOT NULL, `customer_age` FLOAT NOT NULL, `customer_name` STRING NOT NULL, PRIMARY KEY(`customer_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = 'Yz12****', 'database-name' = 'flink_cdc', 'table-name' = 'customers' ); -- create ods dwd and dws tables CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_order_table` ( `order_id` INT, `order_revenue` FLOAT, `order_region` VARCHAR(40), `customer_id` INT, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_order', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`ods_customers_table` ( `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (customer_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'ods_customers', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dwd_order_customer_valid` ( `order_id` INT, `order_revenue` FLOAT, `order_region` STRING, `customer_id` INT, `customer_age` FLOAT, `customer_name` STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dwd_order_customer_valid', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`dws_agg_by_region` ( `order_region` VARCHAR(40), `order_cnt` BIGINT, `order_total_revenue` FLOAT, PRIMARY KEY (order_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dws_agg_by_region', 'properties.bootstrap.servers' = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092', 'key.format' = 'json', 'value.format' = 'json' ); USE flink_cdc; BEGIN STATEMENT SET; INSERT INTO ods_order_table SELECT * FROM orders_src; INSERT INTO ods_customers_table SELECT * FROM customers_src; INSERT INTO dwd_order_customer_valid SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name FROM customers_src c JOIN orders_src o ON c.customer_id=o.customer_id WHERE c.customer_id <> -1; INSERT INTO dws_agg_by_region SELECT order_region, count(*) as order_cnt, sum(order_revenue) as order_total_revenue FROM dwd_order_customer_valid GROUP BY order_region; END;
涉及參數(shù)如下所示:
創(chuàng)建數(shù)據(jù)表orders_src和customers_src。
參數(shù)
描述
connector
固定值為mysql-cdc。
hostname
RDS的內(nèi)網(wǎng)地址。
您可以在RDS的數(shù)據(jù)庫連接頁面,單擊內(nèi)網(wǎng)地址進(jìn)行復(fù)制。例如,rm-2ze5h9qnki343****.mysql.rds.aliyuncs.com。
port
固定值為3306。
username
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號(hào)名。本示例為emr_test。
password
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號(hào)的密碼。本示例為Yz12****。
database-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫名。本示例為flink_cdc。
table-name
步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表。
orders_src:本示例為orders。
customers_src:本示例為customers。
創(chuàng)建數(shù)據(jù)表ods_order_table、ods_customers_table、dwd_order_customer_valid和dws_agg_by_region。
參數(shù)
描述
connector
固定值為upsert-kafka。
topic
步驟二:創(chuàng)建Kafka的Topic中創(chuàng)建的Topic名稱。
ods_order_table:本示例為ods_order。
ods_customers_table:本示例為ods_customers。
dwd_order_customer_valid:本示例為dwd_order_customer_valid。
dws_agg_by_region:本示例為dws_agg_by_region。
properties.bootstrap.servers
固定格式為
192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092
。
執(zhí)行以下命令,啟動(dòng)Flink任務(wù)。
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql
步驟五:查看數(shù)據(jù)庫和表信息
使用SSH方式登錄StarRocks集群,具體操作請參見登錄集群。
執(zhí)行以下,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,查詢數(shù)據(jù)庫信息。
執(zhí)行以下命令,使用數(shù)據(jù)庫。
use flink_cdc;
執(zhí)行以下命令,查看表信息。
show tables;
返回信息如下所示。
+--------------------------+ | Tables_in_flink_cdc | +--------------------------+ | customers | | dwd_order_customer_valid | | dws_agg_by_region | | orders | +--------------------------+ 4 rows in set (0.01 sec)
步驟六:場景演示,查詢插入后的數(shù)據(jù)
使用步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的測試賬號(hào)連接MySQL實(shí)例,具體操作請參見通過DMS登錄RDS MySQL。
在RDS數(shù)據(jù)庫窗口執(zhí)行以下命令,向表orders和customers中插入數(shù)據(jù)。
INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1); INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1); INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
使用SSH方式登錄StarRocks集群,具體操作請參見登錄集群。
執(zhí)行以下命令,連接StarRocks集群。
mysql -h127.0.0.1 -P 9030 -uroot
執(zhí)行以下命令,查詢ODS層數(shù)據(jù)。
執(zhí)行以下命令,使用數(shù)據(jù)庫。
use flink_cdc;
執(zhí)行以下命令,查看orders表信息。
select * from orders;
返回信息如下所示。
+----------+---------------+--------------+-------------+ | order_id | order_revenue | order_region | customer_id | +----------+---------------+--------------+-------------+ | 1 | 10 | beijing | 1 | | 2 | 10 | beijing | 1 | +----------+---------------+--------------+-------------+
執(zhí)行以下命令,查看customers表信息。
select * from customers;
返回信息如下所示。
+-------------+--------------+---------------+ | customer_id | customer_age | customer_name | +-------------+--------------+---------------+ | 1 | 22 | emr_test | +-------------+--------------+---------------+
執(zhí)行以下命令,查詢DWD層數(shù)據(jù)。
執(zhí)行以下命令,使用數(shù)據(jù)庫。
use flink_cdc;
執(zhí)行以下命令,查看orders表信息。
select * from dwd_order_customer_valid;
返回信息如下所示。
+----------+---------------+--------------+-------------+--------------+---------------+ | order_id | order_revenue | order_region | customer_id | customer_age | customer_name | +----------+---------------+--------------+-------------+--------------+---------------+ | 1 | 10 | beijing | 1 | 22 | emr_test | | 2 | 10 | beijing | 1 | 22 | emr_test | +----------+---------------+--------------+-------------+--------------+---------------+ 2 rows in set (0.00 sec)
執(zhí)行以下命令,查詢DWS層數(shù)據(jù)。
執(zhí)行以下命令,使用數(shù)據(jù)庫。
use flink_cdc;
執(zhí)行以下命令,查看orders表信息。
select * from dws_agg_by_region;
返回信息如下所示。
+--------------+-----------+---------------------+ | order_region | order_cnt | order_total_revenue | +--------------+-----------+---------------------+ | beijing | 2 | 20 | +--------------+-----------+---------------------+ 1 row in set (0.01 sec)