feat(Push): add Appwrite Push (MQTT 5) adapter#129
Conversation
Greptile SummaryAdds a new Appwrite Push adapter that publishes MQTT 5 notifications to a self-hosted broker over a pipelined PUBLISH/PUBACK loop, backed by a pure-PHP MQTT 5 codec with no external dependencies.
Confidence Score: 3/5The adapter and codec carry several unfixed correctness gaps from prior review rounds; the core happy path works but the code is not ready to merge as-is. Three issues flagged in earlier review rounds remain unaddressed: the read buffer is never reset between connections, endpoint whitespace is stripped only on the right, and the receive-maximum window shrinks permanently across sends. An additional previously-noted issue in the MQTT codec causes any unrecognised property ID to silently discard all remaining properties in the same packet. The two new findings here — null returns from readVariableByteInteger going unchecked in all four parse methods, and parse_url returning false for malformed URLs with '://' — add to the overall picture of the parser being fragile against non-conforming broker responses. src/Utopia/Messaging/Adapter/Push/Appwrite.php and src/Utopia/Messaging/Helpers/MQTT.php both need attention before merge. Important Files Changed
Reviews (2): Last reviewed commit: "feat(Push): add Appwrite Push (MQTT 5) a..." | Re-trigger Greptile |
| } | ||
|
|
||
| public function getMaxMessagesPerRequest(): int | ||
| { |
There was a problem hiding this comment.
readBuffer not cleared between process() calls
$this->readBuffer is never reset at the start of each connection. If the adapter instance is reused (e.g., send() is called twice), or if the broker sends an extra packet after the last PUBACK (e.g., a PINGREQ that landed in the buffer just before disconnect), that residual data persists into the next call. On the next invocation readPacket() would immediately return the leftover packet as if it were the new connection's CONNACK, causing handshake() to throw "Broker did not respond with CONNACK" even on a healthy connection.
Add $this->readBuffer = ''; at the start of connect() or at the top of process() to isolate each connection's read state.
|
|
||
| private function resolveEndpoint(): string | ||
| { | ||
| $endpoint = \rtrim($this->endpoint); |
There was a problem hiding this comment.
rtrim strips only trailing whitespace, so a leading space in the configured endpoint (e.g., " broker.example.com") would produce a malformed URL like tls:// broker.example.com:8883 that stream_socket_client rejects. Use trim to strip both ends.
| $endpoint = \rtrim($this->endpoint); | |
| $endpoint = \trim($this->endpoint); |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| $packet = MQTT::decodePacket($this->readBuffer); | ||
| if ($packet !== null) { | ||
| return $packet; | ||
| } |
There was a problem hiding this comment.
receiveMaximum decreases monotonically across process() calls
$this->receiveMaximum is instance state that is only ever updated via min() in handshake(). If the adapter is reused across multiple send() calls and the broker advertises a low receiveMaximum (say 10) on the first call, subsequent connections — even to a different broker endpoint — will be throttled to that minimum permanently for the lifetime of the object. Resetting it to the class-default (or to 65535) at the start of each connect() would make each connection's window independent.
Based on PR utopia-php#122 by abnegate. Adds Appwrite Push - a self-hosted MQTT 5 based push notification adapter with minimal MQTT 5 control-packet codec.
7e1b5cd to
2dc61de
Compare
Port of PR #122 by abnegate.
Adds Appwrite Push - a self-hosted, low-power alternative to FCM/APNS that publishes notifications over MQTT 5 to per-device topics.
Changes: