日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

開源客戶端首次連接配置

更新時間:

本文以Java SDK為例介紹開源MQTT客戶端首次連接服務端時如何初始化客戶端和配置自動重連功能。

開源客戶端Java SDK下載地址

paho.mqtt.java

SDK版本

SDK依賴如下,建議使用最新版本。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

初始化客戶端

創建MqttClient

final String brokerUrl = properties.getProperty("brokerUrl");
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final String topic = properties.getProperty("topic");
final int qosLevel = Integer.parseInt(properties.getProperty("qos"));
final MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
mqttClient.setTimeToWait(3000L);

mqttClient.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        //連接成功回調,可以開始訂閱Topic。
    }

    @Override
    public void connectionLost(Throwable throwable) {
        // 連接斷開回調,建議打印日志方便定位問題。
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // 收到消息的回調,此次不要阻塞,且不能在這里同步發送消息,否則可能會導致死鎖卡住,以及心跳無法發送導致斷鏈。
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        // 成功發送消息到服務端。
    }
});

初始化連接項

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
.....
connOpts.setCleanSession(cleanSession);
connOpts.setKeepAliveInterval(60); // 心跳間隔時間,單位:秒。
connOpts.setAutomaticReconnect(true); // 務必開啟自動重連。
connOpts.setMaxInflight(1000);
.....

首次連接

初始化mqttClient,開始連接服務端。

mqttClient.connect(mqttConnectOptions);

首次連接不重連問題

問題現象

如果由于網絡抖動或延時等其他原因導致客戶端連接服務端失敗,則首次連接不能自動重連。具體報錯如下:

Caused by: java.net.ConnectException: Network is unreachable (connect failed)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:74)

可能原因

客戶端首次連接,wasConnected=false,所以進不了connectionLost的重連處理。

說明

以下代碼為SDK源碼,您無需在代碼中添加該內容。

if (wasConnected && callback != null) {
    // Let the user know client has disconnected either normally or abnormally.
    callback.connectionLost(reason);
}

解決方案

首次連接建議盡可能進行嘗試,直至連接成功。

for(;;) {
    try {
        mqttClient.connect(mqttConnectOptions);
        break;
    } catch (Throwable e) {
        log.error("",e); // 建議打印日志,方便定位問題。
        Thread.sleep(5000L);
    }
}

客戶端自動重連

異常示例

客戶端連接服務端后,如果后續由于其他原因導致連接斷開,會觸發connectionLost

2023-06-20 17:10:26:972	 connectionLost clientId=XXX
已斷開連接 (32109) - java.net.SocketException: Operation timed out (Read failed)
	at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)

異常原因源碼分析

說明

以下代碼為SDK源碼,您無需在代碼中添加該內容。

public void connectionLost(MqttException cause) {
    final String methodName = "connectionLost";
    // If there was a problem and a client callback has been set inform
    // the connection lost listener of the problem.
    try {
        if (mqttCallback != null && cause != null) {
            // @TRACE 708=call connectionLost
            log.fine(CLASS_NAME, methodName, "708", new Object[] { cause });
            mqttCallback.connectionLost(cause);
        }
        if(reconnectInternalCallback != null && cause != null){
            reconnectInternalCallback.connectionLost(cause);
        }
    } catch (java.lang.Throwable t) {
        // Just log the fact that a throwable has caught connection lost 
        // is called during shutdown processing so no need to do anything else
        // @TRACE 720=exception from connectionLost {0}
        log.fine(CLASS_NAME, methodName, "720", new Object[] { t });
    }
}

觸發重連

說明

以下代碼為SDK源碼,您無需在代碼中添加該內容。

MqttReconnectCallback

public void connectionLost(Throwable cause) {
    if (automaticReconnect) {
        // Automatic reconnect is set so make sure comms is in resting
        // state
        comms.setRestingState(true);
        reconnecting = true;
        startReconnectCycle();
    }
}

MqttReconnectActionListener

重試間隔為1 s、2 s、4 s、8 s......128 s,默認最大間隔時間為128 s。

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    // @Trace 502=Automatic Reconnect failed, rescheduling: {0}
    log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() });
    if (reconnectDelay < connOpts.getMaxReconnectDelay()) {
        reconnectDelay = reconnectDelay * 2;
    }
    rescheduleReconnectCycle(reconnectDelay);
}

重連成功

connectComplete回調

2023-06-20 17:12:36:764	connect success to: tcp://xxxxxx.mqtt.aliyuncs.com:1883,reconnect=true