Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 86 additions & 3 deletions mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "herr.h"
#include "hendian.h"
#include "hsocket.h"
#include "hmath.h"

static unsigned short mqtt_next_mid() {
static unsigned short s_mid = 0;
Expand Down Expand Up @@ -64,10 +65,27 @@ static void mqtt_send_disconnect(hio_t* io) {
mqtt_send_head(io, MQTT_TYPE_DISCONNECT, 0);
}

// Skip MQTT v5 properties section
// Returns 1 on success, 0 on malformed data
static int mqtt_v5_skip_properties(unsigned char** pp, unsigned char* end) {
unsigned char* p = *pp;
int bytes = end - p;
if (bytes <= 0) return 0;
int prop_len = (int)varint_decode(p, &bytes);
if (bytes <= 0) return 0;
p += bytes; // skip varint bytes
if (p + prop_len > end) return 0;
p += prop_len; // skip properties data
*pp = p;
return 1;
}

/*
* MQTT_TYPE_CONNECT
* 2 + protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive + 2 + [client_id] +
* [2 + will_topic + 2 + will_payload] +
* 2 + protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive +
* [v5: properties] +
* 2 + [client_id] +
* [v5: varint + will_properties] + [2 + will_topic + 2 + will_payload] +
* [2 + username] + [2 + password]
*/
static int mqtt_client_login(mqtt_client_t* cli) {
Expand All @@ -81,6 +99,10 @@ static int mqtt_client_login(mqtt_client_t* cli) {

// protocol_name_len
len += cli->protocol_version == MQTT_PROTOCOL_V31 ? 6 : 4;
// MQTT v5: connect properties (empty, 1 byte for property length = 0)
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
len += 1;
}
if (*cli->client_id) {
cid_len = strlen(cli->client_id);
} else {
Expand All @@ -102,6 +124,10 @@ static int mqtt_client_login(mqtt_client_t* cli) {
if (cli->will->retain) {
conn_flags |= MQTT_CONN_WILL_RETAIN;
}
// MQTT v5: will properties (empty, 1 byte for property length = 0)
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
len += 1;
}
len += 2 + will_topic_len;
len += 2 + will_payload_len;
}
Expand Down Expand Up @@ -131,7 +157,6 @@ static int mqtt_client_login(mqtt_client_t* cli) {
unsigned char* p = buf;
int headlen = mqtt_head_pack(&head, p);
p += headlen;
// TODO: Not implement MQTT_PROTOCOL_V5
if (cli->protocol_version == MQTT_PROTOCOL_V31) {
PUSH16(p, 6);
PUSH_N(p, MQTT_PROTOCOL_NAME_v31, 6);
Expand All @@ -142,11 +167,19 @@ static int mqtt_client_login(mqtt_client_t* cli) {
PUSH8(p, cli->protocol_version);
PUSH8(p, conn_flags);
PUSH16(p, cli->keepalive);
// MQTT v5: connect properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
PUSH8(p, 0);
}
PUSH16(p, cid_len);
if (cid_len > 0) {
PUSH_N(p, cli->client_id, cid_len);
}
if (conn_flags & MQTT_CONN_HAS_WILL) {
// MQTT v5: will properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
PUSH8(p, 0);
}
PUSH16(p, will_topic_len);
PUSH_N(p, cli->will->topic, will_topic_len);
PUSH16(p, will_payload_len);
Expand Down Expand Up @@ -233,6 +266,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
hio_close(io);
return;
}
// MQTT v5: skip connack properties
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
if (!mqtt_v5_skip_properties(&p, end)) {
hloge("MQTT CONNACK v5 properties malformed!");
hio_close(io);
return;
}
}
cli->connected = 1;
if (cli->timer) {
htimer_del(cli->timer);
Expand Down Expand Up @@ -269,6 +310,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
}
POP16(p, cli->mid);
}
// MQTT v5: skip publish properties
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
if (!mqtt_v5_skip_properties(&p, end)) {
hloge("MQTT PUBLISH v5 properties malformed!");
hio_close(io);
return;
}
}
cli->message.payload_len = end - p;
if (cli->message.payload_len > 0) {
// NOTE: Not deep copy
Expand Down Expand Up @@ -312,6 +361,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
return;
}
POP16(p, cli->mid);
// MQTT v5: skip suback properties
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
if (!mqtt_v5_skip_properties(&p, end)) {
hloge("MQTT SUBACK v5 properties malformed!");
hio_close(io);
return;
}
}
}
break;
// case MQTT_TYPE_UNSUBSCRIBE:
Expand All @@ -324,6 +381,14 @@ static void on_packet(hio_t* io, void* buf, int len) {
return;
}
POP16(p, cli->mid);
// MQTT v5: skip unsuback properties
if (cli->protocol_version == MQTT_PROTOCOL_V5 && p < end) {
if (!mqtt_v5_skip_properties(&p, end)) {
hloge("MQTT UNSUBACK v5 properties malformed!");
hio_close(io);
return;
}
}
}
break;
case MQTT_TYPE_PINGREQ:
Expand Down Expand Up @@ -541,6 +606,8 @@ int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
int len = 2 + topic_len + payload_len;
if (msg->qos > 0) len += 2; // mid
// MQTT v5: publish properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) len += 1;
unsigned short mid = 0;

