云消息隊(duì)列 MQTT 版提供同步查詢和異步上下線事件通知兩種方式,來獲取MQTT客戶端在線狀態(tài)。本文介紹這兩種方式的基本原理、應(yīng)用場景、具體差異以及實(shí)現(xiàn)方式。

基本原理

云消息隊(duì)列 MQTT 版服務(wù)端(下文簡稱為MQTT服務(wù)端)提供以下方式獲取客戶端在線狀態(tài):
  • 同步查詢

    該方式相對簡單,即通過開放的接入點(diǎn)地址調(diào)用HTTP/HTTPS方式的OpenAPI查詢某個(gè)特定客戶端的當(dāng)前實(shí)時(shí)狀態(tài),適用于對單個(gè)或多個(gè)客戶端的狀態(tài)判斷。

  • 異步上下線事件通知

    該方式使用消息通知,在客戶端上線和下線事件觸發(fā)時(shí),MQTT服務(wù)端支持將上下線消息直接推送到部署在阿里云服務(wù)器上的云端應(yīng)用。

    該方式屬于異步感知客戶端的狀態(tài),且感知到的是上下線事件,而非在線狀態(tài),云端應(yīng)用需要根據(jù)事件發(fā)生的時(shí)間序列分析出客戶端的狀態(tài)。

應(yīng)用場景

兩種獲取MQTT客戶端在線狀態(tài)的方式分別應(yīng)用于以下場景:

  • 同步查詢
    • 主業(yè)務(wù)流程中需要根據(jù)客戶端是否在線來決定后續(xù)運(yùn)行邏輯。
    • 運(yùn)維過程需要判斷特定客戶端當(dāng)前是否在線。
  • 異步上下線事件通知
    • 服務(wù)端需要在客戶端上線或者下線時(shí)觸發(fā)一些預(yù)定義的動(dòng)作。
    • 服務(wù)端需要對客戶端的上下線數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,并根據(jù)客戶端的在線狀態(tài)推送消息。

同步查詢與異步事件通知的差異

兩種查詢方式的區(qū)別如下:
  • 同步查詢是查詢當(dāng)前客戶端的實(shí)時(shí)狀態(tài),理論上比異步通知的方式更精確。
  • 異步上下線通知因?yàn)椴捎孟⒔怦睿瑺顟B(tài)判斷更加復(fù)雜,且誤判可能性更大,但該方法可以基于事件分析多個(gè)客戶端的運(yùn)行狀態(tài)軌跡。異步通知雖然存在一定復(fù)雜度和誤判,但更加適合大規(guī)模的客戶端的狀態(tài)統(tǒng)計(jì)。

實(shí)現(xiàn)方式

  • 同步查詢

    同步查詢方式可以通過調(diào)用以下API接口獲取客戶端狀態(tài):

  • 異步上下線事件通知

    異步上下線事件通知方式下,云端服務(wù)可通過調(diào)用云消息隊(duì)列 MQTT 版提供的云端SDK獲取客戶端上下線消息。SDK下載,請參見版本說明。

    獲取客戶端上下線事件的示例代碼如下:
    說明 在使用示例代碼前,需要配置環(huán)境變量,通過環(huán)境變量讀取訪問憑證。關(guān)于配置環(huán)境變量的方法,請參見配置訪問憑證。

    云消息隊(duì)列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENVMQTT_SK_ENV。

    package com.aliyun.openservices.lmq.example;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.mqtt.server.ServerConsumer;
    import com.alibaba.mqtt.server.callback.StatusListener;
    import com.alibaba.mqtt.server.config.ChannelConfig;
    import com.alibaba.mqtt.server.config.ConsumerConfig;
    import com.alibaba.mqtt.server.model.StatusNotice;
    
    public class MQTTClientStatusNoticeProcessDemo {
        public static void main(String[] args) throws Exception {
             /**
             * 您創(chuàng)建的云消息隊(duì)列 MQTT 版的實(shí)例接入點(diǎn)。
             * 獲取云端SDK的接入點(diǎn),請聯(lián)系云消息隊(duì)列 MQTT 版技術(shù)支持,釘釘群號:35228338。。
             * 接入點(diǎn)地址必須填寫分配的域名,不得使用IP地址直接連接,否則可能會(huì)導(dǎo)致服務(wù)端異常。
             */
            String domain = "domain";
    
             /**
             * 使用的協(xié)議和端口必須匹配,該參數(shù)值固定為5672。
             */
            int port = 5672;
    
             /**
             * 您創(chuàng)建的云消息隊(duì)列 MQTT 版的實(shí)例ID。
             */
            String instanceId = "instanceId";
    
            /**
             * AccessKey ID,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。
             * 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運(yùn)維。
             * 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
             * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權(quán)模式下需要設(shè)置。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
             
             /**
             * 您在云消息隊(duì)列 MQTT 版控制臺創(chuàng)建的Group的ID。
             * */
            String mqttGroupId = "mqttGroupId";
    
            ChannelConfig channelConfig = new ChannelConfig();
            channelConfig.setDomain(domain);
            channelConfig.setPort(port);
            channelConfig.setInstanceId(instanceId);
            channelConfig.setAccessKey(accessKey);
            channelConfig.setSecretKey(secretKey);
    
            ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
            serverConsumer.start();
            serverConsumer.subscribeStatus(mqttGroupId, new StatusListener() {
                @Override
                public void process(StatusNotice statusNotice) {
                    System.out.println(JSONObject.toJSONString(statusNotice));
                }
            });
        }
    
    }

更多信息

MQTT客戶端上下線事件數(shù)據(jù)流出