云消息隊列 RocketMQ 版通過消費位點管理消費進度,本文為您介紹云消息隊列 RocketMQ 版的消費進度管理機制。
背景信息
云消息隊列 RocketMQ 版的生產者和消費者在進行消息收發時,必然會涉及以下場景,消息先生產后訂閱或先訂閱后生產。這兩種場景下,消費者客戶端啟動后從哪里開始消費?如何標記已消費的消息?這些都是由云消息隊列 RocketMQ 版的消費進度管理機制來定義的。
通過了解云消息隊列 RocketMQ 版的消費進度管理機制,可以幫助您解答以下問題:
消費者啟動后從哪里開始消費消息?
消費者每次消費成功后如何標記消息狀態,確保下次不會再重復處理該消息?
某消息被指定消費者消費過一次后,如果業務出現異常需要做故障恢復,該消息能否被重新消費?
消費進度原理
消息位點(Offset)
參考云消息隊列 RocketMQ 版主題和隊列的定義,消息是按到達服務端的先后順序存儲在指定主題的多個隊列中,每條消息在隊列中都有一個唯一的Long類型坐標,這個坐標被定義為消息位點。
任意一個消息隊列在邏輯上都是無限存儲,即消息位點會從0到Long.MAX無限增加。通過主題、隊列和位點就可以定位任意一條消息的位置,具體關系如下圖所示:
云消息隊列 RocketMQ 版定義隊列中最早一條消息的位點為最小消息位點(MinOffset);最新一條消息的位點為最大消息位點(MaxOffset)。雖然消息隊列邏輯上是無限存儲,但由于服務端物理節點的存儲空間有限,云消息隊列 RocketMQ 版會滾動刪除隊列中存儲最早的消息。因此,消息的最小消費位點和最大消費位點會一直遞增變化。
消費位點(ConsumerOffset)
云消息隊列 RocketMQ 版領域模型為發布訂閱模式,每個主題的隊列都可以被多個消費者分組訂閱。若某條消息被某個消費者消費后直接被刪除,則其他訂閱了該主題的消費者將無法消費該消息。
因此,云消息隊列 RocketMQ 版通過消費位點管理消息的消費進度。每條消息被某個消費者消費完成后不會立即在隊列中刪除,云消息隊列 RocketMQ 版會基于每個消費者分組維護一份消費記錄,該記錄指定消費者分組消費某一個隊列時,消費過的最新一條消息的位點,即消費位點。
當消費者客戶端離線,又再次重新上線時,會嚴格按照服務端保存的消費進度繼續處理消息。如果服務端保存的歷史位點信息已過期被刪除,此時消費位點向前移動至服務端存儲的最小位點。
消費位點的保存和恢復是基于云消息隊列 RocketMQ 版服務端的存儲實現,和任何消費者無關。因此云消息隊列 RocketMQ 版支持跨消費者的消費進度恢復。
隊列中消息位點MinOffset、MaxOffset和每個消費者分組的消費位點ConsumerOffset的關系如下:
ConsumerOffset≤MaxOffset:
當消費速度和生產速度一致,且全部消息都處理完成時,最大消息位點和消費位點相同,即ConsumerOffset=MaxOffset。
當消費速度較慢小于生產速度時,隊列中會有部分消息未消費,此時消費位點小于最大消息位點,即ConsumerOffset<MaxOffset,兩者之差就是該隊列中堆積的消息量。
ConsumerOffset≥MinOffset:正常情況下有效的消費位點ConsumerOffset必然大于等于最小消息位點MinOffset。消費位點小于最小消息位點時是無效的,相當于消費者要消費的消息已經從隊列中刪除了,是無法消費到的,此時服務端會將消費位點強制糾正到合法的消息位點。
消費位點初始值
消費位點初始值指的是消費者分組首次啟動消費者消費消息時,服務端保存的消費位點的初始值。
云消息隊列 RocketMQ 版定義消費位點的初始值為消費者首次獲取消息時,該時刻隊列中的最大消息位點。相當于消費者將從隊列中最新的消息開始消費。
重置消費位點
若消費者分組的初始消費位點或當前消費位點不符合您的業務預期,您可以通過重置消費位點調整您的消費進度。
適用場景
初始消費位點不符合需求:因初始消費位點為當前隊列的最大消息位點,即客戶端會直接從最新消息開始消費。若業務上線時需要消費部分歷史消息,您可以通過重置消費位點功能消費到指定時刻前的消息。
消費堆積快速清理:當下游消費系統性能不足或消費速度小于生產速度時,會產生大量堆積消息。若這部分堆積消息可以丟棄,您可以通過重置消費位點快速將消費位點更新到指定位置,繞過這部分堆積的消息,減少下游處理壓力。
業務回溯,糾正處理:由于業務消費邏輯出現異常,消息被錯誤處理。若您希望重新消費這些已被處理的消息,可以通過重置消費位點快速將消費位點更新到歷史指定位置,實現消費回溯。
重置功能
云消息隊列 RocketMQ 版的重置消費位點提供以下能力:
從最新位點開始消費
Group ID消費指定Topic中的消息時,會跳過當前堆積(未被消費)的所有消息,從重置操作時間后發送的最新消息開始消費。
從指定時間的位點開始消費:
消費者將從重置時間對應的消費位點之后的消息進行消費,不管這些消息是否已被消費過。
可選時間范圍中的起始和終止時間分別是該Topic中儲存的最早的和最晚的一條消息的生產時間,不能選擇超過可選時間范圍的時間點。
重置到某一時刻對應的消費位點,匹配位點時,服務端會根據自動匹配到該時刻最接近的消費位點。
設置方式
控制臺操作入口:
登錄云消息隊列 RocketMQ 版控制臺并在左側導航欄選擇實例列表。
在實例列表中選擇指定實例進入實例詳情頁面,然后在左側導航欄選擇Group 管理。
在Group列表中選擇指定Group進入Group 詳情頁面進行重置消費位點操作。
OpenAPI接口:ResetConsumeOffset - 重置消費位點
使用限制
版本兼容性
關于消費者分組的消費位點初始值,不同的服務端版本中定義如下:
服務端歷史版本(4.x/3.x版本):消息位點初始值受當前隊列消息狀態的影響。
服務端5.x版本:明確定義消費位點初始值為消費者獲取消息時刻隊列中的最大消息位點。
因此,若您將服務端版本從歷史版本升級到最新的5.x版本時,需要自行對消費者首次啟動時的情況做兼容性判斷。
使用建議
嚴格控制消費位點重置的權限
重置消費位點會給系統帶來額外處理壓力,可能會影響新消息的讀寫性能。 因此該操作請在適用場景下謹慎執行,并提前做好合理性和必要性評估。