您可以使用規則引擎,將物聯網平臺數據轉發到消息隊列(Kafka)中存儲,從而實現消息從設備、物聯網平臺、Kafka到應用服務器之間的全鏈路高可靠傳輸能力。本文以物模型數據上報Topic為例,介紹流轉消息數據的完整流程。

前提條件

  • 已創建數據源DataSource,并添加物模型數據上報Topic。具體步驟,請參見添加待流轉的數據源
  • 已創建消息隊列(Kafka)實例和用于接收數據的Topic。Kafka使用方法,請參見Kafka快速入門
    重要 Kafka實例所在地域必須與物聯網平臺服務的當前實例所在地域一致。
  • 當前阿里云賬號已添加白名單權限,支持將數據轉發到消息隊列Kafka。您可提交工單申請開通白名單權限。

背景信息

轉發的數據目的配置完成后,會自動完成以下配置,實現設備數據通過物聯網平臺的規則引擎轉發到消息隊列(Kafka)。

  • 物聯網平臺占用Kafka實例所在虛擬交換機的2個IP地址。
  • 在Kafka實例所在的VPC網絡下創建托管安全組,安全組名稱默認以sg-nsm-開頭。

創建數據目的

  1. 登錄物聯網平臺控制臺
  2. 實例概覽頁面,選擇目標環境,找到對應的實例,單擊實例ID或備注名稱。
    重要 目前僅開通企業版實例服務的地域下,執行此步驟。其他地域,請跳過此步驟。地域及實例的支持說明,請參見實例概述
    實例概覽
  3. 在左側導航欄,選擇消息轉發 > 云產品流轉
  4. 可選:云產品流轉頁面,單擊右上角體驗新版,進入新版功能頁面。
    說明 如果您已執行過此操作,再次進入云產品流轉頁面,會直接進入新版功能頁面。
  5. 單擊數據目的頁簽,然后單擊創建數據目的
  6. 創建數據目的對話框,輸入數據目的名稱,例如DataPurpose,按照以下參數說明,完成配置,然后單擊確定
    參數描述
    選擇操作選擇發送數據到消息隊列(Kafka)中
    角色授權物聯網平臺將數據寫入Kafka。

    如您還未創建相關角色,單擊創建RAM角色,跳轉到RAM控制臺,創建角色和授權策略,請參見創建RAM角色

    地域固定為您物聯網平臺實例所在地域。
    實例選擇Kafka實例。

    您可以單擊創建實例,跳轉到消息隊列控制臺,創建Kafka實例。具體操作,請參見創建實例

    Topic選擇用于接收物聯網平臺數據的Kafka Topic。

    您可以單擊創建Topic,跳轉到消息隊列控制臺,創建Kafka Topic。具體操作,請參見創建Topic

配置并啟動解析器

  1. 創建解析器,例如DataParser。具體操作,請參見創建解析器
  2. 解析器詳情頁面,關聯數據源。
    1. 在配置向導的數據源下,單擊關聯數據源
    2. 在彈出的對話框中,單擊數據源下拉列表,選擇已創建的數據源DataSource,單擊確定
  3. 解析器詳情頁面,關聯數據目的。
    1. 單擊配置向導的數據目的,然后單擊數據目的列表右上方的關聯數據目的
    2. 在彈出的對話框中,單擊數據目的下拉列表,選擇已創建的數據目的DataPurpose,單擊確定
    3. 在數據目的列表,查看并保存數據目的ID,例如為1000
      后續解析腳本中,需使用此處的數據目的ID
  4. 解析器詳情頁面,單擊解析器
  5. 在腳本輸入框,輸入解析腳本。腳本編輯方法,請參見腳本示例
    函數參數說明,請參見函數列表
    //通過payload函數,獲取設備上報的消息內容,并按照JSON格式轉換。
    var data = payload("json");
    //直接流轉物模型上報數據。
    writeKafka(1000, data, "調試");
  6. 單擊調試,根據頁面提示,選擇產品和設備,輸入Topic和Payload數據,驗證腳本可執行。
    參數示例如下:調試示例

    運行結果如下,表示腳本執行成功。

    action:
        transmit to kafka[destinationId=1000], data:{"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
    variables:
        data : {"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
  7. 單擊發布
  8. 回到云產品流轉頁面的解析器頁簽,單擊解析器DataParser對應的啟動按鈕,啟動解析器。
  9. 消息隊列Kafka版控制臺對應實例的Topic詳情頁面,查詢流轉的消息。
    消息