使用Java SDK收發消息
本文為您介紹快速使用云消息隊列 RabbitMQ 版進行消息收發的操作流程。
操作流程
步驟一:(可選)RAM用戶授權
RAM用戶默認沒有云消息隊列 RabbitMQ 版資源的操作權限。如果您的賬號為RAM用戶,必須先為RAM用戶進行授權,若您的賬號為阿里云賬號,則默認擁有云消息隊列 RabbitMQ 版服務的所有權限,無需進行授權操作。
如果您需要為RAM用戶授權,具體操作,請參見為RAM用戶授權。
步驟二:創建資源
創建實例
實例是一個獨立的云消息隊列 RabbitMQ 版資源實體,包含Vhost、Exchange、Queue等基本的資源要素。
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在頂部菜單欄選擇地域,然后在實例列表頁面,單擊創建實例。
在購買頁完成基本配置并勾選服務協議,然后單擊立即購買。
根據提示完成支付。
在實例列表頁面的頂部菜單欄,選擇地域,您可以看到創建的實例。
說明專業版實例和企業版實例購買后,立即進入服務中狀態。
鉑金版實例購買后,首先進入部署中狀態,待集群分配后,再進入服務中狀態。
獲取實例接入點
在收發消息時,您需要為發布端和訂閱端配置該接入點,客戶端通過接入點接入云消息隊列 RabbitMQ 版實例。
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。
在實例詳情頁面的接入點信息頁簽,將鼠標指針移動到目標類型的接入點,單擊該接入點右側的圖標,復制該接入點。
類型
說明
示例值
公網接入點
公網環境可讀寫。按量付費實例默認支持,預付費實例需在購買時選擇才支持。
XXX.net.mq.amqp.aliyuncs.com
VPC接入點
VPC環境可讀寫。按量付費實例和預付費實例默認都支持。
XXX.vpc.mq.amqp.aliyuncs.com
創建Vhost
Vhost是指虛擬主機,用作邏輯隔離,分別管理各自的Exchange、Queue和Binding,使得應用安全地運行在不同的Vhost上,相互之間不會干擾。一個實例下可以有多個Vhost,一個Vhost里面可以有若干個Exchange和Queue。Producer和Consumer連接云消息隊列 RabbitMQ 版需要指定一個Vhost。
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。
在左側導航欄,單擊Vhost 列表。
在Vhost 列表頁面,單擊創建 Vhost。
在創建 Vhost面板的Vhost名稱文本框,輸入Vhost名稱,然后單擊確定。
創建Exchange
Producer將消息發送到Exchange,Exchange根據Routing Key將消息路由到一個或多個Queue中(或者丟棄)。不同類型的Exchange的路由規則不同。更多信息,請參見Exchange。
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。
在左側導航欄,單擊Exchange 列表。
在Exchange 列表頁面,在當前 Vhost右側的切換下拉列表中,選擇Vhost,然后單擊創建 Exchange。
在創建 Exchange面板,輸入Exchange名稱,選擇Exchange類型,設置是否為Internal類型,然后單擊確定。
參數
描述
Exchange 名稱
Exchange名稱。以amq.開頭的為保留字段,因此不能使用。例如:amq.test。
類型
Exchange類型。取值:
direct:該類型的路由規則會將消息路由到Routing Key完全匹配的Queue中。
topic:該類型與direct類型相似。Topic Exchange路由規則沒有Direct Exchange那么嚴格,支持模糊匹配和多條件匹配,即該類型Exchange使用Routing Key模式匹配和字符串比較的方式將消息路由至綁定的Queue中。
fanout:該類型的路由規則非常簡單,會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中,相當于廣播功能。
headers:該類型與direct類型相似。Headers Exchange使用Headers屬性代替Routing Key進行路由匹配,在綁定Headers Exchange和Queue時,設置綁定屬性的鍵值對;在向Headers Exchange發送消息時,設置消息的Headers屬性鍵值對,使用消息Headers屬性鍵值對和綁定屬性鍵值對比較的方式將消息路由至綁定的Queue。
x-delayed-message:通過聲明該類Exchange,您可以自定義消息的Header屬性x-delay來指定消息延時投遞的時間段,單位為毫秒。消息將在x-delay中定義的時間段后,根據路由規則被投遞到對應的Queue。路由規則取決于x-delayed-type中指定的Exchange路由類型。
x-consistent-hash:x-consistent-hash Exchange支持將Routing Key或Header值進行Hash計算,使用一致性哈希算法將消息路由到不同的Queue上。
x-delayed-type
當Exchange類型為x-delayed-message時,需要配置此參數,以指定Exchange的路由類型。
哈希取值
當Exchange類型為x-consistent-hash時,需要配置此參數,以指定Hash計算的輸入值為哪種類型。取值如下:
RoutingKey
Header 值:使用Header方式作為Hash計算輸入值時,您需要定義hash-header參數的取值。
hash-header
當Exchange類型為x-consistent-hash且哈希取值為Header 值時,需要配置此參數,作為指定Hash計算的輸入值。
Internal
是否為Internal類型,默認值為否。取值:
是:內建類型,用于Exchange和Exchange之間的綁定。
否:非內建類型,用于Exchange和Queue之間的綁定。
創建Queue
Queue是指隊列,云消息隊列 RabbitMQ 版的消息都會被投入到一個或多個Queue中。
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。
在左側導航欄,單擊Queue 列表。
在Queue 列表頁面,在當前 Vhost右側的切換下拉列表中,選擇Vhost,單擊創建 Queue。
在創建 Queue面板,在Queue 名稱文本框輸入Queue的名稱,選擇是否為Auto Delete類型,單擊高級選項,設置Queue的參數,然后單擊確定。
參數
描述
說明
Queue 名稱
Queue的名稱
只能包含字母、數字、短劃線(-)、下劃線(_)、半角句號(.)、井號(#)、正斜線(/)、at符號(@)。
長度限制在1~255字符。
創建后無法修改,只能刪除重建。
以amq.開頭的為保留字段,因此不能使用。例如:amq.test。
Auto Delete
最后一個Consumer取消訂閱后,Queue是否自動刪除。
true:在訂閱該Queue消息的最后一個Consumer取消訂閱該Queue的消息后,自動刪除該Queue。
false:在訂閱該Queue消息的最后一個Consumer取消訂閱該Queue的消息后,不自動刪除該Queue。
高級選項
Queue的參數設置,可用于設置死信Exchange、死信Routing Key和消息存活時間。
DeadLetterExchange:指定死信消息發送的目標Exchange。
DeadLetterRoutingKey:指定死信消息的Routing Key,即死信Exchange會將消息發送至匹配該死信Routing Key所對應的Queue。
MessageTTL:消息存活時間,單位毫秒(ms)。在指定時間內未被成功消費的消息會變成死信消息,該消息將會被發送到死信Exchange。更多信息,請參見消息存活時間。
創建綁定關系
將Queue和指定的Exchange進行綁定,消息將按照對應的轉發規則從Exchange轉發到Queue中。
在Queue 列表頁面,選擇指定Queue,在其操作列單擊詳情。
在Queue 詳情頁面單擊被綁定信息頁簽,單擊添加被綁定。
在添加被綁定面板,選擇源Exchange,在Routing Key文本框輸入Routing Key,然后單擊確定。
說明若被綁定的Exchange的類型為x-consistent-hash時,Routing Key表示綁定的Queue的權重,只能設置為整數,取值范圍為[1~20]。
創建用戶名密碼
開源客戶端訪問云消息隊列 RabbitMQ 版服務端時,需要傳入用戶名和密碼進行權限認證,認證通過才允許訪問服務端。
登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表。
在實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。
在左側導航欄,單擊靜態用戶名密碼。
在靜態用戶名密碼頁面,單擊創建用戶名密碼。
在創建用戶名密碼面板,輸入AccessKey ID和AccessKey Secret,然后單擊確定。
說明AccessKey ID和AccessKey Secret需要在阿里云RAM控制臺獲取,具體獲取方式,請參見創建AccessKey。
靜態用戶名密碼頁面,顯示創建的靜態用戶名與密碼,密碼處于隱藏狀態。
在創建的靜態用戶名密碼的密碼列,單擊顯示密碼,可查看用戶名的密碼。
步驟三:調用SDK收發消息
安裝Java依賴庫
在IDEA中創建一個Java工程。
在pom.xml文件中添加以下依賴引入Java依賴庫。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- 支持開源所有版本 --> </dependency>
生產消息
在已創建的Java工程中,創建消息發送程序,按照SDK參數填寫說明配置相關參數并運行。
示例代碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class ProducerTest {
public static void main(String[] args) throws IOException, TimeoutException {
//設置實例的接入點。
String hostName = "xxx.xxx.aliyuncs.com";
//設置實例的靜態用戶名密碼。
String userName = "${UserName}";
String passWord = "${PassWord}";
//設置實例的Vhost。
String virtualHost = "${VirtualHost}";
//在生產環境中,建議提前創建好Connection,并在需要時重復使用,避免頻繁創建和關閉Connection,以提高性能、復用連接資源,以及保證系統的穩定性。
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
Channel channel = connection.createChannel();
//設置Exchange、Queue和綁定關系。
String exchangeName = "${ExchangeName}";
String queueName = "${QueueName}";
String bindingKey = "${BindingKey}";
//設置Exchange類型。
String exchangeType = "${ExchangeType}";
//此處為了體驗流暢,確保了Exchange和Queue的創建過程。
//在生產環境中,建議在控制臺提前創建,盡量避免在代碼中直接聲明,否則可能觸發單API調用的限流。
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, bindingKey);
//開始發送消息。
for (int i = 0; i < 10; i++ ) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish(exchangeName, bindingKey, true, props,
("消息發送示例Body-" + i).getBytes(StandardCharsets.UTF_8));
System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
}
channel.close();
connection.close();
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
//默認端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
//基于網絡環境合理設置超時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
}
}
云消息隊列 RabbitMQ 版會對單實例的TPS流量峰值進行限流,更多信息,請參見實例限流最佳實踐。
訂閱消息
在已創建的Java工程中,創建消息訂閱程序,按照SDK參數填寫說明配置相關參數并運行。
示例代碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
//設置實例的接入點。
String hostName = "xxx.xxx.aliyuncs.com";
//設置實例的靜態用戶名密碼。
String userName = "${UserName}";
String passWord = "${PassWord}";
//設置實例的Vhost。
String virtualHost = "${VirtualHost}";
Connection connection = createConnection(hostName, userName, passWord, virtualHost);
final Channel channel = connection.createChannel();
//聲明Queue。
String queueName = "${QueueName}";
//創建${QueueName} ,該Queue必須在云消息隊列RabbitMQ版控制臺上已存在。
AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
//開始消費消息。
channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,進行業務邏輯處理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(passWord);
//設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// 默認端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
factory.setConnectionTimeout(300 * 1000);
factory.setHandshakeTimeout(300 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
return connection;
};
}
SDK參數填寫說明
參數 | 示例值 | 描述 |
hostName | XXX.net.mq.amqp.aliyuncs.com | 云消息隊列 RabbitMQ 版實例接入點。獲取方式,請參見步驟二:創建資源。 |
Port | 5672 | 默認端口。非加密端口為5672,加密端口為5671。 |
userName | MjoxODgwNzcwODY5MD**** | 客戶端接入云消息隊列 RabbitMQ 版服務端用于權限認證的靜態用戶名。 需要提前在云消息隊列 RabbitMQ 版控制臺創建。 具體操作,請參見步驟二:創建資源。 |
passWord | NDAxREVDQzI2MjA0OT**** | 客戶端接入云消息隊列 RabbitMQ 版服務端用于權限認證的靜態用戶密碼。 需要提前在云消息隊列 RabbitMQ 版控制臺創建。 具體操作,請參見步驟二:創建資源。 |
virtualHost | amqp_vhost | 云消息隊列 RabbitMQ 版實例的Vhost。需要提前在云消息隊列 RabbitMQ 版控制臺創建。 具體操作,請參見步驟二:創建資源。 |
exchangeName | ExchangeTest | 云消息隊列 RabbitMQ 版的Exchange。 需要提前在云消息隊列 RabbitMQ 版控制臺創建。 具體操作,請參見步驟二:創建資源。 |
queueName | QueueTest | 云消息隊列 RabbitMQ 版的Queue。 需要提前在云消息隊列 RabbitMQ 版控制臺創建。 具體操作,請參見步驟二:創建資源。 |
bindingKey | BindingKeyTest | 云消息隊列 RabbitMQ 版Exchange與Queue綁定的Binding Key。 需要提前在云消息隊列 RabbitMQ 版控制臺創建綁定關系。 具體操作,請參見步驟二:創建資源。 |
exchangeType | topic | Exchange的類型。云消息隊列 RabbitMQ 版支持的類型如下,更多信息,請參見Exchange。
重要 請確保填寫的Exchange類型和您創建Exchange時選擇的類型一致。 |
步驟四:配置監控告警
當您需要監控云消息隊列 RabbitMQ 版的使用情況時,可以使用云監控創建報警規則。如果資源的監控指標達到報警條件,云監控自動發送報警通知,幫助您及時得知異常監控數據,并快速處理。具體操作請參見監控指標。如果您需要查看基于阿里云ARMS Prometheus監控服務和Grafana的指標信息,請參見Dashboard。