本文介紹冷數據的特點和適應場景,通過表格存儲Tablestore和Delta Lake結合示例,演示數據的冷熱分層。冷熱分層可以充分利用計算和存儲資源,以低成本承載更優質服務。
背景信息
在海量大數據場景下,隨著業務和數據量的不斷增長,性能和成本的權衡成為大數據系統設計面臨的關鍵挑戰。
Delta Lake是新型數據湖方案,推出了數據流入、數據組織管理、數據查詢和數據流出等特性,同時提供了數據的ACID和CRUD操作。通過結合Delta Lake和上下游組件,您可以搭建出一個便捷、易用、安全的數據湖架構。在數據湖架構設計中,通常會應用HTAP(Hybrid Transaction and Analytical Process)體系結構,通過合理地選擇分層存儲組件和計算引擎,既能支持海量數據分析和快速的事務更新寫入,又能有效地降低冷熱數據分離的成本。
更多介紹請參見結構化大數據分析平臺設計、面向海量數據的極致成本優化-云HBase的一體化冷熱分離和云上如何做冷熱數據分離。
冷熱數據
數據按照實際訪問的頻率可以分為熱數據、溫數據和冷數據。其中冷數據的數據量較大,很少被訪問,甚至整個生命周期都不會被訪問。
冷熱數據的區分方式如下:
按照數據的創建時間:通常,數據寫入初期,用戶的關注度較高且訪問頻繁,此時的數據為熱數據。但隨著時間的推移,舊數據訪問頻率會越來越低,僅存在少量查詢,甚至完全不查詢,此時數據為冷數據。
常見于交易類數據、時序監控和IM聊天等場景。
按照訪問熱度:采用業務打標或系統自動識別等方式,按照數據的訪問熱度來區分冷熱數據。
例如,某舊博客突然被大量訪問。此時不應該按照時間區分,而是應該按照具體的業務和數據分布規律來區分冷熱數據。
說明本文主要討論按照數據創建時間的冷熱數據分層。
冷數據特點
數據量大:相對于熱數據,冷數據通常需要保存較長時間,甚至永久保存。
成本管控:數據量大且訪問頻率較低,不宜投入過多成本。
性能要求低:相較于普通的TP請求查詢,無需在毫秒級別返回。冷數據的查詢可以接受數十秒甚至更長時間返回結果,或者可以進行異步處理。
操作簡單:通常,冷數據都是執行批量寫入和刪除操作,沒有更新操作。
當查詢數據時,您只需要讀取指定條件的數據,且查詢條件不會過于復雜。
適用場景
時序類數據場景:時序類數據天然具備時間屬性,數據量大,且僅執行追加操作。示例如下:
IM場景:通常用戶會查詢最近若干條聊天記錄,只有在特殊需求的時候才會查詢歷史數據。例如釘釘。
監控場景:通常用戶只會查看近期的監控,只有在調查問題或者制定報表時才會查詢歷史數據。例如云監控。
賬單場景:通常用戶只會查詢最近幾天或者一個月內的賬單,不會查詢超過一年以上的賬單。例如支付寶。
物聯網場景:通常設備近期上報的數據是熱點數據,會經常被分析,而歷史數據的分析頻率都較低。例如IoT。
歸檔類場景:對于讀寫簡單,查詢復雜的數據,您可以定期歸檔數據至成本更低的存儲組件或更高壓縮比的存儲介質中,以達到降低成本的目的。
海量結構化數據Delta Lake架構
針對結構化冷熱分層的數據場景,阿里巴巴集團推出了海量結構化數據的Delta Lake架構。
基于Tablestore的通道服務,原始數據可以利用變更數據捕獲CDC(Change Data Capture)技術寫入多種存儲組件中。
示例
本示例結合Tablestore和Delta Lake,進行數據的冷熱分層。
實時流式投遞。
創建數據源表。
數據源表是原始訂單表OrderSource,有兩個主鍵UserId(用戶ID)和OrderId(訂單ID),兩個屬性列price(價格)和timestamp(訂單時間)。使用Tablestore SDK的BatchWrite接口寫入訂單數據,訂單的時間戳的時間范圍為最近90天(本示例的模擬時間范圍為2020-02-26~2020-05-26),共計寫入3112400條。
在模擬訂單寫入時,對應Tablestore表屬性列的版本號也會被設置為相應的時間戳。通過配置表上的TTL屬性,當寫入數據的保留時長超過設置的TTL時,系統會自動清理對應版本號的數據。
在Tablestore控制臺上創建增量通道。
利用增量通道提供的CDC技術,同步新增的主表數據至Delta。通道ID用于后續的SQL配置。
在EMR集群的Header節點,啟動
streaming-sql
交互式命令行。streaming-sql --master yarn --use-emr-datasource --num-executors 16 --executor-memory 4g --executor-cores 4
執行以下命令,創建源表和目的表。
// 1. 創建源表。 DROP TABLE IF EXISTS order_source; CREATE TABLE order_source USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderSource", catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"col": "price", "type": "double"}, "timestamp": {"col": "timestamp", "type": "long"}}}' ); // 2. 創建Delta Lake Sink: delta_orders DROP TABLE IF EXISTS delta_orders; CREATE TABLE delta_orders( UserId string, OrderId string, price double, timestamp long ) USING delta LOCATION '/delta/orders'; // 3. 在源表上創建增量SCAN視圖。 CREATE SCAN incremental_orders ON order_source USING STREAM OPTIONS( tunnel.id="324c6bee-b10d-4265-9858-b829a1b71b4b", maxoffsetsperchannel="10000"); // 4. 執行Stream作業,實時同步Tablestore CDC數據至Delta Lake。 CREATE STREAM orders_job OPTIONS ( checkpointLocation='/delta/orders_checkpoint', triggerIntervalMs='3000' ) MERGE INTO delta_orders USING incremental_orders AS delta_source ON delta_orders.UserId=delta_source.UserId AND delta_orders.OrderId=delta_source.OrderId WHEN MATCHED AND delta_source.__ots_record_type__='DELETE' THEN DELETE WHEN MATCHED AND delta_source.__ots_record_type__='UPDATE' THEN UPDATE SET UserId=delta_source.UserId, OrderId=delta_source.OrderId, price=delta_source.price, timestamp=delta_source.timestamp WHEN NOT MATCHED AND delta_source.__ots_record_type__='PUT' THEN INSERT (UserId, OrderId, price, timestamp) values (delta_source.UserId, delta_source.OrderId, delta_source.price, delta_source.timestamp);
各操作含義如下。
操作
描述
創建Tablestore源表
創建order_source源表。
OPTIONS參數中的
catalog
是表字段的Schema定義(本示例對應UserId、OrderId、price和timestamp四列)。創建Delta Lake Sink表
創建delta_orders目的表。
LOCATION中指定的是Delta文件存儲的位置。
在Tablestore源表上創建增量SCAN視圖
創建incremental_orders的流式視圖。
tunnel.id
:步驟1.b中創建的增量通道ID。maxoffsetsperchannel
:通道每個分區可以寫入數據的最大數據量。
啟動Stream作業進行實時投遞
根據Tablestore的主鍵列(UserId和OrderId)進行聚合,同時根據CDC日志的操作類型(PUT,UPDATE,DELETE),轉化為對應的Delta操作。
__ots_record_type__
是Tablestore流式Source提供的預定義列,表示行操作類型。
查詢冷熱數據。
通常,您可以保存熱數據至Tablestore表中進行高效的TP查詢,保存冷數據或是全量數據至Delta中。通過配置Tablestore表的生命周期(TTL),您可以靈活地控制熱數據量。
配置主表的TTL前,查詢源表(order_source)和目的表(delta_orders)。
此時兩邊的查詢結果一致。
配置Tablestore的TTL為最近30天。
Tablestore中的熱數據只有最近30天的數據,而Delta中依舊保留的是全量數據,以達到冷熱分層的目的。
冷熱分層后,再次查詢源表(order_source)和目的表(delta_orders)。
冷熱分層后熱數據為1017004條,冷數據(全量數據)保持不變,仍然為3112400條。