介紹如何使用表格存儲的主鍵增列功能優化高并發IM系統架構。
背景
在構建社交IM和朋友圈應用時,最基本的需求是將用戶發送的消息和朋友圈的更新及時、準確地更新給該用戶的好友。這需要為用戶發送的每一條消息或者朋友圈更新設置一個序號或者ID,并且保證遞增,這個機制可以確保所有的消息能夠按照正確的順序被接收端處理。
高并發的IM系統通常選擇NoSQL數據庫存儲產品來存儲消息,但常見的NoSQL產品沒有提供自增列的功能,通常要借助外部組件來實現消息序號和ID的遞增,使得整體的架構更加復雜,且影響整條鏈路的延時。
功能需求
- 支持用戶一對一聊天
- 支持用戶群組內聊天
- 支持同一個用戶的多終端消息同步
現有架構
- 消息模型 消息模型的實現順序如下。
- 發送方發送了一條消息后,消息會被客戶端推送給后臺系統。
- 后臺系統會先存儲消息。
- 存儲成功后,會推送消息給接收方的客戶端。
- 后臺架構 后臺架構主要分為兩部分:邏輯層和存儲層。
- 邏輯層包括應用服務器、隊列服務和自增ID生成器,是整個后臺架構的核心,處理消息的接收、推送、通知、群消息寫復制等核心業務邏輯。
- 存儲層主要是用來存儲持久化消息數據和其他一些需要持久化的數據。
對于一對一聊天,發送方發送消息給應用服務器后,應用服務器將消息存儲到接收方為主鍵的表中,同時通知應用服務器中的消息推送服務,消息推送服務會將上次推送給接收方的最后一條消息的ID作為起始主鍵,從存儲系統中讀取之后的所有消息,然后將消息推送給接收方。
對于群組內的聊天,邏輯會更加復雜,需要通過異步隊列來完成消息的擴散寫,即發到群組內的一條消息會給群組內的每個人都保存一份。
下圖展示了省略掉存儲層后的群消息發送過程。
使用擴散寫而非擴散讀,主要由于以下兩點原因:- 群組內成員一般都不多,存儲成本并不高,而且有壓縮,成本更低。
- 消息擴散寫到每個人的存儲表(收件箱)后,為每個接收方推送消息時,只需要檢查自己的收件箱即可,此時群聊和單聊的處理邏輯一樣,實現簡單。
- 存儲系統 存儲系統采用阿里云表格存儲,表格存儲具有以下優勢:
- 表格存儲寫操作不僅支持單行寫,也支持多行批量寫,可滿足大并發寫數據需求。
- 表格存儲支持按范圍讀,消息多時可翻頁。
- 表格存儲支持數據生命周期管理,可對過期數據進行自動清理,節省存儲費用。
- 表格存儲價格便宜,且穩定可靠。
- 表格存儲讀寫性能極佳,對于聊天消息,延遲基本在毫秒,甚至微秒級別。
表格存儲表結構的主鍵列部分請參見下表。主鍵順序 主鍵名稱 主鍵值 說明 1 partition_key md5(receive_id)前4位 分區鍵保證數據均勻分布 2 receive_id receive_id 接收方的用戶ID 3 message_id message_id 消息ID 表格存儲表結構包括主鍵列和屬性列兩部分。- 主鍵列
- 最多支持4個主鍵列,第一個主鍵為分區鍵。
- 通過分區鍵可以讓數據和請求均衡分布、避免熱點。由于最終讀取消息時要按照接收方讀取,所以此處可以使用接收方ID作為分區鍵。為了更加均衡,可以使用接收方ID的md5值的部分區域,例如前4個字符,這樣就可以將數據均衡分布了。
- 第一個主鍵只用了部分接收方ID,為了能定位到接收方的消息,需要保存完整的接收方ID,所以可以將接收方ID作為第二個主鍵。
- 第三個主鍵可以是消息ID,由于需要查詢最新的消息,這個值需要是單調自增的。
說明 主鍵列結構在使用過程中不能修改。 - 屬性列
屬性由多個屬性列組成。每行的屬性列個數沒有限制,即每行的屬性列可不同。一個屬性列在某一行的值可為空。同一個屬性列的值可以有多種數據類型。屬性列可以保存消息內容和元數據等。
- 挑戰
此架構雖可應用于高并發IM系統,但仍面臨挑戰。多個用戶在一個隊列中,隊列串行執行,為了保證消息嚴格遞增,執行過程中要持有鎖,這個過程會有一個風險:如果發送給某個用戶的消息量很大,這個用戶所在的隊列中消息會變多,就有可能堵塞其他用戶的消息,導致同隊列的其他用戶消息出現延遲。
新架構
現有架構存在的挑戰可通過使用表格存儲的主鍵列自增功能輕松解決。
- 與原架構相比,新架構最明顯的區別是減少了隊列服務和自增ID生成器兩個組件,架構更加簡單。
- 應用服務器接收到消息后,直接將消息寫入表格存儲,對于主鍵自增列message_id,在寫數據時不需要填寫具體的值,只需要填充一個特定的占位符即可,此值會在表格存儲系統內部自動生成。
- 新架構中自增操作是在表格存儲系統內部處理的,就算多個應用服務器同時給表格存儲中的同一個接收方寫數據,表格存儲內部也能保證這些消息是串行處理,每個消息都有一個獨立的消息ID,且嚴格遞增。使用主鍵列自增功能,就不再需要隊列服務,這樣也就徹底解決了原架構的問題。
- 原架構只能有一個隊列處理同一個用戶的消息,新架構可以多個隊列并行處理,當一些用戶的消息量突然變大時,也不會立即堵塞其他用戶,而是將壓力均勻分布給了所有隊列。
- 使用主鍵自增列功能后,應用服務器可以直接寫數據到表格存儲,不再需要經過隊列和獲取消息ID,性能表現會更加優秀。
方案實現
本文使用Java SDK實現優化方案。
- 創建數據表 數據表結構的主鍵列部分請參見下表。
主鍵順序 主鍵名稱 主鍵值 說明 1 partition_key hash(receive_id)前4位 分區鍵,保證數據均勻分布,可以使用md5作為hash函數 2 receive_id receive_id 接收方的用戶ID 3 message_id message_id 消息ID 上表中,第三列主鍵(PK)是message_id,這一列是主鍵自增列,建表時指定message_id列的屬性為AUTO_INCREMENT,且類型為INTEGER。private static void createTable(SyncClient client) { TableMeta tableMeta = new TableMeta(“message_table”); //第一列為分區建。 tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("partition_key", PrimaryKeyType.STRING)); //第二列為接收方ID。 tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("receive_id", PrimaryKeyType.STRING)); //第三列為消息ID,自動自增列,類型為INTEGER,屬性為PKO_AUTO_INCREMENT。 tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("message_id", PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT)); int timeToLive = -1; //永不過期,也可以設置數據有效期,過期了會自動刪除。 int maxVersions = 1; //只保存一個版本,目前支持多版本。 TableOptions tableOptions = new TableOptions(timeToLive, maxVersions); CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions); client.createTable(request); }
完成以上操作后即創建了一個第三列PK為自動自增的表。
- 寫數據您可以使用PutRow和BatchWriteRow接口寫數據,這兩個接口都支持主鍵列自增功能。寫數據時,第三列message_id是主鍵自增列,這一列不需要填值,只需要填入占位符即可。
private static void putRow(SyncClient client, String receive_id) { //構造主鍵。 PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); //第一列的值為hash(receive_id)前4位。 primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4))); //第二列的值為接收方ID。 primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id)); //第三列是消息ID,主鍵遞增列,這個值是表格存儲產生的,用戶在這里不需要填入真實值,只需要一個占位符AUTO_INCREMENT即可。 primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.AUTO_INCREMENT); PrimaryKey primaryKey = primaryKeyBuilder.build(); RowPutChange rowPutChange = new RowPutChange("message_table", primaryKey); //此處設置返回類型為RT_PK,表示是在返回結果中包含PK列的值。如果不設置ReturnType,默認不返回。 rowPutChange.setReturnType(ReturnType.RT_PK); //加入屬性列,消息內容。 rowPutChange.addColumn(new Column("content", ColumnValue.fromString(content))); //寫數據到表格存儲。 PutRowResponse response = client.putRow(new PutRowRequest(rowPutChange)); //打印出返回的PK列。 Row returnRow = response.getRow(); if (returnRow != null) { System.out.println("PrimaryKey:" + returnRow.getPrimaryKey().toString()); } //打印出消耗的CU。 CapacityUnit cu = response.getConsumedCapacity().getCapacityUnit(); System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit()); System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit()); }
- 讀數據您可以通過GetRange接口讀取最近的消息。message_id這一列PK的起始位置是上一條消息的message_id+1,結束位置是INF_MAX,這樣每次都可以讀出最新的消息,然后發送給客戶端。
private static void getRange(SyncClient client, String receive_id, String lastMessageId) { RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(“message_table”); //設置起始主鍵。 PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); //第一列的值為hash(receive_id)前4位。 primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4))); //第二列的值為接收方ID。 primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id)); //第三列的值為消息ID,起始于上一條消息。 primaryKeyBuilder.addPrimaryKeyColumn(“message_id”, PrimaryKeyValue.fromLong(lastMessageId + 1)); rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build()); //設置結束主鍵。 primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); //第一列的值為hash(receive_id)前4位。 primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4))); //第二列的值為接收方ID。 primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id)); //第三列的值為消息ID。 primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.INF_MAX); rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build()); rangeRowQueryCriteria.setMaxVersions(1); System.out.println("GetRange的結果為:"); while (true) { GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria)); for (Row row : getRangeResponse.getRows()) { System.out.println(row); } //如果nextStartPrimaryKey不為null,則繼續讀取。 if (getRangeResponse.getNextStartPrimaryKey() != null) { rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey()); } else { break; } } }
技術支持
表格存儲為您提供專業的免費的技術咨詢服務,歡迎通過釘釘加入相應交流群。
為互聯網應用、大數據、社交應用等開發者提供的最新技術交流群有36165029092(
表格存儲技術交流群-3
)。說明表格存儲用戶群11789671(
表格存儲技術交流群
)和23307953(表格存儲技術交流群-2
)已滿,暫時無法加入。為物聯網和時序模型開發者提供的技術交流群有44327024(
物聯網存儲 IoTstore 開發者交流群
)。