您可以使用規則引擎,將物聯網平臺數據轉發到消息隊列(Kafka)中存儲,從而實現消息從設備、物聯網平臺、Kafka到應用服務器之間的全鏈路高可靠傳輸能力。本文以物模型數據上報Topic為例,介紹流轉消息數據的完整流程。
前提條件
背景信息
轉發的數據目的配置完成后,會自動完成以下配置,實現設備數據通過物聯網平臺的規則引擎轉發到消息隊列(Kafka)。
- 物聯網平臺占用Kafka實例所在虛擬交換機的2個IP地址。
- 在Kafka實例所在的VPC網絡下創建托管安全組,安全組名稱默認以sg-nsm-開頭。
創建數據目的
配置并啟動解析器
- 創建解析器,例如DataParser。具體操作,請參見創建解析器。
- 在解析器詳情頁面,關聯數據源。
- 在配置向導的數據源下,單擊關聯數據源。
- 在彈出的對話框中,單擊數據源下拉列表,選擇已創建的數據源DataSource,單擊確定。
- 在解析器詳情頁面,關聯數據目的。
- 在解析器詳情頁面,單擊解析器。
- 在腳本輸入框,輸入解析腳本。腳本編輯方法,請參見腳本示例。函數參數說明,請參見函數列表。
//通過payload函數,獲取設備上報的消息內容,并按照JSON格式轉換。 var data = payload("json"); //直接流轉物模型上報數據。 writeKafka(1000, data, "調試");
- 單擊調試,根據頁面提示,選擇產品和設備,輸入Topic和Payload數據,驗證腳本可執行。參數示例如下:
運行結果如下,表示腳本執行成功。
action: transmit to kafka[destinationId=1000], data:{"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}} variables: data : {"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
- 單擊發布。
- 回到云產品流轉頁面的解析器頁簽,單擊解析器DataParser對應的啟動按鈕,啟動解析器。
- 在消息隊列Kafka版控制臺對應實例的Topic詳情頁面,查詢流轉的消息。