日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

微消息隊列MQTT版推送事件到函數計算

本文介紹如何通過事件總線EventBridge云消息隊列 MQTT 版的數據推送到函數計算。

前提條件

您已完成以下操作:

注意事項

事件總線EventBridge不支持直接從云消息隊列 MQTT 版的Topic拉取事件。您可以通過云消息隊列 MQTT 版的數據流出功能,將數據流轉到云消息隊列 RocketMQ 版的Topic中,并通過添加事件總線EventBridge的自定義事件源云消息隊列 RocketMQ 版,實現云消息隊列 MQTT 版事件總線EventBridge的集成。

步驟一:創建數據流出規則

  1. 登錄云消息隊列 MQTT 版控制臺,并在左側導航欄單擊實例列表
  2. 在頂部菜單欄選擇目標地域,然后在實例列表中單擊實例名稱進入實例詳情頁面。
  3. 在左側導航欄單擊規則管理,然后在頁面左上角,單擊創建規則
  4. 創建規則頁面完成以下操作。
    1. 配置基本信息配置向導頁面,填寫規則的基本信息,然后單擊下一步
      參數取值示例說明
      規則ID111111規則的全局唯一標識,說明如下:
      • 只能包含字母、數字、短劃線(-)和下劃線(_),至少包含一個字母或數字。
      • 名稱長度限制在3~64字符之間,長于64字符將被自動截取。
      • 創建后無法更新。
      描述migrate from rocketmq對規則的描述。
      狀態啟用是否啟用當前規則,取值說明如下:
      • 啟用
      • 停用
      規則類型數據流出創建的規則類型,取值說明如下:
      • 數據流出:用于將云消息隊列 MQTT 版的數據導出至其他阿里云產品。詳細信息,請參見跨云產品的數據流出
      • 數據流入:用于將其他阿里云產品的數據導入至云消息隊列 MQTT 版。詳細信息,請參見跨云產品數據流入
      • 上下線通知:用于將獲取的云消息隊列 MQTT 版客戶端上下線事件數據導出至其他阿里云產品。詳細信息,請參見MQTT客戶端上下線事件數據流出
    2. 配置規則源配置向導頁面,配置數據源,然后單擊下一步
      參數取值示例說明
      TopicTopicA指定您需導出數據的源Topic,即云消息隊列 MQTT 版的Topic。
    3. 配置規則目標配置向導頁面,配置數據的流轉目標,然后單擊創建
      參數取值示例說明
      目標服務類型消息隊列 RocketMQ 版指定您需將源Topic的數據轉發至的目標云產品。
      說明 當前僅支持云消息隊列 RocketMQ 版
      RocketMQ 實例MQ_INST_13801563067*****_BbyOD2jQ指定目標云產品的實例ID,即云消息隊列 RocketMQ 版的實例ID。
      說明 僅支持選擇和云消息隊列 MQTT 版實例為同一地域的云產品實例。
      TopicTopicB指定目標云產品的資源鍵值,即云消息隊列 RocketMQ 版的Topic。源Topic的數據將流轉至TopicB。
    您可以在規則管理的規則列表查看到剛創建的數據流出規則。

步驟二:創建自定義事件源

  1. 登錄事件總線EventBridge控制臺
  2. 在左側導航欄,單擊事件總線
  3. 在頂部菜單欄,選擇地域。
  4. 事件總線頁面,單擊已創建的自定義事件總線。
  5. 在左側導航欄,單擊事件源
  6. 事件源頁面,單擊添加事件源
  7. 添加自定義事件源面板,輸入名稱描述事件提供方選擇消息隊列 RocketMQ 版,并選擇已創建的云消息隊列 RocketMQ 版的資源信息等,然后單擊確定

步驟三:創建事件規則

重要 目標服務和事件規則必須處于同一地域。
  1. 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件總線
  2. 在頂部菜單欄,選擇地域,在事件總線頁面,單擊目標總線名稱。
  3. 在左側導航欄,單擊事件規則,然后單擊創建規則
  4. 創建規則頁面,完成以下操作。
    1. 配置基本信息配置向導,在名稱文本框輸入規則名稱,在描述文本框輸入規則的描述,然后單擊下一步
    2. 配置事件模式配置向導,事件源類型選擇自定義事件源事件源選擇步驟一添加的自定義事件源,在事件模式內容代碼框輸入事件模式,然后單擊下一步

      如需了解更多信息,請參見事件模式

    3. 配置事件目標配置向導,配置事件目標,然后單擊創建
      說明 1個事件規則最多可以添加5個目標。
      • 服務類型:單擊函數計算
      • 服務:選擇已創建的函數計算的服務。
      • 函數:選擇已創建的函數計算的函數。
      • 事件:單擊模板

        以下提供變量和模板的示例。

        變量示例:

        {
          "source":"$.source",
          "type":"$.type"
        }

        模板示例:

        The event comes from ${source},event type is ${type}.

        如需了解更多信息,請參見事件內容轉換

      • 服務版本和別名:選擇服務版本或服務別名。
        • 默認版本:LATEST。
        • 指定版本:選擇服務版本。更多信息,請參見管理版本
        • 指定別名:選擇服務別名。更多信息,請參見管理別名
      • 調用方式:選擇同步調用異步調用。更多信息,請參見同步調用功能概覽

