@@ -952,6 +952,7 @@ static int NetConnect(void *context, const char* host, word16 port,
952952 return rc ;
953953 }
954954
955+ sock -> bytes = 0 ;
955956 sock -> stat = SOCK_CONN ;
956957 return MQTT_CODE_SUCCESS ;
957958}
@@ -995,59 +996,44 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
995996 PRINTF ("sock->curl = %p, sockfd = %d" , (void * )sock -> curl , sockfd );
996997#endif
997998
998- /* A very simple retry with timeout example. This assumes the entire
999- * payload will be transferred in a single shot without buffering.
1000- * todo: add buffering? */
1001- for (size_t i = 0 ; i < MQTT_CURL_NUM_RETRY ; ++ i ) {
1002- #ifdef WOLFMQTT_MULTITHREAD
999+ #ifdef WOLFMQTT_MULTITHREAD
1000+ {
10031001 int rc = wm_SemLock (& sock -> mqttCtx -> client .lockCURL );
10041002 if (rc != 0 ) {
10051003 return rc ;
10061004 }
1007- #endif
1008-
1009- res = curl_easy_send (sock -> curl , buf , buf_len , & sent );
1010-
1011- #ifdef WOLFMQTT_MULTITHREAD
1012- wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1013- #endif
1014-
1015- if (res == CURLE_OK ) {
1016- #if defined(WOLFMQTT_DEBUG_SOCKET )
1017- PRINTF ("info: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1018- curl_easy_strerror (res ));
1019- #endif
1020- break ;
1021- }
1005+ }
1006+ #endif
10221007
1023- if (res == CURLE_AGAIN ) {
1024- #if defined(WOLFMQTT_DEBUG_SOCKET )
1025- PRINTF ("info: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1026- curl_easy_strerror (res ));
1027- #endif
1008+ res = curl_easy_send (sock -> curl , & buf [sock -> bytes ],
1009+ buf_len - sock -> bytes , & sent );
10281010
1029- wait_rc = mqttcurl_wait (sockfd , 0 , timeout_ms ,
1030- sock -> mqttCtx -> test_mode );
1011+ #ifdef WOLFMQTT_MULTITHREAD
1012+ wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1013+ #endif
10311014
1032- if (wait_rc == MQTT_CODE_CONTINUE ) {
1033- continue ;
1034- }
1035- else {
1036- return wait_rc ;
1037- }
1015+ if (res == CURLE_OK ) {
1016+ sock -> bytes += (int )sent ;
1017+ if (sock -> bytes < buf_len ) {
1018+ /* Partial write, return continue to retry */
1019+ return MQTT_CODE_CONTINUE ;
10381020 }
1039-
1040- PRINTF ( "error: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1041- curl_easy_strerror ( res )) ;
1042- return MQTT_CODE_ERROR_CURL ;
1021+ /* Complete, reset for next operation */
1022+ sent = sock -> bytes ;
1023+ sock -> bytes = 0 ;
1024+ return ( int ) sent ;
10431025 }
10441026
1045- if ((int ) sent != buf_len ) {
1046- PRINTF ("error: sent %d bytes, expected %d" , (int )sent , buf_len );
1047- return MQTT_CODE_ERROR_CURL ;
1027+ if (res == CURLE_AGAIN ) {
1028+ wait_rc = mqttcurl_wait (sockfd , 0 , timeout_ms ,
1029+ sock -> mqttCtx -> test_mode );
1030+ return wait_rc ;
10481031 }
10491032
1050- return buf_len ;
1033+ PRINTF ("error: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1034+ curl_easy_strerror (res ));
1035+ sock -> bytes = 0 ;
1036+ return MQTT_CODE_ERROR_CURL ;
10511037}
10521038
10531039static int NetRead (void * context , byte * buf , int buf_len ,
@@ -1089,59 +1075,50 @@ static int NetRead(void *context, byte* buf, int buf_len,
10891075 PRINTF ("sock->curl = %p, sockfd = %d" , (void * )sock -> curl , sockfd );
10901076#endif
10911077
1092- /* A very simple retry with timeout example. This assumes the entire
1093- * payload will be transferred in a single shot without buffering.
1094- * todo: add buffering? */
1095- for (size_t i = 0 ; i < MQTT_CURL_NUM_RETRY ; ++ i ) {
1096- #ifdef WOLFMQTT_MULTITHREAD
1078+ #ifdef WOLFMQTT_MULTITHREAD
1079+ {
10971080 int rc = wm_SemLock (& sock -> mqttCtx -> client .lockCURL );
10981081 if (rc != 0 ) {
10991082 return rc ;
11001083 }
1101- #endif
1084+ }
1085+ #endif
11021086
1103- res = curl_easy_recv (sock -> curl , buf , buf_len , & recvd );
1087+ res = curl_easy_recv (sock -> curl , & buf [sock -> bytes ],
1088+ buf_len - sock -> bytes , & recvd );
11041089
1105- #ifdef WOLFMQTT_MULTITHREAD
1106- wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1107- #endif
1090+ #ifdef WOLFMQTT_MULTITHREAD
1091+ wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1092+ #endif
11081093
1109- if (res == CURLE_OK ) {
1110- # if defined( WOLFMQTT_DEBUG_SOCKET )
1111- PRINTF ( "info: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1112- curl_easy_strerror ( res ) );
1113- #endif
1114- break ;
1094+ if (res == CURLE_OK ) {
1095+ if ( recvd == 0 ) {
1096+ /* Connection closed */
1097+ PRINTF ( "error: connection closed by peer" );
1098+ sock -> bytes = 0 ;
1099+ return MQTT_CODE_ERROR_NETWORK ;
11151100 }
1116-
1117- if (res == CURLE_AGAIN ) {
1118- #if defined(WOLFMQTT_DEBUG_SOCKET )
1119- PRINTF ("info: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1120- curl_easy_strerror (res ));
1121- #endif
1122-
1123- wait_rc = mqttcurl_wait (sockfd , 1 , timeout_ms ,
1124- sock -> mqttCtx -> test_mode );
1125-
1126- if (wait_rc == MQTT_CODE_CONTINUE ) {
1127- continue ;
1128- }
1129- else {
1130- return wait_rc ;
1131- }
1101+ sock -> bytes += (int )recvd ;
1102+ if (sock -> bytes < buf_len ) {
1103+ /* Partial read, return continue to retry */
1104+ return MQTT_CODE_CONTINUE ;
11321105 }
1133-
1134- PRINTF ( "error: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1135- curl_easy_strerror ( res )) ;
1136- return MQTT_CODE_ERROR_CURL ;
1106+ /* Complete, reset for next operation */
1107+ recvd = sock -> bytes ;
1108+ sock -> bytes = 0 ;
1109+ return ( int ) recvd ;
11371110 }
11381111
1139- if ((int ) recvd != buf_len ) {
1140- PRINTF ("error: recvd %d bytes, expected %d" , (int )recvd , buf_len );
1141- return MQTT_CODE_ERROR_CURL ;
1112+ if (res == CURLE_AGAIN ) {
1113+ wait_rc = mqttcurl_wait (sockfd , 1 , timeout_ms ,
1114+ sock -> mqttCtx -> test_mode );
1115+ return wait_rc ;
11421116 }
11431117
1144- return buf_len ;
1118+ PRINTF ("error: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1119+ curl_easy_strerror (res ));
1120+ sock -> bytes = 0 ;
1121+ return MQTT_CODE_ERROR_CURL ;
11451122}
11461123
11471124static int NetDisconnect (void * context )
@@ -1159,6 +1136,7 @@ static int NetDisconnect(void *context)
11591136 curl_easy_cleanup (sock -> curl );
11601137 sock -> curl = NULL ;
11611138 }
1139+ sock -> bytes = 0 ;
11621140
11631141 return 0 ;
11641142}
0 commit comments