@@ -443,16 +443,19 @@ static void *subscribe_task(void *param)
443443 rc = check_response (mqttCtx , rc , & startSec , MQTT_PACKET_TYPE_SUBSCRIBE ,
444444 mqttCtx -> cmd_timeout_ms );
445445 } while (rc == MQTT_CODE_CONTINUE );
446- if (rc != MQTT_CODE_SUCCESS ) {
446+ /* SUBSCRIBE_REJECTED means the SUBACK completed normally (some/all
447+ * filters rejected by broker) - no message to cancel in that case. */
448+ if (rc != MQTT_CODE_SUCCESS && rc != MQTT_CODE_ERROR_SUBSCRIBE_REJECTED ) {
447449 MqttClient_CancelMessage (& mqttCtx -> client ,
448450 (MqttObject * )& mqttCtx -> subscribe );
449451 }
450452
451453 PRINTF ("MQTT Subscribe: %s (%d)" ,
452454 MqttClient_ReturnCodeToString (rc ), rc );
453455
454- if (rc == MQTT_CODE_SUCCESS ) {
455- /* show subscribe results */
456+ if (rc == MQTT_CODE_SUCCESS || rc == MQTT_CODE_ERROR_SUBSCRIBE_REJECTED ) {
457+ /* show subscribe results (also reveals which filters the broker
458+ * rejected when rc == MQTT_CODE_ERROR_SUBSCRIBE_REJECTED) */
456459 for (i = 0 ; i < mqttCtx -> subscribe .topic_count ; i ++ ) {
457460 MqttTopic * topic = & mqttCtx -> subscribe .topics [i ];
458461 PRINTF (" Topic %s, Qos %u, Return Code %u" ,
@@ -467,6 +470,13 @@ static void *subscribe_task(void *param)
467470 }
468471#endif
469472
473+ /* Broker rejected the subscription: signal the other threads to stop
474+ * so waitMessage_task (which will never receive the expected messages)
475+ * does not hang this example. */
476+ if (rc == MQTT_CODE_ERROR_SUBSCRIBE_REJECTED ) {
477+ mqtt_stop_set ();
478+ }
479+
470480 THREAD_EXIT (0 );
471481}
472482
0 commit comments