日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MQTT數據流出規則的實現

更新時間:

如果您的云端應用需要使用云消息隊列 RocketMQ 版產品的某些功能,例如順序消息特性、事務消息特性等,您可以通過消息流入或流出規則將云消息隊列 MQTT 版云消息隊列 RocketMQ 版數據進行流轉。本文介紹如何將云消息隊列 MQTT 版的數據導出至其他阿里云產品。

背景信息

云消息隊列 MQTT 版支持云端SDK,云上應用可直接通過云端SDK接入云消息隊列 MQTT 版服務端進行消息收發。云端SDK使用,請參見云端開發概述

同時云消息隊列 MQTT 版支持和其他云產品進行互通,當前支持的云產品有云消息隊列 RocketMQ 版

本文以公網環境中的Java SDK為例說明如何將云消息隊列 MQTT 版的數據導出至云消息隊列 RocketMQ 版

此場景下可使用多語言的第三方開源SDK來實現消息收發。更多信息,請參見SDK下載

quick_start_data_outflow

網絡訪問

云消息隊列 MQTT 版同時提供了公網接入點VPC 接入點
  • 公網接入點為本地公網環境訪問的IP地址,一般用于物聯網和移動互聯網場景中;
  • VPC 接入點為云上私網訪問的IP地址,一般用于云端應用接入云消息隊列 MQTT 版
重要 客戶端使用接入點連接服務時務必使用域名接入,不得直接使用域名背后的IP地址直接連接,因為IP地址隨時會變化。在以下使用情況中出現的問題云消息隊列 MQTT 版產品方概不負責:
  • 客戶端不使用域名接入而是使用IP地址接入,產品方更新了域名解析導致原有IP地址失效。
  • 客戶端網絡對IP地址設置網絡防火墻策略,產品方更新了域名解析后新IP地址被您的防火墻策略攔截。

前提條件

  • 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
  • 下載安裝JDK
  • 已創建云消息隊列 MQTT 版實例、Topic和Group ID,具體操作,請參見創建資源

  • 已創建云消息隊列 RocketMQ 版實例、Topic和Group ID,具體操作,請參見步驟二:創建資源

重要
  • 云消息隊列 MQTT 版數據流出規則僅支持云消息隊列 RocketMQ 版4.x系列實例。

  • 云消息隊列 MQTT 版數據流出規則不能跨地域使用,因此,云消息隊列 MQTT 版云消息隊列 RocketMQ 版的資源都必須創建在同一地域。

1.創建數據流出規則

  1. 登錄云消息隊列 MQTT 版控制臺,并在左側導航欄單擊實例列表

  2. 在頂部菜單欄選擇目標地域,然后在實例列表中單擊實例名稱進入實例詳情頁面。

  3. 在左側導航欄單擊規則管理,然后在頁面左上角,單擊創建規則

  4. 創建規則頁面完成以下操作。

    1. 配置基本信息。輸入規則ID,選擇數據流出的規則類型。

      image

    2. 配置規則源。選擇已經創建好的云消息隊列 MQTT 版的Topic。

      image

    3. 配置規則目標。選擇已經創建好的云消息隊列 RocketMQ 版的實例和Topic。

      image

2.準備測試代碼

2.1下載示例代碼

  1. 下載mqtt-java-demo,并解壓該Demo工程包至您指定的文件夾。

  2. 在解壓的Demo工程中找到lmq-java-demo文件夾,將此文件夾導入IntelliJ IDEA,并確認pom.xml中已包含以下依賴。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置訪問憑證。

    • 獲取AccessKey信息。獲取方式,請參見創建AccessKey

    • 配置環境變量。云消息隊列 MQTT 版的AccessKey ID和AccessKey Secret的環境變量名稱分別為MQTT_AK_ENVMQTT_SK_ENV。關于配置環境變量的方法,請參見配置訪問憑證

2.2收發消息代碼

