數(shù)據(jù)轉(zhuǎn)發(fā)到AMQP服務端訂閱消費組消費
直接使用AMQP服務端訂閱實時獲取設備數(shù)據(jù),只能從產(chǎn)品維度獲取所有設備的數(shù)據(jù),使用消息轉(zhuǎn)發(fā)的云產(chǎn)品流轉(zhuǎn)功能,可以將全部設備或指定設備發(fā)送到物聯(lián)網(wǎng)平臺的消息,先經(jīng)過解析腳本處理和過濾,再轉(zhuǎn)發(fā)到AMQP服務端訂閱消費組,通過AMQP客戶端消費。本文以物模型數(shù)據(jù)上報Topic為例,介紹流轉(zhuǎn)消息數(shù)據(jù)的完整流程。
工作原理
云產(chǎn)品流轉(zhuǎn)可將同一產(chǎn)品所有設備或指定設備的指定Topic消息,實時轉(zhuǎn)發(fā)到一個或多個消費組中,每個消費組中包括多個消費者即AMQP客戶端。每條消息轉(zhuǎn)發(fā)到消費組時,消費組中隨機一個消費者收到消息,不同消費組通過消費組ID區(qū)分。在上圖中:
消費者:使用AMQP SDK注冊消費組的消費者,用于接收物聯(lián)網(wǎng)平臺轉(zhuǎn)發(fā)到AMQP消費組中的消息。
物聯(lián)網(wǎng)平臺:配置云產(chǎn)品流轉(zhuǎn)的數(shù)據(jù)源Topic、數(shù)據(jù)目的和解析器腳本,并啟動解析器,將設備消息轉(zhuǎn)發(fā)到AMQP服務端訂閱的消費組。
數(shù)據(jù)源:支持的Topic類型消息,請參見數(shù)據(jù)格式(非云網(wǎng)關(guān)產(chǎn)品和設備)、自定義Topic(MQTT云網(wǎng)關(guān))、消息轉(zhuǎn)發(fā)Topic(GB/T 32960云網(wǎng)關(guān))、消息轉(zhuǎn)發(fā)Topic(JT/T 808云網(wǎng)關(guān))、消息轉(zhuǎn)發(fā)Topic(SL 651云網(wǎng)關(guān))。
例如:
數(shù)據(jù)源Topic_產(chǎn)品A:將產(chǎn)品A所有設備的消息轉(zhuǎn)發(fā)到消費組。
數(shù)據(jù)源Topic_設備B:將產(chǎn)品B中指定的一個設備B的消息轉(zhuǎn)發(fā)到消費組。
數(shù)據(jù)目的:接收設備數(shù)據(jù)的AMQP服務端訂閱消費組。具體內(nèi)容,請參見管理AMQP消費組。
解析器腳本:配置通過數(shù)據(jù)流轉(zhuǎn)函數(shù)
writeAmqp(destinationId, payload, tag)
將設備數(shù)據(jù)轉(zhuǎn)發(fā)到AMQP客戶端的消費組中。函數(shù)說明,請參見流轉(zhuǎn)數(shù)據(jù)到數(shù)據(jù)目的函數(shù)。
AMQP客戶端:啟動解析器后,物聯(lián)網(wǎng)平臺會自動將設備消息轉(zhuǎn)發(fā)到AMQP客戶端,不需要在AMQP客戶端代碼中訂閱Topic,只要AMQP客戶端在線就可以接收消息。
云產(chǎn)品流轉(zhuǎn)不支持從AMQP客戶端下發(fā)消息給設備,如果需要下發(fā)指令給設備,請調(diào)用消息通信API。
應用場景
業(yè)務服務器接收設備消息:云產(chǎn)品流轉(zhuǎn)可以靈活地轉(zhuǎn)發(fā)設備消息到AMQP服務端訂閱的消息組。
轉(zhuǎn)發(fā)指定設備的消息。
轉(zhuǎn)發(fā)指定Topic的消息。
消息過濾或處理后再轉(zhuǎn)發(fā)。
AMQP客戶端直接實時獲取指定產(chǎn)品下所有設備的消息,可以直接配置AMQP服務端訂閱。具體內(nèi)容,請參見配置AMQP服務端訂閱。
前提條件
已創(chuàng)建消費組,作為數(shù)據(jù)轉(zhuǎn)發(fā)目的地。您可使用物聯(lián)網(wǎng)平臺默認消費組(DEFAULT_GROUP)或創(chuàng)建消費組。
已添加待轉(zhuǎn)發(fā)的設備Topic數(shù)據(jù)源。例如:創(chuàng)建數(shù)據(jù)源DataSource,添加指定設備的物模型數(shù)據(jù)上報Topic。具體步驟,請參見添加待流轉(zhuǎn)的數(shù)據(jù)源。
使用限制
AMQP客戶端建立連接之后,需要立刻發(fā)送認證請求。如果15秒內(nèi)沒有認證成功,服務器會主動關(guān)閉連接。
AMQP客戶端的一個連接限流1,000 TPS,消息轉(zhuǎn)發(fā)TPS限流由實例的消息轉(zhuǎn)發(fā)TPS規(guī)格決定,消息大小無限制。更多的AMQP服務端訂閱限制,請參見服務端訂閱使用限制。
步驟一:在物聯(lián)網(wǎng)平臺配置數(shù)據(jù)目的和解析器
step1:配置數(shù)據(jù)目的
在實例概覽頁簽的全部環(huán)境下,找到對應的實例,單擊實例卡片。
在左側(cè)導航欄,選擇 。
在云產(chǎn)品流轉(zhuǎn)頁面,單擊右上角體驗新版,進入新版功能頁面。
說明如果您已執(zhí)行過此操作,再次進入云產(chǎn)品流轉(zhuǎn)頁面,會直接進入新版功能頁面。
單擊數(shù)據(jù)目的頁簽,然后單擊創(chuàng)建數(shù)據(jù)目的。
在創(chuàng)建數(shù)據(jù)目的對話框,輸入數(shù)據(jù)目的名稱,例如DataPurpose,按照以下參數(shù)說明,完成配置,然后單擊確定。
參數(shù)
描述
選擇操作
選擇發(fā)布到AMQP服務端訂閱消費組。
消費組
選擇一個已創(chuàng)建的消費組作為數(shù)據(jù)轉(zhuǎn)發(fā)目標。單擊創(chuàng)建消費組可以進行消費組創(chuàng)建。
step2:配置并啟動解析器
創(chuàng)建解析器,例如DataParser。具體操作,請參見創(chuàng)建解析器。
在解析器詳情頁面,關(guān)聯(lián)數(shù)據(jù)源。
在配置向?qū)У?b data-tag="uicontrol" id="uicontrol-f9s-25z-bkv" class="uicontrol">數(shù)據(jù)源下,單擊關(guān)聯(lián)數(shù)據(jù)源。
在彈出的對話框中,單擊數(shù)據(jù)源下拉列表,選擇已創(chuàng)建的數(shù)據(jù)源DataSource,單擊確定。
在解析器詳情頁面,關(guān)聯(lián)數(shù)據(jù)目的。
單擊配置向?qū)У?b data-tag="uicontrol" id="uicontrol-msn-zik-exs" class="uicontrol">數(shù)據(jù)目的,然后單擊數(shù)據(jù)目的列表右上方的關(guān)聯(lián)數(shù)據(jù)目的。
在彈出的對話框中,單擊數(shù)據(jù)目的下拉列表,選擇已創(chuàng)建的數(shù)據(jù)目的DataPurpose,單擊確定。
在數(shù)據(jù)目的列表,查看并保存數(shù)據(jù)目的ID,例如為1000。
后續(xù)解析腳本中,需使用此處的數(shù)據(jù)目的ID。
在解析器詳情頁面,單擊解析器。
在腳本輸入框,輸入解析腳本。
解析腳本類似JavaScript語言,編輯腳本的語法參考JavaScript語法,詳細編輯方法,請參見腳本語法。
轉(zhuǎn)發(fā)數(shù)據(jù)到AMQP服務端訂閱消息組,需要使用函數(shù)
writeAmqp(destinationId, payload, tag)
。函數(shù)參數(shù)說明,請參見函數(shù)列表。無需指定設備,轉(zhuǎn)發(fā)產(chǎn)品下全部設備的數(shù)據(jù):
//通過payload函數(shù),獲取設備上報的消息內(nèi)容,并按照JSON格式轉(zhuǎn)換。 var data = payload("json"); //直接流轉(zhuǎn)物模型上報數(shù)據(jù)。 writeAmqp(1000, data, "調(diào)試");
指定某一個設備,僅轉(zhuǎn)發(fā)該設備的消息:
//通過payload函數(shù),獲取設備上報的消息內(nèi)容,并按照JSON格式轉(zhuǎn)換。 var data = payload("json"); //獲取上報消息的設備名稱。 var dn = deviceName(); //流轉(zhuǎn)指定設備的物模型上報數(shù)據(jù)。 if (dn == 'device01') { writeAmqp(1000, data, "調(diào)試"); }
單擊調(diào)試,根據(jù)頁面提示,選擇產(chǎn)品和設備,輸入Topic和Payload數(shù)據(jù),驗證腳本可執(zhí)行。
參數(shù)示例如下:
運行結(jié)果如下,表示腳本執(zhí)行成功。
單擊發(fā)布。
回到云產(chǎn)品流轉(zhuǎn)頁面的解析器頁簽,單擊解析器DataParser對應的啟動按鈕,啟動解析器。
步驟二:運行AMQP客戶端
建議使用阿里云物聯(lián)網(wǎng)平臺提供的AMQP SDK接入示例。對于您自研的AMQP SDK,阿里云不提供后續(xù)技術(shù)支持服務。
本示例使用Java語言,其他語言的示例請參見AMQP客戶端接入說明。
本示例購買Alibaba Cloud Linux操作系統(tǒng)的ECS實例,作為AMQP客戶端的開發(fā)環(huán)境:
登錄ECS實例。登錄方式,請參見連接方式概述。
執(zhí)行以下命令,下載Demo文件。
wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
執(zhí)行以下命令,解壓demo文件。
unzip amqp-demo.zip
在
src/main/java/com.aliyun.iotx.demo
目錄下AmqpClient.java
文件中,參照下表修改AMQP的接入信息。重要本示例Demo代碼中,添加了結(jié)束程序的代碼(
Thread.sleep(60 * 1000);
),即程序啟動成功,運行一分鐘后會結(jié)束。實際場景中,您可根據(jù)需要自行設置運行時間。參數(shù)
說明
accessKey
阿里云主賬號或RAM用戶的AccessKey ID和AccessKey Secret。
登錄物聯(lián)網(wǎng)平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。
重要為避免將AccessKey硬編碼到業(yè)務代碼中帶來的安全風險,可采用配置環(huán)境變量的方法管理AccessKey。
您需在本地操作系統(tǒng)中添加環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分別寫入已準備好的AccessKey ID和AccessKey Secret。
在示例代碼中可通過以下方法獲取:
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
accessSecret
consumerGroupId
當前物聯(lián)網(wǎng)平臺對應實例中的消費組ID。
登錄物聯(lián)網(wǎng)平臺控制臺,在對應實例的
查看您的消費組ID。iotInstanceId
實例ID。您可在物聯(lián)網(wǎng)平臺控制臺的實例概覽頁面,查看當前實例的ID。
若有ID值,必須傳入該ID值。
若無實例概覽頁面或ID值,傳入空值,即
iotInstanceId = ""
。
clientId
表示客戶端ID,用戶自定義,長度不可超過64個字符。建議使用您的AMQP客戶端所在服務器UUID、MAC地址、IP等唯一標識。
AMQP客戶端接入并啟動成功后,登錄物聯(lián)網(wǎng)平臺控制臺,在對應實例的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數(shù),方便您識別區(qū)分不同的客戶端。
connectionCount
啟動AMQP客戶端的連接數(shù),最大不超過128個。用于實時消息推送的擴容。
消費組詳情頁面會以
${clientId}+"-"+數(shù)字
形式,顯示連接的客戶端。其中數(shù)字最小值為0。host
AMQP接入域名。
${YourHost}
對應的AMQP接入域名信息,請參見查看和配置實例終端節(jié)點信息(Endpoint)。在
pom.xml
文件中,已添加相關(guān)Maven依賴。在amqp-demo
根目錄執(zhí)行以下命令,重新加載Maven變更,構(gòu)建項目。mvn clean package
在
amqp-demo/target
目錄執(zhí)行以下命令,運行生成的JAR包。java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
運行示例代碼后返回如下信息,表示AMQP客戶端已接入物聯(lián)網(wǎng)平臺并成功接收消息。
重要只有當AMQP客戶端在線時,才能在服務器上收到設備消息。
10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5) 10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6) 10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message, topic = /g18******/device01/thing/event/property/post, messageId = 1731508564705******, content = {"temperature":10,"humidity":56}
在相應消費組顯示在線的AMQP客戶端。
amqp-demo
中connectionCount = 4
代表4個客戶端。
后續(xù)操作
如果AMQP客戶端不在線,AMQP服務端訂閱消息會堆積,AMQP客戶端重新上線后,物聯(lián)網(wǎng)平臺重新推送消息。如果不需要消費堆積的消息,可在AMQP客戶端上線前,清空堆積的消息。
所有配置完成,設備上報訂閱數(shù)據(jù)并被AMQP客戶端接收后,您可以登錄物聯(lián)網(wǎng)平臺控制臺,進入對應實例查看消息運行日志。
在查詢云端運行日志。
頁簽,查看設備上報數(shù)據(jù)、物聯(lián)網(wǎng)平臺轉(zhuǎn)發(fā)數(shù)據(jù)到AMQP客戶端和AMQP客戶端返回ACK的日志記錄。具體操作,請參見- 頁簽,單擊目標消費組右側(cè)操作列的查看,在消費組詳情頁面,查看消息消費速率、消息堆積量、消費日志等。具體操作,請參見