步驟四:發布事件

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 本代碼提供簽名鑒權模式下MQ4IOT客戶端發送消息到MQ4IOT客戶端的示例,其中初始化參數請根據實際情況修改。
 * 簽名模式即使用阿里云賬號系統提供的AccessKey和SecretKey對每個客戶端計算出一個獨立的簽名供客戶端識別使用。
 * 對于實際業務場景使用過程中,考慮到私鑰SecretKey的隱私性,可以將簽名過程放在受信任的環境完成。
 *
 * 完整demo工程,參考https://github.com/AliwareMQ/lmq-demo。
 */
public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT實例ID,購買后控制臺獲取。
         */
        String instanceId = "XXXXX";
        /**
         * 接入點地址,購買MQ4IOT實例,且配置完成后即可獲取,接入點地址必須填寫分配的域名,不得使用IP地址直接連接,否則可能會導致客戶端異常。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * 賬號accesskey,從賬號系統控制臺獲取。
         */
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        /**
         * 賬號secretKey,從賬號系統控制臺獲取,僅在Signature鑒權模式下需要設置。
         */
        String secretKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        /**
         * MQ4IOT Client Id,由業務系統分配,需要保證每個TCP連接都不一樣,保證全局唯一,如果不同的客戶端對象(TCP連接)使用了相同的Client Id會導致連接異常斷開。
         * Client Id 由兩部分組成,格式為GroupID@@@DeviceId,其中groupId在MQ4IOT控制臺申請,DeviceId由業務方自己設置,Client Id總長度不得超過64個字符。
         */
        String clientId = "GID_XXXXX@@@XXXXX";
        /**
         * MQ4IOT 消息的一級Topic,需要在控制臺申請才能使用。
         * 如果使用了沒有申請或者沒有被授權的Topic會導致鑒權失敗,服務端會斷開客戶端連接。
         */
        final String parentTopic = "XXXXX";
        /**
         * MQ4IOT支持子級Topic,用來做自定義的過濾,此處為示意,可以填寫任何字符串。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS參數代表傳輸質量,可選0,1,2。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客戶端使用的協議和端口必須匹配。
         * 如果SSL加密則設置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客戶端設置好發送超時時間,防止無限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客戶端連接成功后就需要盡快訂閱需要的Topic。
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。
                 * 消費消息需要保證在規定時間內完成,如果消費耗時超過服務端約定的超時時間,對于可靠傳輸的模式,服務端可能會重試推送,業務需要做好冪等去重處理。超時時間約定參考限制。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  發送普通消息時,Topic必須和接收方訂閱的Topic一致,或者符合通配符匹配規則。
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支持點對點消息,即如果發送方明確知道該消息只需要給特定的一個設備接收,且知道對端的Client Id,則可以直接發送點對點消息。
             * 點對點消息不需要經過訂閱關系匹配,可以簡化訂閱方的邏輯。點對點消息的Topic格式規范是{{parentTopic}}/p2p/{{targetClientId}}。
             */
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

結果驗證

您可以在函數計算控制臺使用表盤解讀數據指標。

  1. 登錄函數計算控制臺
  2. 在左側導航欄,單擊服務及函數
  3. 在頂部菜單欄,選擇地域。
  4. 服務列表頁面,找到目標服務,在其右側操作列單擊函數管理
  5. 函數管理頁面,找到目標函數,單擊目標函數名稱。
  6. 函數詳情頁面,單擊調用日志頁簽,查看日志。
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****

常見問題

事件發布失敗,我該如何定位問題?

如果事件發布失敗,您可以查看事件軌跡,在事件軌跡頁面的事件投遞區域查看投遞詳情,獲取投遞響應。針對不同投遞響應提示,采取相應的解決措施。

發布到函數計算的事件發布失敗,且投遞響應為[500]ConnectErrorconnectiontimedout,我該如何處理?

您可以按照以下步驟處理:
  1. 登錄函數計算控制臺,執行目標函數并觀察執行時間。
  2. 如果函數執行時間大于15s,請排查網絡問題;如果函數執行時間小于15s,請確認您是否可以訪問函數計算服務所屬地域的Endpoint。
  3. 如您不能訪問當前函數計算服務所屬地域的Endpoint,請聯系函數計算工程師處理。