日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MQTT客戶端上下線事件數(shù)據(jù)流出

更新時間:

您可通過配置云消息隊列 MQTT 版的客戶端上下線通知規(guī)則,將獲取的MQTT客戶端上下線事件數(shù)據(jù)導出至其他阿里云產(chǎn)品。該方法為異步上下線通知。本文介紹客戶端上下線通知的原理、應用場景、使用限制以及云消息隊列 MQTT 版與其他阿里云產(chǎn)品的資源映射關(guān)系。

基本原理

在客戶端上線和下線事件觸發(fā)時,MQTT服務器會根據(jù)您配置的客戶端上下線通知規(guī)則,向后端其他云產(chǎn)品推送一條上下線消息。業(yè)務應用一般部署在阿里云的服務器上,業(yè)務應用通過向后端云產(chǎn)品訂閱這條消息來獲取所有客戶端的上下線動作。上下線事件流出

該方式屬于異步感知客戶端的狀態(tài),且感知到的是上下線事件,而非在線狀態(tài),云端應用需要根據(jù)事件發(fā)生的時間序列分析出客戶端的狀態(tài)。

異步上下線通知因為采用消息解耦,狀態(tài)判斷更加復雜,且誤判可能性更大,但該方法可以基于事件分析多個客戶端的運行狀態(tài)軌跡。

應用場景

客戶端上下線通知主要的應用場景為業(yè)務應用需要在客戶端上線或者下線時觸發(fā)一些預定義的動作。

例如,客戶端在線狀態(tài)聚合。此場景中,MQTT客戶端產(chǎn)生上下線等狀態(tài)變更,云消息隊列 MQTT 版會根據(jù)您配置的客戶端狀態(tài)通知規(guī)則,將狀態(tài)變更封裝后轉(zhuǎn)發(fā)到云消息隊列 RocketMQ 版消息的方式來實現(xiàn)客戶端狀態(tài)數(shù)據(jù)的聚合和統(tǒng)計。

說明

針對其他場景,推薦您使用同步查詢接口來獲取客戶端在線狀態(tài)。詳細信息,請參見獲取MQTT客戶端在線狀態(tài)。

使用限制

限制項

限制值

說明

單實例規(guī)則數(shù)量

100

如果默認限制不滿足,請聯(lián)系云消息隊列 MQTT 版技術(shù)支持,釘釘群號:35228338。

規(guī)則去重限制

同一個內(nèi)部資源同種規(guī)則只能創(chuàng)建一個規(guī)則。

例如一個Group ID只能創(chuàng)建一個上下線通知規(guī)則,一個MQTT Topic只能創(chuàng)建一個數(shù)據(jù)流入規(guī)則和一個數(shù)據(jù)流出規(guī)則。

地域限制

不支持跨地域創(chuàng)建規(guī)則,規(guī)則的數(shù)據(jù)源和數(shù)據(jù)目標所屬的實例必須處于同一地域。

例如,創(chuàng)建數(shù)據(jù)流出規(guī)則,數(shù)據(jù)源云消息隊列 MQTT 版實例屬于華東1(杭州)地域,則數(shù)據(jù)目標云消息隊列 RocketMQ 版只能選擇華東1(杭州)地域的實例。

云消息隊列 MQTT 版實例版本

僅新版本的實例支持。

新購的云消息隊列 MQTT 版實例默認為新版本實例,舊版實例已不支持購買。

云消息隊列 RocketMQ 版實例版本

僅4.0系列實例支持

云消息隊列 MQTT 版云消息隊列 RocketMQ 版通過消息流入或消息流出規(guī)則進行數(shù)據(jù)互通時,云消息隊列 RocketMQ 版僅4.0系列實例支持消息流入或流出規(guī)則,5.0系列實例不支持。

資源映射方式

同一個云消息隊列 MQTT 版Group ID下的所有客戶端的狀態(tài)變更通知都會轉(zhuǎn)發(fā)到您配置的同一個其他阿里云產(chǎn)品的資源里。

表 1. 映射關(guān)系

MQTT資源

其他阿里云產(chǎn)品

其他阿里云產(chǎn)品資源

數(shù)據(jù)包定義

MQTT Group ID

云消息隊列 RocketMQ 版

云消息隊列 RocketMQ 版的Topic

MQTT和RocketMQ的消息結(jié)構(gòu)映射

操作流程