MQ4IoTSendMessageToRocketMQ.java類中包含了發送MQTT消息和使用RocketMQ消費的代碼,按代碼注釋說明填寫云消息隊列 MQTT 版云消息隊列 RocketMQ 版資源的參數。示例代碼如下。

收發送消息代碼示例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToRocketMQ {
    public static void main(String[] args) throws Exception {
        /**
         * 初始化云消息隊列 RocketMQ 版接收客戶端,實際業務中一般部署在服務端應用中。
         */
        Properties properties = new Properties();
        /**
         * 設置云消息隊列 RocketMQ 版Group ID,在云消息隊列 RocketMQ 版控制臺創建。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        
        /**
         * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創建。
         * 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
         * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以將AccessKey 和 AccessKeySecret 保存在環境變量為例說明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創建。僅在簽名鑒權模式下需要設置。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
      
         /**
         * 設置TCP接入點,該接入點為云消息隊列 RocketMQ 版實例的接入點。云消息隊列 RocketMQ 版控制臺實例詳情頁面獲取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
        /**
         * 設置云消息隊列 RocketMQ 版的Topic,在云消息隊列 RocketMQ 版控制臺創建。
         * 云消息隊列 RocketMQ 版和微消息隊列MQTT配合使用時,RocketMQ客戶端僅操作一級Topic。
         */
        final String parentTopic = "XXXXX";
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(parentTopic, "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext consumeContext) {
                System.out.println("recv msg:" + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * 初始化云消息隊列 MQTT 版發送客戶端,實際業務中云消息隊列 MQTT 版一般部署在移動終端環境。
         */

        /**
         * 您在控制臺創建的微消息隊列MQTT的實例ID。
         */
        String instanceId = "XXXXX";
         /**
         * 設置接入點,進入微消息隊列MQTT版控制臺實例詳情頁面獲取。
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創建。
         * 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
         * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以將AccessKey 和 AccessKeySecret 保存在環境變量為例說明。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創建。僅在簽名鑒權模式下需要設置。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQTT客戶端ID,由業務系統分配,需要保證每個TCP連接都不一樣,保證全局唯一,如果不同的客戶端對象(TCP連接)使用了相同的clientId會導致連接異常斷開。
         * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在云消息隊列 MQTT 版控制臺創建,DeviceID由業務方自己設置,clientId總長度不得超過64個字符。
         */
        String clientId = "GID_XXXX@@@XXXXX";
       /**
         * 云消息隊列 MQTT 版支持子級Topic,用來做自定義的過濾,此處為示例,可以填寫任何字符串。
         * 需要注意的是,完整的Topic長度不得超過128個字符。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS參數代表傳輸質量,可選0,1,2。詳細信息,請參見名詞解釋。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
         /**
         * 客戶端協議和端口。客戶端使用的協議和端口必須匹配,如果是SSL加密則設置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 設置客戶端發送超時時間,防止無限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客戶端連接成功后就需要盡快訂閱需要的Topic。
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  發送普通消息時,Topic必須和接收方訂閱的Topic一致,或者符合通配符匹配規則。
             */
            mqttClient.publish(mq4IotTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3.結果驗證

執行MQ4IoTSendMessageToRocketMQ.java類中的Main函數運行代碼。可以根據下面操作驗證消息的發送和消費情況。

代碼驗證

如下圖所示,MQTT消息已經成功發送,RocketMQ客戶端也已經成功消費。

image

控制臺驗證

  • 查詢消息發送情況。在云消息隊列 MQTT 版控制臺消息軌跡查詢頁面,根據Group ID和Device ID查詢消息已經成功發送,如下圖所示。

    image

  • 查詢消息消費情況。在云消息隊列 RocketMQ 版控制臺消息查詢頁面,根據Topic查詢到消息已經被轉發到云消息隊列 RocketMQ 版,如下圖所示。

    image

    單擊操作列中的消息軌跡查看,消息已經被消費,如下圖所示。

    image

更多信息