W5500 Aliyun MQTT Porting based on STM32F103

Aliyun MQTT implementation using W5500 and stm32f103
ORIGINAL POST
By wicevi
components
Hardware Components
w5500
X 1
stm32f103
X 1
Software Apps and online services
aliyun
mqtt
details

mqtt.png

一、MQTT协议
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
在这里我们选择使用STM32F103+W5500作为客户端,来实现发布和订阅消息。

二、开发环境
硬件设备:stm32f103c8t6+w5500
开发软件:Keil MDK 525
云平台:阿里云

相关链接:STM32芯片资料 w5500芯片资料 阿里云

三、云端准备
以aliyun账号直接进入IoT控制台,如果还没有开通阿里云物联网套件服务,则申请开通

开通完后,创建一个自定义产品类

之后再添加一个测试设备 名称默认为空就好

添加好之后点进去查看设备,获取该设备的设备证书(ProductKey、DeviceName、DeviceSecret),后面会用到

之后再去产品那边查看自定义Topic列表,默认有一个发布和一个订阅的Topic,将${deviceName}替换成上一个步骤获取的DeviceName然后保存,后面会用到,到这里开发所需的云端就准备好了

四、移植和开发
先去W5500官网下载对应的例程下来,我们只需要提取其MQTT的库文件,至于它那个MQTT_CON_ALI函数实现就不要了,封装可读性太差了,感觉这例程就是赶出来了的( ̄o ̄)

将以上库文件添加到工程之后,就可以开始进行封装了
创建一个新的文件 mqtt_api.c 将MQTT库文件和W5500库文件声明导入

#include “mqtt_api.h”
#include “MQTTPacket.h”
#include “w5500api.h”
#include <stdlib.h>
#include <stdint.h>
#include <string.h>

新建一个函数 来判断过来的数据包类型,传入接收到的数据 返回数据包类型

/******解析收到的ACK报文*********/
int mqtt_decode_msg(unsigned char*buf)
{
int rc = -1;
MQTTHeader header = {0};
header.byte = buf[0];
rc = header.bits.type;
return rc;
}

MQTT包类型声明在MQTTPacket.h 文件里

enum msgTypes
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
PINGREQ, PINGRESP, DISCONNECT
};

然后实现Connect函数,W5500有8个SOCKET,函数可以根据需求传入不同的SOCKET建立多个连接(该SOCKET必须处于SOCK_ESTABLISHED状态)
该函数主要是拼接连接报文,MQTTPacket_connectData参数设置好之后通过MQTTSerialize_connect函数拼接
然后通过socket发送给服务器,这边要注意的是 不同云平台验证的字段和方法不同,需根据具体情况更改,这边只介绍阿里云平台的方法:

> data.clientID.cstring=”$clientId|securemode=3,signmethod=hmacsha1,timestamp=789|”;//$clientId为是客户端自表示id,建议mac或sn,64字符内 其它三个参数分别是安全模式【可选值有2(TLS直连模式)、3(TCP直连模式)】、签名算法类型、时间截
> data.keepAliveInterval =180;//保活时间 阿里云规定必须大于60 单位:秒
> data.cleansession=1;//该标志置1服务器必须丢弃之前保持的客户端的信息
> data.username.cstring=”$deviceName&$productKey”;//$deviceName和$productKey为设备证书里的
> data.password.cstring=hmacsha1($deviceSecret,$content);//$deviceSecret为为设备证书里的 $content为productKey,deviceName,timestamp,clientId按照手母顺序排序,然后将参数值依次拼接例如:
> clientId192.168.207.115deviceNameMQTT1productKeyTKKMt4nMF8Utimestamp789

