Skip to content

Commit 88b8966

Browse files
embhornclaude
andcommitted
curl: use internal loops for partial read/write handling
Change NetRead and NetWrite to loop internally until the entire transfer completes rather than returning MQTT_CODE_CONTINUE for partial transfers. This reduces round-trips and prevents timeout issues while still correctly handling partial data. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2f47748 commit 88b8966

1 file changed

Lines changed: 79 additions & 71 deletions

File tree

examples/mqttnet.c

Lines changed: 79 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -990,44 +990,48 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
990990
PRINTF("sock->curl = %p, sockfd = %d", (void *)sock->curl, sockfd);
991991
#endif
992992

993-
#ifdef WOLFMQTT_MULTITHREAD
994-
{
995-
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
996-
if (rc != 0) {
997-
return rc;
993+
/* Loop until all data is sent or error */
994+
do {
995+
#ifdef WOLFMQTT_MULTITHREAD
996+
{
997+
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
998+
if (rc != 0) {
999+
return rc;
1000+
}
9981001
}
999-
}
1000-
#endif
1002+
#endif
10011003

1002-
res = curl_easy_send(sock->curl, &buf[sock->bytes],
1003-
buf_len - sock->bytes, &sent);
1004+
res = curl_easy_send(sock->curl, &buf[sock->bytes],
1005+
buf_len - sock->bytes, &sent);
10041006

1005-
#ifdef WOLFMQTT_MULTITHREAD
1006-
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
1007-
#endif
1007+
#ifdef WOLFMQTT_MULTITHREAD
1008+
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
1009+
#endif
10081010

1009-
if (res == CURLE_OK) {
1010-
sock->bytes += (int)sent;
1011-
if (sock->bytes < buf_len) {
1012-
/* Partial write, return continue to retry */
1013-
return MQTT_CODE_CONTINUE;
1011+
if (res == CURLE_OK) {
1012+
sock->bytes += (int)sent;
1013+
if (sock->bytes == buf_len) {
1014+
/* Complete, reset for next operation */
1015+
sent = sock->bytes;
1016+
sock->bytes = 0;
1017+
return (int)sent;
1018+
}
1019+
/* Partial write, continue loop */
10141020
}
1015-
/* Complete, reset for next operation */
1016-
sent = sock->bytes;
1017-
sock->bytes = 0;
1018-
return (int)sent;
1019-
}
1020-
1021-
if (res == CURLE_AGAIN) {
1022-
wait_rc = mqttcurl_wait(sockfd, 0, timeout_ms,
1023-
sock->mqttCtx->test_mode);
1024-
return wait_rc;
1025-
}
1026-
1027-
PRINTF("error: curl_easy_send(%d) returned: %d, %s", buf_len, res,
1028-
curl_easy_strerror(res));
1029-
sock->bytes = 0;
1030-
return MQTT_CODE_ERROR_CURL;
1021+
else if (res == CURLE_AGAIN) {
1022+
wait_rc = mqttcurl_wait(sockfd, 0, timeout_ms,
1023+
sock->mqttCtx->test_mode);
1024+
if (wait_rc != MQTT_CODE_CONTINUE) {
1025+
return wait_rc;
1026+
}
1027+
}
1028+
else {
1029+
PRINTF("error: curl_easy_send(%d) returned: %d, %s", buf_len, res,
1030+
curl_easy_strerror(res));
1031+
sock->bytes = 0;
1032+
return MQTT_CODE_ERROR_CURL;
1033+
}
1034+
} while (1);
10311035
}
10321036

10331037
static int NetRead(void *context, byte* buf, int buf_len,
@@ -1069,50 +1073,54 @@ static int NetRead(void *context, byte* buf, int buf_len,
10691073
PRINTF("sock->curl = %p, sockfd = %d", (void *)sock->curl, sockfd);
10701074
#endif
10711075

1072-
#ifdef WOLFMQTT_MULTITHREAD
1073-
{
1074-
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
1075-
if (rc != 0) {
1076-
return rc;
1076+
/* Loop until all data is received or error */
1077+
do {
1078+
#ifdef WOLFMQTT_MULTITHREAD
1079+
{
1080+
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
1081+
if (rc != 0) {
1082+
return rc;
1083+
}
10771084
}
1078-
}
1079-
#endif
1085+
#endif
10801086

1081-
res = curl_easy_recv(sock->curl, &buf[sock->bytes],
1082-
buf_len - sock->bytes, &recvd);
1087+
res = curl_easy_recv(sock->curl, &buf[sock->bytes],
1088+
buf_len - sock->bytes, &recvd);
10831089

1084-
#ifdef WOLFMQTT_MULTITHREAD
1085-
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
1086-
#endif
1090+
#ifdef WOLFMQTT_MULTITHREAD
1091+
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
1092+
#endif
10871093

1088-
if (res == CURLE_OK) {
1089-
if (recvd == 0) {
1090-
/* Connection closed */
1091-
PRINTF("error: connection closed by peer");
1092-
sock->bytes = 0;
1093-
return MQTT_CODE_ERROR_NETWORK;
1094+
if (res == CURLE_OK) {
1095+
if (recvd == 0) {
1096+
/* Connection closed */
1097+
PRINTF("error: connection closed by peer");
1098+
sock->bytes = 0;
1099+
return MQTT_CODE_ERROR_NETWORK;
1100+
}
1101+
sock->bytes += (int)recvd;
1102+
if (sock->bytes == buf_len) {
1103+
/* Complete, reset for next operation */
1104+
recvd = sock->bytes;
1105+
sock->bytes = 0;
1106+
return (int)recvd;
1107+
}
1108+
/* Partial read, continue loop */
10941109
}
1095-
sock->bytes += (int)recvd;
1096-
if (sock->bytes < buf_len) {
1097-
/* Partial read, return continue to retry */
1098-
return MQTT_CODE_CONTINUE;
1110+
else if (res == CURLE_AGAIN) {
1111+
wait_rc = mqttcurl_wait(sockfd, 1, timeout_ms,
1112+
sock->mqttCtx->test_mode);
1113+
if (wait_rc != MQTT_CODE_CONTINUE) {
1114+
return wait_rc;
1115+
}
10991116
}
1100-
/* Complete, reset for next operation */
1101-
recvd = sock->bytes;
1102-
sock->bytes = 0;
1103-
return (int)recvd;
1104-
}
1105-
1106-
if (res == CURLE_AGAIN) {
1107-
wait_rc = mqttcurl_wait(sockfd, 1, timeout_ms,
1108-
sock->mqttCtx->test_mode);
1109-
return wait_rc;
1110-
}
1111-
1112-
PRINTF("error: curl_easy_recv(%d) returned: %d, %s", buf_len, res,
1113-
curl_easy_strerror(res));
1114-
sock->bytes = 0;
1115-
return MQTT_CODE_ERROR_CURL;
1117+
else {
1118+
PRINTF("error: curl_easy_recv(%d) returned: %d, %s", buf_len, res,
1119+
curl_easy_strerror(res));
1120+
sock->bytes = 0;
1121+
return MQTT_CODE_ERROR_CURL;
1122+
}
1123+
} while (1);
11161124
}
11171125

11181126
static int NetDisconnect(void *context)

0 commit comments

Comments
 (0)