日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

快速使用MQTT的Java SDK收發(fā)消息(終端和終端消息收發(fā))

更新時(shí)間:

本文介紹如何快速使用云消息隊(duì)列 MQTT 版的Java SDK實(shí)現(xiàn)MQTT終端與終端消息的收發(fā)。

前提條件

背景信息

云消息隊(duì)列 MQTT 版最簡(jiǎn)單的使用場(chǎng)景即MQTT終端和終端交互,消息生產(chǎn)者和消費(fèi)者均分布在終端設(shè)備。各終端設(shè)備均通過(guò)終端SDK云消息隊(duì)列 MQTT 版云消息隊(duì)列 MQTT 版服務(wù)端連接實(shí)現(xiàn)消息收發(fā)。

終端和終端交互

本文以公網(wǎng)環(huán)境為例,說(shuō)明如何使用Java SDK完成消息收發(fā)。

接入點(diǎn)說(shuō)明

終端和云端服務(wù)與云消息隊(duì)列 MQTT 版通信時(shí),需要在各自的SDK代碼中設(shè)置云消息隊(duì)列 MQTT 版實(shí)例的接入點(diǎn)信息,通過(guò)接入點(diǎn)和云消息隊(duì)列 MQTT 版服務(wù)端連接。

  • 終端SDK接入點(diǎn)格式

    使用終端SDK接入云消息隊(duì)列 MQTT 版時(shí),需要填寫(xiě)的接入點(diǎn)格式如下:

    • 公網(wǎng)接入點(diǎn)MQTT實(shí)例ID.mqtt.aliyuncs.com

    • VPC 接入點(diǎn)MQTT實(shí)例ID-internal-vpc.mqtt.aliyuncs.com

    終端SDK接入點(diǎn)也可以直接在云消息隊(duì)列 MQTT 版控制臺(tái)實(shí)例詳情頁(yè)面的接入點(diǎn)頁(yè)簽中查看。

  • 云端SDK接入點(diǎn)格式

    使用云端SDK接入云消息隊(duì)列 MQTT 版時(shí),需要填寫(xiě)的接入點(diǎn)格式如下:

    重要

    僅實(shí)例地域?qū)儆谥袊?guó)內(nèi)地的實(shí)例支持云端SDK接入。

    • 公網(wǎng)接入點(diǎn)MQTT實(shí)例ID-server-internet.mqtt.aliyuncs.com

    • VPC 接入點(diǎn)MQTT實(shí)例ID-server-internal.mqtt.aliyuncs.com

說(shuō)明

MQTT實(shí)例ID可在云消息隊(duì)列 MQTT 版控制臺(tái)實(shí)例詳情頁(yè)面的基礎(chǔ)信息區(qū)域查看。

終端SDK接入點(diǎn)和云端SDK接入點(diǎn)同時(shí)支持公網(wǎng)接入點(diǎn)VPC 接入點(diǎn)公網(wǎng)接入點(diǎn)為本地公網(wǎng)環(huán)境訪問(wèn)的IP地址,一般用于物聯(lián)網(wǎng)和移動(dòng)互聯(lián)網(wǎng)場(chǎng)景中;VPC 接入點(diǎn)為云上私網(wǎng)訪問(wèn)的IP地址,一般用于云端應(yīng)用接入云消息隊(duì)列 MQTT 版

重要

SDK使用接入點(diǎn)連接服務(wù)時(shí)務(wù)必使用域名接入,不得直接使用域名背后的IP地址直接連接,因?yàn)镮P地址隨時(shí)會(huì)變化。在以下使用情況中出現(xiàn)的問(wèn)題云消息隊(duì)列 MQTT 版產(chǎn)品方概不負(fù)責(zé):

  • 終端或云端不使用域名接入而是使用IP地址接入,產(chǎn)品方更新了域名解析導(dǎo)致原有IP地址失效。

  • 終端或云端網(wǎng)絡(luò)側(cè)對(duì)IP地址設(shè)置網(wǎng)絡(luò)防火墻策略,產(chǎn)品方更新了域名解析后新IP地址被您的防火墻策略攔截。

