@@ -561,7 +561,7 @@ static void MqttSNClient_PacketReset(SN_MsgType packet_type, void* packet_obj)
561561static int SN_Client_WaitType (MqttClient * client , void * packet_obj ,
562562 byte wait_type , word16 wait_packet_id , int timeout_ms )
563563{
564- int rc ;
564+ int rc = MQTT_CODE_SUCCESS ;
565565 word16 packet_id ;
566566 SN_MsgType packet_type ;
567567#ifdef WOLFMQTT_MULTITHREAD
@@ -593,9 +593,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
593593 if (client -> lastRc != MQTT_CODE_CONTINUE )
594594 #endif
595595 {
596- PRINTF ("SN_Client_WaitType: Type %s (%d), ID %d" ,
596+ PRINTF ("SN_Client_WaitType: Type %s (%d), ID %d, State %d-%d " ,
597597 SN_Packet_TypeDesc ((SN_MsgType )wait_type ),
598- wait_type , wait_packet_id );
598+ wait_type , wait_packet_id , mms_stat -> read , mms_stat -> write );
599599 }
600600#endif
601601
@@ -604,59 +604,44 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
604604 case MQTT_MSG_BEGIN :
605605 {
606606 #ifdef WOLFMQTT_MULTITHREAD
607- /* Lock recv socket mutex */
608- rc = wm_SemLock (& client -> lockRecv );
609- if (rc != 0 ) {
610- PRINTF ("SN_Client_WaitType recv lock error" );
607+ /* Check to see if packet type and id have already completed */
608+ rc = MqttClient_CheckPendResp (client , wait_type , wait_packet_id );
609+ if (rc != MQTT_CODE_ERROR_NOT_FOUND && rc != MQTT_CODE_CONTINUE ) {
611610 return rc ;
612611 }
613- mms_stat -> isReadActive = 1 ;
614- MQTT_TRACE_MSG ("SN lockRecv" );
615612 #endif
616613
617- /* reset the packet state used by SN_Packet_Read */
618- client -> packet .stat = MQTT_PK_BEGIN ;
614+ if ((rc = MqttReadStart (client , mms_stat )) != 0 ) {
615+ return rc ;
616+ }
617+
618+ mms_stat -> read = MQTT_MSG_WAIT ;
619619 }
620620 FALL_THROUGH ;
621621
622622 case MQTT_MSG_WAIT :
623+ case MQTT_MSG_HEADER :
623624 {
624- #ifdef WOLFMQTT_MULTITHREAD
625- /* Check to see if packet type and id have already completed */
626- pendResp = NULL ;
627- rc = wm_SemLock (& client -> lockClient );
628- if (rc == 0 ) {
629- if (MqttClient_RespList_Find (client , (MqttPacketType )wait_type ,
630- wait_packet_id , & pendResp )) {
631- if (pendResp -> packetDone ) {
632- /* pending response is already done, so return */
633- rc = pendResp -> packet_ret ;
634- #ifdef WOLFMQTT_DEBUG_CLIENT
635- PRINTF ("PendResp already Done %p: Rc %d" , pendResp , rc );
636- #endif
637- MqttClient_RespList_Remove (client , pendResp );
638- wm_SemUnlock (& client -> lockClient );
639- MQTT_TRACE_MSG ("SN unlockRecv" );
640- wm_SemUnlock (& client -> lockRecv );
641- return rc ;
642- }
643- }
644- wm_SemUnlock (& client -> lockClient );
645- }
646- else {
647- break ; /* error */
648- }
649- #endif /* WOLFMQTT_MULTITHREAD */
650-
651- mms_stat -> read = MQTT_MSG_WAIT ;
652-
653625 /* Wait for packet */
654626 rc = SN_Packet_Read (client , client -> rx_buf , client -> rx_buf_len ,
655627 timeout_ms );
628+ /* handle failure */
656629 if (rc <= 0 ) {
630+ #ifdef WOLFMQTT_NONBLOCK
631+ if (rc == MQTT_CODE_CONTINUE &&
632+ (client -> packet .stat > MQTT_PK_BEGIN ||
633+ client -> read .total > 0 )
634+ ) {
635+ /* advance state, since we received some data */
636+ mms_stat -> read = MQTT_MSG_HEADER ;
637+ }
638+ #endif
657639 break ;
658640 }
659641
642+ /* advance state, since we received some data */
643+ mms_stat -> read = MQTT_MSG_HEADER ;
644+
660645 client -> packet .buf_len = rc ;
661646
662647 /* Decode header */
@@ -674,11 +659,10 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
674659 client -> packet .buf_len , packet_type , packet_id );
675660 #endif
676661
677- mms_stat -> read = MQTT_MSG_HEADER ;
662+ mms_stat -> read = MQTT_MSG_PAYLOAD ;
678663 }
679664 FALL_THROUGH ;
680665
681- case MQTT_MSG_HEADER :
682666 case MQTT_MSG_PAYLOAD :
683667 case MQTT_MSG_PAYLOAD2 :
684668 {
@@ -689,6 +673,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
689673 (wait_packet_id == 0 || wait_packet_id == packet_id ))
690674 {
691675 use_packet_obj = packet_obj ;
676+ #ifdef WOLFMQTT_DEBUG_CLIENT
677+ PRINTF ("Using INCOMING packet_obj %p" , use_packet_obj );
678+ #endif
692679 waitMatchFound = 1 ;
693680 }
694681 else {
@@ -748,6 +735,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
748735 }
749736 }
750737 #endif /* WOLFMQTT_MULTITHREAD */
738+
739+ /* done reading */
740+ MqttReadStop (client , mms_stat );
751741 break ;
752742 }
753743
@@ -756,35 +746,40 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
756746 default :
757747 {
758748 #ifdef WOLFMQTT_DEBUG_CLIENT
759- PRINTF ("SN_Client_WaitType: Invalid state %d!" , mms_stat -> read );
749+ PRINTF ("SN_Client_WaitType: Invalid read state %d!" ,
750+ mms_stat -> read );
760751 #endif
761752 rc = MQTT_TRACE_ERROR (MQTT_CODE_ERROR_STAT );
762753 break ;
763754 }
764- } /* switch (msg->stat) */
765-
766- #ifdef WOLFMQTT_DEBUG_CLIENT
767- if (rc != MQTT_CODE_CONTINUE ) {
768- PRINTF ("SN_Client_WaitType: rc %d, state %d" , rc , mms_stat -> read );
769- }
770- #endif
755+ } /* switch (mms_stat->read) */
771756
772- if ( mms_stat -> read == MQTT_MSG_WAIT || rc != MQTT_CODE_CONTINUE ) {
773- /* reset state */
757+ /* no data read, then reset state */
758+ if ( mms_stat -> read == MQTT_MSG_WAIT ) {
774759 mms_stat -> read = MQTT_MSG_BEGIN ;
760+ }
775761
776- #ifdef WOLFMQTT_MULTITHREAD
777- if (mms_stat -> isReadActive ) {
778- mms_stat -> isReadActive = 0 ;
779- wm_SemUnlock (& client -> lockRecv );
780- }
781- #endif
762+ #ifdef WOLFMQTT_NONBLOCK
763+ /* if nonblocking and some data has been read, do not release read lock */
764+ if (rc == MQTT_CODE_CONTINUE && mms_stat -> read > MQTT_MSG_WAIT ) {
765+ return rc ;
782766 }
767+ #endif
768+
769+ MqttReadStop (client , mms_stat );
783770
784771#ifdef WOLFMQTT_NONBLOCK
785772 #ifdef WOLFMQTT_DEBUG_CLIENT
786- client -> lastRc = rc ;
773+ #ifdef WOLFMQTT_MULTITHREAD
774+ if (wm_SemLock (& client -> lockClient ) == 0 )
775+ #endif
776+ {
777+ client -> lastRc = rc ;
778+ #ifdef WOLFMQTT_MULTITHREAD
779+ wm_SemUnlock (& client -> lockClient );
787780 #endif
781+ }
782+ #endif /* WOLFMQTT_DEBUG_CLIENT */
788783 if (rc == MQTT_CODE_CONTINUE ) {
789784 return rc ;
790785 }
@@ -802,8 +797,23 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
802797
803798 if (!waitMatchFound ) {
804799 /* if we get here, then the we are still waiting for a packet */
800+ mms_stat -> read = MQTT_MSG_BEGIN ;
801+ #ifdef WOLFMQTT_NONBLOCK
802+ /* for non-blocking return with code continue instead of waiting again
803+ * if called with packet type and id of 'any' */
804+ if (wait_type == SN_MSG_TYPE_ANY && wait_packet_id == 0 ) {
805+ return MQTT_CODE_CONTINUE ;
806+ }
807+ #endif
808+ MQTT_TRACE_MSG ("Wait Again" );
805809 goto wait_again ;
806810 }
811+ #ifdef WOLFMQTT_DEBUG_CLIENT
812+ if (rc != MQTT_CODE_CONTINUE ) {
813+ PRINTF ("SN_Client_WaitType: rc %d, state %d-%d" ,
814+ rc , mms_stat -> read , mms_stat -> write );
815+ }
816+ #endif
807817
808818 return rc ;
809819}
0 commit comments