mqtt_head_t head;
Expand All @@ -563,6 +630,10 @@ int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
mid = mqtt_next_mid();
PUSH16(p, mid);
}
// MQTT v5: publish properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
PUSH8(p, 0);
}

hmutex_lock(&cli->mutex_);
// send head + topic + mid
Expand All @@ -585,6 +656,8 @@ int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
if (!cli->connected) return -2;
int topic_len = strlen(topic);
int len = 2 + 2 + topic_len + 1;
// MQTT v5: subscribe properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) len += 1;

mqtt_head_t head;
memset(&head, 0, sizeof(head));
Expand All @@ -599,6 +672,10 @@ int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
p += headlen;
unsigned short mid = mqtt_next_mid();
PUSH16(p, mid);
// MQTT v5: subscribe properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
PUSH8(p, 0);
}
PUSH16(p, topic_len);
PUSH_N(p, topic, topic_len);
PUSH8(p, qos & 3);
Expand All @@ -613,6 +690,8 @@ int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
if (!cli->connected) return -2;
int topic_len = strlen(topic);
int len = 2 + 2 + topic_len;
// MQTT v5: unsubscribe properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) len += 1;

mqtt_head_t head;
memset(&head, 0, sizeof(head));
Expand All @@ -627,6 +706,10 @@ int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
p += headlen;
unsigned short mid = mqtt_next_mid();
PUSH16(p, mid);
// MQTT v5: unsubscribe properties (empty)
if (cli->protocol_version == MQTT_PROTOCOL_V5) {
PUSH8(p, 0);
}
PUSH16(p, topic_len);
PUSH_N(p, topic, topic_len);
// send head + mid + topic
Expand Down
4 changes: 4 additions & 0 deletions mqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ class MqttClient {
mqtt_client_set_id(client, id);
}

void setProtocolVersion(unsigned char version) {
client->protocol_version = version;
}

void setWill(mqtt_message_t* will) {
mqtt_client_set_will(client, will);
}
Expand Down
3 changes: 2 additions & 1 deletion mqtt/mqtt_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#define MQTT_PROTOCOL_V31 3
#define MQTT_PROTOCOL_V311 4
#define MQTT_PROTOCOL_V5 5 // Not yet supproted
#define MQTT_PROTOCOL_V5 5

#define MQTT_PROTOCOL_NAME "MQTT"
#define MQTT_PROTOCOL_NAME_v31 "MQIsdp"
Expand Down Expand Up @@ -38,6 +38,7 @@ typedef enum {
MQTT_TYPE_PINGREQ = 12,
MQTT_TYPE_PINGRESP = 13,
MQTT_TYPE_DISCONNECT = 14,
MQTT_TYPE_AUTH = 15, // MQTT_PROTOCOL_V5
} mqtt_type_e;

typedef enum {
Expand Down