Skip to content

Commit eeae5f5

Browse files
committed
Progress with multiple clients and tracking topics and directing publish messages.
1 parent add06ed commit eeae5f5

6 files changed

Lines changed: 301 additions & 134 deletions

File tree

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
#!/usr/bin/env bash
2+
set -euo pipefail
3+
4+
HOST="localhost"
5+
PORT="1883"
6+
USER=""
7+
PASS=""
8+
NUM_CLIENTS="2"
9+
READY_TIMEOUT="30"
10+
RUN_SECS="5"
11+
KEEP_LOGS="${KEEP_LOGS:-0}"
12+
13+
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
14+
PUB_BIN="${ROOT_DIR}/examples/pub-sub/mqtt-pub"
15+
SUB_BIN="${ROOT_DIR}/examples/pub-sub/mqtt-sub"
16+
17+
usage() {
18+
cat <<EOF
19+
Usage: $(basename "$0") [-h host] [-p port] [-u user] [-w pass]
20+
[-n num_clients] [-t ready_timeout] [-R run_secs]
21+
22+
Runs clients in pairs. Each pair (P0, P1, P2, ...) has two clients:
23+
pair P client A: subscribes to "pair/P/a", publishes to "pair/P/b"
24+
pair P client B: subscribes to "pair/P/b", publishes to "pair/P/a"
25+
26+
Each subscriber verifies it received the message from its partner.
27+
28+
Options:
29+
-h host Broker host (default: localhost)
30+
-p port Broker port (default: 1883)
31+
-u user Username for auth
32+
-w pass Password for auth
33+
-n num_clients Total number of clients, must be even (default: 2)
34+
-t ready_timeout Max seconds to wait for subscribers to be ready (default: 30)
35+
-R run_secs Seconds to wait after publish for delivery (default: 5)
36+
37+
Environment:
38+
KEEP_LOGS=1 Preserve log directory after exit
39+
40+
Examples:
41+
$(basename "$0") -n 2 # 1 pair, 2 clients
42+
$(basename "$0") -n 100 # 50 pairs, 100 clients
43+
$(basename "$0") -n 200 -R 10 # 100 pairs, longer delivery wait
44+
EOF
45+
}
46+
47+
while getopts "h:p:u:w:n:t:R:?" opt; do
48+
case "${opt}" in
49+
h) HOST="${OPTARG}" ;;
50+
p) PORT="${OPTARG}" ;;
51+
u) USER="${OPTARG}" ;;
52+
w) PASS="${OPTARG}" ;;
53+
n) NUM_CLIENTS="${OPTARG}" ;;
54+
t) READY_TIMEOUT="${OPTARG}" ;;
55+
R) RUN_SECS="${OPTARG}" ;;
56+
?) usage; exit 0 ;;
57+
*) usage; exit 1 ;;
58+
esac
59+
done
60+
61+
# Validate num_clients is even and >= 2
62+
if (( NUM_CLIENTS < 2 || NUM_CLIENTS % 2 != 0 )); then
63+
echo "error: -n must be an even number >= 2 (got ${NUM_CLIENTS})"
64+
exit 1
65+
fi
66+
67+
NUM_PAIRS=$(( NUM_CLIENTS / 2 ))
68+
69+
if [[ ! -x "${PUB_BIN}" || ! -x "${SUB_BIN}" ]]; then
70+
echo "error: mqtt-pub or mqtt-sub not found. Build examples first."
71+
exit 1
72+
fi
73+
74+
TMP_DIR="$(mktemp -d)"
75+
PIDS=()
76+
77+
cleanup() {
78+
for pid in "${PIDS[@]:-}"; do
79+
kill "${pid}" 2>/dev/null || true
80+
done
81+
wait 2>/dev/null || true
82+
if [[ "${KEEP_LOGS}" == "1" ]]; then
83+
echo "Logs preserved in ${TMP_DIR}"
84+
else
85+
rm -rf "${TMP_DIR}"
86+
fi
87+
}
88+
trap cleanup EXIT
89+
90+
auth_args=()
91+
if [[ -n "${USER}" ]]; then auth_args+=(-u "${USER}"); fi
92+
if [[ -n "${PASS}" ]]; then auth_args+=(-w "${PASS}"); fi
93+
94+
# All subscriber log files (for readiness polling)
95+
SUB_LOGS=()
96+
97+
start_sub() {
98+
local client_id="$1"
99+
local topic="$2"
100+
local log="$3"
101+
# -T = test mode (disables STDIN capture so background processes work)
102+
# -d = debug output (needed to detect readiness via log polling)
103+
# -i = unique client ID
104+
# stdbuf -oL = line-buffered stdout so grep can detect readiness in logs
105+
stdbuf -oL "${SUB_BIN}" -T -h "${HOST}" -p "${PORT}" -i "${client_id}" \
106+
-n "${topic}" -q 0 -d "${auth_args[@]}" >"${log}" 2>&1 &
107+
PIDS+=("$!")
108+
SUB_LOGS+=("${log}")
109+
}
110+
111+
run_pub() {
112+
local client_id="$1"
113+
local topic="$2"
114+
local msg="$3"
115+
# -T = test mode (disables STDIN capture)
116+
# -i = unique client ID
117+
"${PUB_BIN}" -T -h "${HOST}" -p "${PORT}" -i "${client_id}" -n "${topic}" \
118+
-m "${msg}" -q 0 "${auth_args[@]}" >/dev/null 2>&1
119+
}
120+
121+
# Wait until all subscriber logs contain the ready marker
122+
wait_for_subscribers() {
123+
local total="${#SUB_LOGS[@]}"
124+
local elapsed=0
125+
while (( elapsed < READY_TIMEOUT )); do
126+
local ready_count=0
127+
for log in "${SUB_LOGS[@]}"; do
128+
if grep -q "MQTT Waiting for message" "${log}" 2>/dev/null; then
129+
(( ready_count++ )) || true
130+
fi
131+
done
132+
if (( ready_count == total )); then
133+
echo "All ${total} subscriber(s) ready after ${elapsed}s"
134+
return 0
135+
fi
136+
if (( elapsed > 0 && elapsed % 5 == 0 )); then
137+
echo " ... ${ready_count}/${total} subscribers ready (${elapsed}s elapsed)"
138+
fi
139+
sleep 1
140+
(( elapsed++ )) || true
141+
done
142+
echo "WARNING: Not all subscribers ready after ${READY_TIMEOUT}s (continuing anyway)"
143+
return 0
144+
}
145+
146+
echo "Starting ${NUM_CLIENTS} clients (${NUM_PAIRS} pairs)..."
147+
148+
# --- Start all subscribers ---
149+
for p in $(seq 0 $(( NUM_PAIRS - 1 ))); do
150+
start_sub "sub_${p}_a" "pair/${p}/a" "${TMP_DIR}/sub_${p}_a.log"
151+
start_sub "sub_${p}_b" "pair/${p}/b" "${TMP_DIR}/sub_${p}_b.log"
152+
done
153+
154+
echo "Waiting for subscribers to connect and subscribe..."
155+
wait_for_subscribers
156+
157+
# --- Publish: each client publishes to its partner's topic ---
158+
echo "Publishing ${NUM_CLIENTS} messages..."
159+
for p in $(seq 0 $(( NUM_PAIRS - 1 ))); do
160+
# Client A publishes to pair/P/b (B's topic)
161+
run_pub "pub_${p}_a" "pair/${p}/b" "hello_from_${p}_a"
162+
# Client B publishes to pair/P/a (A's topic)
163+
run_pub "pub_${p}_b" "pair/${p}/a" "hello_from_${p}_b"
164+
done
165+
166+
# --- Wait for message delivery ---
167+
echo "Waiting ${RUN_SECS}s for message delivery..."
168+
sleep "${RUN_SECS}"
169+
170+
# --- Check results ---
171+
PASS_COUNT=0
172+
FAIL_COUNT=0
173+
174+
check_result() {
175+
local description="$1"
176+
local pattern="$2"
177+
local log="$3"
178+
if grep -q "${pattern}" "${log}" 2>/dev/null; then
179+
(( PASS_COUNT++ )) || true
180+
else
181+
echo "FAIL: ${description}"
182+
(( FAIL_COUNT++ )) || true
183+
fi
184+
}
185+
186+
echo ""
187+
echo "== Results =="
188+
for p in $(seq 0 $(( NUM_PAIRS - 1 ))); do
189+
# A subscribed to pair/P/a, should have received hello_from_P_b
190+
check_result "pair ${p} A (sub pair/${p}/a) did not receive hello_from_${p}_b" \
191+
"hello_from_${p}_b" "${TMP_DIR}/sub_${p}_a.log"
192+
# B subscribed to pair/P/b, should have received hello_from_P_a
193+
check_result "pair ${p} B (sub pair/${p}/b) did not receive hello_from_${p}_a" \
194+
"hello_from_${p}_a" "${TMP_DIR}/sub_${p}_b.log"
195+
done
196+
197+
echo "${PASS_COUNT} passed, ${FAIL_COUNT} failed (${NUM_CLIENTS} clients, ${NUM_PAIRS} pairs)"
198+
199+
if (( FAIL_COUNT > 0 )); then
200+
# Preserve logs on failure regardless of KEEP_LOGS setting
201+
KEEP_LOGS=1
202+
exit 1
203+
fi
204+
exit 0

src/include.am

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ EXTRA_DIST +=
2323
if BUILD_BROKER
2424
bin_PROGRAMS += src/mqtt_broker
2525
src_mqtt_broker_SOURCES = src/mqtt_broker.c
26-
src_mqtt_broker_CFLAGS =
26+
src_mqtt_broker_CFLAGS = $(AM_CFLAGS)
27+
src_mqtt_broker_CPPFLAGS = $(AM_CPPFLAGS)
2728
src_mqtt_broker_LDFLAGS = -Lsrc
2829
src_mqtt_broker_LDADD = src/libwolfmqtt.la $(LTLIBEVENT) $(LIB_STATIC_ADD)
2930
src_mqtt_broker_DEPENDENCIES = src/libwolfmqtt.la

0 commit comments

Comments
 (0)