使用示例
本文以C Link SDK中的Demo文件為例,介紹如何調(diào)用Link SDK的API,將MQTT協(xié)議的設(shè)備接入物聯(lián)網(wǎng)平臺并進(jìn)行消息收發(fā)。
背景信息
MQTT接入的更多信息,請參見MQTT接入概述。
物聯(lián)網(wǎng)平臺提供的C Link SDK中,非云網(wǎng)關(guān)設(shè)備接入的Demo文件為./mqtt_basic_demo.c
,云網(wǎng)關(guān)設(shè)備接入的Demo文件為./mqtt_userdefine_demo.c
。
步驟一:初始化
添加頭文件。
#include "aiot_state_api.h" #include "aiot_sysdep_api.h" #include "aiot_mqtt_api.h"
配置底層依賴和日志輸出。
aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile); aiot_state_set_logcb(demo_state_logcb);
調(diào)用aiot_mqtt_init,創(chuàng)建MQTT客戶端實例,并初始化默認(rèn)參數(shù)。
mqtt_handle = aiot_mqtt_init(); if (mqtt_handle == NULL) { printf("aiot_mqtt_init failed\n"); return -1; }
步驟二:配置功能
調(diào)用aiot_mqtt_setopt,配置以下功能。
更多功能的配置項,請參見aiot_mqtt_option_t。
配置連接參數(shù)。
非云網(wǎng)關(guān)設(shè)備
示例代碼:
/* TODO: 替換為自己設(shè)備的認(rèn)證信息。 */ char *product_key = "a18wP******"; char *device_name = "LightSwitch"; char *device_secret = "uwMTmVAMnGGHaAkqmeDY6cHxxB******"; /* TODO: 替換為自己設(shè)備的接入域名。 */ char *mqtt_host = "iot-06z00ax1o******.mqtt.iothub.aliyuncs.com"; ... ... /* 以下參數(shù)為可選,您可使用SDK文件中默認(rèn)內(nèi)容。 */ /* 配置MQTT服務(wù)器地址。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)url); /* 配置MQTT服務(wù)器端口。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port); /* 配置設(shè)備ProductKey。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* 配置設(shè)備DeviceName。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name); /* 配置設(shè)備DeviceSecret。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret); /* 配置網(wǎng)絡(luò)連接的安全憑據(jù)。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
相關(guān)參數(shù):
參數(shù)
說明
mqtt_host
設(shè)備的接入域名。
企業(yè)版實例和新版公共實例:在實例詳情頁面的開發(fā)配置面板,查看接入域名。
舊版公共實例:接入域名格式為
${YourProductKey}.iot-as-mqtt.${YourRegionId}.aliyuncs.com
。
新舊版公共實例和企業(yè)版實例、以及接入域名的更多信息,請參見查看實例終端節(jié)點。
product_key
設(shè)備認(rèn)證信息。具體信息,請參見獲取設(shè)備認(rèn)證信息。
本示例代碼的身份認(rèn)證方式為一機(jī)一密。
device_name
device_secret
MQTT保活說明:
重要設(shè)備端在保活時間間隔內(nèi),至少需要發(fā)送一次報文,包括ping請求。
從物聯(lián)網(wǎng)平臺發(fā)送CONNACK響應(yīng)CONNECT消息時,開始心跳計時。收到PUBLISH、SUBSCRIBE、PING或 PUBACK消息時,會重置計時器。物聯(lián)網(wǎng)平臺每隔30秒定時檢測一次設(shè)備的保活心跳,設(shè)備上線時間點距離最新定時檢測時間點的時間,是定時檢測的等待時間。定義最大超時時間為:
保活心跳時間*1.5+定時檢測的等待時間
。超過最大超時時間未收到設(shè)備消息,服務(wù)器會自動斷開連接。
C Link SDK具備保活能力,您可以設(shè)置以下配置項,自定義設(shè)備連接的保活心跳。如果不配置,則取默認(rèn)值。
配置項
默認(rèn)值
說明
AIOT_MQTTOPT_HEARTBEAT_MAX_LOST
2
可容忍的心跳丟失閾值。即:心跳請求報文達(dá)到設(shè)置的次數(shù)后,發(fā)起重連。
AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS
25,000
每次發(fā)起重連之間的間隔時間。單位毫秒, 取值范圍:1,000~1,200,000。
AIOT_MQTTOPT_KEEPALIVE_SEC
1,200
可容忍的心跳丟失時間閾值。即:失去心跳后,設(shè)置的時間內(nèi),允許發(fā)起重連。單位秒,取值范圍:30~1,200。建議取值大于300。
云網(wǎng)關(guān)設(shè)備
示例代碼:
/* TODO: 替換為自己設(shè)備的設(shè)備信息、接入地址(mqtt_host)、端口(port)、證書(user_ca_cert) */ char *username = "LightSwitch"; char *password = "*******"; char *client_id = "client_*******"; char *mqtt_host = "iot-******.igw.iothub.aliyuncs.com"; char *product_key = "*******"; uint16_t port = 1883; const char *user_ca_cert = \ { "-----BEGIN CERTIFICATE-----\r\n" \ "MIIC4jCCAco*************************************MQswCQYDVQQGEwJD\r\n" \ "TjETMBEGA1U*************************************IENBMB4XDTIyMTIw\r\n" \ "MjE0NDk1Mlo*************************************Q04xEzARBgNVBAoM\r\n" \ "CkFsaXl1biB*************************************KoZIhvcNAQEBBQAD\r\n" \ "ggEPADCCAQo*************************************2VaEUrnXNoO40w71\r\n" \ "i3l4Alchs1M*************************************VVxRGEtybsIH8CYO\r\n" \ "kyzGgOKbx7M*************************************49l3opCIfg9LOwjF\r\n" \ "R6x+ZY6yGdv*************************************CDxW2mILl+VwYd9s\r\n" \ "2udrJ7riJ5i*************************************/P9s+2UaBX89TTUd\r\n" \ "lYWKe3tHRg+*************************************BgkqhkiG9w0BAQsF\r\n" \ "AAOCAQEAhr8*************************************odRVrUaVBBLguSAH\r\n" \ "OZmtwUy0ZUf*************************************aJ27G4prMD2DMoby\r\n" \ "uTbXKOPYCvT*************************************5smSmfXIrBrbGrG6\r\n" \ "0CL1Y8DTNlF*************************************TjixfpGS3C/Ogu0H\r\n" \ "uMMbA4RCFhF*************************************X0VprxMrDarUJAa1\r\n" \ "tJ************************Iw==\r\n" \ "-----END CERTIFICATE-----\r\n" \ }; ... ... /* 配置MQTT服務(wù)器地址 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)mqtt_host); /* 配置MQTT服務(wù)器端口 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port); /* 配置設(shè)備username */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERNAME, (void *)username); /* 配置設(shè)備password */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PASSWORD, (void *)password); /* 配置設(shè)備client_id */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_CLIENTID, (void *)client_id); /* 配置設(shè)備productKey */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* 配置設(shè)備deviceName */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)username); /* 配置網(wǎng)絡(luò)連接的安全憑據(jù), 上面已經(jīng)創(chuàng)建好了 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred); /* 配置MQTT默認(rèn)消息接收回調(diào)函數(shù) */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler); /* 配置MQTT事件回調(diào)函數(shù) */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler); /* 關(guān)閉對Topic的以'/'為開頭的校驗 */ uint8_t topic_check = 0; aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_TOPIC_HEADER_CHECK, (void *)&topic_check);
相關(guān)參數(shù):
參數(shù)
說明
product_key
設(shè)備所屬云網(wǎng)關(guān)產(chǎn)品的ProductKey。
username
設(shè)備認(rèn)證信息。具體信息,請參見獲取云網(wǎng)關(guān)設(shè)備認(rèn)證信息。
password
client_id
客戶端ID,需自定義,長度不可超過64個字符。建議使用設(shè)備的MAC地址或SN碼,方便您識別區(qū)分不同的客戶端。
mqtt_host
MQTT云網(wǎng)關(guān)設(shè)備接入地址和端口號(默認(rèn)為1883)。獲取方法,請參見創(chuàng)建云網(wǎng)關(guān)產(chǎn)品(MQTT)。
port
user_ca_cert
設(shè)備根證書
root-ca.crt
的內(nèi)容。取消通信Topic以
/
開頭的校驗(AIOT_MQTTOPT_TOPIC_HEADER_CHECK
):/* 關(guān)閉對Topic的以'/'為開頭的校驗 */ uint8_t topic_check = 0; aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_TOPIC_HEADER_CHECK, (void *)&topic_check);
物聯(lián)網(wǎng)平臺與云網(wǎng)關(guān)設(shè)備之間通過MQTT協(xié)議的Topic實現(xiàn)消息通信,通信Topic符合標(biāo)準(zhǔn)MQTT協(xié)議的Topic規(guī)范即可,無需以
/
開頭。MQTT協(xié)議云網(wǎng)關(guān)設(shè)備通信的更多內(nèi)容,請參見消息通信說明。
配置狀態(tài)監(jiān)控和消息回調(diào)。
配置狀態(tài)監(jiān)控回調(diào)函數(shù)。
示例代碼:
int main(int argc, char *argv[]) { ... ... /* 配置MQTT默認(rèn)消息接收回調(diào)函數(shù)。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler); /* 配置MQTT事件回調(diào)函數(shù)。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler); ... ... }
相關(guān)參數(shù):
配置項
示例值
說明
AIOT_MQTTOPT_RECV_HANDLER
demo_mqtt_default_recv_handler
當(dāng)接收消息時,根據(jù)該回調(diào)函數(shù)定義的處理邏輯,執(zhí)行對應(yīng)的處理。
AIOT_MQTTOPT_EVENT_HANDLER
demo_mqtt_event_handler
當(dāng)設(shè)備連接狀態(tài)發(fā)生變化時,根據(jù)該回調(diào)函數(shù)定義的處理邏輯,執(zhí)行對應(yīng)的處理。
定義狀態(tài)監(jiān)控的回調(diào)函數(shù)。
重要避免定義過于耗時的事件處理邏輯,以免阻塞收包線程。
連接狀態(tài)的變化包括網(wǎng)絡(luò)異常、自動重連已成功、已斷開連接等。
如果要根據(jù)連接狀態(tài)的變化做應(yīng)對處理,可在
TODO
處,按照需要修改代碼。
/* MQTT事件回調(diào)函數(shù), 當(dāng)網(wǎng)絡(luò)連接、重連或斷開時,觸發(fā)該函數(shù), 事件定義見core/aiot_mqtt_api.h。 */ void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata) { switch (event->type) { /* 調(diào)用了aiot_mqtt_connect()接口, 與MQTT服務(wù)器建立連接。 */ case AIOT_MQTTEVT_CONNECT: { printf("AIOT_MQTTEVT_CONNECT\n"); /* TODO: 處理SDK建立連接成功, 不可在此調(diào)用耗時較長的阻塞函數(shù)。 */ } break; /* SDK因網(wǎng)絡(luò)狀況被動斷開連接后, 成功自動發(fā)起重連。 */ case AIOT_MQTTEVT_RECONNECT: { printf("AIOT_MQTTEVT_RECONNECT\n"); /* TODO: 處理SDK重連成功, 不可在此調(diào)用耗時較長的阻塞函數(shù)。 */ } break; /* SDK因網(wǎng)絡(luò)狀況被動斷開了連接, network底層讀寫失敗, heartbeat沒有按預(yù)期得到服務(wù)端心跳應(yīng)答。 */ case AIOT_MQTTEVT_DISCONNECT: { char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") : ("heartbeat disconnect"); printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause); /* TODO: 處理SDK被動斷開連接, 不可在此調(diào)用耗時較長的阻塞函數(shù)。 */ } break; default: { } } }
定義消息接收的回調(diào)函數(shù)。
重要避免定義過于耗時的事件處理邏輯,以免阻塞收包線程。
如果您要根據(jù)接收的消息做應(yīng)對處理,可在
TODO
處,按照需要修改代碼。
/* MQTT默認(rèn)消息處理回調(diào), 當(dāng)SDK從服務(wù)器收到MQTT消息時, 且您未設(shè)置對應(yīng)回調(diào)的處理時,以下接口被調(diào)用。 */ void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata) { switch (packet->type) { case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: { printf("heartbeat response\n"); /* TODO: 處理服務(wù)器對心跳的回應(yīng), 一般不處理。 */ } break; case AIOT_MQTTRECV_SUB_ACK: { printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n", -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos); /* TODO: 處理服務(wù)器對訂閱請求的回應(yīng), 一般不處理。 */ } break; case AIOT_MQTTRECV_PUB: { printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic); printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload); /* TODO: 處理服務(wù)器下發(fā)的業(yè)務(wù)報文。 */ } break; case AIOT_MQTTRECV_PUB_ACK: { printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id); /* TODO: 處理服務(wù)器對QoS=1上報消息的回應(yīng), 一般不處理。 */ } break; default: { } } }
步驟三:請求連接
調(diào)用aiot_mqtt_connect,根據(jù)配置連接的參數(shù),向物聯(lián)網(wǎng)平臺,發(fā)起連接認(rèn)證請求。
/* 與服務(wù)器建立MQTT連接。 */
res = aiot_mqtt_connect(mqtt_handle);
if (res < STATE_SUCCESS) {
/* 嘗試建立連接失敗, 銷毀MQTT實例, 回收資源。 */
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
printf("please check variables like mqtt_host, produt_key, device_name, device_secret in demo\r\n");
return -1;
}
步驟四:開啟保活線程
調(diào)用aiot_mqtt_process,向服務(wù)器發(fā)送心跳報文,使設(shè)備保持長連接狀態(tài),并重發(fā)QoS=1
的未應(yīng)答報文。
開啟保活線程。
res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_process_thread failed: %d\n", res); return -1; }
設(shè)置保活線程處理函數(shù)。
void *demo_mqtt_process_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_process_thread_running) { res = aiot_mqtt_process(args); if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } return NULL; }
步驟五:開啟接收線程
調(diào)用aiot_mqtt_recv,收取服務(wù)器下發(fā)的MQTT消息,根據(jù)消息回調(diào)函數(shù),執(zhí)行對應(yīng)處理。在斷線時自動重連,根據(jù)事件回調(diào)函數(shù),執(zhí)行對應(yīng)處理。
開啟接收線程。
res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res); return -1; }
設(shè)置接收線程處理函數(shù)。
void *demo_mqtt_recv_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_recv_thread_running) { res = aiot_mqtt_recv(args); if (res < STATE_SUCCESS) { if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } } return NULL; }
步驟六:訂閱Topic
調(diào)用aiot_mqtt_sub,訂閱指定Topic。
示例代碼:
{ char *sub_topic = "/a18wP******/LightSwitch/user/get"; res = aiot_mqtt_sub(mqtt_handle, sub_topic, NULL, 1, NULL); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } }
說明完成配置后,請刪除相關(guān)代碼兩邊的注釋符號。
相關(guān)參數(shù):
參數(shù)
示例
說明
sub_topic
/a18wP******/LightSwitch/user/get
擁有訂閱權(quán)限的Topic。
a18wP******
為設(shè)備的ProductKey。LightSwitch
為設(shè)備的DeviceName。
本示例為默認(rèn)的自定義Topic。
設(shè)備通過該Topic,可接收物聯(lián)網(wǎng)平臺的消息。
關(guān)于Topic的更多信息,請參見什么是Topic。
步驟七:發(fā)送消息
調(diào)用aiot_mqtt_pub,向指定Topic發(fā)送消息。
示例代碼:
{ char *pub_topic = "/a18wP******/LightSwitch/user/update"; char *pub_payload = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}"; res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } }
說明完成配置后,請刪除相關(guān)代碼兩邊的注釋符號。
相關(guān)參數(shù):
參數(shù)
示例
說明
pub_topic
/a18wP******/LightSwitch/user/update
擁有發(fā)布權(quán)限的Topic。
a18wP******
為設(shè)備的ProductKey。LightSwitch
為設(shè)備的DeviceName。
設(shè)備通過該Topic向物聯(lián)網(wǎng)平臺發(fā)送消息。
關(guān)于Topic的更多信息,請參見什么是Topic。
pub_payload
{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}
上報至物聯(lián)網(wǎng)平臺的消息內(nèi)容。
由于示例消息的Topic類型為自定義,因此數(shù)據(jù)格式可自定義。
關(guān)于數(shù)據(jù)格式的更多信息,請參見數(shù)據(jù)格式。
設(shè)備與物聯(lián)網(wǎng)平臺建立MQTT通信后,請確保通信量不超過閾值。
通信限制的更多信息,請參見使用限制。
如果通信超過閾值,請登錄物聯(lián)網(wǎng)平臺查看堆積消息。更多信息,請參見查看和監(jiān)控消費組。
步驟八:斷開連接
MQTT接入常應(yīng)用于長連接的設(shè)備,程序通常不會運(yùn)行至此。
例程的主線程任務(wù)為配置參數(shù)并成功建立連接。連接建立后,主線程可進(jìn)入休眠。
調(diào)用aiot_mqtt_disconnect,向物聯(lián)網(wǎng)平臺發(fā)送斷開連接的報文,然后斷開網(wǎng)絡(luò)連接。
res = aiot_mqtt_disconnect(mqtt_handle);
if (res < STATE_SUCCESS) {
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_disconnect failed: -0x%04X\n", -res);
return -1;
}
步驟九:退出程序
調(diào)用aiot_mqtt_deinit,銷毀MQTT客戶端實例,釋放資源。
res = aiot_mqtt_deinit(&mqtt_handle);
if (res < STATE_SUCCESS) {
printf("aiot_mqtt_deinit failed: -0x%04X\n", -res);
return -1;
}
后續(xù)步驟
例程文件配置完成后,需進(jìn)行編譯,生成可執(zhí)行文件:
非云網(wǎng)關(guān)設(shè)備:
./output/mqtt-basic-demo
。云網(wǎng)關(guān)設(shè)備:
./output/mqtt-userdefine-demo
。更多信息,請參見編譯與運(yùn)行。
關(guān)于運(yùn)行結(jié)果的詳細(xì)說明,請參見運(yùn)行日志。