Skip to content

Commit 62b0e51

Browse files
committed
Fix SUBSCRIBE Fixed-Header Flags Do Not Trigger Malformed Close (#486)
1 parent 03667b4 commit 62b0e51

4 files changed

Lines changed: 358 additions & 16 deletions

File tree

src/mqtt_broker.c

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3228,15 +3228,16 @@ static int BrokerHandle_Publish(BrokerClient* bc, int rx_len,
32283228
return rc;
32293229
}
32303230

3231-
/* [MQTT-3.3.2-2] PUBLISH topic must not contain wildcard characters */
3231+
/* [MQTT-3.3.2-2] PUBLISH topic must not contain wildcard characters.
3232+
* Return MALFORMED_DATA so dispatch closes the connection. */
32323233
if (pub.topic_name && pub.topic_name_len > 0) {
32333234
word16 i;
32343235
for (i = 0; i < pub.topic_name_len; i++) {
32353236
if (pub.topic_name[i] == '+' || pub.topic_name[i] == '#') {
32363237
WBLOG_ERR(broker,
32373238
"broker: PUBLISH topic contains wildcard sock=%d",
32383239
(int)bc->sock);
3239-
return MQTT_CODE_ERROR_BAD_ARG;
3240+
return MQTT_CODE_ERROR_MALFORMED_DATA;
32403241
}
32413242
}
32423243
}
@@ -3450,6 +3451,28 @@ static int BrokerHandle_PublishRec(BrokerClient* bc, int rx_len)
34503451
return rc;
34513452
}
34523453

3454+
/* [MQTT-2.2.2-2] / [MQTT-3.8.1-1] etc.: a malformed packet MUST cause the
3455+
* server to close the network connection. Mirrors the read-failure close
3456+
* path: publish will, honor session persistence, then remove the client. */
3457+
static void BrokerClient_AbnormalClose(MqttBroker* broker, BrokerClient* bc)
3458+
{
3459+
BrokerClient_PublishWill(broker, bc);
3460+
if (bc->clean_session) {
3461+
BrokerSubs_RemoveClient(broker, bc);
3462+
}
3463+
else {
3464+
BrokerSubs_OrphanClient(broker, bc);
3465+
}
3466+
BrokerClient_Remove(broker, bc);
3467+
}
3468+
3469+
/* Decode-level errors that indicate a malformed packet on the wire. */
3470+
static int BrokerRcIsMalformed(int rc)
3471+
{
3472+
return (rc == MQTT_CODE_ERROR_MALFORMED_DATA ||
3473+
rc == MQTT_CODE_ERROR_PACKET_TYPE);
3474+
}
3475+
34533476
/* -------------------------------------------------------------------------- */
34543477
/* Per-client processing (called from Step) */
34553478
/* -------------------------------------------------------------------------- */
@@ -3511,15 +3534,7 @@ static int BrokerClient_Process(MqttBroker* broker, BrokerClient* bc)
35113534
}
35123535
else if (rc < 0) {
35133536
WBLOG_ERR(broker, "broker: read failed sock=%d rc=%d", (int)bc->sock, rc);
3514-
BrokerClient_PublishWill(broker, bc); /* abnormal disconnect */
3515-
/* Session persistence: keep subs if clean_session=0 */
3516-
if (bc->clean_session) {
3517-
BrokerSubs_RemoveClient(broker, bc);
3518-
}
3519-
else {
3520-
BrokerSubs_OrphanClient(broker, bc);
3521-
}
3522-
BrokerClient_Remove(broker, bc);
3537+
BrokerClient_AbnormalClose(broker, bc);
35233538
return 0;
35243539
}
35253540

