Skip to content

Commit a1639db

Browse files
committed
Fixes from review
1 parent dba0ca8 commit a1639db

2 files changed

Lines changed: 79 additions & 15 deletions

File tree

src/mqtt_broker.c

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,14 @@ static int callback_broker_mqtt(struct lws *wsi,
643643
ws = (BrokerWsCtx*)bc->ws_ctx;
644644
if (ws == NULL) return 0;
645645

646+
if (ws->pending_close) {
647+
/* Broker-initiated close: stage the close code here (inside a
648+
* callback) where lws_close_reason() is actually effective, then
649+
* return -1 to trigger the WebSocket close handshake. */
650+
lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
651+
return -1;
652+
}
653+
646654
if (ws->tx_pending != NULL && ws->tx_len > 0) {
647655
int n = lws_write(wsi, ws->tx_pending + LWS_PRE,
648656
ws->tx_len, LWS_WRITE_BINARY);
@@ -672,13 +680,33 @@ static int callback_broker_mqtt(struct lws *wsi,
672680
if (ws != NULL) {
673681
ws->status = 0;
674682
ws->wsi = NULL;
683+
684+
if (ws->pending_close) {
685+
/* Broker-initiated close: BrokerClient_Remove is already on
686+
* the call stack (BrokerWsNetDisconnect's spin loop drove us
687+
* here). Signal completion and return — do NOT call
688+
* BrokerClient_Remove again. */
689+
*bc_ptr = NULL;
690+
return 0;
691+
}
675692
}
676693

677-
/* Publish will on abnormal disconnect */
694+
/* Peer-initiated close: publish will and remove client. */
678695
BrokerClient_PublishWill(broker, bc);
679696
BrokerSubs_RemoveClient(broker, bc);
680-
*bc_ptr = NULL; /* clear user data before removal */
681-
BrokerClient_Remove(broker, bc);
697+
*bc_ptr = NULL;
698+
if (ws != NULL && ws->processing) {
699+
/* bc is on the call stack inside BrokerClient_Process (a packet
700+
* handler triggered a lws_service spin that delivered this CLOSED
701+
* callback). Freeing bc now would leave dangling pointers in the
702+
* packet handler — e.g. the fan-out payload pointing into rx_buf,
703+
* or the post-fan-out PUBACK write into tx_buf. Defer the free;
704+
* BrokerClient_Process will call BrokerClient_Remove on return. */
705+
ws->pending_remove = 1;
706+
}
707+
else {
708+
BrokerClient_Remove(broker, bc);
709+
}
682710
}
683711

684712
return 0;
@@ -800,17 +828,36 @@ static int BrokerWsNetDisconnect(void* context)
800828
}
801829

802830
if (ws->wsi != NULL && ws->status > 0) {
803-
BrokerClient **bc_ptr;
831+
struct lws *wsi_local = (struct lws*)ws->wsi;
832+
struct lws_context *lws_ctx = lws_get_context(wsi_local);
833+
int attempts = 0;
834+
804835
WBLOG_INFO(bc->broker, "broker: ws disconnect (wsi=%p)", (void*)ws->wsi);
805-
/* Clear lws per-session user data so that any later callbacks
806-
* (e.g. LWS_CALLBACK_CLOSED) see NULL and skip processing.
807-
* Without this, the callback would dereference the freed bc. */
808-
bc_ptr = (BrokerClient**)lws_wsi_user(ws->wsi);
809-
if (bc_ptr != NULL) {
810-
*bc_ptr = NULL;
836+
837+
/* Signal the WRITEABLE callback to send a close frame and return -1.
838+
* lws_close_reason() is only effective from inside a callback, so the
839+
* flag + writable-callback + return(-1) pattern is used here to
840+
* properly initiate the WebSocket close handshake. */
841+
ws->pending_close = 1;
842+
lws_callback_on_writable(wsi_local);
843+
844+
/* Spin until LWS_CALLBACK_CLOSED fires and clears ws->wsi. */
845+
while (ws->wsi != NULL && attempts < 100) {
846+
lws_service(lws_ctx, 0);
847+
attempts++;
848+
}
849+
850+
/* Fallback: if the close handshake did not complete in time, null out
851+
* the per-session user data so any future callback cannot dereference
852+
* the about-to-be-freed bc. The wsi is left for lws to reclaim via
853+
* its own keep-alive machinery. */
854+
if (ws->wsi != NULL) {
855+
BrokerClient **bc_ptr = (BrokerClient**)lws_wsi_user(wsi_local);
856+
if (bc_ptr != NULL) {
857+
*bc_ptr = NULL;
858+
}
859+
ws->wsi = NULL;
811860
}
812-
lws_close_reason(ws->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
813-
ws->wsi = NULL;
814861
}
815862

816863
if (ws->tx_pending != NULL) {
@@ -932,9 +979,6 @@ static void BrokerClient_Free(BrokerClient* bc)
932979
}
933980
else
934981
#endif
935-
{
936-
(void)BrokerNetDisconnect(bc);
937-
}
938982

939983
#ifdef ENABLE_MQTT_TLS
940984
if (bc->client.tls.ssl) {
@@ -3070,6 +3114,11 @@ static int BrokerClient_Process(MqttBroker* broker, BrokerClient* bc)
30703114
activity = 1;
30713115
WBLOG_DBG(broker, "broker: packet sock=%d type=%u len=%d",
30723116
(int)bc->sock, type, rc);
3117+
#ifdef ENABLE_MQTT_WEBSOCKET
3118+
if (bc->ws_ctx != NULL) {
3119+
((BrokerWsCtx*)bc->ws_ctx)->processing = 1;
3120+
}
3121+
#endif
30733122
switch (type) {
30743123
case MQTT_PACKET_TYPE_CONNECT:
30753124
{
@@ -3125,6 +3174,18 @@ static int BrokerClient_Process(MqttBroker* broker, BrokerClient* bc)
31253174
default:
31263175
break;
31273176
}
3177+
#ifdef ENABLE_MQTT_WEBSOCKET
3178+
if (bc->ws_ctx != NULL) {
3179+
BrokerWsCtx *wsc = (BrokerWsCtx*)bc->ws_ctx;
3180+
wsc->processing = 0;
3181+
if (wsc->pending_remove) {
3182+
/* The peer closed bc's connection while we were dispatching a
3183+
* packet (LWS_CALLBACK_CLOSED deferred the free to here). */
3184+
BrokerClient_Remove(broker, bc);
3185+
return 0;
3186+
}
3187+
}
3188+
#endif
31283189
}
31293190

31303191
/* Check keepalive timeout (MQTT spec 3.1.2.10: 1.5x keep alive) */

wolfmqtt/mqtt_broker.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ typedef struct BrokerWsCtx {
182182
byte *tx_pending; /* allocated with LWS_PRE prefix room */
183183
size_t tx_len;
184184
int status; /* 1=established, 0=closed, -1=error */
185+
int pending_close; /* 1 when broker-initiated close is in progress */
186+
int processing; /* 1 while BrokerClient_Process is dispatching a packet */
187+
int pending_remove; /* 1 when peer closed during processing; deferred free */
185188
} BrokerWsCtx;
186189
#endif /* ENABLE_MQTT_WEBSOCKET */
187190

0 commit comments

Comments
 (0)