Skip to content

Commit dba0ca8

Browse files
embhornclaude
andcommitted
Add WebSocket broker configurations to CI workflow
Add two new matrix entries to broker-check.yml for testing the broker with WebSocket support: plain WebSocket and WebSocket + TLS. Installs libwebsockets-dev and configures wolfSSL with --enable-opensslcoexist for system lws compatibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a4299c5 commit dba0ca8

5 files changed

Lines changed: 68 additions & 67 deletions

File tree

.github/workflows/broker-check.yml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,23 @@ jobs:
3737
- name: "Broker TLS-only (no insecure)"
3838
cflags: ""
3939
wolfmqtt_opts: "--enable-broker --enable-tls --disable-broker-insecure"
40+
- name: "Broker with WebSocket"
41+
cflags: ""
42+
wolfmqtt_opts: "--enable-broker --enable-websocket"
43+
extra_deps: "libwebsockets-dev"
44+
wolfssl_opts: "--enable-opensslcoexist"
45+
- name: "Broker with WebSocket + TLS"
46+
cflags: ""
47+
wolfmqtt_opts: "--enable-broker --enable-tls --enable-websocket"
48+
extra_deps: "libwebsockets-dev"
49+
wolfssl_opts: "--enable-opensslcoexist --enable-enckeys"
4050

4151
steps:
4252
- name: Install dependencies
4353
run: |
4454
export DEBIAN_FRONTEND=noninteractive
4555
sudo apt-get update
46-
sudo apt-get install -y mosquitto-clients
56+
sudo apt-get install -y mosquitto-clients ${{ matrix.extra_deps }}
4757
4858
- uses: actions/checkout@master
4959
with:
@@ -54,7 +64,7 @@ jobs:
5464
run: ./autogen.sh
5565
- name: wolfssl configure
5666
working-directory: ./wolfssl
57-
run: ./configure --enable-enckeys
67+
run: ./configure ${{ matrix.wolfssl_opts || '--enable-enckeys' }}
5868
- name: wolfssl make
5969
working-directory: ./wolfssl
6070
run: make

examples/websocket/net_libwebsockets.c

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@
3333

3434
#include <libwebsockets.h>
3535

36+
/* Compatibility for older libwebsockets versions (pre-4.1) */
37+
#ifndef LWS_PROTOCOL_LIST_TERM
38+
#define LWS_PROTOCOL_LIST_TERM { NULL, NULL, 0, 0, 0, NULL, 0 }
39+
#endif
40+
3641
/* Network context for libwebsockets */
3742
typedef struct _LibwebsockContext {
3843
struct lws_context *context;
@@ -74,37 +79,14 @@ static int callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
7479
XMEMCPY(net->rx_buffer + net->rx_len, in, len);
7580
net->rx_len += len;
7681
} else {
77-
/* Buffer overflow - handle error */
78-
lwsl_err("WebSocket receive buffer overflow"
79-
"- dropping oldest data\n");
80-
81-
/* Simple approach: If new data is larger than buffer,
82-
* just keep newest data */
83-
if (len >= sizeof(net->rx_buffer)) {
84-
/* New data is larger than entire buffer,
85-
* keep only what fits */
86-
/* Cast to byte pointer to allow pointer arithmetic */
87-
const byte* in_bytes = (const byte*)in;
88-
XMEMCPY(net->rx_buffer,
89-
&in_bytes[len - sizeof(net->rx_buffer)],
90-
sizeof(net->rx_buffer));
91-
net->rx_len = sizeof(net->rx_buffer);
92-
} else {
93-
/* Keep as much new data as possible */
94-
size_t keep_bytes = sizeof(net->rx_buffer) - len;
95-
96-
/* Move the portion of old data we want to
97-
* keep to the beginning */
98-
if (keep_bytes > 0 && net->rx_len > 0) {
99-
XMEMMOVE(net->rx_buffer,
100-
net->rx_buffer + (net->rx_len - keep_bytes),
101-
keep_bytes);
102-
}
103-
104-
/* Append all new data */
105-
XMEMCPY(net->rx_buffer + keep_bytes, in, len);
106-
net->rx_len = keep_bytes + len;
107-
}
82+
/* Dropping bytes would desynchronize MQTT packet framing,
83+
* so treat overflow as a fatal protocol error. */
84+
lwsl_err("WebSocket receive buffer overflow "
85+
"(have=%d, need=%d, max=%d)\n",
86+
(int)net->rx_len, (int)len,
87+
(int)sizeof(net->rx_buffer));
88+
net->status = -1;
89+
return -1; /* close connection */
10890
}
10991
}
11092
}

