Skip to content

Commit 4f5c2d2

Browse files
committed
Fixes from review
1 parent d5b49be commit 4f5c2d2

3 files changed

Lines changed: 37 additions & 10 deletions

File tree

examples/multithread/multithread.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,13 @@ static void *subscribe_task(void *param)
470470
}
471471
#endif
472472

473+
/* Broker rejected the subscription: signal the other threads to stop
474+
* so waitMessage_task (which will never receive the expected messages)
475+
* does not hang this example. */
476+
if (rc == MQTT_CODE_ERROR_SUBSCRIBE_REJECTED) {
477+
mqtt_stop_set();
478+
}
479+
473480
THREAD_EXIT(0);
474481
}
475482

src/mqtt_broker.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4186,6 +4186,9 @@ int wolfmqtt_broker(int argc, char** argv)
41864186

41874187
#if !defined(WOLFMQTT_WOLFIP) && !defined(WOLFMQTT_BROKER_CUSTOM_NET) && \
41884188
!defined(NO_MAIN_DRIVER)
4189+
/* Reset shutdown flag so this wrapper is reusable across multiple
4190+
* invocations in the same process (tests, embedding). */
4191+
g_broker_shutdown = 0;
41894192
signal(SIGINT, broker_signal_handler);
41904193
signal(SIGTERM, broker_signal_handler);
41914194

src/mqtt_client.c

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -544,10 +544,16 @@ static void Handle_ConnectAck_Props(MqttClient* client, MqttProp* props)
544544

545545
for (prop = props; prop != NULL; prop = prop->next) {
546546
if (prop->type == MQTT_PROP_MAX_QOS) {
547-
client->max_qos = prop->data_byte;
547+
/* MQTT v5 [3.1.2.11.6]: only 0 or 1 are legal. Clamp a
548+
* non-conforming broker value so client-side publish guards
549+
* remain meaningful. */
550+
client->max_qos = (prop->data_byte <= MQTT_QOS_1) ?
551+
prop->data_byte : MQTT_QOS_1;
548552
}
549553
else if (prop->type == MQTT_PROP_RETAIN_AVAIL) {
550-
client->retain_avail = prop->data_byte;
554+
/* MQTT v5 [3.1.2.11.5]: only 0 or 1 are legal. */
555+
client->retain_avail = (prop->data_byte <= 1) ?
556+
prop->data_byte : 1;
551557
}
552558
else if (prop->type == MQTT_PROP_MAX_PACKET_SZ) {
553559
if ((prop->data_int > 0) &&
@@ -653,10 +659,13 @@ static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf,
653659
#ifdef WOLFMQTT_V5
654660
if (rc >= 0 && doProps) {
655661
int tmp;
656-
/* Auto-populate server property fields so that publish and
657-
* packet-size guards are effective even when the
658-
* application has not registered a property callback. */
659-
Handle_ConnectAck_Props(client, p_connect_ack->props);
662+
/* Only latch server-supplied session limits when the broker
663+
* accepted the connection. A refused CONNACK must not
664+
* mutate long-lived MqttClient state. */
665+
if (p_connect_ack->return_code ==
666+
MQTT_CONNECT_ACK_CODE_ACCEPTED) {
667+
Handle_ConnectAck_Props(client, p_connect_ack->props);
668+
}
660669
tmp = Handle_Props(client, p_connect_ack->props,
661670
(packet_obj != NULL), 1);
662671
p_connect_ack->props = NULL;
@@ -1715,6 +1724,13 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect)
17151724
#ifdef WOLFMQTT_V5
17161725
/* Use specified protocol version if set */
17171726
mc_connect->protocol_level = client->protocol_level;
1727+
1728+
/* Reset server-supplied session limits so stale values from a
1729+
* prior broker do not leak across reconnects. An accepted CONNACK
1730+
* will repopulate these in Handle_ConnectAck_Props. */
1731+
client->max_qos = MQTT_QOS_2;
1732+
client->retain_avail = 1;
1733+
client->packet_sz_max = 0;
17181734
#endif
17191735

17201736
/* Encode the connect packet */
@@ -2425,15 +2441,16 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe)
24252441
#endif
24262442

24272443
/* Populate return codes and detect broker rejection. A v3.1.1 SUBACK
2428-
* uses 0x80 to indicate failure; a v5 SUBACK uses any reason code
2429-
* >= 0x80. In either case, any per-topic code with the high bit set
2430-
* means the broker rejected that filter. */
2444+
* uses MQTT_SUBSCRIBE_ACK_CODE_FAILURE (0x80) to indicate failure;
2445+
* a v5 SUBACK uses any reason code >= 0x80. In either case, any
2446+
* per-topic code with the high bit set means the broker rejected
2447+
* that filter. */
24312448
if (rc == MQTT_CODE_SUCCESS) {
24322449
byte any_rejected = 0;
24332450
for (i = 0; i < subscribe->topic_count && i < MAX_MQTT_TOPICS; i++) {
24342451
topic = &subscribe->topics[i];
24352452
topic->return_code = subscribe->ack.return_codes[i];
2436-
if (topic->return_code & 0x80) {
2453+
if (topic->return_code & MQTT_SUBSCRIBE_ACK_CODE_FAILURE) {
24372454
any_rejected = 1;
24382455
}
24392456
}

0 commit comments

Comments
 (0)