調(diào)用Java SDK收發(fā)消息

  1. 下載第三方的開(kāi)源Java SDK。下載地址為Eclipse Paho Java Client
  2. 下載阿里云云消息隊(duì)列 MQTT 版的Java SDK的Demo示例作為您代碼開(kāi)發(fā)的參考。下載地址為mqtt-java-demo
  3. 解壓該Demo工程包至您指定的文件夾。
  4. 在IntelliJ IDEA中,導(dǎo)入解壓后的文件以創(chuàng)建相應(yīng)的工程,并確認(rèn)pom.xml中已包含以下依賴。

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java類(lèi)中,按代碼注釋說(shuō)明填寫(xiě)相應(yīng)參數(shù),主要涉及您已在創(chuàng)建資源中所創(chuàng)建的MQTT資源信息。然后執(zhí)行Main函數(shù)運(yùn)行代碼實(shí)現(xiàn)消息收發(fā)。

    示例代碼如下。

    說(shuō)明

    在使用示例代碼前,需要配置環(huán)境變量,通過(guò)環(huán)境變量讀取訪問(wèn)憑證。關(guān)于配置環(huán)境變量的方法,請(qǐng)參見(jiàn)配置訪問(wèn)憑證

    云消息隊(duì)列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENVMQTT_SK_ENV

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
        public static void main(String[] args) throws Exception {
            /**
             * 您在控制臺(tái)創(chuàng)建的云消息隊(duì)列 MQTT 版的實(shí)例ID。
             */
            String instanceId = "XXXXX";
            /**
             * 設(shè)置接入點(diǎn),進(jìn)入云消息隊(duì)列 MQTT 版控制臺(tái)實(shí)例詳情頁(yè)面獲取。
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * AccessKey ID,阿里云身份驗(yàn)證,在阿里云RAM控制臺(tái)創(chuàng)建。
             * 阿里云賬號(hào)AccessKey擁有所有API的訪問(wèn)權(quán)限,建議您使用RAM用戶進(jìn)行API訪問(wèn)或日常運(yùn)維。
             * 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號(hào)下所有資源的安全。
             * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說(shuō)明。
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * AccessKey Secret,阿里云身份驗(yàn)證,在阿里云RAM控制臺(tái)創(chuàng)建。僅在簽名鑒權(quán)模式下需要設(shè)置。
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * MQTT客戶端ID,由業(yè)務(wù)系統(tǒng)分配,需要保證每個(gè)TCP連接都不一樣,保證全局唯一,如果不同的客戶端對(duì)象(TCP連接)使用了相同的clientId會(huì)導(dǎo)致連接異常斷開(kāi)。
             * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在云消息隊(duì)列 MQTT 版控制臺(tái)創(chuàng)建,DeviceID由業(yè)務(wù)方自己設(shè)置,clientId總長(zhǎng)度不得超過(guò)64個(gè)字符。
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * 云消息隊(duì)列 MQTT 版消息的一級(jí)Topic,需要在控制臺(tái)創(chuàng)建才能使用。
             * 如果使用了沒(méi)有創(chuàng)建或者沒(méi)有被授權(quán)的Topic會(huì)導(dǎo)致鑒權(quán)失敗,服務(wù)端會(huì)斷開(kāi)客戶端連接。
             */
            final String parentTopic = "XXXXX";
            /**
             * 云消息隊(duì)列 MQTT 版支持子級(jí)Topic,用來(lái)做自定義的過(guò)濾,此處為示例,可以填寫(xiě)任意字符串。
             * 需要注意的是,完整的Topic長(zhǎng)度不得超過(guò)128個(gè)字符。
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * QoS參數(shù)代表傳輸質(zhì)量,可選0,1,2。詳細(xì)信息,請(qǐng)參見(jiàn)名詞解釋。
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
            /**
             * 客戶端協(xié)議和端口。客戶端使用的協(xié)議和端口必須匹配,如果是SSL加密則設(shè)置ssl://endpoint:8883。
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * 設(shè)置客戶端發(fā)送超時(shí)時(shí)間,防止無(wú)限阻塞。
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * 客戶端連接成功后就需要盡快訂閱需要的Topic。
                     */
                    System.out.println("connect success");
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                final String topicFilter[] = {mq4IotTopic};
                                final int[] qos = {qosLevel};
                                mqttClient.subscribe(topicFilter, qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    /**
                     * 消費(fèi)消息的回調(diào)接口,需要確保該接口不拋異常,該接口運(yùn)行返回即代表消息消費(fèi)成功。
                     * 消費(fèi)消息需要保證在規(guī)定時(shí)間內(nèi)完成,如果消費(fèi)耗時(shí)超過(guò)服務(wù)端約定的超時(shí)時(shí)間,對(duì)于可靠傳輸?shù)哪J剑?wù)端可能會(huì)重試推送,業(yè)務(wù)需要做好冪等去重處理。
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 * 發(fā)送普通消息時(shí),Topic必須和接收方訂閱的Topic一致,或者符合通配符匹配規(guī)則。
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * 云消息隊(duì)列 MQTT 版支持點(diǎn)對(duì)點(diǎn)消息,即如果發(fā)送方明確知道該消息只需要給特定的一個(gè)設(shè)備接收,且知道對(duì)端的clientId,則可以直接發(fā)送點(diǎn)對(duì)點(diǎn)消息。
                 * 點(diǎn)對(duì)點(diǎn)消息不需要經(jīng)過(guò)訂閱關(guān)系匹配,可以簡(jiǎn)化訂閱方的邏輯。點(diǎn)對(duì)點(diǎn)消息的Topic格式規(guī)范是 {{parentTopic}}/p2p/{{targetClientId}}。
                 */
                final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

結(jié)果驗(yàn)證

完成消息收發(fā)后,您可在云消息隊(duì)列 MQTT 版控制臺(tái)查詢軌跡以驗(yàn)證消息是否發(fā)送并接收成功。詳細(xì)信息,請(qǐng)參見(jiàn)消息軌跡查詢

更多信息

快速使用MQTT的Java SDK收發(fā)消息(終端和云端消息收發(fā))