本文介紹如何應用事件總線EventBridge的事件流功能實現云消息隊列 RocketMQ 版的消息路由。
前提條件
您已購買并部署云消息隊列 RocketMQ 版實例,且實例處于服務中狀態。具體步驟,請參見創建實例。
背景信息
事件流作為更輕量、實時端到端的流式事件通道,提供輕量流式數據的過濾和轉換的能力,在不同的數據倉庫之間、數據處理程序之間、數據分析和處理系統之間進行數據同步。源端云消息隊列 RocketMQ 版生產的消息可以通過事件流這個通道被路由到目標端的云消息隊列 RocketMQ 版,無需定義事件總線。更多信息,請參見事件流概述。
步驟一:在目標端創建事件流
事件流需要在目標端創建,例如如果需要把華北2(北京)的RocketMQ消息路由到華東1(杭州),那么需要在華東1(杭州)創建事件流任務。
- 登錄事件總線EventBridge控制臺。
- 在頂部菜單欄,選擇地域。
- 在左側導航欄,單擊事件流。
在事件流頁面,單擊創建事件流。
在創建事件流面板,設置任務名稱和描述,配置以下參數,然后單擊保存。
任務創建
在Source(源)配置向導,選擇數據提供方為消息隊列 RocketMQ 版,設置以下參數,然后單擊下一步。
參數
說明
示例
地域
選擇消息隊列RocketMQ版源實例所在的地域。
華東1(杭州)
版本
選擇RocketMQ實例版本。
RocketMQ 4.x
RocketMQ 實例
選擇生產消息隊列RocketMQ版消息的源實例。
MQ_INST_115964845466****_ByBehioo
Topic
選擇生產消息隊列RocketMQ版消息的Topic。
topic
Tag
配置源實例中用于過濾消息的Tag。
test
Group ID
選擇源實例中的消費組名稱。請使用獨立的消費組來創建事件源,不要和已有的業務混用消費組,以免影響已有的消息收發。
GID_http_1
消費位點
選擇開始消費消息的位點。
最新位點
數據格式
數據格式是針對支持二進制傳遞的數據源端推出的指定內容格式的編碼能力。針對消息路由場景,這里配置參數為Binary。
Binary
批量推送
批量推送可幫您批量聚合多個事件,當批量推送條數和批量推送間隔(單位:秒)兩者條件達到其一時即會觸發批量推送。
例如:您設置的推送條數為100 條,間隔時間為15 s,在10 s內消息條數已達到100條,那么該次推送則不會等15 s后再推送。
開啟
批量推送條數
調用函數發送的最大批量消息條數,當積壓的消息數量到達設定值時才會發送請求,取值范圍為 [1,10000]。
100
批量推送間隔(單位:秒)
調用函數的間隔時間,系統每到間隔時間點會將消息聚合后發給函數計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)、Transform(轉換)配置向導,設置事件過濾、轉換規則,單擊下一步。事件轉換的配置說明,請參見使用函數計算實現消息數據清洗。
在Sink(目標)配置向導,選擇服務類型為消息隊列RocketMQ版,配置以下參數,單擊保存。
參數
說明
示例
版本
選擇云消息隊列 RocketMQ 版實例版本。
RocketMQ 4.x
實例ID
選擇已創建的云消息隊列 RocketMQ 版實例。
test
Topic
選擇已創建的Topic。
test
消息體(body)
事件總線EventBridge通過二進制提取獲取指定的事件中的數據,Base64解碼后路由到事件目標。
二進制提取
$.data.body
自定義屬性(Properties)
選擇模板。您可以自定義一個模板,定義模板里需要的變量,事件總線EventBridge可以提取事件中的字段,按照模板定義的形式進行轉換。
說明如果需要全量傳遞源端的RocketMQ消息的屬性,推薦使用示例中的配置。
變量:
{ "userProperties":"$.data.userProperties", "msgId":"$.data.systemProperties.UNIQ_KEY" }
模板:
{ "EB_SYS_EMBED_OBJECT":"${userProperties}", "UNIQ_KEY":"${msgId}" }
消息索引(Keys)
事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。
部分事件
$.data.systemProperties.KEYS
消息標簽(Tags)
事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。
部分事件
$.data.systemProperties.TAGS
任務屬性
設置事件流的重試策略及死信隊列。更多信息,請參見重試和死信。
返回事件流頁面,找到創建好的事件流,在其右側操作欄,單擊啟用。
在提示對話框,閱讀提示信息,然后單擊確認。
啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。
步驟二:測試驗證
在頂部菜單欄,選擇步驟一:在目標端創建事件流中源實例所在的地域。
在左側導航欄,單擊實例列表。
在實例列表頁面,找到步驟一:在目標端創建事件流中配置的源實例,在其操作列,單擊詳情。
在左側導航欄,單擊Topic 管理。
在Topic列表,單擊步驟一:在目標端創建事件流中配置的源實例的Topic名稱。
在Topic詳情頁面,單擊右上角的快速體驗。
在快速體驗的消息生產和消費面板,選擇發送方式為控制臺,然后配置消息內容、消息 Key和消息 Tag,單擊確定。
消息發送成功后,界面會提示消息發送成功!,并顯示Message ID。
在源實例完成生產消息后,返回實例列表頁面。
在實例列表頁面,找到步驟一:在目標端創建事件流中配置的目標實例,在其操作列,單擊詳情。
在左側導航欄,單擊Topic 管理。
在Topic列表,單擊步驟一:在目標端創建事件流中配置的目標實例的Topic名稱。
在Topic詳情頁面,單擊消息查詢。
配置查詢方式和查詢范圍,單擊查詢。
查看查詢到的Message ID、Tag和Key值是否與生產的消息一致。