@@ -737,22 +737,16 @@ mqttcurl_connect(SocketContext* sock, const char* host, word16 port,
737737
738738 if (timeout_ms != 0 ) {
739739 res = curl_easy_setopt (sock -> curl , CURLOPT_CONNECTTIMEOUT_MS ,
740- timeout_ms );
740+ ( long ) timeout_ms );
741741
742742 if (res != CURLE_OK ) {
743743 PRINTF ("error: curl_easy_setopt(CONNECTTIMEOUT_MS, %d) "
744744 "returned %d" , timeout_ms , res );
745745 return MQTT_CODE_ERROR_CURL ;
746746 }
747-
748- res = curl_easy_setopt (sock -> curl , CURLOPT_TIMEOUT_MS ,
749- timeout_ms );
750-
751- if (res != CURLE_OK ) {
752- PRINTF ("error: curl_easy_setopt(TIMEOUT_MS, %d) "
753- "returned %d" , timeout_ms , res );
754- return MQTT_CODE_ERROR_CURL ;
755- }
747+ /* Note: CURLOPT_TIMEOUT_MS is not used here because it sets a total
748+ * transfer timeout, which is not applicable with CURLOPT_CONNECT_ONLY
749+ * mode where we use curl_easy_send/recv manually after connect. */
756750 }
757751
758752 res = curl_easy_setopt (sock -> curl , CURLOPT_URL , host );
@@ -763,7 +757,7 @@ mqttcurl_connect(SocketContext* sock, const char* host, word16 port,
763757 return MQTT_CODE_ERROR_CURL ;
764758 }
765759
766- res = curl_easy_setopt (sock -> curl , CURLOPT_PORT , port );
760+ res = curl_easy_setopt (sock -> curl , CURLOPT_PORT , ( long ) port );
767761
768762 if (res != CURLE_OK ) {
769763 PRINTF ("error: curl_easy_setopt(PORT, %d) returned: %d" ,
@@ -845,7 +839,7 @@ mqttcurl_connect(SocketContext* sock, const char* host, word16 port,
845839 */
846840
847841 /* Set peer and host verification. */
848- res = curl_easy_setopt (sock -> curl , CURLOPT_SSL_VERIFYPEER , 1 );
842+ res = curl_easy_setopt (sock -> curl , CURLOPT_SSL_VERIFYPEER , 1L );
849843
850844 if (res != CURLE_OK ) {
851845 PRINTF ("error: curl_easy_setopt(SSL_VERIFYPEER) returned: %d" ,
@@ -856,10 +850,10 @@ mqttcurl_connect(SocketContext* sock, const char* host, word16 port,
856850 /* Only do server host verification when not running against
857851 * localhost broker. */
858852 if (XSTRCMP (host , "localhost" ) == 0 ) {
859- res = curl_easy_setopt (sock -> curl , CURLOPT_SSL_VERIFYHOST , 0 );
853+ res = curl_easy_setopt (sock -> curl , CURLOPT_SSL_VERIFYHOST , 0L );
860854 }
861855 else {
862- res = curl_easy_setopt (sock -> curl , CURLOPT_SSL_VERIFYHOST , 2 );
856+ res = curl_easy_setopt (sock -> curl , CURLOPT_SSL_VERIFYHOST , 2L );
863857 }
864858
865859 if (res != CURLE_OK ) {
@@ -898,7 +892,7 @@ mqttcurl_connect(SocketContext* sock, const char* host, word16 port,
898892 }
899893 #endif
900894
901- res = curl_easy_setopt (sock -> curl , CURLOPT_CONNECT_ONLY , 1 );
895+ res = curl_easy_setopt (sock -> curl , CURLOPT_CONNECT_ONLY , 1L );
902896
903897 if (res != CURLE_OK ) {
904898 PRINTF ("error: curl_easy_setopt(CONNECT_ONLY, 1) returned: %d" ,
@@ -952,6 +946,7 @@ static int NetConnect(void *context, const char* host, word16 port,
952946 return rc ;
953947 }
954948
949+ sock -> bytes = 0 ;
955950 sock -> stat = SOCK_CONN ;
956951 return MQTT_CODE_SUCCESS ;
957952}
@@ -995,59 +990,44 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
995990 PRINTF ("sock->curl = %p, sockfd = %d" , (void * )sock -> curl , sockfd );
996991#endif
997992
998- /* A very simple retry with timeout example. This assumes the entire
999- * payload will be transferred in a single shot without buffering.
1000- * todo: add buffering? */
1001- for (size_t i = 0 ; i < MQTT_CURL_NUM_RETRY ; ++ i ) {
1002- #ifdef WOLFMQTT_MULTITHREAD
993+ #ifdef WOLFMQTT_MULTITHREAD
994+ {
1003995 int rc = wm_SemLock (& sock -> mqttCtx -> client .lockCURL );
1004996 if (rc != 0 ) {
1005997 return rc ;
1006998 }
1007- #endif
1008-
1009- res = curl_easy_send (sock -> curl , buf , buf_len , & sent );
1010-
1011- #ifdef WOLFMQTT_MULTITHREAD
1012- wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1013- #endif
1014-
1015- if (res == CURLE_OK ) {
1016- #if defined(WOLFMQTT_DEBUG_SOCKET )
1017- PRINTF ("info: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1018- curl_easy_strerror (res ));
1019- #endif
1020- break ;
1021- }
999+ }
1000+ #endif
10221001
1023- if (res == CURLE_AGAIN ) {
1024- #if defined(WOLFMQTT_DEBUG_SOCKET )
1025- PRINTF ("info: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1026- curl_easy_strerror (res ));
1027- #endif
1002+ res = curl_easy_send (sock -> curl , & buf [sock -> bytes ],
1003+ buf_len - sock -> bytes , & sent );
10281004
1029- wait_rc = mqttcurl_wait (sockfd , 0 , timeout_ms ,
1030- sock -> mqttCtx -> test_mode );
1005+ #ifdef WOLFMQTT_MULTITHREAD
1006+ wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1007+ #endif
10311008
1032- if (wait_rc == MQTT_CODE_CONTINUE ) {
1033- continue ;
1034- }
1035- else {
1036- return wait_rc ;
1037- }
1009+ if (res == CURLE_OK ) {
1010+ sock -> bytes += (int )sent ;
1011+ if (sock -> bytes < buf_len ) {
1012+ /* Partial write, return continue to retry */
1013+ return MQTT_CODE_CONTINUE ;
10381014 }
1039-
1040- PRINTF ( "error: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1041- curl_easy_strerror ( res )) ;
1042- return MQTT_CODE_ERROR_CURL ;
1015+ /* Complete, reset for next operation */
1016+ sent = sock -> bytes ;
1017+ sock -> bytes = 0 ;
1018+ return ( int ) sent ;
10431019 }
10441020
1045- if ((int ) sent != buf_len ) {
1046- PRINTF ("error: sent %d bytes, expected %d" , (int )sent , buf_len );
1047- return MQTT_CODE_ERROR_CURL ;
1021+ if (res == CURLE_AGAIN ) {
1022+ wait_rc = mqttcurl_wait (sockfd , 0 , timeout_ms ,
1023+ sock -> mqttCtx -> test_mode );
1024+ return wait_rc ;
10481025 }
10491026
1050- return buf_len ;
1027+ PRINTF ("error: curl_easy_send(%d) returned: %d, %s" , buf_len , res ,
1028+ curl_easy_strerror (res ));
1029+ sock -> bytes = 0 ;
1030+ return MQTT_CODE_ERROR_CURL ;
10511031}
10521032
10531033static int NetRead (void * context , byte * buf , int buf_len ,
@@ -1089,59 +1069,50 @@ static int NetRead(void *context, byte* buf, int buf_len,
10891069 PRINTF ("sock->curl = %p, sockfd = %d" , (void * )sock -> curl , sockfd );
10901070#endif
10911071
1092- /* A very simple retry with timeout example. This assumes the entire
1093- * payload will be transferred in a single shot without buffering.
1094- * todo: add buffering? */
1095- for (size_t i = 0 ; i < MQTT_CURL_NUM_RETRY ; ++ i ) {
1096- #ifdef WOLFMQTT_MULTITHREAD
1072+ #ifdef WOLFMQTT_MULTITHREAD
1073+ {
10971074 int rc = wm_SemLock (& sock -> mqttCtx -> client .lockCURL );
10981075 if (rc != 0 ) {
10991076 return rc ;
11001077 }
1101- #endif
1078+ }
1079+ #endif
11021080
1103- res = curl_easy_recv (sock -> curl , buf , buf_len , & recvd );
1081+ res = curl_easy_recv (sock -> curl , & buf [sock -> bytes ],
1082+ buf_len - sock -> bytes , & recvd );
11041083
1105- #ifdef WOLFMQTT_MULTITHREAD
1106- wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1107- #endif
1084+ #ifdef WOLFMQTT_MULTITHREAD
1085+ wm_SemUnlock (& sock -> mqttCtx -> client .lockCURL );
1086+ #endif
11081087
1109- if (res == CURLE_OK ) {
1110- # if defined( WOLFMQTT_DEBUG_SOCKET )
1111- PRINTF ( "info: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1112- curl_easy_strerror ( res ) );
1113- #endif
1114- break ;
1088+ if (res == CURLE_OK ) {
1089+ if ( recvd == 0 ) {
1090+ /* Connection closed */
1091+ PRINTF ( "error: connection closed by peer" );
1092+ sock -> bytes = 0 ;
1093+ return MQTT_CODE_ERROR_NETWORK ;
11151094 }
1116-
1117- if (res == CURLE_AGAIN ) {
1118- #if defined(WOLFMQTT_DEBUG_SOCKET )
1119- PRINTF ("info: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1120- curl_easy_strerror (res ));
1121- #endif
1122-
1123- wait_rc = mqttcurl_wait (sockfd , 1 , timeout_ms ,
1124- sock -> mqttCtx -> test_mode );
1125-
1126- if (wait_rc == MQTT_CODE_CONTINUE ) {
1127- continue ;
1128- }
1129- else {
1130- return wait_rc ;
1131- }
1095+ sock -> bytes += (int )recvd ;
1096+ if (sock -> bytes < buf_len ) {
1097+ /* Partial read, return continue to retry */
1098+ return MQTT_CODE_CONTINUE ;
11321099 }
1133-
1134- PRINTF ( "error: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1135- curl_easy_strerror ( res )) ;
1136- return MQTT_CODE_ERROR_CURL ;
1100+ /* Complete, reset for next operation */
1101+ recvd = sock -> bytes ;
1102+ sock -> bytes = 0 ;
1103+ return ( int ) recvd ;
11371104 }
11381105
1139- if ((int ) recvd != buf_len ) {
1140- PRINTF ("error: recvd %d bytes, expected %d" , (int )recvd , buf_len );
1141- return MQTT_CODE_ERROR_CURL ;
1106+ if (res == CURLE_AGAIN ) {
1107+ wait_rc = mqttcurl_wait (sockfd , 1 , timeout_ms ,
1108+ sock -> mqttCtx -> test_mode );
1109+ return wait_rc ;
11421110 }
11431111
1144- return buf_len ;
1112+ PRINTF ("error: curl_easy_recv(%d) returned: %d, %s" , buf_len , res ,
1113+ curl_easy_strerror (res ));
1114+ sock -> bytes = 0 ;
1115+ return MQTT_CODE_ERROR_CURL ;
11451116}
11461117
11471118static int NetDisconnect (void * context )
@@ -1159,6 +1130,7 @@ static int NetDisconnect(void *context)
11591130 curl_easy_cleanup (sock -> curl );
11601131 sock -> curl = NULL ;
11611132 }
1133+ sock -> bytes = 0 ;
11621134
11631135 return 0 ;
11641136}
0 commit comments