云消息隊(duì)列 MQTT 版提供同步查詢和異步上下線事件通知兩種方式,來獲取MQTT客戶端在線狀態(tài)。本文介紹這兩種方式的基本原理、應(yīng)用場景、具體差異以及實(shí)現(xiàn)方式。
基本原理
云消息隊(duì)列 MQTT 版服務(wù)端(下文簡稱為MQTT服務(wù)端)提供以下方式獲取客戶端在線狀態(tài):
- 同步查詢
該方式相對簡單,即通過開放的接入點(diǎn)地址調(diào)用HTTP/HTTPS方式的OpenAPI查詢某個(gè)特定客戶端的當(dāng)前實(shí)時(shí)狀態(tài),適用于對單個(gè)或多個(gè)客戶端的狀態(tài)判斷。
- 異步上下線事件通知
該方式使用消息通知,在客戶端上線和下線事件觸發(fā)時(shí),MQTT服務(wù)端支持將上下線消息直接推送到部署在阿里云服務(wù)器上的云端應(yīng)用。
該方式屬于異步感知客戶端的狀態(tài),且感知到的是上下線事件,而非在線狀態(tài),云端應(yīng)用需要根據(jù)事件發(fā)生的時(shí)間序列分析出客戶端的狀態(tài)。
應(yīng)用場景
兩種獲取MQTT客戶端在線狀態(tài)的方式分別應(yīng)用于以下場景:
- 同步查詢
- 主業(yè)務(wù)流程中需要根據(jù)客戶端是否在線來決定后續(xù)運(yùn)行邏輯。
- 運(yùn)維過程需要判斷特定客戶端當(dāng)前是否在線。
- 異步上下線事件通知
- 服務(wù)端需要在客戶端上線或者下線時(shí)觸發(fā)一些預(yù)定義的動(dòng)作。
- 服務(wù)端需要對客戶端的上下線數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,并根據(jù)客戶端的在線狀態(tài)推送消息。
同步查詢與異步事件通知的差異
兩種查詢方式的區(qū)別如下:
- 同步查詢是查詢當(dāng)前客戶端的實(shí)時(shí)狀態(tài),理論上比異步通知的方式更精確。
- 異步上下線通知因?yàn)椴捎孟⒔怦睿瑺顟B(tài)判斷更加復(fù)雜,且誤判可能性更大,但該方法可以基于事件分析多個(gè)客戶端的運(yùn)行狀態(tài)軌跡。異步通知雖然存在一定復(fù)雜度和誤判,但更加適合大規(guī)模的客戶端的狀態(tài)統(tǒng)計(jì)。
實(shí)現(xiàn)方式
- 同步查詢
同步查詢方式可以通過調(diào)用以下API接口獲取客戶端狀態(tài):
- 異步上下線事件通知
異步上下線事件通知方式下,云端服務(wù)可通過調(diào)用云消息隊(duì)列 MQTT 版提供的云端SDK獲取客戶端上下線消息。SDK下載,請參見版本說明。
獲取客戶端上下線事件的示例代碼如下:說明 在使用示例代碼前,需要配置環(huán)境變量,通過環(huán)境變量讀取訪問憑證。關(guān)于配置環(huán)境變量的方法,請參見配置訪問憑證。云消息隊(duì)列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example; import com.alibaba.fastjson.JSONObject; import com.alibaba.mqtt.server.ServerConsumer; import com.alibaba.mqtt.server.callback.StatusListener; import com.alibaba.mqtt.server.config.ChannelConfig; import com.alibaba.mqtt.server.config.ConsumerConfig; import com.alibaba.mqtt.server.model.StatusNotice; public class MQTTClientStatusNoticeProcessDemo { public static void main(String[] args) throws Exception { /** * 您創(chuàng)建的云消息隊(duì)列 MQTT 版的實(shí)例接入點(diǎn)。 * 獲取云端SDK的接入點(diǎn),請聯(lián)系云消息隊(duì)列 MQTT 版技術(shù)支持,釘釘群號:35228338。。 * 接入點(diǎn)地址必須填寫分配的域名,不得使用IP地址直接連接,否則可能會(huì)導(dǎo)致服務(wù)端異常。 */ String domain = "domain"; /** * 使用的協(xié)議和端口必須匹配,該參數(shù)值固定為5672。 */ int port = 5672; /** * 您創(chuàng)建的云消息隊(duì)列 MQTT 版的實(shí)例ID。 */ String instanceId = "instanceId"; /** * AccessKey ID,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。 * 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運(yùn)維。 * 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。 * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權(quán)模式下需要設(shè)置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * 您在云消息隊(duì)列 MQTT 版控制臺創(chuàng)建的Group的ID。 * */ String mqttGroupId = "mqttGroupId"; ChannelConfig channelConfig = new ChannelConfig(); channelConfig.setDomain(domain); channelConfig.setPort(port); channelConfig.setInstanceId(instanceId); channelConfig.setAccessKey(accessKey); channelConfig.setSecretKey(secretKey); ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig()); serverConsumer.start(); serverConsumer.subscribeStatus(mqttGroupId, new StatusListener() { @Override public void process(StatusNotice statusNotice) { System.out.println(JSONObject.toJSONString(statusNotice)); } }); } }