云消息隊列 RocketMQ 版提供多種語言的SDK用于收發不同類型的消息,本文以Java SDK為例,說明如何調用SDK連接云消息隊列 RocketMQ 版服務端,完成普通消息的收發流程。
前提條件
您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate為例。
安裝Java依賴庫
在IDEA中創建一個Java工程。
在pom.xml文件中添加以下依賴引入Java依賴庫。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency>
重要如果您使用的實例類型為Serverless,在公網訪問的時候需要注意SDK的版本等信息,詳情請參見Serverless版實例公網訪問版本說明。
生產消息
在已創建的Java工程中,創建發送普通消息程序并運行,示例代碼如下:
package doc;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
* 如果是在阿里云ECS內網訪問,建議填寫VPC接入點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網接入點。使用公網接入點訪問,必須開啟實例的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//消息發送的目標Topic名稱,需要提前在控制臺創建,如果不創建直接使用會返回報錯。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網接入點訪問,configuration還需要設置實例的用戶名和密碼。用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 如果是在阿里云ECS內網訪問,無需填寫該配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
/**
* 初始化Producer時直接配置需要使用的Topic列表(這個參數可以配置多個Topic),實現提前檢查錯誤配置、攔截非法配置啟動。
* 針對非事務消息 Topic,也可以不配置,服務端會動態檢查消息的Topic是否合法。
* 注意!!!事務消息Topic必須提前配置,以免事務消息回查接口失敗,具體原理請參見事務消息。
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//普通消息發送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
//設置消息索引鍵,可根據關鍵字精確查找某條消息。
.setKeys("messageKey")
//設置消息Tag,用于消費端根據指定Tag過濾消息。
.setTag("messageTag")
//消息體。
.setBody("messageBody".getBytes())
.build();
try {
//發送消息,需要關注發送結果,并捕獲失敗等異常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
消費消息
在已創建的Java工程中,創建訂閱普通消息程序并運行。云消息隊列 RocketMQ 版支持SimpleConsumer和PushConsumer兩種消費者類型,您可以選擇任意一種方式訂閱消息,具體的消費者類型的差異如下:
對比項 | PushConsumer | SimpleConsumer |
接口方式 | 使用監聽器回調接口返回消費結果,消費者僅允許在監聽器范圍內處理消費邏輯。 | 業務方自行實現消息處理,并主動調用接口返回消費結果。 |
消費并發度管理 | 由SDK管理消費并發度。 | 由業務方消費邏輯自行管理消費線程。 |
接口靈活度 | 高度封裝,不夠靈活。 | 原子接口,可靈活自定義。 |
適用場景 | 適用于無自定義流程的開發場景。 | 適用于需要高度自定義業務流程的開發場景。 |
PushConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
* 如果是在阿里云ECS內網訪問,建議填寫VPC接入點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網接入點。使用公網接入點訪問,必須開啟實例的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制臺創建,如果不創建直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制臺創建,如果不創建直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網接入點訪問,configuration還需要設置實例的用戶名和密碼。用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 如果是在阿里云ECS內網訪問,無需填寫該配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
//訂閱消息的過濾規則,表示訂閱所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通信參數以及訂閱關系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設置消費者分組。
.setConsumerGroup(consumerGroup)
//設置預綁定的訂閱關系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//設置消費監聽器。
.setMessageListener(messageView -> {
//處理消息并返回消費結果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可關閉該進程。
//pushConsumer.close();
}
}
SimpleConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* 實例接入點,從控制臺實例詳情頁的接入點頁簽中獲取。
* 如果是在阿里云ECS內網訪問,建議填寫VPC接入點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網接入點。使用公網接入點訪問,必須開啟實例的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制臺創建,如果不創建直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制臺創建,如果不創建直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網接入點訪問,configuration還需要設置實例的用戶名和密碼。用戶名和密碼在控制臺訪問控制的智能身份識別頁簽中獲取。
* 如果是在阿里云ECS內網訪問,無需填寫該配置,服務端會根據內網VPC信息智能獲取。
* 如果實例類型為Serverless實例,公網訪問必須設置實例的用戶名密碼,當開啟內網免身份識別時,內網訪問可以不設置用戶名和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
//訂閱消息的過濾規則,表示訂閱所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化SimpleConsumer,需要綁定消費者分組ConsumerGroup、通信參數以及訂閱關系。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設置消費者分組。
.setConsumerGroup(consumerGroup)
//設置長輪詢超時時間。
.setAwaitDuration(awaitDuration)
//設置預綁定的訂閱關系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
//設置本次拉取的最大消息條數。
int maxMessageNum = 16;
//設置消息的不可見時間。
Duration invisibleDuration = Duration.ofSeconds(10);
//SimpleConsumer需要客戶端一直主動循環獲取消息,并進行消費處理。
//如果需要提高消費實時性,建議多線程并發拉取。
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
//消費處理完成后,需要主動調用ACK向服務端提交消費結果。
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// 如果不需要再使用SimpleConsumer,可關閉該進程。
// consumer.close();
}
}
Serverless版實例公網訪問版本說明
Serverless版實例的公網訪問功能,僅部分版本的SDK客戶端支持,具體限制如下:
Java 5.x SDK
Serverless版實例使用公網訪問接入云消息隊列 RocketMQ 版時,需要保證使用的SDK版本滿足以下要求,并在消息收發代碼中補充如下內容:
其中,InstanceId
需要替換為您實際使用的實例ID。
SDK版本:rocketmq-client ≥ 5.2.0
消息發送代碼補充:
producer.setNamespaceV2("InstanceId");
消息消費代碼補充:
consumer.setNamespaceV2("InstanceId");
SDK版本:rocketmq-client-java ≥ 5.0.6
消息發送和消息消費代碼補充:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();
Java Ons 1.x SDK
Serverless版實例使用公網訪問接入云消息隊列 RocketMQ 版時,需要保證使用的Java ONS 1.x SDK版本為1.9.0.Final及以上版本,并在消息收發代碼中補充如下內容:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
其中,InstanceId
需要替換為您實際使用的實例ID。
SDK參數填寫說明
參數 | 示例值 | 描述 |
endpoints | rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080 | 云消息隊列 RocketMQ 版實例的接入點。獲取方式,請參見獲取實例接入點。
|
topic | normal_test | 云消息隊列 RocketMQ 版的Topic,用于指定生產者將消息發送到哪個Topic,或者指定消費者要消費哪個Topic的消息。 Topic需要提前在云消息隊列 RocketMQ 版實例下創建。具體操作,請參見創建Topic。 |
group | GID_test | 云消息隊列 RocketMQ 版的ConsumerGroup,用于指定消費者使用哪個消費者分組消費消息。 Group需要提前在云消息隊列 RocketMQ 版實例下創建。具體操作,請參見創建ConsumerGroup。 |
Instance UserName | 1XVg0hzgKm****** | 云消息隊列 RocketMQ 版實例的用戶名。使用公網訪問時需要填寫,VPC訪問時Serverless實例只有開啟內網免身份識別可以不用填寫,其他實例無需填寫。 獲取方式,請參見獲取實例用戶名密碼。 |
Instance Password | ijSt8rEc45****** | 云消息隊列 RocketMQ 版實例的用戶密碼。使用公網訪問時需要填寫,VPC訪問時Serverless實例只有開啟內網免身份識別可以不用填寫,其他實例無需填寫。 獲取方式,請參見獲取實例用戶名密碼。 |
驗證消息
消息收發完成后,您可以通過控制臺查看消息消費情況。
登錄控制臺,在實例列表頁面單擊目標實例名稱。
在左側導航欄單擊消息軌跡。
SDK參考
本文以Java SDK為例介紹收發普通消息流程,其他語言SDK和其他類型消息的示例代碼,請參見SDK參考概述。