scripts/broker.test

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,7 @@ if [ "$has_websocket" = "yes" ] && [ -x ./$ws_client_bin ]; then
887887
broker_pid=$no_pid
888888
fi
889889
./$broker_bin -p $tcp_port -w $ws_port \
890-
>"${TMP_DIR}/t17_broker.log" 2>&1 &
890+
>"${TMP_DIR}/t22_broker.log" 2>&1 &
891891
broker_pid=$!
892892
check_broker $tcp_port
893893
check_broker $ws_port
@@ -899,29 +899,29 @@ if [ "$has_websocket" = "yes" ] && [ -x ./$ws_client_bin ]; then
899899
# Use stdbuf to force line-buffered stdout (otherwise printf output
900900
# is fully buffered when redirected to a file and lost on kill).
901901
timeout 15 stdbuf -oL ./$ws_client_bin -h 127.0.0.1 -p $ws_port \
902-
>"${TMP_DIR}/t17.log" 2>&1 &
903-
T17_WS_PID=$!
904-
TEST_PIDS+=($T17_WS_PID)
902+
>"${TMP_DIR}/t22.log" 2>&1 &
903+
T22_WS_PID=$!
904+
TEST_PIDS+=($T22_WS_PID)
905905
sleep 3
906906
# Verify WS client connected and subscribed before publishing.
907907
# Check both client log and broker log (broker always prints via
908908
# PRINTF which is unbuffered).
909909
ws_connected=no
910-
if grep -q "MQTT Connected" "${TMP_DIR}/t17.log" 2>/dev/null; then
910+
if grep -q "MQTT Connected" "${TMP_DIR}/t22.log" 2>/dev/null; then
911911
ws_connected=yes
912-
elif grep -q "SUBACK sock=-1" "${TMP_DIR}/t17_broker.log" 2>/dev/null; then
912+
elif grep -q "SUBACK sock=-1" "${TMP_DIR}/t22_broker.log" 2>/dev/null; then
913913
ws_connected=yes
914914
fi
915915
if [ "$ws_connected" = "yes" ]; then
916916
# Publish from TCP client to the WS client's subscribed topic
917917
./$pub_bin -T -h 127.0.0.1 -p $tcp_port -n "test/topic" -m "ws_hello" \
918-
>"${TMP_DIR}/t17_pub.log" 2>&1
918+
>"${TMP_DIR}/t22_pub.log" 2>&1
919919
sleep 2
920920
fi
921-
kill $T17_WS_PID 2>/dev/null
922-
wait $T17_WS_PID 2>/dev/null || true
921+
kill $T22_WS_PID 2>/dev/null
922+
wait $T22_WS_PID 2>/dev/null || true
923923
TEST_PIDS=()
924-
if grep -q "ws_hello" "${TMP_DIR}/t17.log" 2>/dev/null; then
924+
if grep -q "ws_hello" "${TMP_DIR}/t22.log" 2>/dev/null; then
925925
echo "PASS: WebSocket connect and subscribe (message received)"
926926
elif [ "$ws_connected" = "yes" ]; then
927927
echo "PASS: WebSocket connect and subscribe (connected ok)"

src/mqtt_broker.c

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,11 @@ static void BrokerTls_Free(MqttBroker* broker)
465465

466466
#include <libwebsockets.h>
467467

