本文介紹云消息隊列 RocketMQ 版中消費者(Consumer)的定義、模型關系、內部屬性、行為約束、版本兼容性及使用建議。

定義

消費者是云消息隊列 RocketMQ 版中用來接收并處理消息的運行實體。

消費者通常被集成在業務系統中,從云消息隊列 RocketMQ 版服務端獲取消息,并將消息轉化成業務可理解的信息,供業務邏輯處理。

在消息消費端,可以定義如下傳輸行為:
  • 消費者身份:消費者必須關聯一個指定的消費者分組,以獲取分組內統一定義的行為配置和消費狀態。
  • 消費者類型:云消息隊列 RocketMQ 版面向不同的開發場景提供了多樣的消費者類型,包括PushConsumer類型、SimpleConsumer類型等。具體信息,請參見消費者分類
  • 消費者本地運行配置:消費者根據不同的消費者類型,控制消費者客戶端本地的運行配置。例如消費者客戶端的線程數,消費并發度等,實現不同的傳輸效果。

模型關系

云消息隊列 RocketMQ 版的領域模型中,消費者的位置和流程如下:消費者
  1. 消息由生產者初始化并發送到云消息隊列 RocketMQ 版服務端。
  2. 消息按照到達云消息隊列 RocketMQ 版服務端的順序存儲到主題的指定隊列中。
  3. 消費者按照指定的訂閱關系從云消息隊列 RocketMQ 版服務端中獲取消息并消費。

內部屬性

消費者分組名稱
  • 定義:當前消費者關聯的消費者分組名稱,消費者必須關聯到指定的消費者分組,通過消費者分組獲取消費行為。更多信息,請參見消費者分組(ConsumerGroup)
  • 取值:消費者分組為云消息隊列 RocketMQ 版的邏輯資源,需要您提前通過控制臺或OpenAPI創建。具體命名格式,請參見使用限制
客戶端ID
  • 定義:消費者客戶端的標識,用于區分不同的消費者。集群內全局唯一。
  • 取值:客戶端ID由云消息隊列 RocketMQ 版的SDK自動生成,主要用于日志查看、問題定位等運維場景,不支持修改。
通信參數
  • 接入點信息(必選):連接服務端的接入地址,用于識別服務端集群。

    接入點必須按格式配置,建議使用域名,避免使用IP地址,防止節點變更無法進行熱點遷移。

  • 身份認證信息(可選):客戶端用于身份驗證的憑證信息。

    僅在服務端開啟身份識別和認證時需要傳輸。

  • 請求超時時間(可選):客戶端網絡請求調用的超時時間。取值范圍和默認值,請參見參數限制
預綁定訂閱關系列表
  • 定義:指定消費者的訂閱關系列表。

    云消息隊列 RocketMQ 版服務端可在消費者初始化階段,根據預綁定的訂閱關系列表對目標主題進行權限及合法性校驗,無需等到應用啟動后才能校驗。

  • 取值:建議在消費者初始化階段明確訂閱關系即要訂閱的主題列表,若未設置,或訂閱的主題動態變更,云消息隊列 RocketMQ 版會對目標主題進行動態補充校驗。
消費監聽器
  • 定義:云消息隊列 RocketMQ 版服務端將消息推送給消費者后,消費者調用消息消費邏輯的監聽器。
  • 取值:由消費者客戶端本地配置。
  • 約束:使用PushConsumer類型的消費者消費消息時,消費者客戶端必須設置消費監聽器。消費者類型的具體信息,請參見消費者分類

行為約束

云消息隊列 RocketMQ 版領域模型中,消費者的管理通過消費者分組實現,同一分組內的消費者共同分攤消息進行消費。因此,為了保證分組內消息的正常負載和消費,云消息隊列 RocketMQ 版要求同一分組下的所有消費者以下消費行為保持一致:
  • 投遞順序
  • 消費重試策略

版本兼容性

如行為約束中所述,同一分組內所有消費者的投遞順序和消費重試策略需要保持一致。
  • 云消息隊列 RocketMQ 版服務端5.x版本:上述消費者的消費行為從關聯的消費者分組中統一獲取,因此,同一分組內所有消費者的消費行為必然是一致的,客戶端無需關注。
  • 云消息隊列 RocketMQ 版服務端3.x/4.x歷史版本:上述消費邏輯由消費者客戶端接口定義,因此,您需要自己在消費者客戶端設置時保證同一分組下的消費者的消費行為一致。

若您使用云消息隊列 RocketMQ 版服務端5.x版本,客戶端使用歷史版本SDK,則消費者的消費邏輯以消費者客戶端接口的設置為準。

使用建議

不建議在單一進程內創建大量消費者

云消息隊列 RocketMQ 版的消費者在通信協議層面支持非阻塞傳輸模式,網絡通信效率較高,并且支持多線程并發訪問。因此,大部分場景下,單一進程內同一個消費分組只需要初始化唯一的一個消費者即可,開發過程中應避免以相同的配置初始化多個消費者。

不建議頻繁創建和銷毀消費者

云消息隊列 RocketMQ 版的消費者是可以重復利用的底層資源,類似數據庫的連接池。因此不需要在每次接收消息時動態創建消費者,且在消費完成后銷毀消費者。這樣頻繁地創建銷毀會在服務端產生大量短連接請求,嚴重影響系統性能。

  • 正確示例
    Consumer c = ConsumerBuilder.build();
      for (int i =0;i<n;i++)
        {
          Message m= c.receive();
          //process message
        }
    c.shutdown();
  • 典型錯誤示例
    for (int i =0;i<n;i++)
      {
        Consumer c = ConsumerBuilder.build();
        Message m= c.receive();
        //process message
        c.shutdown();
      }