//连接MQTT服务器函数
uint8 connectMqtt(SOCKET Socket){
int len,rc,wait_ack_time=0;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;//配置部分可变头部的值
data.clientID.cstring = mqttClientId; //客户端标识,用于区分每个客户端
data.keepAliveInterval = keepalive; //保活计时器,定义了服务器收到客户端消息的最大时间间隔
data.cleansession = cleansession; //该标志置1服务器必须丢弃之前保持的客户端的信息,将该连接视为“不存在”
data.username.cstring = mqttUsername;
data.password.cstring = password;
memset(buf,0,buflen);
len = MQTTSerialize_connect(buf, buflen, &data); /*1 构造连接报文*/
rc = send(Socket,buf,len); //发送连接请求
if(rc != len)
{
printf(“Send Connect Error: rc=%d len=%dnr”,rc,len);
return 1;
}
//循环获取Connect Ack
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait CONNACK Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);//接收数据 判断是否为Connect Ack
if(mqtt_decode_msg(buf)!=CONNACK){
printf(“Error CONNACK:%snr”,buf);
return 3;
}
printf(“Connect Is Ok:%srn”,buf);
return 0;
}

接上之后 在keepAliveInterval时间内,我们还需要和服务器之间至少得有一次数据传输,不然时间一到,服务器会将客服端断开。但是此时又没有消息需要发布或者主题需要订阅呢,这时候就需要用到Ping了,由客户端向服务器发送一个空的Ping包 服务器回复PINGRESP,则相互通信一次,服务端的keepAliveInterval时间也就更新了

