diff --git a/ChangeLog.md b/ChangeLog.md index 349aa738..fb59a615 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -29,6 +29,26 @@ because [MQTT-3.1.3.5] defines Password as Binary Data, not a UTF-8 string. A binary password containing bytes that are not valid UTF-8 (e.g., `0xC0`, `0xFF`) would otherwise be incorrectly rejected. + - `MqttClient_Publish` / `MqttClient_Publish_ex` now return the new + `MQTT_CODE_ERROR_PUBLISH_REJECTED` (-21) when a v5 broker rejects a + QoS>0 PUBLISH via a PUBACK (QoS 1), PUBREC, or PUBCOMP (QoS 2) reason + code >= 0x80 (e.g. Not authorized, Quota exceeded, Topic Name invalid, + Payload format invalid). Previously these were reported as + `MQTT_CODE_SUCCESS`, so the application proceeded as if the broker had + accepted the message. The specific reason is available in + `MqttPublish.resp.reason_code`. For QoS 2, a PUBREC reason code >= 0x80 + now ends the exchange without sending PUBREL per [MQTT-4.3.3] instead of + timing out. v3.1.1 publishes are unaffected, as is the return value of + the fire-and-forget `MqttClient_Publish_WriteOnly` call itself. Callers + that treat any non-success return as fatal may need to handle this code. + In `WOLFMQTT_MULTITHREAD` builds where a dedicated thread drives reads, + that reading thread now returns `MQTT_CODE_ERROR_PUBLISH_REJECTED` when it + processes a QoS 2 PUBREC rejection (instead of advancing the handshake + with an illegal PUBREL); the originating write-only publish's pending + response is not auto-completed in that case, so it blocks until + `cmd_timeout_ms`. A QoS 1 PUBACK or QoS 2 PUBCOMP rejection is NOT + detected on the write-only path (the publish appears successful), matching + prior behavior; use `MqttClient_Publish`/`_ex` for reliable detection. ### v2.0.0 (03/20/2026) Release 2.0.0 has been developed according to wolfSSL's development and QA diff --git a/src/mqtt_broker.c b/src/mqtt_broker.c index 525e1ca5..65855463 100644 --- a/src/mqtt_broker.c +++ b/src/mqtt_broker.c @@ -5421,7 +5421,36 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len, * PUBLISH is fully received and decoded before we * reach the fan-out); BrokerOutPub_Alloc deep-copies * pub.total_len from that buffer. */ - { + if (sub->client->out_q_count >= + BROKER_MAX_QUEUED_MSGS_PER_SUB) { + /* DoS guard: bound the connected subscriber's outbound + * queue depth. The inflight cap above only limits bytes on + * the wire; a subscriber that stops acking lets QUEUED + * entries accumulate one heap-copied PUBLISH at a time + * until the broker exhausts memory. Disconnect the slow / + * abusive subscriber rather than growing out_q or silently + * dropping accepted QoS 1/2 messages. A persistent session + * is reclaimable on reconnect via the (capped) offline + * queue. Mirrors the static partial-write teardown: tear + * the socket down and clear connected so the match guard + * above skips this client's remaining subscriptions; the + * main loop reaps it on the next read error. */ + WBLOG_ERR(broker, + "broker: out_q full (%d) -> disconnect sock=%d " + "(from sock=%d)", sub->client->out_q_count, + (int)sub->client->sock, (int)bc->sock); + #ifdef WOLFMQTT_V5 + (void)BrokerSend_Disconnect(sub->client, + MQTT_REASON_QUOTA_EXCEEDED); + #endif + if (sub->client->sock != BROKER_SOCKET_INVALID) { + broker->net.close(broker->net.ctx, + sub->client->sock); + sub->client->sock = BROKER_SOCKET_INVALID; + } + sub->client->connected = 0; + } + else { BrokerOutPub* e = BrokerOutPub_Alloc(topic, payload, pub.total_len); if (e == NULL) { diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 7c0ce4dd..9246816f 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -1030,6 +1030,28 @@ static int MqttClient_HandlePacket(MqttClient* client, break; } + #ifdef WOLFMQTT_V5 + /* A v5 broker rejects a QoS 2 PUBLISH at the PUBREC stage with a + * reason code >= 0x80 (e.g. not authorized, quota exceeded, topic + * name invalid, payload format invalid). Per [MQTT-4.3.3] the + * exchange is then complete and the sender MUST NOT send a PUBREL. + * Surface the rejection instead of advancing the handshake, which + * would emit an illegal PUBREL and then block waiting for a PUBCOMP + * the broker will never send. The QoS 1 PUBACK and the QoS 2 + * PUBCOMP reason codes are checked by the caller after the wait. + * Note (WOLFMQTT_MULTITHREAD): when a separate thread drives reads + * and processes this PUBREC, it receives this error directly and + * the publishing thread's PUBCOMP pending response is not marked + * done, so that publish blocks until cmd_timeout_ms. This matches + * the pre-existing behavior (which left the publisher waiting on a + * PUBCOMP after an illegal PUBREL) and is not made worse here. */ + if (packet_type == MQTT_PACKET_TYPE_PUBLISH_REC && + client->protocol_level >= MQTT_CONNECT_PROTOCOL_LEVEL_5 && + (((MqttPublishResp*)packet_obj)->reason_code & 0x80)) { + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PUBLISH_REJECTED); + } + #endif + /* Populate information needed for ack */ resp->packet_type = packet_type+1; /* next ack */ resp->packet_id = packet_id; @@ -2324,6 +2346,26 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, /* Wait for publish response packet */ rc = MqttClient_WaitType(client, &publish->resp, resp_type, publish->packet_id, client->cmd_timeout_ms); + + #ifdef WOLFMQTT_V5 + /* A v5 broker can acknowledge a QoS>0 PUBLISH at the + * protocol layer yet still reject the message via a + * PUBACK/PUBCOMP reason code >= 0x80 (e.g. not authorized, + * quota exceeded, topic name invalid, payload format + * invalid). Surface that as an error so the caller does not + * treat a rejected message as delivered. Mirrors the + * CONNECT/SUBSCRIBE/UNSUBSCRIBE rejection handling. The + * protocol_level guard avoids misreading a stale byte for + * v3.1.1 ACKs, which carry no reason code (same guard the + * PUBREC check in MqttClient_HandlePacket uses). */ + if (rc == MQTT_CODE_SUCCESS && + client->protocol_level >= + MQTT_CONNECT_PROTOCOL_LEVEL_5 && + (publish->resp.reason_code & 0x80)) { + rc = MQTT_TRACE_ERROR( + MQTT_CODE_ERROR_PUBLISH_REJECTED); + } + #endif } #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) @@ -3202,6 +3244,8 @@ const char* MqttClient_ReturnCodeToString(int return_code) return "Error (Broker rejected subscription)"; case MQTT_CODE_ERROR_UNSUBSCRIBE_REJECTED: return "Error (Broker rejected unsubscribe)"; + case MQTT_CODE_ERROR_PUBLISH_REJECTED: + return "Error (Broker rejected publish)"; #if defined(ENABLE_MQTT_CURL) case MQTT_CODE_ERROR_CURL: return "Error (libcurl)"; diff --git a/tests/test_broker_connect.c b/tests/test_broker_connect.c index 5f33468a..b0b9d2f6 100644 --- a/tests/test_broker_connect.c +++ b/tests/test_broker_connect.c @@ -782,6 +782,68 @@ static int count_packets_of_type(const byte* buf, size_t len, byte target) return count; } +/* find_broker_client / the out_q-cap tests below inspect dynamic-mode-only + * BrokerClient state (the linked client list, out_q_count); guard the whole + * group so static-memory broker builds (where MqttBroker.clients is an array + * and BrokerClient has no next/out_q_count) still compile. */ +#ifndef WOLFMQTT_STATIC_MEMORY +/* Find the broker-side BrokerClient with the given client_id, walking the + * dynamic-mode client list. Returns NULL if absent. Used to inspect internal + * per-subscriber queue state (out_q_count) that has no wire-visible signal. */ +static BrokerClient* find_broker_client(MqttBroker* broker, const char* id) +{ + BrokerClient* c = broker->clients; + while (c != NULL) { + if (c->client_id != NULL && XSTRCMP(c->client_id, id) == 0) { + return c; + } + c = c->next; + } + return NULL; +} + +#ifdef WOLFMQTT_V5 +/* Return the Reason Code byte of the first DISCONNECT packet in a captured + * stream, or -1 if none carries a reason code. A v5 DISCONNECT with a + * non-success reason and no properties encodes as 0xE0 ; + * the reason is the first byte of the variable header. Only referenced by the + * v5 cap test, so it lives under WOLFMQTT_V5 to avoid an unused-function + * warning (the build runs with -Werror -Wunused). */ +static int first_disconnect_reason(const byte* buf, size_t len) +{ + size_t pos = 0; + while (pos < len) { + byte type = (byte)((buf[pos] >> 4) & 0x0F); + size_t remain = 0; + size_t mult = 1; + size_t hdr_len = 1; + int vbi_complete = 0; + while (pos + hdr_len < len && hdr_len <= 5) { + byte b = buf[pos + hdr_len]; + remain += (size_t)(b & 0x7F) * mult; + hdr_len++; + if ((b & 0x80) == 0) { vbi_complete = 1; break; } + mult *= 128; + } + if (!vbi_complete) { + break; + } + if (type == MQTT_PACKET_TYPE_DISCONNECT) { + if (remain >= 1 && pos + hdr_len < len) { + return buf[pos + hdr_len]; + } + return -1; + } + if (remain > len - pos - hdr_len) { + break; + } + pos += hdr_len + remain; + } + return -1; +} +#endif /* WOLFMQTT_V5 */ +#endif /* !WOLFMQTT_STATIC_MEMORY */ + /* [MQTT-4.3.3] / Method B: when the broker receives a duplicate QoS 2 * PUBLISH carrying a packet ID that's still awaiting PUBREL, it MUST send * another PUBREC to the publisher but MUST NOT re-deliver the application @@ -1029,6 +1091,26 @@ static size_t build_qos2_pub(byte* out, word16 packet_id) return 8; } +/* Build a v3.1.1 QoS 1 PUBLISH to topic "x" (payload "p") with the given + * packet_id. Used to flood a slow subscriber's outbound queue. The v3.1.1 + * wire form has no property field, so the encoding is identical to the QoS 2 + * helper except for the QoS bits in the fixed header. Returns length (8). + * Only used by the dynamic-mode out_q-cap tests, so guarded to avoid an + * unused-function warning under -Werror in static-memory builds. */ +#ifndef WOLFMQTT_STATIC_MEMORY +static size_t build_qos1_pub(byte* out, word16 packet_id) +{ + /* remain = topic_len(2) + topic(1) + packet_id(2) + payload(1) = 6 */ + out[0] = 0x32; /* PUBLISH, QoS 1 */ + out[1] = 0x06; + out[2] = 0x00; out[3] = 0x01; out[4] = 'x'; + out[5] = (byte)(packet_id >> 8); + out[6] = (byte)(packet_id & 0xFF); + out[7] = 'p'; + return 8; +} +#endif /* !WOLFMQTT_STATIC_MEMORY */ + /* The per-client cap on in-flight QoS 2 packet IDs (BROKER_MAX_INBOUND_QOS2, * default 16) MUST disconnect a client that exceeds it. Without the cap, a * misbehaving client could exhaust broker memory by sending many distinct @@ -1350,6 +1432,257 @@ TEST(qos2_publish_then_abrupt_close_offline_subscriber) MqttBroker_Free(&broker); } +/* The out_q-cap tests are dynamic-memory-only (they inspect out_q_count and + * the linked client list); the v5 variant additionally uses the v5-only + * MQTT_REASON_QUOTA_EXCEEDED. Guard accordingly so the non-V5 and + * static-memory broker CI matrix entries compile. */ +#ifndef WOLFMQTT_STATIC_MEMORY +#ifdef WOLFMQTT_V5 +/* Issue 6222: a connected QoS>=1 subscriber that stops acking must NOT be able + * to grow its outbound queue without bound. The inflight cap only limits + * PUBLISHes on the wire; entries beyond it sit QUEUED and, pre-fix, were + * appended to out_q with no depth limit - one heap-copied PUBLISH (topic + + * attacker-sized payload) per matching message, until the broker OOMs. + * + * Repro: subscriber subscribes to "x" at QoS 1 and then never PUBACKs; a + * publisher floods more than BROKER_MAX_QUEUED_MSGS_PER_SUB matching QoS 1 + * PUBLISHes. The fix bounds out_q_count at the cap and disconnects the slow + * subscriber (v5: DISCONNECT reason 0x97 Quota Exceeded). We assert the queue + * stopped growing at the cap, the subscriber's socket was torn down, and at + * most the inflight window ever reached the wire. */ +TEST(online_qos1_flood_disconnects_slow_v5_subscriber) +{ + MqttBroker broker; + MqttBrokerNet net; + int i; + int flood = BROKER_MAX_QUEUED_MSGS_PER_SUB + 8; + int sub_pubs; + int sub_disconnects; + BrokerClient* sub_bc; + + /* Subscriber CONNECT: v5, ClientId "S", clean_session=1. */ + static const byte connect_sub[] = { + 0x10, 0x0E, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x05, 0x02, 0x00, 0x3C, + 0x00, /* properties length = 0 */ + 0x00, 0x01, 'S' + }; + /* SUBSCRIBE (v5): packet_id=1, props_len=0, filter "x", QoS 1. */ + static const byte subscribe_x[] = { + 0x82, 0x07, + 0x00, 0x01, + 0x00, /* properties length = 0 */ + 0x00, 0x01, 'x', + 0x01 + }; + /* Publisher CONNECT: v3.1.1, ClientId "P", clean_session=1. */ + static const byte connect_pub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x02, 0x00, 0x3C, + 0x00, 0x01, 'P' + }; + byte pub_buf[8]; + + install_mock_net(&net); + XMEMSET(&broker, 0, sizeof(broker)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); + + reset_mock_clients(2); + + /* Phase 1: subscriber connects and subscribes (and stays connected). + * Establish the subscription before the flood so every published message + * matches and is fanned out to this subscriber. */ + mock_client_input_append(0, connect_sub, sizeof(connect_sub)); + mock_client_input_append(0, subscribe_x, sizeof(subscribe_x)); + for (i = 0; i < 16; i++) { + MqttBroker_Step(&broker); + } + sub_bc = find_broker_client(&broker, "S"); + ASSERT_TRUE(sub_bc != NULL); + ASSERT_TRUE(sub_bc->connected); + + /* Phase 2: publisher connects and floods QoS 1 PUBLISHes to "x". The + * subscriber never PUBACKs, so the inflight window saturates and every + * further message is queued (pre-fix: forever). */ + mock_client_input_append(1, connect_pub, sizeof(connect_pub)); + for (i = 0; i < flood; i++) { + size_t n = build_qos1_pub(pub_buf, (word16)(i + 1)); + mock_client_input_append(1, pub_buf, n); + } + for (i = 0; i < flood + 32; i++) { + MqttBroker_Step(&broker); + } + + /* The subscriber's queue must have stopped growing at the cap rather than + * tracking the flood size, and the broker must have torn the socket down. */ + sub_bc = find_broker_client(&broker, "S"); + ASSERT_TRUE(sub_bc != NULL); + ASSERT_EQ(BROKER_MAX_QUEUED_MSGS_PER_SUB, sub_bc->out_q_count); + ASSERT_FALSE(sub_bc->connected); + ASSERT_TRUE(g_clients[0].closed); + + /* Only the inflight window ever reached the wire - far fewer than the + * flood, proving the cap bounds wire traffic too. */ + sub_pubs = count_packets_of_type(g_clients[0].out_buf, + g_clients[0].out_len, MQTT_PACKET_TYPE_PUBLISH); + ASSERT_TRUE(sub_pubs >= 1); + ASSERT_TRUE(sub_pubs <= BROKER_MAX_INFLIGHT_PER_SUB); + + /* v5 subscriber gets a DISCONNECT with reason 0x97 Quota Exceeded. */ + sub_disconnects = count_packets_of_type(g_clients[0].out_buf, + g_clients[0].out_len, MQTT_PACKET_TYPE_DISCONNECT); + ASSERT_EQ(1, sub_disconnects); + ASSERT_EQ(MQTT_REASON_QUOTA_EXCEEDED, + first_disconnect_reason(g_clients[0].out_buf, g_clients[0].out_len)); + + /* The publisher is a separate, well-behaved client and must be untouched. */ + ASSERT_FALSE(g_clients[1].closed); + + MqttBroker_Stop(&broker); + MqttBroker_Free(&broker); +} + +#endif /* WOLFMQTT_V5 */ + +/* Same overflow scenario with a v3.1.1 subscriber. v3.1.1 has no + * DISCONNECT-with-reason path, so the broker simply closes the socket; the + * queue must still be bounded at the cap. */ +TEST(online_qos1_flood_disconnects_slow_v311_subscriber) +{ + MqttBroker broker; + MqttBrokerNet net; + int i; + int flood = BROKER_MAX_QUEUED_MSGS_PER_SUB + 8; + int sub_disconnects; + BrokerClient* sub_bc; + + /* Subscriber CONNECT: v3.1.1, ClientId "S", clean_session=1. */ + static const byte connect_sub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x02, 0x00, 0x3C, + 0x00, 0x01, 'S' + }; + /* SUBSCRIBE (v3.1.1): packet_id=1, filter "x", QoS 1. */ + static const byte subscribe_x[] = { + 0x82, 0x06, + 0x00, 0x01, + 0x00, 0x01, 'x', + 0x01 + }; + static const byte connect_pub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x02, 0x00, 0x3C, + 0x00, 0x01, 'P' + }; + byte pub_buf[8]; + + install_mock_net(&net); + XMEMSET(&broker, 0, sizeof(broker)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); + + reset_mock_clients(2); + mock_client_input_append(0, connect_sub, sizeof(connect_sub)); + mock_client_input_append(0, subscribe_x, sizeof(subscribe_x)); + for (i = 0; i < 16; i++) { + MqttBroker_Step(&broker); + } + sub_bc = find_broker_client(&broker, "S"); + ASSERT_TRUE(sub_bc != NULL); + ASSERT_TRUE(sub_bc->connected); + + mock_client_input_append(1, connect_pub, sizeof(connect_pub)); + for (i = 0; i < flood; i++) { + size_t n = build_qos1_pub(pub_buf, (word16)(i + 1)); + mock_client_input_append(1, pub_buf, n); + } + for (i = 0; i < flood + 32; i++) { + MqttBroker_Step(&broker); + } + + sub_bc = find_broker_client(&broker, "S"); + ASSERT_TRUE(sub_bc != NULL); + ASSERT_EQ(BROKER_MAX_QUEUED_MSGS_PER_SUB, sub_bc->out_q_count); + ASSERT_FALSE(sub_bc->connected); + ASSERT_TRUE(g_clients[0].closed); + + /* v3.1.1 has no DISCONNECT reason code: the broker closes silently. */ + sub_disconnects = count_packets_of_type(g_clients[0].out_buf, + g_clients[0].out_len, MQTT_PACKET_TYPE_DISCONNECT); + ASSERT_EQ(0, sub_disconnects); + + MqttBroker_Stop(&broker); + MqttBroker_Free(&broker); +} + +/* Boundary check: a slow subscriber that reaches but does not exceed the cap + * stays connected. Publishing exactly BROKER_MAX_QUEUED_MSGS_PER_SUB messages + * fills out_q to the cap; only the (cap+1)th would trip the disconnect. Guards + * the >= comparison against an off-by-one that would evict at the cap. */ +TEST(online_qos1_at_cap_keeps_subscriber) +{ + MqttBroker broker; + MqttBrokerNet net; + int i; + int at_cap = BROKER_MAX_QUEUED_MSGS_PER_SUB; + BrokerClient* sub_bc; + + static const byte connect_sub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x02, 0x00, 0x3C, + 0x00, 0x01, 'S' + }; + static const byte subscribe_x[] = { + 0x82, 0x06, 0x00, 0x01, 0x00, 0x01, 'x', 0x01 + }; + static const byte connect_pub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x02, 0x00, 0x3C, + 0x00, 0x01, 'P' + }; + byte pub_buf[8]; + + install_mock_net(&net); + XMEMSET(&broker, 0, sizeof(broker)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); + + reset_mock_clients(2); + mock_client_input_append(0, connect_sub, sizeof(connect_sub)); + mock_client_input_append(0, subscribe_x, sizeof(subscribe_x)); + for (i = 0; i < 16; i++) { + MqttBroker_Step(&broker); + } + sub_bc = find_broker_client(&broker, "S"); + ASSERT_TRUE(sub_bc != NULL); + + mock_client_input_append(1, connect_pub, sizeof(connect_pub)); + for (i = 0; i < at_cap; i++) { + size_t n = build_qos1_pub(pub_buf, (word16)(i + 1)); + mock_client_input_append(1, pub_buf, n); + } + for (i = 0; i < at_cap + 32; i++) { + MqttBroker_Step(&broker); + } + + sub_bc = find_broker_client(&broker, "S"); + ASSERT_TRUE(sub_bc != NULL); + ASSERT_EQ(BROKER_MAX_QUEUED_MSGS_PER_SUB, sub_bc->out_q_count); + ASSERT_TRUE(sub_bc->connected); + ASSERT_FALSE(g_clients[0].closed); + + MqttBroker_Stop(&broker); + MqttBroker_Free(&broker); +} +#endif /* !WOLFMQTT_STATIC_MEMORY */ + #ifdef WOLFMQTT_V5 /* v5 variant: same orphan-then-publish-then-disconnect pattern but the * PUBLISH carries a property block. Tests the suspected use-after-free in @@ -2756,6 +3089,13 @@ int main(int argc, char** argv) RUN_TEST(qos2_pubrel_unknown_id_still_pubcomps); RUN_TEST(qos2_publish_with_offline_durable_subscriber); RUN_TEST(qos2_publish_then_abrupt_close_offline_subscriber); +#ifndef WOLFMQTT_STATIC_MEMORY +#ifdef WOLFMQTT_V5 + RUN_TEST(online_qos1_flood_disconnects_slow_v5_subscriber); +#endif /* WOLFMQTT_V5 */ + RUN_TEST(online_qos1_flood_disconnects_slow_v311_subscriber); + RUN_TEST(online_qos1_at_cap_keeps_subscriber); +#endif /* !WOLFMQTT_STATIC_MEMORY */ #ifdef WOLFMQTT_V5 RUN_TEST(qos2_publish_v5_props_with_offline_durable_subscriber); #endif diff --git a/tests/test_mqtt_client.c b/tests/test_mqtt_client.c index 58d73933..447b46f6 100644 --- a/tests/test_mqtt_client.c +++ b/tests/test_mqtt_client.c @@ -298,6 +298,10 @@ TEST(connect_with_mock_network) static int connect_mock_xfer; static byte connect_mock_sent[TEST_TX_BUF_SIZE]; +/* Set when a PUBREL (fixed header type 6) is written, so tests can assert the + * QoS 2 handshake either did or did not emit one. */ +static int g_pubrel_written; + static int mock_net_write_accept(void *context, const byte* buf, int buf_len, int timeout_ms) { @@ -307,6 +311,10 @@ static int mock_net_write_accept(void *context, const byte* buf, int buf_len, XMEMCPY(connect_mock_sent, buf, (size_t)buf_len); connect_mock_xfer = buf_len; } + if (buf != NULL && buf_len > 0 && + (buf[0] & 0xF0) == (MQTT_PACKET_TYPE_PUBLISH_REL << 4)) { + g_pubrel_written = 1; + } /* Pretend the full packet was sent so MqttClient_Connect reaches the * CLIENT_FORCE_ZERO step. */ return buf_len; @@ -634,6 +642,313 @@ TEST(publish_null_publish) ASSERT_EQ(MQTT_CODE_ERROR_BAD_ARG, rc); } +#ifdef WOLFMQTT_V5 +/* Drives one QoS>0 publish to completion against the canned-response mock and + * returns the final MqttClient_Publish result. The caller stages the broker's + * PUBACK/PUBCOMP (plus any intermediate PUBREC for QoS 2) in g_canned_buf. */ +static int run_publish_with_canned_resp(MqttPublish* publish, + const byte* resp, int resp_len, byte proto_level) +{ + int rc; + int i; + + rc = test_init_client(); + if (rc != MQTT_CODE_SUCCESS) { + return rc; + } + test_client.protocol_level = proto_level; + + g_pubrel_written = 0; + test_net.write = mock_net_write_accept; + test_net.read = mock_net_read_canned; + XMEMCPY(g_canned_buf, resp, (size_t)resp_len); + g_canned_len = resp_len; + g_canned_pos = 0; + + rc = MQTT_CODE_CONTINUE; + for (i = 0; i < 20 && rc == MQTT_CODE_CONTINUE; i++) { + rc = MqttClient_Publish(&test_client, publish); + } + return rc; +} + +/* A v5 broker can ACK a QoS 1 PUBLISH at the protocol layer yet still reject + * the message with a PUBACK reason code >= 0x80 (ACL deny, quota, invalid + * topic/payload). MqttClient_Publish must surface that as + * MQTT_CODE_ERROR_PUBLISH_REJECTED rather than MQTT_CODE_SUCCESS, else the + * application proceeds as if the message was delivered. This pins the + * detection that the publish path previously lacked (known issue 3626). */ +TEST(publish_qos1_v5_broker_rejection_returns_publish_rejected) +{ + int rc; + MqttPublish publish; + static byte payload[] = "hello"; + /* v5 PUBACK: type=0x40, remain=3, packet_id=7, reason=0x87 NOT_AUTHORIZED */ + static const byte puback[] = { 0x40, 0x03, 0x00, 0x07, 0x87 }; + + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_1; + publish.packet_id = 7; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + + rc = run_publish_with_canned_resp(&publish, puback, (int)sizeof(puback), + MQTT_CONNECT_PROTOCOL_LEVEL_5); + + ASSERT_EQ(MQTT_CODE_ERROR_PUBLISH_REJECTED, rc); + ASSERT_EQ(MQTT_REASON_NOT_AUTHORIZED, publish.resp.reason_code); +} + +/* A v5 PUBACK with reason code Success (0x00) means the broker accepted the + * message; MqttClient_Publish must still return success and not false-trip the + * new rejection check. */ +TEST(publish_qos1_v5_success_returns_success) +{ + int rc; + MqttPublish publish; + static byte payload[] = "hello"; + /* v5 PUBACK: type=0x40, remain=3, packet_id=8, reason=0x00 Success */ + static const byte puback[] = { 0x40, 0x03, 0x00, 0x08, 0x00 }; + + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_1; + publish.packet_id = 8; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + + rc = run_publish_with_canned_resp(&publish, puback, (int)sizeof(puback), + MQTT_CONNECT_PROTOCOL_LEVEL_5); + + ASSERT_EQ(MQTT_CODE_SUCCESS, rc); + ASSERT_EQ(MQTT_REASON_SUCCESS, publish.resp.reason_code); +} + +/* Not every non-zero v5 reason code is a rejection: the high bit distinguishes + * error (>= 0x80) from success-class codes. A broker legitimately returns + * 0x10 No matching subscribers for a QoS 1 PUBLISH that matched no + * subscriptions, and the message WAS accepted. The check uses + * (reason_code & 0x80) precisely so 0x10 stays a success; this pins that + * boundary against a regression to e.g. (reason_code != 0). */ +TEST(publish_qos1_v5_no_matching_subscribers_returns_success) +{ + int rc; + MqttPublish publish; + static byte payload[] = "hello"; + /* v5 PUBACK: type=0x40, remain=3, packet_id=12, reason=0x10 No match sub */ + static const byte puback[] = { 0x40, 0x03, 0x00, 0x0C, 0x10 }; + + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_1; + publish.packet_id = 12; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + + rc = run_publish_with_canned_resp(&publish, puback, (int)sizeof(puback), + MQTT_CONNECT_PROTOCOL_LEVEL_5); + + ASSERT_EQ(MQTT_CODE_SUCCESS, rc); + ASSERT_EQ(MQTT_REASON_NO_MATCH_SUB, publish.resp.reason_code); +} + +/* QoS 2 completes the PUBLISH -> PUBREC -> PUBREL -> PUBCOMP handshake. A v5 + * broker can reject at the PUBCOMP with a reason code >= 0x80 (e.g. 0x92 + * Packet Identifier not found); MqttClient_Publish must surface that as + * MQTT_CODE_ERROR_PUBLISH_REJECTED. The mock serves a success PUBREC followed + * by the failing PUBCOMP, and accepts the client's PUBREL. */ +TEST(publish_qos2_v5_broker_rejection_returns_publish_rejected) +{ + int rc; + MqttPublish publish; + static byte payload[] = "hello"; + /* PUBREC (success, no reason byte): type=0x50, remain=2, packet_id=9. + * PUBCOMP (reject): type=0x70, remain=3, packet_id=9, reason=0x92. */ + static const byte resp[] = { + 0x50, 0x02, 0x00, 0x09, + 0x70, 0x03, 0x00, 0x09, 0x92 + }; + + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_2; + publish.packet_id = 9; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + + rc = run_publish_with_canned_resp(&publish, resp, (int)sizeof(resp), + MQTT_CONNECT_PROTOCOL_LEVEL_5); + + ASSERT_EQ(MQTT_CODE_ERROR_PUBLISH_REJECTED, rc); + ASSERT_EQ(MQTT_REASON_PACKET_ID_NOT_FOUND, publish.resp.reason_code); + /* The PUBREC succeeded, so the handshake must have advanced to PUBREL + * before the PUBCOMP rejection arrived. */ + ASSERT_TRUE(g_pubrel_written); +} + +/* QoS 2 full happy path: success PUBREC -> PUBREL -> success PUBCOMP must + * return MQTT_CODE_SUCCESS. The post-wait rejection check now runs for every + * v5 QoS 2 publish, so this pins that a terminal success PUBCOMP is not + * false-tripped and that the handshake emits a PUBREL. */ +TEST(publish_qos2_v5_success_returns_success) +{ + int rc; + MqttPublish publish; + static byte payload[] = "hello"; + /* PUBREC success (no reason byte): type=0x50, remain=2, packet_id=14. + * PUBCOMP success (no reason byte): type=0x70, remain=2, packet_id=14. */ + static const byte resp[] = { + 0x50, 0x02, 0x00, 0x0E, + 0x70, 0x02, 0x00, 0x0E + }; + + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_2; + publish.packet_id = 14; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + + rc = run_publish_with_canned_resp(&publish, resp, (int)sizeof(resp), + MQTT_CONNECT_PROTOCOL_LEVEL_5); + + ASSERT_EQ(MQTT_CODE_SUCCESS, rc); + ASSERT_EQ(MQTT_REASON_SUCCESS, publish.resp.reason_code); + ASSERT_TRUE(g_pubrel_written); +} + +/* The primary QoS 2 rejection point is the PUBREC: a v5 broker reports + * authorization/quota/topic/payload failures there with a reason code >= 0x80. + * Per [MQTT-4.3.3] the sender must not send PUBREL and the exchange is + * complete, so MqttClient_Publish must return MQTT_CODE_ERROR_PUBLISH_REJECTED + * directly from the PUBREC rather than emitting an illegal PUBREL and blocking + * for a PUBCOMP that never arrives. The mock serves only the failing PUBREC. */ +TEST(publish_qos2_v5_pubrec_rejection_returns_publish_rejected) +{ + int rc; + MqttPublish publish; + static byte payload[] = "hello"; + /* v5 PUBREC: type=0x50, remain=3, packet_id=11, reason=0x87 NOT_AUTHORIZED */ + static const byte pubrec[] = { 0x50, 0x03, 0x00, 0x0B, 0x87 }; + + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_2; + publish.packet_id = 11; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + + rc = run_publish_with_canned_resp(&publish, pubrec, (int)sizeof(pubrec), + MQTT_CONNECT_PROTOCOL_LEVEL_5); + + ASSERT_EQ(MQTT_CODE_ERROR_PUBLISH_REJECTED, rc); + ASSERT_EQ(MQTT_REASON_NOT_AUTHORIZED, publish.resp.reason_code); + /* Per [MQTT-4.3.3] a PUBREC reason code >= 0x80 ends the exchange: the + * client must NOT emit a PUBREL. Directly pin that no PUBREL was written. */ + ASSERT_FALSE(g_pubrel_written); +} + +/* A v3.1.1 PUBACK carries no reason code, so the rejection check must not run + * for protocol level < 5. Pre-seed resp.reason_code with a failure byte to + * prove the protocol_level guard prevents a stale value from being misread as + * a broker rejection. */ +TEST(publish_v311_ack_not_misread_as_rejected) +{ + int rc; + MqttPublish publish; + static byte payload[] = "hello"; + /* v3.1.1 PUBACK: type=0x40, remain=2, packet_id=10 (no reason code). */ + static const byte puback[] = { 0x40, 0x02, 0x00, 0x0A }; + + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_1; + publish.packet_id = 10; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + /* Stale failure byte that must be ignored for a v3.1.1 ACK. */ + publish.resp.reason_code = MQTT_REASON_NOT_AUTHORIZED; + + rc = run_publish_with_canned_resp(&publish, puback, (int)sizeof(puback), + MQTT_CONNECT_PROTOCOL_LEVEL_4); + + ASSERT_EQ(MQTT_CODE_SUCCESS, rc); +} + +#if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_NONBLOCK) +/* Pins the documented WOLFMQTT_MULTITHREAD divergence for a QoS 2 PUBREC + * rejection. A write-only publish registers a pending response for the PUBCOMP + * and returns without reading; a separate "reading thread" (simulated here by + * MqttClient_WaitMessage) then processes the rejecting PUBREC. Expected, per + * the code comment in MqttClient_HandlePacket and the ChangeLog: + * - the reading thread receives MQTT_CODE_ERROR_PUBLISH_REJECTED directly; + * - no PUBREL is emitted ([MQTT-4.3.3]); + * - the PUBREC reason code is decoded into the shared client->msg object, not + * the publisher's MqttPublish, because RespList_Find matches PUBREC against + * the PUBCOMP-keyed pendResp and finds nothing — so the publisher's + * publish.resp is left untouched and it must rely on the reader to observe + * the rejection. This test locks that behavior so a future change can't + * silently alter it. */ +TEST(publish_qos2_v5_pubrec_rejection_multithread_reader) +{ + int rc; + int i; + /* static so the registered pendResp does not point into freed stack after + * the call returns (the client is zeroed by setup() before the next test; + * MqttClient_DeInit does not walk the pending-response list). */ + static MqttPublish publish; + static byte payload[] = "hello"; + /* v5 PUBREC: type=0x50, remain=3, packet_id=13, reason=0x87 NOT_AUTHORIZED */ + static const byte pubrec[] = { 0x50, 0x03, 0x00, 0x0D, 0x87 }; + + rc = test_init_client(); + ASSERT_EQ(MQTT_CODE_SUCCESS, rc); + test_client.protocol_level = MQTT_CONNECT_PROTOCOL_LEVEL_5; + + g_pubrel_written = 0; + test_net.write = mock_net_write_accept; + test_net.read = mock_net_read_canned; + + /* Write-only QoS 2 publish: sends PUBLISH, registers the PUBCOMP pending + * response, returns CONTINUE without reading (response is another thread's + * job). */ + XMEMSET(&publish, 0, sizeof(publish)); + publish.qos = MQTT_QOS_2; + publish.packet_id = 13; + publish.topic_name = "test/topic"; + publish.buffer = payload; + publish.total_len = (word32)(sizeof(payload) - 1); + publish.buffer_len = publish.total_len; + + rc = MqttClient_Publish_WriteOnly(&test_client, &publish, NULL); + ASSERT_EQ(MQTT_CODE_CONTINUE, rc); + + /* Reading thread processes the rejecting PUBREC. */ + XMEMCPY(g_canned_buf, pubrec, sizeof(pubrec)); + g_canned_len = (int)sizeof(pubrec); + g_canned_pos = 0; + + rc = MQTT_CODE_CONTINUE; + for (i = 0; i < 20 && rc == MQTT_CODE_CONTINUE; i++) { + rc = MqttClient_WaitMessage(&test_client, TEST_CMD_TIMEOUT_MS); + } + + ASSERT_EQ(MQTT_CODE_ERROR_PUBLISH_REJECTED, rc); + ASSERT_FALSE(g_pubrel_written); + /* The publisher's own struct is NOT updated on this path. */ + ASSERT_EQ(MQTT_REASON_SUCCESS, publish.resp.reason_code); +} +#endif /* WOLFMQTT_MULTITHREAD && WOLFMQTT_NONBLOCK */ +#endif /* WOLFMQTT_V5 */ + /* Regression test for MQTT Packet Identifier in-use collision check. The * MQTT spec (3.1.1 section 2.3.1, 5.0 section 2.2.1) requires that a new QoS-related * Control Packet use a Packet Identifier that is not currently in use; @@ -906,6 +1221,18 @@ void run_mqtt_client_tests(void) /* MqttClient_Publish tests */ RUN_TEST(publish_null_client); RUN_TEST(publish_null_publish); +#ifdef WOLFMQTT_V5 + RUN_TEST(publish_qos1_v5_broker_rejection_returns_publish_rejected); + RUN_TEST(publish_qos1_v5_success_returns_success); + RUN_TEST(publish_qos1_v5_no_matching_subscribers_returns_success); + RUN_TEST(publish_qos2_v5_broker_rejection_returns_publish_rejected); + RUN_TEST(publish_qos2_v5_success_returns_success); + RUN_TEST(publish_qos2_v5_pubrec_rejection_returns_publish_rejected); + RUN_TEST(publish_v311_ack_not_misread_as_rejected); +#if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_NONBLOCK) + RUN_TEST(publish_qos2_v5_pubrec_rejection_multithread_reader); +#endif +#endif #if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_NONBLOCK) RUN_TEST(publish_writeonly_rejects_duplicate_in_flight_packet_id); RUN_TEST(subscribe_in_flight_blocks_publish_with_same_packet_id); diff --git a/wolfmqtt/mqtt_broker.h b/wolfmqtt/mqtt_broker.h index a076b150..7f374d8e 100644 --- a/wolfmqtt/mqtt_broker.h +++ b/wolfmqtt/mqtt_broker.h @@ -198,6 +198,31 @@ #define BROKER_MAX_OFFLINE_MSGS_PER_SUB 32 #endif +/* BROKER_MAX_QUEUED_MSGS_PER_SUB bounds the total depth of a *connected* + * subscriber's outbound queue (out_q_count) in dynamic-memory mode. + * + * BROKER_MAX_INFLIGHT_PER_SUB (above) only bounds how many QoS>0 PUBLISHes + * are on the wire awaiting an ack - i.e. bytes in flight. It does NOT bound + * how many entries may sit QUEUED behind the inflight window. A subscriber + * that stops sending PUBACK/PUBREC (a slow consumer, or an attacker keeping + * the session alive with PINGREQ) saturates the inflight window and then + * every subsequent matching PUBLISH still allocates a fresh BrokerOutPub plus + * heap copies of the topic and payload onto out_q. Without a depth cap that + * queue grows without limit until the broker exhausts memory. + * + * When a fan-out would exceed this depth the broker disconnects that + * subscriber (v5: DISCONNECT reason 0x97 Quota Exceeded) rather than growing + * the heap or silently dropping accepted QoS 1/2 messages; a persistent + * session is then reclaimable on reconnect via the (separately capped) + * offline queue. The default leaves room for a full inflight window plus an + * offline-sized backlog of not-yet-sent entries. Override with + * -DBROKER_MAX_QUEUED_MSGS_PER_SUB=N + * Only used in dynamic-memory mode. */ +#ifndef BROKER_MAX_QUEUED_MSGS_PER_SUB + #define BROKER_MAX_QUEUED_MSGS_PER_SUB \ + (BROKER_MAX_INFLIGHT_PER_SUB + BROKER_MAX_OFFLINE_MSGS_PER_SUB) +#endif + /* Schema version stamped on every persisted record. Bump when the * encoding of any namespace changes incompatibly; a startup with stored * records carrying a different version logs a warning, wipes all diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index c03a0464..2188127f 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -317,7 +317,10 @@ WOLFMQTT_API int MqttClient_Connect( with message data * Note: MqttPublish and MqttMessage are same structure. - * \return MQTT_CODE_SUCCESS, MQTT_CODE_CONTINUE (for non-blocking) or + * \return MQTT_CODE_SUCCESS, MQTT_CODE_CONTINUE (for non-blocking), + MQTT_CODE_ERROR_PUBLISH_REJECTED if a v5 broker rejected a + QoS>0 PUBLISH via a PUBACK (QoS 1) or PUBREC/PUBCOMP (QoS 2) + reason code >= 0x80 (see MqttPublish.resp.reason_code), or MQTT_CODE_ERROR_* (see enum MqttPacketResponseCodes) \sa MqttClient_Publish_WriteOnly \sa MqttClient_Publish_ex @@ -341,8 +344,11 @@ WOLFMQTT_API int MqttClient_Publish( * Note: MqttPublish and MqttMessage are same structure. * \param pubCb Function pointer to callback routine - * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* - (see enum MqttPacketResponseCodes) + * \return MQTT_CODE_SUCCESS, MQTT_CODE_CONTINUE (for non-blocking), + MQTT_CODE_ERROR_PUBLISH_REJECTED if a v5 broker rejected a + QoS>0 PUBLISH via a PUBACK (QoS 1) or PUBREC/PUBCOMP (QoS 2) + reason code >= 0x80 (see MqttPublish.resp.reason_code), or + MQTT_CODE_ERROR_* (see enum MqttPacketResponseCodes) */ WOLFMQTT_API int MqttClient_Publish_ex( MqttClient *client, @@ -365,6 +371,14 @@ WOLFMQTT_API int MqttClient_Publish_ex( * Note: MqttPublish and MqttMessage are same structure. * \param pubCb Function pointer to callback routine + * \note Because this call only writes and another thread processes the + ACK, it never returns MQTT_CODE_ERROR_PUBLISH_REJECTED. On the + reading thread only a QoS 2 PUBREC rejection is surfaced (as + MQTT_CODE_ERROR_PUBLISH_REJECTED, returned in order to suppress + an illegal PUBREL per [MQTT-4.3.3]); a QoS 1 PUBACK or QoS 2 + PUBCOMP reason code >= 0x80 is NOT detected on this path and the + publish appears successful. Use MqttClient_Publish/_ex when + reliable v5 broker-rejection detection for QoS>0 is required. * \return MQTT_CODE_SUCCESS, MQTT_CODE_CONTINUE (for non-blocking) or MQTT_CODE_ERROR_* (see enum MqttPacketResponseCodes) \sa MqttClient_Publish diff --git a/wolfmqtt/mqtt_types.h b/wolfmqtt/mqtt_types.h index cfcc4a73..1b66fa13 100644 --- a/wolfmqtt/mqtt_types.h +++ b/wolfmqtt/mqtt_types.h @@ -215,6 +215,12 @@ enum MqttPacketResponseCodes { topic filters in an UNSUBSCRIBE; see each reason code in MqttUnsubscribeAck. */ + MQTT_CODE_ERROR_PUBLISH_REJECTED = -21, /* v5 broker rejected a QoS>0 + PUBLISH via a PUBACK (QoS 1) or + PUBREC/PUBCOMP (QoS 2) reason + code >= 0x80; see + MqttPublish.resp.reason_code for + the specific reason. */ MQTT_CODE_CONTINUE = -101, MQTT_CODE_STDIN_WAKE = -102,