獲取MQTT客戶端在線狀態
您可通過調用同步查詢接口或使用異步上下線通知的方式獲取云消息隊列 MQTT 版客戶端(下文簡稱為客戶端)當前在線情況。
應用場景
云消息隊列 MQTT 版需結合后端存儲消息隊列產品,如云消息隊列 RocketMQ 版等使用,來和部署在云端服務器上的應用(下文簡稱為業務應用)完成業務流程。
獲取客戶端在線狀態主要的應用場景如下:
主業務流程中需要根據客戶端是否在線來決定后續運行邏輯;
運維過程需要判斷特定客戶端當前是否在線;
業務應用需要在客戶端上線或者下線時觸發一些預定義的動作。
基本原理
云消息隊列 MQTT 版服務器(下文簡稱為MQTT服務器)使用異步上下線通知方式獲取客戶端在線狀態。
該方式使用消息通知,在客戶端上線和下線事件觸發時,MQTT服務器會向后端存儲消息隊列推送一條上下線消息。業務應用一般部署在阿里云的服務器上,業務應用通過向后端存儲消息隊列訂閱這條消息來獲取所有客戶端的上下線動作。
該方式屬于異步感知客戶端的狀態,且感知到的是上下線事件,而非在線狀態,云端應用需要根據事件發生的時間序列分析出客戶端的狀態。
異步上下線通知
如上文所述,使用異步上下線通知的方式,上下線事件會映射到后端存儲消息隊列中。
下文以云消息隊列 RocketMQ 版作為后端存儲消息隊列的情況為例說明。
操作步驟
創建上下線事件對應的Topic。
您需關注哪些Group ID分組的設備,就在云消息隊列 MQTT 版控制臺創建對應的Topic。創建Topic的步驟請參見MQTT快速入門。
例如您需要關注Group ID為GID_XXX類型的所有客戶端,那么這類客戶端對應的Client ID和Topic分別是GID_XXX@@@YYYYY和GID_XXX_MQTT。
其中:
GID_XXX是在云消息隊列 MQTT 版控制臺上創建的Group ID。
YYYYY是設備ID,與Group ID以“<GroupID>@@@<DeviceID>” 模式構成Client ID。
_MQTT是該類事件通知消息的Topic命名所必需的固定后綴。
更多信息請參見名詞解釋。
業務應用訂閱該類通知消息。
使用步驟1中創建的Topic,即可收到關注的客戶端的上下線事件。云消息隊列 RocketMQ 版的接收程序請參見訂閱消息。示例代碼詳細信息,請參見MQTTClientStatusNoticeProcessDemo.java。
事件類型放在云消息隊列 RocketMQ 版的Tag中,代表上線或下線。數據格式如下:
MQ Tag:connect/disconnect/tcpclean
其中:
connect事件代表客戶端上線動作。
disconnect事件代表客戶端主動斷開連接。按照MQTT協議,客戶端主動斷開TCP連接之前應該發送disconnect 報文,MQTT服務器在收到disconnect 報文后觸發該類型消息。如果某些客戶端SDK沒有按照協議發送disconnect 報文,MQTT服務器相應無法收到該消息。
tcpclean事件代表實際的TCP連接斷開。無論客戶端是否顯示發送過disconnect 報文,只要當前TCP連接斷開就會觸發tcpclean事件。
說明tcpclean消息代表客戶端網絡層連接的真實斷開。對應的,disconnect消息僅僅代表客戶端是主動發送了下線報文。受限于客戶端的實現,有時候客戶端異常退出會導致disconnect消息并沒有正常發送。因此判斷客戶端下線請使用tcpclean事件。
數據內容為JSON類型,相關的Key說明如下:
clientId代表具體設備;
time代表本次事件的時間;
eventType代表事件類型,供客戶端區分事件類型;
channelId代表每個TCP連接的唯一標識;
clientIp代表客戶端使用的公網出口IP地址。
示例:
clientId:GID_XXX@@@YYYYY time:1212121212 eventType:connect/disconnect/tcpclean channelId:2b9b1281046046faafe5e0b458e4XXXX clientIp:192.168.XX.XX:133XX
判斷客戶端當前是否在線不能僅僅根據收到的最后一條消息的狀態,而需要結合上下線消息的前后關聯來判斷。
具體判斷規則如下:
同一個clientId的客戶端,產生上下線事件的先后順序以時間為準,基本原則為時間戳越大則越新。
同一個clientId的客戶端,可能存在多次閃斷,因此,當收到下線消息時,一定要根據channelId字段判斷是否是當前的TCP連接。簡而言之,下線消息只能覆蓋channelId相同的下線消息,如果下線消息的channelId不一樣,盡管time較新,也不能覆蓋。一個channelId代表一個TCP連接,只會存在一個connect事件和一個close事件。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* 初始化消息隊列RocketMQ版接收客戶端,實際業務中一般部署在服務端應用中。
*/
Properties properties = new Properties();
/**
* 設置RocketMQ客戶端的Group ID,注意此處的groupId和MQTT實例中的groupId是兩個概念,請按照各自產品的說明申請填寫。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* 賬號AccessKey ID,從賬號系統控制臺獲取。
*/
properties.put(PropertyKeyConst.AccessKey, "XXXX");
/**
* 賬號AccessKey Secret,從賬號系統控制臺獲取,僅在Signature鑒權模式下需要設置。
*/
properties.put(PropertyKeyConst.SecretKey, "XXXX");
/**
* 設置TCP接入域名。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
/**
* 使用RocketMQ消費端來處理MQTT客戶端的上下線通知時,訂閱的Topic為上下線通知Topic,請遵循控制臺文檔提前創建。
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* 客戶端狀態數據,實際生產環境中建議使用數據庫或者Redis等外部持久化存儲來保存該信息,避免應用重啟丟失狀態,本Demo以單機內存版實現做演示。
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* 此處僅處理客戶端是否在線,因此只需要關注connect事件和tcpclean事件即可。
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXxXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 處理上下線通知的邏輯。
* 實際部署過程中,消費上下線通知的應用可能部署多臺機器,因此客戶端在線狀態的數據可以使用數據庫或者Redis等外部共享存儲來維護。
* 其次需要單獨做消息冪等處理,以免重復接收消息導致狀態機判斷錯誤。
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* 首先存儲新的事件。
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* 讀取當前channel的事件列表。
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* 如果事件列表里上線和下線事件都已經收到,則當前channel已經掉線,可以清理掉這個channel的數據。
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* 根據狀態表判斷一個clientId是否有活躍的TCP連接。
* 1.如果沒有channel表,則一定不在線。
* 2.如果channel表非空,檢查一下channel數據中是否僅包含上線事件,如果有則代表有活躍連接在線。
* 如果全部的channel都有掉線斷開事件則一定是不在線。
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}