//PING服务器 保持存活
uint8 pingMqtt(SOCKET Socket){
int len,rc,wait_ack_time=0;
memset(buf,0,buflen);
len=MQTTSerialize_pingreq(buf,buflen);
rc=send(Socket,buf, len);
if(len!=rc)
{
printf(“Send Ping Error: rc=%d len=%dnr”,rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait PINGRESP Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != PINGRESP){
printf(“Error PINGRESP:%snr”,buf);
return 3;
}
printf(“PING Successfully:%snr”,buf);
// if(len>2){
// if(mqtt_decode_msg(buf+2) == PUBLISH){
// dealMqtt(Socket,buf+2,buflen-2);
// }
// }
return 0;
}

然后是发布和订阅函数

//MQTT发布消息函数
uint8 publishMqtt(SOCKET Socket,char *pTopic,char *pMessage)
{
int len,rc,wait_ack_time=0;
MQTTString topicString = MQTTString_initializer;
int msglen = strlen(pMessage);//计算发布消息的长度
memset(buf,0,buflen);
topicString.cstring = pTopic;
len = MQTTSerialize_publish(buf, buflen, 0, 1, 0, 0, topicString, (unsigned char*)pMessage, msglen); /*2 构造发布消息的报文*/
rc = send(Socket,buf,len); //发送消息
if (rc != len)
{
printf(“Send Publish Error: rc=%d len=%dnr”,rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait PUBACK Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != PUBACK){
printf(“error PUBACK:%snr”,buf);
return 3;
}
printf(“Publish Successfully:%snr”,buf);
return 0;
}
//MQTT订阅函数
uint8 subscribMqtt(SOCKET Socket,char *pTopic)
{
int len,rc,wait_ack_time=0;
MQTTString topicString = MQTTString_initializer;
int msgid = 1; //该值为消息标识符
int req_qos = 1;
memset(buf,0,buflen);
topicString.cstring = pTopic;
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);//构造订阅主题报文
rc = send(Socket,buf,len); //发送消息
if (rc != len)
{
printf(“Send Subscrib Error: rc=%d len=%dnr”,rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait SUBACK Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != SUBACK){
printf(“Error SUBACK:%snr”,buf);
return 3;
}
SUB_FLAG=1;
printf(“Subscrib Successfully:%snr”,buf);
return 0;
}

注意:这边发布包构建函数(MQTTSerialize_publish)的几个参数需要注意一下,函数的原型是在MQTTSerializePublish.c 里面

/**
* Serializes the supplied publish data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param dup integer – the MQTT dup flag
* @param qos integer – the MQTT QoS value
* @param retained integer – the MQTT retained flag
* @param packetid integer – the MQTT packet identifier
* @param topicName MQTTString – the MQTT topic in the publish
* @param payload byte buffer – the MQTT publish payload
* @param payloadlen integer – the length of the MQTT payload
* @return the length of the serialized data. <= 0 indicates error
*/
int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid,MQTTString topicName, unsigned char* payload, int payloadlen);

dup:其是用来在保证消息传输可靠的,如果设置为1,则在下面的变长头部里多加MessageId,并需要回复确认,保证消息传输完成,但不能用于检测消息重复发送。
qos:主要用于PUBLISH(发布态)消息的,保证消息传递的次数。0:至多一次 1:至少一次,如果设置为0,则服务器收到之后不会返回PUBACK
retained:主要用于PUBLISH(发布态)的消息,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它。如果不设那么推送至当前订阅的就释放了。
packetid:也就是MessageId

发送的函数基本都封装完了,接下来就是接收订阅消息的函数了,这边是在Socket有消息来的时候对消息类型进行判断,如果是PUBLISH数据包,调用订阅消息解析处理

//MQTT处理订阅消息函数
void dealPublish(SOCKET Socket,uint8 *msgbuf,uint16 msglen){
//获取订阅消息参数//
unsigned char dup;
int qos;
unsigned char retained;
unsigned short mssageid;
int payloadlen_in;
unsigned char* payload_in;
MQTTString receivedTopic;
/
int len,rc;
MQTTDeserialize_publish(&dup, &qos, &retained, &mssageid, &receivedTopic,
&payload_in, &payloadlen_in, msgbuf, msglen);
printf(“message id: %dnr”,mssageid);
printf(“message qos: %dnr”,qos);
printf(“message receivedTopic: %snr”,receivedTopic.lenstring.data);
printf(“message arrived[%d]: %snr”, payloadlen_in, payload_in);
if(qos>0){//需要回复
len = MQTTSerialize_puback(buf, buflen, mssageid);//构造ack报文
rc = send(Socket,buf,len); //发送消息
if(len!=rc)printf(“Send PUBACK Error: rc=%d len=%dnr”,rc,len);
}
}
//运行函数 需循环调用
uint8 do_mqtt(SOCKET Socket,uint8 *sip,uint16 sport,uint16 lport){
static uint8 CONNECT_FLAG = 0;
uint16 rlen;
switch(getSn_SR(Socket)){
case SOCK_INIT:
connect(Socket,sip,sport);
break;
case SOCK_ESTABLISHED:
if(getSn_IR(Socket)&Sn_IR_CON)setSn_IR(Socket,Sn_IR_CON);
if(!CONNECT_FLAG)if(!connectMqtt(Socket))CONNECT_FLAG=1;
rlen=getSn_RX_RSR(Socket);
if(rlen>0){
recv(Socket,buf,rlen);
switch(mqtt_decode_msg(buf)){
case PUBLISH:
dealPublish(Socket,buf,rlen);
break;
default:
printf(“recv:%s”,buf);
break;
}
}
break;
case SOCK_CLOSE_WAIT:
disconnect(Socket);
close(Socket);
break;
case SOCK_CLOSED:
CONNECT_FLAG=0;
socket(Socket,Sn_MR_TCP,lport,Sn_MR_ND);
break;
}
return CONNECT_FLAG;
}

到此就封装全封装好了,可以同时八个socket都订阅或者发布不同的主题,只需要改变一下传参即可,注意:阿里云平台的话 端口是1883 域名每个地区不同 具体官网查看

while(1){
cnt_flag=do_mqtt(MQTT_SOCK,NET_CONFIG.rip,1883,5500);
if(cnt_flag==0){sub_flag=0;sup_flag=0;time=0;}
if(cnt_flag&&sub_flag==0)sub_flag=!subscribMqtt(MQTT_SOCK,subTopic);
if(cnt_flag&&sub_flag&&sup_flag==0)sup_flag=!publishMqtt(MQTT_SOCK,pubTopic,”连接上的一条测试发布消息”);
if(cnt_flag)time++;
if(time==5000){
time=0;
pingMqtt(MQTT_SOCK);
}
delay_ms(10);
}

开始测试

在调试输出中可以看到设备连接并订阅和发布成功了,看看阿里云平台

成功上线并发布了一条消息和一个主题,接下来往订阅的主题推送消息

成功送达!!
严格来讲这不算是移植最多就是封装个接口,官方例程那个是在太乱了,所以这边给它整理了一下,也顺便理一理代码和通信流程,收获还是蛮多的的(╹ڡ╹ ),Dome后面整理完给出。

Dome:https://download.csdn.net/download/wicevi/12719521

mqtt.png

一、MQTT协议
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
在这里我们选择使用STM32F103+W5500作为客户端,来实现发布和订阅消息。

二、开发环境
硬件设备:stm32f103c8t6+w5500
开发软件:Keil MDK 525
云平台:阿里云

相关链接:STM32芯片资料 w5500芯片资料 阿里云

三、云端准备
以aliyun账号直接进入IoT控制台,如果还没有开通阿里云物联网套件服务,则申请开通

开通完后,创建一个自定义产品类

之后再添加一个测试设备 名称默认为空就好

添加好之后点进去查看设备,获取该设备的设备证书(ProductKey、DeviceName、DeviceSecret),后面会用到

之后再去产品那边查看自定义Topic列表,默认有一个发布和一个订阅的Topic,将${deviceName}替换成上一个步骤获取的DeviceName然后保存,后面会用到,到这里开发所需的云端就准备好了

四、移植和开发
先去W5500官网下载对应的例程下来,我们只需要提取其MQTT的库文件,至于它那个MQTT_CON_ALI函数实现就不要了,封装可读性太差了,感觉这例程就是赶出来了的( ̄o ̄)

将以上库文件添加到工程之后,就可以开始进行封装了
创建一个新的文件 mqtt_api.c 将MQTT库文件和W5500库文件声明导入

#include “mqtt_api.h”
#include “MQTTPacket.h”
#include “w5500api.h”
#include <stdlib.h>
#include <stdint.h>
#include <string.h>

新建一个函数 来判断过来的数据包类型,传入接收到的数据 返回数据包类型

/******解析收到的ACK报文*********/
int mqtt_decode_msg(unsigned char*buf)
{
int rc = -1;
MQTTHeader header = {0};
header.byte = buf[0];
rc = header.bits.type;
return rc;
}

MQTT包类型声明在MQTTPacket.h 文件里

enum msgTypes
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
PINGREQ, PINGRESP, DISCONNECT
};