如上文所述,如果使用異步上下線通知的方式,您需創(chuàng)建客戶端上下線事件通知規(guī)則,將上下線通知的消息導出至后端云產(chǎn)品中。下文以使用云消息隊列 RocketMQ 版后端云產(chǎn)品為例進行說明。

  1. 創(chuàng)建上下線通知規(guī)則。

    您需關(guān)注哪些Group ID分組的設備,就在云消息隊列 MQTT 版控制臺創(chuàng)建規(guī)則時,選定相應的Group ID。創(chuàng)建規(guī)則的詳細步驟,請參見創(chuàng)建上下線通知規(guī)則。

  2. 業(yè)務應用訂閱該類通知消息。

    通過步驟1中創(chuàng)建的規(guī)則,即可收到關(guān)注的客戶端的上下線事件。云消息隊列 RocketMQ 版的接收程序,請參見訂閱消息。示例代碼詳細信息,請參見MQTTClientStatusNoticeProcessDemo.java。

    事件類型放在云消息隊列 RocketMQ 版的Tag中,代表上線或下線。數(shù)據(jù)格式如下:

    MQ Tag:connect/disconnect/tcpclean

    其中:

    • connect事件代表客戶端上線動作。

    • disconnect事件代表客戶端主動斷開連接。按照MQTT協(xié)議,客戶端主動斷開TCP連接之前應該發(fā)送disconnect 報文,MQTT服務器在收到disconnect 報文后觸發(fā)該類型消息。如果某些客戶端SDK沒有按照協(xié)議發(fā)送disconnect 報文,MQTT服務器相應無法收到該消息。

    • tcpclean事件代表實際的TCP連接斷開。無論客戶端是否顯示發(fā)送過disconnect 報文,只要當前TCP連接斷開就會觸發(fā)tcpclean事件。

    說明

    tcpclean消息代表客戶端網(wǎng)絡層連接的真實斷開。對應的,disconnect消息僅僅代表客戶端是主動發(fā)送了下線報文。受限于客戶端的實現(xiàn),有時候客戶端異常退出會導致disconnect消息并沒有正常發(fā)送。因此判斷客戶端下線請使用tcpclean事件。

    數(shù)據(jù)內(nèi)容為JSON類型,相關(guān)的Key說明如下:

    • clientId代表具體設備。

    • time代表本次事件的時間。

    • eventType代表事件類型,供客戶端區(qū)分事件類型。

    • channelId代表每個TCP連接的唯一標識。

    • clientIp代表客戶端使用的公網(wǎng)出口IP地址。

    • certificateChainSn代表客戶端設備的證書鏈的SN序列號。格式為{設備證書SN,CA證書SN}。

      僅當客戶端接入時使用證書認證功能,才包含該參數(shù)。更多信息,請參見客戶端證書認證。

    示例如下:

    clientId:GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp:192.168.XX.XX:133XX  
    certificateChainSn:1254685.....2458655,154689536.....25655

    判斷客戶端當前是否在線不能僅僅根據(jù)收到的最后一條消息的狀態(tài),而需要結(jié)合上下線消息的前后關(guān)聯(lián)來判斷。

    具體判斷規(guī)則如下:

    • 同一個clientId的客戶端,產(chǎn)生上下線事件的先后順序以時間為準,基本原則為時間戳越大則越新。

    • 同一個clientId的客戶端,可能存在多次閃斷,因此,當收到下線消息時,一定要根據(jù)channelId字段判斷是否是當前的TCP連接。簡而言之,下線消息只能覆蓋channelId相同的下線消息,如果下線消息的channelId不一樣,盡管time較新,也不能覆蓋。一個channelId代表一個TCP連接,只會存在一個connect事件和一個close事件。

異步上下線通知Java示例代碼

package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化消息隊列RocketMQ版接收客戶端,實際業(yè)務中一般部署在服務端應用中。
         */
        Properties properties = new Properties();
        /**
         * 設置RocketMQ客戶端的Group ID,注意此處的groupId和MQTT實例中的groupId是兩個概念,請按照各自產(chǎn)品的說明申請?zhí)顚憽?         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * 賬號AccessKey ID,從控制臺獲取。
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * 賬號AccessKey Secret,從控制臺獲取,僅在Signature鑒權(quán)模式下需要設置。
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * 設置TCP接入域名。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * 使用RocketMQ消費端來處理MQTT客戶端的上下線通知時,訂閱的Topic為上下線通知Topic,請遵循控制臺文檔提前創(chuàng)建。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 客戶端狀態(tài)數(shù)據(jù),實際生產(chǎn)環(huán)境中建議使用數(shù)據(jù)庫或者Redis等外部持久化存儲來保存該信息,避免應用重啟丟失狀態(tài),本Demo以單機內(nèi)存版實現(xiàn)做演示。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此處僅處理客戶端是否在線,因此只需要關(guān)注connect事件和tcpclean事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 處理上下線通知的邏輯。
     * 實際部署過程中,消費上下線通知的應用可能部署多臺機器,因此客戶端在線狀態(tài)的數(shù)據(jù)可以使用數(shù)據(jù)庫或者Redis等外部共享存儲來維護。
     * 其次需要單獨做消息冪等處理,以免重復接收消息導致狀態(tài)機判斷錯誤。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先存儲新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 讀取當前channel的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表里上線和下線事件都已經(jīng)收到,則當前channel已經(jīng)掉線,可以清理掉這個channel的數(shù)據(jù)。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根據(jù)狀態(tài)表判斷一個clientId是否有活躍的TCP連接。
     * 1.如果沒有channel表,則一定不在線。
     * 2.如果channel表非空,檢查一下channel數(shù)據(jù)中是否僅包含上線事件,如果有則代表有活躍連接在線。
     * 如果全部的channel都有掉線斷開事件則一定是不在線。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

更多信息

如需了解控制臺上的操作,請參見上下線通知規(guī)則管理。