Skip to content

Commit 9e75822

Browse files
committed
wolfMQTT broker support for wolfIP
1 parent 33f4fa2 commit 9e75822

2 files changed

Lines changed: 208 additions & 17 deletions

File tree

src/mqtt_broker.c

Lines changed: 196 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
#ifdef WOLFMQTT_BROKER
3737

3838
/* -------------------------------------------------------------------------- */
39-
/* Platform includes - only for default POSIX backend */
39+
/* Platform includes */
4040
/* -------------------------------------------------------------------------- */
41-
#ifndef WOLFMQTT_BROKER_CUSTOM_NET
41+
#if defined(WOLFMQTT_WOLFIP)
42+
#include "wolfip.h"
43+
#elif !defined(WOLFMQTT_BROKER_CUSTOM_NET)
4244
#include <errno.h>
4345
#include <arpa/inet.h>
4446
#include <fcntl.h>
@@ -47,21 +49,28 @@
4749
#include <sys/socket.h>
4850
#include <time.h>
4951
#include <unistd.h>
50-
#endif /* !WOLFMQTT_BROKER_CUSTOM_NET */
52+
#endif
5153

5254
/* -------------------------------------------------------------------------- */
5355
/* Default time abstraction */
5456
/* -------------------------------------------------------------------------- */
5557
#ifndef WOLFMQTT_BROKER_GET_TIME_S
56-
#define WOLFMQTT_BROKER_GET_TIME_S() \
57-
((WOLFMQTT_BROKER_TIME_T)time(NULL))
58+
#if defined(WOLFMQTT_WOLFIP)
59+
/* wolfIP: override WOLFMQTT_BROKER_GET_TIME_S in user_settings.h */
60+
#define WOLFMQTT_BROKER_GET_TIME_S() ((WOLFMQTT_BROKER_TIME_T)0)
61+
#else
62+
#define WOLFMQTT_BROKER_GET_TIME_S() \
63+
((WOLFMQTT_BROKER_TIME_T)time(NULL))
64+
#endif
5865
#endif
5966

