本文介紹如何應用事件總線EventBridge的事件流功能實現云消息隊列 RabbitMQ 版的消息路由。
前提條件
您已購買并部署云消息隊列 RabbitMQ 版實例,且實例處于服務中狀態。具體步驟,請參見創建資源。
背景信息
事件流作為更輕量、實時端到端的流式事件通道,提供輕量流式數據的過濾和轉換的能力,在不同的數據倉庫之間、數據處理程序之間、數據分析和處理系統之間進行數據同步。源端云消息隊列 RabbitMQ 版生產的消息可以通過事件流這個通道被路由到目標端的云消息隊列 RabbitMQ 版,無需定義事件總線。更多信息,請參見事件流概述。
步驟一:在目標端創建事件流
事件總線EventBridge暫不支持跨地域創建云消息隊列 RabbitMQ 版的事件流。
- 登錄事件總線EventBridge控制臺。
- 在頂部菜單欄,選擇地域。
- 在左側導航欄,單擊事件流。
在事件流頁面,單擊創建事件流。
在創建事件流面板,設置任務名稱和描述,配置以下參數,然后單擊保存。
任務創建
在Source(源)配置向導,選擇數據提供方為消息隊列 RabbitMQ 版,設置以下參數,然后單擊下一步。
參數
說明
示例
地域
選擇云消息隊列 RabbitMQ 版源實例所在的地域。
華東1(杭州)
RabbitMQ 實例
選擇生產云消息隊列 RabbitMQ 版消息的源實例。
amqp-cn-7pp2mwbc****
Vhost
選擇源實例中的Vhost。
test
Queue
選擇存儲消息的隊列。
test
批量推送
批量推送可幫您批量聚合多個事件,當批量推送條數和批量推送間隔(單位:秒)兩者條件達到其一時即會觸發批量推送。
例如:您設置的推送條數為100 條,間隔時間為15 s,在10 s內消息條數已達到100條,那么該次推送則不會等15 s后再推送。
開啟
批量推送條數
調用函數發送的最大批量消息條數,當積壓的消息數量到達設定值時才會發送請求,取值范圍為 [1,10000]。
100
批量推送間隔(單位:秒)
調用函數的間隔時間,系統每到間隔時間點會將消息聚合后發給函數計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)、Transform(轉換)配置向導,設置事件過濾、轉換規則,單擊下一步。事件轉換的配置說明,請參見使用函數計算實現消息數據清洗。
在Sink(目標)配置向導,選擇服務類型為消息隊列RabbitMQ版,配置以下參數,單擊保存。
Exchange:生產者將消息發送到Exchange,由Exchange將消息路由到一個或多個Queue中。
Queue:每個消息都會被投入到一個或多個Queue里。
參數
說明
示例
實例ID
選擇已創建的云消息隊列 RabbitMQ 版實例。
amqp-cn-zvp2pny6****
Vhost
選擇已創建的Vhost。
test
目標類型
Queue 模式
Exchange
當目標類型為Exchange時,選擇云消息隊列 RabbitMQ 版中的Exchange。
exchange
Queue
當目標類型為Queue時,選擇云消息隊列 RabbitMQ 版中的選擇接收消息的隊列。
queue
消息路由規則(Routing Key)
事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。當目標類型為Exchange時需要配置。
部分事件
$.data.key
消息體(body)
事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。
部分事件
$.data.body
MessageId
事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。
部分事件
$.data.props.messageId
自定義屬性(Properties)
事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。
部分事件
任務屬性
設置事件流的重試策略及死信隊列。更多信息,請參見重試和死信。
返回事件流頁面,找到創建好的事件流,在其右側操作欄,單擊啟用。
在提示對話框,閱讀提示信息,然后單擊確認。
啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。
步驟二:調用SDK發送消息
獲取接入點。您需要在云消息隊列 RabbitMQ 版控制臺獲取實例的接入點。在發送消息時,您需要為發布端配置該接入點,通過接入點接入云消息隊列 RabbitMQ 版實例。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在實例詳情頁面的接入點信息頁簽,將鼠標指針移動到目標類型的接入點,單擊該接入點右側的圖標,復制該接入點。
類型
說明
示例值
公網接入點
公網環境可讀寫。按量付費實例默認支持,預付費實例需在購買時選擇才支持。
XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com
VPC接入點
VPC環境可讀寫。按量付費實例和預付費實例默認都支持。
XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com
安裝Java依賴庫。在pom.xml添加以下依賴。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持開源所有版本 --> </dependency>
生成用戶名密碼。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊靜態用戶名密碼。
在靜態用戶名密碼頁面,單擊創建用戶名密碼。
在創建用戶名密碼面板,輸入AccessKey ID,輸入AccessKey Secret,單擊確定。
靜態用戶名密碼頁面,顯示創建的靜態用戶名與密碼,密碼處于隱藏狀態。
在創建的靜態用戶名密碼的密碼列,單擊顯示密碼,可查看用戶名的密碼。
生產消息。創建并編譯運行Qava。
重要編譯運行ProducerTest.java生產消息之前,您需要根據代碼提示信息配置參數列表中所列舉的參數。
表 3. 參數列表 參數
示例值
描述
hostName
1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com
云消息隊列 RabbitMQ 版實例接入點。
Port
5672
默認端口。非加密端口為5672,加密端口為5671。
userName
MjoxODgwNzcwODY5MD****
在云消息隊列 RabbitMQ 版控制臺將阿里云賬號或RAM用戶的AccessKey ID、AccessKey Secret和云消息隊列 RabbitMQ 版實例ID通過Base64編碼后生成的靜態用戶名。您可以在云消息隊列 RabbitMQ 版控制臺的靜態用戶名密碼頁面獲取。
passWord
NDAxREVDQzI2MjA0OT****
在云消息隊列 RabbitMQ 版控制臺將阿里云賬號或RAM用戶的AccessKey Secret和timestamp參數(系統當前時間)通過HMAC-SHA1生成一個簽名后,再將這個簽名和timestamp參數(系統當前時間)通過Base64編碼后生成的靜態密碼。您可以在云消息隊列 RabbitMQ 版控制臺的靜態用戶名密碼獲取。
virtualHost
Test
云消息隊列 RabbitMQ 版實例的Vhost。您可以在云消息隊列 RabbitMQ 版控制臺的Vhost 詳情頁面查看。如何查看Vhost,請參見查看Vhost連接詳情。
ExchangeName
ExchangeTest
云消息隊列 RabbitMQ 版的Exchange。您可以在云消息隊列 RabbitMQ 版控制臺的Exchange 列表頁面,結合實例ID與Vhost模糊搜索已創建的Exchange。
BindingKey
BindingKeyTest
云消息隊列 RabbitMQ 版Exchange與Queue的Binding Key。您可以在云消息隊列 RabbitMQ 版控制臺的Exchange 列表頁面查看Exchange的綁定關系,獲取Binding Key。
QueueName
QueueTest
云消息隊列 RabbitMQ 版的Queue。僅在訂閱消息時候需要配置,您可以在云消息隊列 RabbitMQ 版控制臺的Exchange 列表頁面,查看Exchange的綁定關系,獲取Exchange綁定的Queue。
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; import java.util.HashMap; import java.util.UUID; public class ProducerTest { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 設置接入點,在消息隊列RabbitMQ版控制臺實例詳情頁面查看。 factory.setHost("xxx.xxx.aliyuncs.com"); // 用戶名,在消息隊列RabbitMQ版控制臺靜態用戶名密碼頁面查看。 factory.setUsername("${UserName}"); // 密碼,在消息隊列RabbitMQ版控制臺靜態用戶名密碼頁面查看。 factory.setPassword("${PassWord}"); //設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); // 設置Vhost名稱,請確保已在消息隊列RabbitMQ版控制臺上創建完成。 factory.setVirtualHost("${VhostName}"); // 默認端口,非加密端口5672,加密端口5671。 factory.setPort(5672); // 基于網絡環境合理設置超時時間。 factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null); channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>()); channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}"); // 開始發送消息。 for (int i = 0; i < 100; i++ ) { // ${ExchangeName}必須在消息隊列RabbitMQ版控制臺上已存在,并且Exchange的類型與控制臺上的類型一致。 // BindingKey根據業務需求填入相應的BindingKey。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props, ("消息發送Body" + i).getBytes(StandardCharsets.UTF_8)); } connection.close(); } }
步驟三:驗證事件流
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。
在實例列表頁面,單擊步驟一:在目標端創建事件流中配置的目標實例名稱。
在實例詳情頁面的基本信息區域,單擊消息查詢。
設置查詢方式為按 Message ID 查詢或按 Queue 查詢,然后設置時間范圍,單擊查詢。
查看查詢到的消息內容是否與步驟二:調用SDK發送消息中發送的消息一致。