然后实现Connect函数,W5500有8个SOCKET,函数可以根据需求传入不同的SOCKET建立多个连接(该SOCKET必须处于SOCK_ESTABLISHED状态)
该函数主要是拼接连接报文,MQTTPacket_connectData参数设置好之后通过MQTTSerialize_connect函数拼接
然后通过socket发送给服务器,这边要注意的是 不同云平台验证的字段和方法不同,需根据具体情况更改,这边只介绍阿里云平台的方法:

> data.clientID.cstring=”$clientId|securemode=3,signmethod=hmacsha1,timestamp=789|”;//$clientId为是客户端自表示id,建议mac或sn,64字符内 其它三个参数分别是安全模式【可选值有2(TLS直连模式)、3(TCP直连模式)】、签名算法类型、时间截
> data.keepAliveInterval =180;//保活时间 阿里云规定必须大于60 单位:秒
> data.cleansession=1;//该标志置1服务器必须丢弃之前保持的客户端的信息
> data.username.cstring=”$deviceName&$productKey”;//$deviceName和$productKey为设备证书里的
> data.password.cstring=hmacsha1($deviceSecret,$content);//$deviceSecret为为设备证书里的 $content为productKey,deviceName,timestamp,clientId按照手母顺序排序,然后将参数值依次拼接例如:
> clientId192.168.207.115deviceNameMQTT1productKeyTKKMt4nMF8Utimestamp789

//连接MQTT服务器函数
uint8 connectMqtt(SOCKET Socket){
int len,rc,wait_ack_time=0;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;//配置部分可变头部的值
data.clientID.cstring = mqttClientId; //客户端标识,用于区分每个客户端
data.keepAliveInterval = keepalive; //保活计时器,定义了服务器收到客户端消息的最大时间间隔
data.cleansession = cleansession; //该标志置1服务器必须丢弃之前保持的客户端的信息,将该连接视为“不存在”
data.username.cstring = mqttUsername;
data.password.cstring = password;
memset(buf,0,buflen);
len = MQTTSerialize_connect(buf, buflen, &data); /*1 构造连接报文*/
rc = send(Socket,buf,len); //发送连接请求
if(rc != len)
{
printf(“Send Connect Error: rc=%d len=%dnr”,rc,len);
return 1;
}
//循环获取Connect Ack
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait CONNACK Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);//接收数据 判断是否为Connect Ack
if(mqtt_decode_msg(buf)!=CONNACK){
printf(“Error CONNACK:%snr”,buf);
return 3;
}
printf(“Connect Is Ok:%srn”,buf);
return 0;
}