6067
/* -------------------------------------------------------------------------- */
6168
/* Default sleep abstraction */
6269
/* -------------------------------------------------------------------------- */
6370
#ifndef BROKER_SLEEP_MS
64-
#ifdef USE_WINDOWS_API
71+
#if defined(WOLFMQTT_WOLFIP)
72+
#define BROKER_SLEEP_MS(ms) /* no-op: Step() returns to caller */
73+
#elif defined(USE_WINDOWS_API)
6574
#define BROKER_SLEEP_MS(ms) Sleep(ms)
6675
#else
6776
#define BROKER_SLEEP_MS(ms) usleep((unsigned)(ms) * 1000)
@@ -170,10 +179,153 @@ static void BrokerStore_String(char** dst_ptr,
170179
BrokerStore_String(&(dst), src, len)
171180
#endif
172181

182+
/* -------------------------------------------------------------------------- */
183+
/* wolfIP network backend */
184+
/* -------------------------------------------------------------------------- */
185+
#if defined(WOLFMQTT_WOLFIP)
186+
187+
/* Context passed through MqttBrokerNet.ctx */
188+
#ifndef WOLFMQTT_WOLFIP_CTX_DEFINED
189+
#define WOLFMQTT_WOLFIP_CTX_DEFINED
190+
typedef struct BrokerWolfIP_Ctx {
191+
struct wolfIP *stack;
192+
} BrokerWolfIP_Ctx;
193+
#endif
194+
195+
static BrokerWolfIP_Ctx broker_wolfip_ctx;
196+
197+
static int BrokerWolfIP_Listen(void* ctx, BROKER_SOCKET_T* sock,
198+
word16 port, int backlog)
199+
{
200+
BrokerWolfIP_Ctx* wctx = (BrokerWolfIP_Ctx*)ctx;
201+
struct wolfIP_sockaddr_in addr;
202+
BROKER_SOCKET_T fd;
203+
204+
if (wctx == NULL || wctx->stack == NULL || sock == NULL) {
205+
return MQTT_CODE_ERROR_BAD_ARG;
206+
}
207+
208+
fd = wolfIP_sock_socket(wctx->stack, AF_INET, IPSTACK_SOCK_STREAM, 0);
209+
if (fd < 0) {
210+
return MQTT_CODE_ERROR_NETWORK;
211+
}
212+
213+
XMEMSET(&addr, 0, sizeof(addr));
214+
addr.sin_family = AF_INET;
215+
addr.sin_port = ee16(port);
216+
addr.sin_addr.s_addr = 0; /* INADDR_ANY */
217+
218+
if (wolfIP_sock_bind(wctx->stack, fd,
219+
(struct wolfIP_sockaddr*)&addr, sizeof(addr)) < 0) {
220+
wolfIP_sock_close(wctx->stack, fd);
221+
return MQTT_CODE_ERROR_NETWORK;
222+
}
223+
if (wolfIP_sock_listen(wctx->stack, fd, backlog) < 0) {
224+
wolfIP_sock_close(wctx->stack, fd);
225+
return MQTT_CODE_ERROR_NETWORK;
226+
}
227+
228+
*sock = fd;
229+
return MQTT_CODE_SUCCESS;
230+
}
231+
232+
static int BrokerWolfIP_Accept(void* ctx, BROKER_SOCKET_T listen_sock,
233+
BROKER_SOCKET_T* client_sock)
234+
{
235+
BrokerWolfIP_Ctx* wctx = (BrokerWolfIP_Ctx*)ctx;
236+
BROKER_SOCKET_T fd;
237+
238+
if (wctx == NULL || wctx->stack == NULL || client_sock == NULL) {
239+
return MQTT_CODE_ERROR_BAD_ARG;
240+
}
241+
242+
fd = wolfIP_sock_accept(wctx->stack, listen_sock, NULL, NULL);
243+
if (fd < 0) {
244+
/* EAGAIN / no pending connection */
245+
return MQTT_CODE_CONTINUE;
246+
}
247+
248+
*client_sock = fd;
249+
return MQTT_CODE_SUCCESS;
250+
}
251+
252+
static int BrokerWolfIP_Read(void* ctx, BROKER_SOCKET_T sock,
253+
byte* buf, int buf_len, int timeout_ms)
254+
{
255+
BrokerWolfIP_Ctx* wctx = (BrokerWolfIP_Ctx*)ctx;
256+
int rc;
257+
(void)timeout_ms;
258+
259+
if (wctx == NULL || wctx->stack == NULL || buf == NULL || buf_len <= 0) {
260+
return MQTT_CODE_ERROR_BAD_ARG;
261+
}
262+
263+
rc = wolfIP_sock_recv(wctx->stack, sock, buf, (size_t)buf_len, 0);
264+
if (rc < 0) {
265+
/* EAGAIN / would block */
266+
return MQTT_CODE_CONTINUE;
267+
}
268+
if (rc == 0) {
269+
return MQTT_CODE_ERROR_NETWORK; /* Connection closed */
270+
}
271+
return rc;
272+
}
273+
274+
static int BrokerWolfIP_Write(void* ctx, BROKER_SOCKET_T sock,
275+
const byte* buf, int buf_len, int timeout_ms)
276+
{
277+
BrokerWolfIP_Ctx* wctx = (BrokerWolfIP_Ctx*)ctx;
278+
int rc;
279+
(void)timeout_ms;
280+
281+
if (wctx == NULL || wctx->stack == NULL || buf == NULL || buf_len <= 0) {
282+
return MQTT_CODE_ERROR_BAD_ARG;
283+
}
284+
285+
rc = wolfIP_sock_send(wctx->stack, sock, buf, (size_t)buf_len, 0);
286+
if (rc < 0) {
287+
/* EAGAIN / would block */
288+
return MQTT_CODE_CONTINUE;
289+
}
290+
if (rc == 0) {
291+
return MQTT_CODE_ERROR_NETWORK;
292+
}
293+
return rc;
294+
}
295+
296+
static int BrokerWolfIP_Close(void* ctx, BROKER_SOCKET_T sock)
297+
{
298+
BrokerWolfIP_Ctx* wctx = (BrokerWolfIP_Ctx*)ctx;
299+
300+
if (wctx != NULL && wctx->stack != NULL &&
301+
sock != BROKER_SOCKET_INVALID) {
302+
wolfIP_sock_close(wctx->stack, sock);
303+
}
304+
return MQTT_CODE_SUCCESS;
305+
}
306+
307+
int MqttBrokerNet_wolfIP_Init(MqttBrokerNet* net, void* wolfIP_stack)
308+
{
309+
if (net == NULL || wolfIP_stack == NULL) {
310+
return MQTT_CODE_ERROR_BAD_ARG;
311+
}
312+
XMEMSET(net, 0, sizeof(*net));
313+
XMEMSET(&broker_wolfip_ctx, 0, sizeof(broker_wolfip_ctx));
314+
broker_wolfip_ctx.stack = (struct wolfIP*)wolfIP_stack;
315+
316+
net->listen = BrokerWolfIP_Listen;
317+
net->accept = BrokerWolfIP_Accept;
318+
net->read = BrokerWolfIP_Read;
319+
net->write = BrokerWolfIP_Write;
320+
net->close = BrokerWolfIP_Close;
321+
net->ctx = &broker_wolfip_ctx;
322+
return MQTT_CODE_SUCCESS;
323+
}
324+
173325
/* -------------------------------------------------------------------------- */
174326
/* Default POSIX network backend */
175327
/* -------------------------------------------------------------------------- */
176-
#ifndef WOLFMQTT_BROKER_CUSTOM_NET
328+
#elif !defined(WOLFMQTT_BROKER_CUSTOM_NET)
177329

178330
static int BrokerPosix_SetNonBlocking(BROKER_SOCKET_T fd)
179331
{
@@ -456,7 +608,7 @@ static void BrokerTls_Free(MqttBroker* broker)
456608
}
457609
#endif /* ENABLE_MQTT_TLS */
458610

459-
#endif /* !WOLFMQTT_BROKER_CUSTOM_NET */
611+
#endif /* WOLFMQTT_WOLFIP / !WOLFMQTT_BROKER_CUSTOM_NET */
460612

461613
/* -------------------------------------------------------------------------- */
462614
/* WebSocket server support (libwebsockets) */
@@ -3233,7 +3385,7 @@ int MqttBroker_Init(MqttBroker* broker, MqttBrokerNet* net)
32333385
broker->log_level = BROKER_LOG_LEVEL_DEFAULT;
32343386
broker->next_packet_id = 1;
32353387

3236-
#ifndef WOLFMQTT_BROKER_CUSTOM_NET
3388+
#if !defined(WOLFMQTT_WOLFIP) && !defined(WOLFMQTT_BROKER_CUSTOM_NET)
32373389
/* For the default POSIX backend, the net callbacks expect ctx to be a
32383390
* MqttBroker* for logging via WBLOG_*. If no context was provided,
32393391
* default to using this broker instance to avoid NULL-dereference. */
@@ -3381,7 +3533,7 @@ int MqttBroker_Step(MqttBroker* broker)
33813533
return activity ? MQTT_CODE_SUCCESS : MQTT_CODE_CONTINUE;
33823534
}
33833535