@@ -3534,6 +3549,24 @@ static int BrokerClient_Process(MqttBroker* broker, BrokerClient* bc)
35343549
((BrokerWsCtx*)bc->ws_ctx)->processing = 1;
35353550
}
35363551
#endif
3552+
/* [MQTT-2.2.2-2] Reject malformed fixed-header reserved flags. The
3553+
* per-type decoders also enforce this (see MqttDecode_FixedHeader),
3554+
* but PUBACK / PUBCOMP / PINGREQ / DISCONNECT are not run through a
3555+
* decoder here, so the broker enforces it directly before dispatch. */
3556+
if (!MqttPacket_FixedHeaderFlagsValid(bc->rx_buf[0])) {
3557+
WBLOG_ERR(broker,
3558+
"broker: invalid fixed-header flags type=%u byte=0x%02X "
3559+
"sock=%d [MQTT-2.2.2-2]",
3560+
type, bc->rx_buf[0], (int)bc->sock);
3561+
if (bc->connected) {
3562+
BrokerClient_AbnormalClose(broker, bc);
3563+
}
3564+
else {
3565+
BrokerSubs_RemoveClient(broker, bc);
3566+
BrokerClient_Remove(broker, bc);
3567+
}
3568+
return 0;
3569+
}
35373570
/* [MQTT-3.1.0-1] First packet must be CONNECT */
35383571
if (type != MQTT_PACKET_TYPE_CONNECT && !bc->connected) {
35393572
WBLOG_ERR(broker,
@@ -3566,31 +3599,61 @@ static int BrokerClient_Process(MqttBroker* broker, BrokerClient* bc)
35663599
break;
35673600
}
35683601
case MQTT_PACKET_TYPE_PUBLISH:
3569-
(void)BrokerHandle_Publish(bc, rc, broker);
3602+
{
3603+
int p_rc = BrokerHandle_Publish(bc, rc, broker);
3604+
if (BrokerRcIsMalformed(p_rc)) {
3605+
BrokerClient_AbnormalClose(broker, bc);
3606+
return 0;
3607+
}
35703608
break;
3609+
}
35713610
case MQTT_PACKET_TYPE_PUBLISH_ACK:
35723611
/* QoS 1 ack from subscriber - delivery complete */
35733612
break;
35743613
case MQTT_PACKET_TYPE_PUBLISH_REC:
3614+
{
35753615
/* QoS 2 step 2: subscriber sends PUBREC, broker
35763616
* responds with PUBREL */
3577-
(void)BrokerHandle_PublishRec(bc, rc);
3617+
int p_rc = BrokerHandle_PublishRec(bc, rc);
3618+
if (BrokerRcIsMalformed(p_rc)) {
3619+
BrokerClient_AbnormalClose(broker, bc);
3620+
return 0;
3621+
}
35783622
break;
3623+
}
35793624
case MQTT_PACKET_TYPE_PUBLISH_REL:
3625+
{
35803626
/* QoS 2 step 3: publisher sends PUBREL, broker
35813627
* responds with PUBCOMP */
3582-
(void)BrokerHandle_PublishRel(bc, rc);
3628+
int p_rc = BrokerHandle_PublishRel(bc, rc);
3629+
if (BrokerRcIsMalformed(p_rc)) {
3630+
BrokerClient_AbnormalClose(broker, bc);
3631+
return 0;
3632+
}
35833633
break;
3634+
}
35843635
case MQTT_PACKET_TYPE_PUBLISH_COMP:
35853636
/* QoS 2 step 4: subscriber sends PUBCOMP - delivery
35863637
* complete */
35873638
break;
35883639
case MQTT_PACKET_TYPE_SUBSCRIBE:
3589-
(void)BrokerHandle_Subscribe(bc, rc, broker);
3640+
{
3641+
int s_rc = BrokerHandle_Subscribe(bc, rc, broker);
3642+
if (BrokerRcIsMalformed(s_rc)) {
3643+
BrokerClient_AbnormalClose(broker, bc);
3644+
return 0;
3645+
}
35903646
break;
3647+
}
35913648
case MQTT_PACKET_TYPE_UNSUBSCRIBE:
3592-
(void)BrokerHandle_Unsubscribe(bc, rc, broker);
3649+
{
3650+
int u_rc = BrokerHandle_Unsubscribe(bc, rc, broker);
3651+
if (BrokerRcIsMalformed(u_rc)) {
3652+
BrokerClient_AbnormalClose(broker, bc);
3653+
return 0;
3654+
}
35933655
break;
3656+
}
35943657
case MQTT_PACKET_TYPE_PING_REQ:
35953658
(void)BrokerSend_PingResp(bc);
35963659
break;

src/mqtt_packet.c

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,59 @@ static int MqttEncode_FixedHeader(byte *tx_buf, int tx_buf_len, int remain_len,
182182
return header_len;
183183
}
184184

185+
/* [MQTT-2.2.2-1] Required fixed-header reserved-flag values per packet type.
186+
* PUBLISH (type 3) carries DUP/QoS/RETAIN and is validated separately. */
187+
static int FixedHeaderFlagsExpected(byte type, byte *expected)
188+
{
189+
switch (type) {
190+
case MQTT_PACKET_TYPE_CONNECT:
191+
case MQTT_PACKET_TYPE_CONNECT_ACK:
192+
case MQTT_PACKET_TYPE_PUBLISH_ACK:
193+
case MQTT_PACKET_TYPE_PUBLISH_REC:
194+
case MQTT_PACKET_TYPE_PUBLISH_COMP:
195+
case MQTT_PACKET_TYPE_SUBSCRIBE_ACK:
196+
case MQTT_PACKET_TYPE_UNSUBSCRIBE_ACK:
197+
case MQTT_PACKET_TYPE_PING_REQ:
198+
case MQTT_PACKET_TYPE_PING_RESP:
199+
case MQTT_PACKET_TYPE_DISCONNECT:
200+
case MQTT_PACKET_TYPE_AUTH:
201+
*expected = 0x0;
202+
return 1;
203+
case MQTT_PACKET_TYPE_PUBLISH_REL:
204+
case MQTT_PACKET_TYPE_SUBSCRIBE:
205+
case MQTT_PACKET_TYPE_UNSUBSCRIBE:
206+
*expected = 0x2;
207+
return 1;
208+
default:
209+
return 0;
210+
}
211+
}
212+
213+
int MqttPacket_FixedHeaderFlagsValid(byte type_flags)
214+
{
215+
byte type = (byte)MQTT_PACKET_TYPE_GET(type_flags);
216+
byte flags = (byte)MQTT_PACKET_FLAGS_GET(type_flags);
217+
byte expected;
218+
219+
if (type == MQTT_PACKET_TYPE_PUBLISH) {
220+
byte qos = (byte)MQTT_PACKET_FLAGS_GET_QOS(type_flags);
221+
byte dup = (flags & MQTT_PACKET_FLAG_DUPLICATE) ? 1 : 0;
222+
if (qos > MQTT_QOS_2) {
223+
return 0;
224+
}
225+
if (qos == MQTT_QOS_0 && dup) {
226+
return 0;
227+
}
228+
return 1;
229+
}
230+
if (FixedHeaderFlagsExpected(type, &expected)) {
231+
return (flags == expected) ? 1 : 0;
232+
}
233+
/* Unknown/reserved type: this helper validates the flag nibble only.
234+
* Callers are responsible for rejecting unknown packet types. */
235+
return 1;
236+
}
237+
185238
static int MqttDecode_FixedHeader(byte *rx_buf, int rx_buf_len, int *remain_len,
186239
byte type, MqttQoS *p_qos, byte *p_retain, byte *p_duplicate)
187240
{
@@ -199,6 +252,11 @@ static int MqttDecode_FixedHeader(byte *rx_buf, int rx_buf_len, int *remain_len,
199252
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_PACKET_TYPE);
200253
}
201254

255+
/* [MQTT-2.2.2-2] Reject invalid fixed-header reserved flags. */
256+
if (!MqttPacket_FixedHeaderFlagsValid(header->type_flags)) {
257+
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_MALFORMED_DATA);
258+
}
259+
202260
/* Extract header flags */
203261
if (p_qos) {
204262
*p_qos = (MqttQoS)MQTT_PACKET_FLAGS_GET_QOS(header->type_flags);

0 commit comments

Comments
 (0)