接上之后 在keepAliveInterval时间内,我们还需要和服务器之间至少得有一次数据传输,不然时间一到,服务器会将客服端断开。但是此时又没有消息需要发布或者主题需要订阅呢,这时候就需要用到Ping了,由客户端向服务器发送一个空的Ping包 服务器回复PINGRESP,则相互通信一次,服务端的keepAliveInterval时间也就更新了

//PING服务器 保持存活
uint8 pingMqtt(SOCKET Socket){
int len,rc,wait_ack_time=0;
memset(buf,0,buflen);
len=MQTTSerialize_pingreq(buf,buflen);
rc=send(Socket,buf, len);
if(len!=rc)
{
printf(“Send Ping Error: rc=%d len=%dnr”,rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait PINGRESP Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != PINGRESP){
printf(“Error PINGRESP:%snr”,buf);
return 3;
}
printf(“PING Successfully:%snr”,buf);
// if(len>2){
// if(mqtt_decode_msg(buf+2) == PUBLISH){
// dealMqtt(Socket,buf+2,buflen-2);
// }
// }
return 0;
}

然后是发布和订阅函数

//MQTT发布消息函数
uint8 publishMqtt(SOCKET Socket,char *pTopic,char *pMessage)
{
int len,rc,wait_ack_time=0;
MQTTString topicString = MQTTString_initializer;
int msglen = strlen(pMessage);//计算发布消息的长度
memset(buf,0,buflen);
topicString.cstring = pTopic;
len = MQTTSerialize_publish(buf, buflen, 0, 1, 0, 0, topicString, (unsigned char*)pMessage, msglen); /*2 构造发布消息的报文*/
rc = send(Socket,buf,len); //发送消息
if (rc != len)
{
printf(“Send Publish Error: rc=%d len=%dnr”,rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait PUBACK Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != PUBACK){
printf(“error PUBACK:%snr”,buf);
return 3;
}
printf(“Publish Successfully:%snr”,buf);
return 0;
}
//MQTT订阅函数
uint8 subscribMqtt(SOCKET Socket,char *pTopic)
{
int len,rc,wait_ack_time=0;
MQTTString topicString = MQTTString_initializer;
int msgid = 1; //该值为消息标识符
int req_qos = 1;
memset(buf,0,buflen);
topicString.cstring = pTopic;
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);//构造订阅主题报文
rc = send(Socket,buf,len); //发送消息
if (rc != len)
{
printf(“Send Subscrib Error: rc=%d len=%dnr”,rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf(“Wait SUBACK Overtimenr”);
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != SUBACK){
printf(“Error SUBACK:%snr”,buf);
return 3;
}
SUB_FLAG=1;
printf(“Subscrib Successfully:%snr”,buf);
return 0;
}

注意:这边发布包构建函数(MQTTSerialize_publish)的几个参数需要注意一下,函数的原型是在MQTTSerializePublish.c 里面

/**
* Serializes the supplied publish data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param dup integer – the MQTT dup flag
* @param qos integer – the MQTT QoS value
* @param retained integer – the MQTT retained flag
* @param packetid integer – the MQTT packet identifier
* @param topicName MQTTString – the MQTT topic in the publish
* @param payload byte buffer – the MQTT publish payload
* @param payloadlen integer – the length of the MQTT payload
* @return the length of the serialized data. <= 0 indicates error
*/
int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid,MQTTString topicName, unsigned char* payload, int payloadlen);

dup:其是用来在保证消息传输可靠的,如果设置为1,则在下面的变长头部里多加MessageId,并需要回复确认,保证消息传输完成,但不能用于检测消息重复发送。
qos:主要用于PUBLISH(发布态)消息的,保证消息传递的次数。0:至多一次 1:至少一次,如果设置为0,则服务器收到之后不会返回PUBACK
retained:主要用于PUBLISH(发布态)的消息,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它。如果不设那么推送至当前订阅的就释放了。
packetid:也就是MessageId

发送的函数基本都封装完了,接下来就是接收订阅消息的函数了,这边是在Socket有消息来的时候对消息类型进行判断,如果是PUBLISH数据包,调用订阅消息解析处理

//MQTT处理订阅消息函数
void dealPublish(SOCKET Socket,uint8 *msgbuf,uint16 msglen){
//获取订阅消息参数//
unsigned char dup;
int qos;
unsigned char retained;
unsigned short mssageid;
int payloadlen_in;
unsigned char* payload_in;
MQTTString receivedTopic;
/
int len,rc;
MQTTDeserialize_publish(&dup, &qos, &retained, &mssageid, &receivedTopic,
&payload_in, &payloadlen_in, msgbuf, msglen);
printf(“message id: %dnr”,mssageid);
printf(“message qos: %dnr”,qos);
printf(“message receivedTopic: %snr”,receivedTopic.lenstring.data);
printf(“message arrived[%d]: %snr”, payloadlen_in, payload_in);
if(qos>0){//需要回复
len = MQTTSerialize_puback(buf, buflen, mssageid);//构造ack报文
rc = send(Socket,buf,len); //发送消息
if(len!=rc)printf(“Send PUBACK Error: rc=%d len=%dnr”,rc,len);
}
}
//运行函数 需循环调用
uint8 do_mqtt(SOCKET Socket,uint8 *sip,uint16 sport,uint16 lport){
static uint8 CONNECT_FLAG = 0;
uint16 rlen;
switch(getSn_SR(Socket)){
case SOCK_INIT:
connect(Socket,sip,sport);
break;
case SOCK_ESTABLISHED:
if(getSn_IR(Socket)&Sn_IR_CON)setSn_IR(Socket,Sn_IR_CON);
if(!CONNECT_FLAG)if(!connectMqtt(Socket))CONNECT_FLAG=1;
rlen=getSn_RX_RSR(Socket);
if(rlen>0){
recv(Socket,buf,rlen);
switch(mqtt_decode_msg(buf)){
case PUBLISH:
dealPublish(Socket,buf,rlen);
break;
default:
printf(“recv:%s”,buf);
break;
}
}
break;
case SOCK_CLOSE_WAIT:
disconnect(Socket);
close(Socket);
break;
case SOCK_CLOSED:
CONNECT_FLAG=0;
socket(Socket,Sn_MR_TCP,lport,Sn_MR_ND);
break;
}
return CONNECT_FLAG;
}

到此就封装全封装好了,可以同时八个socket都订阅或者发布不同的主题,只需要改变一下传参即可,注意:阿里云平台的话 端口是1883 域名每个地区不同 具体官网查看

while(1){
cnt_flag=do_mqtt(MQTT_SOCK,NET_CONFIG.rip,1883,5500);
if(cnt_flag==0){sub_flag=0;sup_flag=0;time=0;}
if(cnt_flag&&sub_flag==0)sub_flag=!subscribMqtt(MQTT_SOCK,subTopic);
if(cnt_flag&&sub_flag&&sup_flag==0)sup_flag=!publishMqtt(MQTT_SOCK,pubTopic,”连接上的一条测试发布消息”);
if(cnt_flag)time++;
if(time==5000){
time=0;
pingMqtt(MQTT_SOCK);
}
delay_ms(10);
}

开始测试

在调试输出中可以看到设备连接并订阅和发布成功了,看看阿里云平台

成功上线并发布了一条消息和一个主题,接下来往订阅的主题推送消息

成功送达!!
严格来讲这不算是移植最多就是封装个接口,官方例程那个是在太乱了,所以这边给它整理了一下,也顺便理一理代码和通信流程,收获还是蛮多的的(╹ڡ╹ ),Dome后面整理完给出。

Dome:https://download.csdn.net/download/wicevi/12719521

documents
Code
demo code

COMMENTS

Please Login to comment
  Subscribe  
Notify of
POSTED BY
Reusable S/W