3384-
int MqttBroker_Run(MqttBroker* broker)
3536+
int MqttBroker_Start(MqttBroker* broker)
33853537
{
33863538
int rc;
33873539

@@ -3475,6 +3627,18 @@ int MqttBroker_Run(MqttBroker* broker)
34753627
#endif
34763628

34773629
broker->running = 1;
3630+
return MQTT_CODE_SUCCESS;
3631+
}
3632+
3633+
int MqttBroker_Run(MqttBroker* broker)
3634+
{
3635+
int rc;
3636+
3637+
rc = MqttBroker_Start(broker);
3638+
if (rc != MQTT_CODE_SUCCESS) {
3639+
return rc;
3640+
}
3641+
34783642
while (broker->running) {
34793643
rc = MqttBroker_Step(broker);
34803644
if (rc == MQTT_CODE_CONTINUE) {
@@ -3526,8 +3690,17 @@ int MqttBroker_Free(MqttBroker* broker)
35263690
BrokerPendingWill_FreeAll(broker);
35273691
BrokerRetained_FreeAll(broker);
35283692

3529-
#if defined(ENABLE_MQTT_TLS) && !defined(WOLFMQTT_BROKER_CUSTOM_NET)
3530-
BrokerTls_Free(broker);
3693+
#ifdef ENABLE_MQTT_TLS
3694+
if (broker->tls_ctx != NULL) {
3695+
#if !defined(WOLFMQTT_WOLFIP) && !defined(WOLFMQTT_BROKER_CUSTOM_NET)
3696+
BrokerTls_Free(broker);
3697+
#else
3698+
/* Application-provided TLS context: free ctx but skip
3699+
* wolfSSL_Cleanup() since wolfSSL may be shared */
3700+
wolfSSL_CTX_free(broker->tls_ctx);
3701+
broker->tls_ctx = NULL;
3702+
#endif
3703+
}
35313704
#endif
35323705

35333706
/* Close listen sockets */
@@ -3605,7 +3778,8 @@ static void BrokerUsage(const char* prog)
36053778

36063779
static MqttBroker* g_broker = NULL;
36073780

3608-
#if !defined(WOLFMQTT_BROKER_CUSTOM_NET) && !defined(NO_MAIN_DRIVER)
3781+
#if !defined(WOLFMQTT_WOLFIP) && !defined(WOLFMQTT_BROKER_CUSTOM_NET) && \
3782+
!defined(NO_MAIN_DRIVER)
36093783
#include <signal.h>
36103784
static void broker_signal_handler(int signo)
36113785
{
@@ -3628,7 +3802,11 @@ int wolfmqtt_broker(int argc, char** argv)
36283802
setvbuf(stdout, NULL, _IONBF, 0);
36293803
#endif
36303804

3631-
#ifndef WOLFMQTT_BROKER_CUSTOM_NET
3805+
#if defined(WOLFMQTT_WOLFIP)
3806+
XMEMSET(&net, 0, sizeof(net));
3807+
PRINTF("broker: use MqttBrokerNet_wolfIP_Init() for wolfIP");
3808+
return MQTT_CODE_ERROR_BAD_ARG;
3809+
#elif !defined(WOLFMQTT_BROKER_CUSTOM_NET)
36323810
rc = MqttBrokerNet_Init(&net);
36333811
if (rc != MQTT_CODE_SUCCESS) {
36343812
return rc;
@@ -3696,15 +3874,17 @@ int wolfmqtt_broker(int argc, char** argv)
36963874
}
36973875
}
36983876

3699-
#if !defined(WOLFMQTT_BROKER_CUSTOM_NET) && !defined(NO_MAIN_DRIVER)
3877+
#if !defined(WOLFMQTT_WOLFIP) && !defined(WOLFMQTT_BROKER_CUSTOM_NET) && \
3878+
!defined(NO_MAIN_DRIVER)
37003879
g_broker = &broker;
37013880
signal(SIGINT, broker_signal_handler);
37023881
signal(SIGTERM, broker_signal_handler);
37033882
#endif
37043883

37053884
rc = MqttBroker_Run(&broker);
37063885

3707-
#if !defined(WOLFMQTT_BROKER_CUSTOM_NET) && !defined(NO_MAIN_DRIVER)
3886+
#if !defined(WOLFMQTT_WOLFIP) && !defined(WOLFMQTT_BROKER_CUSTOM_NET) && \
3887+
!defined(NO_MAIN_DRIVER)
37083888
g_broker = NULL;
37093889
#endif
37103890

wolfmqtt/mqtt_broker.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,20 @@ WOLFMQTT_API int MqttBroker_Stop(MqttBroker* broker);
376376
/* Clean up broker resources */
377377
WOLFMQTT_API int MqttBroker_Free(MqttBroker* broker);
378378

379+
/* Start the broker (listen + TLS init). Call once before MqttBroker_Step().
380+
* For embedded systems that use a cooperative main loop with Step(). */
381+
WOLFMQTT_API int MqttBroker_Start(MqttBroker* broker);
382+
383+
/* wolfIP backend initializer.
384+
* wolfIP_stack is a (struct wolfIP*) pointer to the wolfIP stack instance. */
385+
#ifdef WOLFMQTT_WOLFIP
386+
WOLFMQTT_API int MqttBrokerNet_wolfIP_Init(MqttBrokerNet* net,
387+
void* wolfIP_stack);
388+
#endif
389+
379390
/* Default POSIX backend initializer.
380391
* Only available when WOLFMQTT_BROKER_CUSTOM_NET is NOT defined. */
381-
#ifndef WOLFMQTT_BROKER_CUSTOM_NET
392+
#if !defined(WOLFMQTT_WOLFIP) && !defined(WOLFMQTT_BROKER_CUSTOM_NET)
382393
WOLFMQTT_API int MqttBrokerNet_Init(MqttBrokerNet* net);
383394
#endif
384395

0 commit comments

Comments
 (0)