468+
/* Compatibility for older libwebsockets versions (pre-4.1) */
469+
#ifndef LWS_PROTOCOL_LIST_TERM
470+
#define LWS_PROTOCOL_LIST_TERM { NULL, NULL, 0, 0, 0, NULL, 0 }
471+
#endif
472+
468473
/* Forward declaration for the no-op connect callback (defined after WS section) */
469474
static int BrokerNetConnect(void* context, const char* host, word16 port,
470475
int timeout_ms);
@@ -622,25 +627,13 @@ static int callback_broker_mqtt(struct lws *wsi,
622627
ws->rx_len += len;
623628
}
624629
else {
625-
WBLOG_ERR(broker, "broker: ws rx buffer overflow (wsi=%p)",
626-
(void*)wsi);
627-
/* Drop oldest data to make room */
628-
if (len >= sizeof(ws->rx_buffer)) {
629-
const byte* in_bytes = (const byte*)in;
630-
XMEMCPY(ws->rx_buffer,
631-
&in_bytes[len - sizeof(ws->rx_buffer)],
632-
sizeof(ws->rx_buffer));
633-
ws->rx_len = sizeof(ws->rx_buffer);
634-
}
635-
else {
636-
size_t keep = sizeof(ws->rx_buffer) - len;
637-
if (keep > 0 && ws->rx_len > 0) {
638-
XMEMMOVE(ws->rx_buffer,
639-
ws->rx_buffer + (ws->rx_len - keep), keep);
640-
}
641-
XMEMCPY(ws->rx_buffer + keep, in, len);
642-
ws->rx_len = keep + len;
643-
}
630+
/* Dropping bytes would desynchronize MQTT packet framing,
631+
* so treat overflow as a fatal protocol error. */
632+
WBLOG_ERR(broker, "broker: ws rx buffer overflow "
633+
"(wsi=%p, have=%d, need=%d, max=%d)",
634+
(void*)wsi, (int)ws->rx_len, (int)len,
635+
(int)sizeof(ws->rx_buffer));
636+
return -1; /* close connection */
644637
}
645638
}
646639
else if (reason == LWS_CALLBACK_SERVER_WRITEABLE) {
@@ -653,14 +646,17 @@ static int callback_broker_mqtt(struct lws *wsi,
653646
if (ws->tx_pending != NULL && ws->tx_len > 0) {
654647
int n = lws_write(wsi, ws->tx_pending + LWS_PRE,
655648
ws->tx_len, LWS_WRITE_BINARY);
649+
if (n < (int)ws->tx_len) {
650+
WBLOG_ERR(broker, "broker: ws write failed (wsi=%p, "
651+
"n=%d, len=%d)", (void*)wsi, n, (int)ws->tx_len);
652+
WOLFMQTT_FREE(ws->tx_pending);
653+
ws->tx_pending = NULL;
654+
ws->tx_len = 0;
655+
return -1;
656+
}
656657
WOLFMQTT_FREE(ws->tx_pending);
657658
ws->tx_pending = NULL;
658659
ws->tx_len = 0;
659-
if (n < 0) {
660-
WBLOG_ERR(broker, "broker: ws write failed (wsi=%p)",
661-
(void*)wsi);
662-
return -1;
663-
}
664660
}
665661
}
666662
else if (reason == LWS_CALLBACK_CLOSED) {
@@ -804,7 +800,15 @@ static int BrokerWsNetDisconnect(void* context)
804800
}
805801

806802
if (ws->wsi != NULL && ws->status > 0) {
803+
BrokerClient **bc_ptr;
807804
WBLOG_INFO(bc->broker, "broker: ws disconnect (wsi=%p)", (void*)ws->wsi);
805+
/* Clear lws per-session user data so that any later callbacks
806+
* (e.g. LWS_CALLBACK_CLOSED) see NULL and skip processing.
807+
* Without this, the callback would dereference the freed bc. */
808+
bc_ptr = (BrokerClient**)lws_wsi_user(ws->wsi);
809+
if (bc_ptr != NULL) {
810+
*bc_ptr = NULL;
811+
}
808812
lws_close_reason(ws->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
809813
ws->wsi = NULL;
810814
}

wolfmqtt/mqtt_broker.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ typedef struct MqttBrokerNet {
167167
/* WebSocket per-client context */
168168
/* -------------------------------------------------------------------------- */
169169
#ifdef ENABLE_MQTT_WEBSOCKET
170+
#ifdef WOLFMQTT_STATIC_MEMORY
171+
#error "WebSocket support (ENABLE_MQTT_WEBSOCKET) is incompatible with " \
172+
"static memory mode (WOLFMQTT_STATIC_MEMORY). libwebsockets " \
173+
"requires dynamic allocation internally."
174+
#endif
170175
#ifndef BROKER_WS_RX_BUF_SZ
171176
#define BROKER_WS_RX_BUF_SZ BROKER_RX_BUF_SZ
172177
#endif

0 commit comments

Comments
 (0)