本文將引導您快速體驗 SOFAStack 消息隊列,從創建資源、配置接入點到使用 SDK 收發消息。
具體操作步驟如下:
創建資源
注意事項
在使用 SOFAStack 消息隊列時,請注意以下網絡訪問限制:
Topic 和 Group ID 需創建在同一個地域(Region)下的同一個工作空間中才能互通。例如,當某 Topic 創建在華東 1(杭州)下的工作空間 A 中,那么該 Topic 只能被在華東 1(杭州)下的工作空間 A 中創建的 Group ID 對應的生產端和消費端訪問。
目前不支持公網訪問,生產端和消費端需要部署在相同地域的 ECS 上,或者保證網絡聯通。
創建工作空間
要使用消息隊列,您需要確保 SOFAStack 控制臺已創建至少一個工作空間。如 SOFAStack 未創建工作空間或您需要創建一個新的工作空間,可參見 添加工作空間。創建好工作空間后,將為您自動創建一個消息隊列實例。
創建 Topic
Topic 是消息隊列里對消息的一級歸類。消息生產者將消息發送到一個 Topic,而消息消費者則通過訂閱該 Topic 來獲取和消費消息。
登錄 SOFAStack 控制臺。
在左側導航欄,選擇 中間件 > 消息隊列 > Topic 管理。
單擊 創建 Topic,然后在 創建 Topic 對話框配置 Topic 信息:
參數
是否必填
說明
Topic
必填
Topic 格式要求如下:
Topic 只能包含英文、數字、 短橫線(-)和下劃線(_),其中英文和數字必須要有一種, 短橫線(-)和下劃線(_)可選。
長度需控制在 3~64 個字符之間。
命名不能以“CID”和“GID”開頭。
消息類型
必填
支持的消息類型有普通消息、分區順序消息、事務消息和定時消息。詳細消息類型的說明,可參見 消息類型。
描述
選填
對該 Topic 的備注信息,長度限制在 256 個字符以內。
單擊 確定。
創建 Group ID
創建完 Topic 后,您需要為消息的消費者(或生產者)創建客戶端 ID ,即 Group ID 作為標識。
Group ID 和 Topic 的關系是 N:N,即一個消費者可以訂閱多個 Topic,同一個 Topic 也可以被多個消費者訂閱;一個生產者可以向多個 Topic 發送消息,同一個 Topic 也可以接收來自多個生產者的消息。
消費者必須有對應的 Group ID,生產者不做強制要求。
登錄 SOFAStack 控制臺。
在左側導航欄,選擇 中間件 > 消息隊列 > Group 管理。
單擊 創建 Group ID,然后在 創建 Group ID 對話框配置 Group ID 信息:
參數
是否必填
說明
Group ID
必填
Group ID 格式要求如下:
命名以 “GID_” 或者 “GID-” 開頭。
只能包含字母、數字、短橫線和下劃線。
長度限制在 7~64 字符之間。
說明Group ID 一旦創建,則無法修改。
描述
選填
對該 Group ID 的備注信息,長度限制在 256 個字符以內。
單擊 確定。
獲取 AK(AccessKey ID)和 SK(AccessKey Secret)
阿里云 AccessKey 用于收發消息時進行賬戶鑒權。
在調用 SDK 發送和訂閱消息的時候,除了需要指定創建的 Topic 和 Group ID 以外,還需輸入您在 RAM 控制臺創建的身份驗證信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AccessKeySecret。
由于 RAM 是阿里云產品,非阿里云飛天底座輸出的環境中可使用螞蟻 IAM 創建訪問密鑰(AccessKey)做身份驗證。
創建 AccessKey 的具體步驟,參見 創建 AccessKey。
獲取接入配置
在控制臺創建好資源后,您需通過控制臺獲取工作空間的接入點。在收發消息時,您需要為生產端和消費端配置該接入點,以此接入某個具體工作空間或地域的服務。
在左側導航欄,選擇 中間件 > 消息隊列 > 概覽。
在頁面底部的 接入配置 找到 實例 ID 及 內網接入點。
將 內網接入點 配置到客戶端的 SDK 代碼的 ENDPOINT 參數。
將 實例 ID 配置到客戶端的 SDK 代碼的 INSTANCE_ID 參數。
發送消息
您可以通過控制臺發送測試消息或通過調用 TCP Java SDK 發送消息。
發送測試消息
用于快速驗證 Topic 資源的可用性,主要用作測試。
在左側導航欄,選擇 中間件 > 消息隊列 > Topic 管理。
在 Topic 管理頁面,找到您剛剛創建的 Topic,單擊右側操作列的 發送測試消息。
在 發送測試消息 對話框中的 消息體 一欄,輸入消息的具體內容,單擊 確定。控制臺即會返回消息發送成功通知以及相應的 Message ID。
調用 SDK 發送消息
通過 Maven 方式引入依賴。Java SDK 的最新版本號,可參見 SDK 版本說明。
<dependency> <groupId>com.alipay.sofa</groupId> <artifactId>sofamq-client-all</artifactId> <version>"XXX"</version> //設置為 Java SDK 的最新版本號 </dependency> <repositories> <repository> <id>antcloudrelease</id> <name>Ant Cloud</name> <url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url> </repository> </repositories>
根據以下說明設置相關參數,運行示例代碼:
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; import io.openmessaging.api.Producer; import io.openmessaging.api.SendResult; public class Main{ public static void main(String... args){ Properties credentials =new Properties(); // 阿里云賬號 AccessKey 擁有所有 API 的訪問權限,風險很高。強烈建議您創建并使用 RAM 用戶進行 API 訪問或日常運維,請登錄 RAM 控制臺創建 RAM 用戶。 // 此處以把 AccessKey 和 AccessKeySecret 保存在環境變量為例說明。 // 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險 credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 設置 TCP 接入域名,進入控制臺的概覽頁面查看接入點配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint") .withCredentials(credentials).build(); Properties properties =new Properties(); // 設置用戶實例,進入控制臺的概覽頁面查看接入點配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId"); // 您在控制臺創建的 Group ID properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP"); Producer producer = accessPoint.createProducer(properties); producer.start(); Message message =new Message("$topic","YOUR_TAG","hello world".getBytes()); SendResult sendResult = producer.send(message); System.out.println(sendResult); } }
消息發送后,您可以在控制臺查看消息發送狀態,步驟如下:
在左側導航欄,選擇 中間件 > 消息隊列 > 消息查詢。
單擊 按 Message ID 查詢,在搜索框中輸入發送消息后返回的 Message ID,單擊 搜索 查詢消息發送狀態。
存儲時間 表示消息隊列服務端存儲這條消息的時間。如果查詢到此消息,表示消息已經成功發送到服務端。
訂閱消息
消息發送成功后,需要啟動消費者來訂閱消息。
調用 TCP Java SDK 訂閱消息。您可以運行以下示例代碼來啟動消費者,并測試訂閱消息的功能。請按照說明正確設置相關參數。
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Action; import io.openmessaging.api.ConsumeContext; import io.openmessaging.api.Consumer; import io.openmessaging.api.Message; import io.openmessaging.api.MessageListener; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; public class Main{ public static void main(String... args){ Properties credentials =new Properties(); // 阿里云賬號AccessKey擁有所有API的訪問權限,風險很高。強烈建議您創建并使用RAM用戶進行API訪問或日常運維,請登錄RAM控制臺創建RAM用戶。 // 此處以把AccessKey和AccessKeySecret保存在環境變量為例說明。 // 強烈建議不要把AccessKey和AccessKeySecret保存到代碼里,會存在密鑰泄漏風險 credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 設置 TCP 接入域名,進入控制臺的概覽頁面查看接入點配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint") .withCredentials(credentials).build(); Properties properties =new Properties(); // 設置用戶實例,進入控制臺的概覽頁面查看接入點配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId"); // 您在控制臺創建的 Group ID properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP"); Consumer consumer = accessPoint.createConsumer(properties); consumer.subscribe("YOUR_TOPIC","YOUR_TAG",new MessageListener(){ @Override public Action consume(Message message,ConsumeContext context){ System.out.println(new String(message.getBody())); return Action.CommitMessage; } }); consumer.start(); } }
完成上述步驟后,您可以在控制臺查看消費者是否啟動成功,即消息訂閱是否成功。
在左側導航欄,選擇 中間件 > 消息隊列 > Group 管理。
單擊目標 Group ID 名稱進入詳情頁。
單擊 訂閱關系。
如果 是否在線 顯示為 在線,且訂閱關系一致,則說明訂閱成功。否則說明訂閱失敗。