Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces the legacy subscription stack with a new context-first, option-configurable subscription client and transport layer (WS/SSE); removes legacy handlers/tests; adds protocol/transport/client packages and tests; and refactors resolver/subscription writer/updater APIs to an Error/Done synchronous model. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Resolver as Resolver
participant Factory as SubscriptionClientFactory
participant Client as SubscriptionClient
participant Transport as Transport (WS/SSE)
participant Protocol as Protocol (GQLTWS/GQLWS or SSE)
participant Updater as SubscriptionUpdater
App->>Resolver: Request subscription
Resolver->>Factory: NewSubscriptionClient(ctx, opts...)
Factory->>Client: returns client
Resolver->>Client: Subscribe(ctx, req, options, handler)
Client->>Transport: Subscribe(ctx, req, options, handler)
Transport->>Protocol: Dial/Init + Subscribe(conn,id,req)
Protocol-->>Transport: Message(Data/Error/Complete) / ConnectionError
Transport->>Client: deliver via handler
Client->>Updater: Error(data) / Update(data) / Done()
App->>Resolver: CompleteSubscription(id)
Resolver->>Client: cancel()
Client->>Transport: Unsubscribe(ctx, conn, id)
Transport->>Protocol: Unsubscribe(...)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
cbc71f0 to
a8c13ce
Compare
97535ff to
8e00408
Compare
d99382b to
a48f331
Compare
6d3d274 to
5a61bda
Compare
|
@claude review |
There was a problem hiding this comment.
Actionable comments posted: 12
🧹 Nitpick comments (12)
v2/pkg/engine/resolve/resolve.go (3)
698-732: Document lock precondition forhandleAddSubscription.The function modifies
r.subscriptionsByIDviaaddSubscriptionIndex, which requiresr.muto be held. While the callers do hold the lock, adding a comment would make this precondition explicit.📝 Suggested documentation
-// handleAddSubscription must be called with r.mu held. +// handleAddSubscription must be called with r.mu held. +// It modifies r.subscriptionsByID and r.subscriptionsByConnection. func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve.go` around lines 698 - 732, Add an explicit precondition comment to the top of the handleAddSubscription function documenting that r.mu must be held when calling it (since it mutates r.subscriptionsByID via addSubscriptionIndex); reference handleAddSubscription and addSubscriptionIndex in the comment. Keep it brief and in the function header (same style as the existing "// handleAddSubscription must be called with r.mu held.") so future maintainers see the lock requirement; optionally add a debug-mode runtime check/assert that panics or logs if r.mu is not held to catch violations early.
491-497: Consider adding a guard indone()against double-close.While the current code path ensures
done()is only called once per subscription (via theCompareAndSwapguard when creating finalizers), adding a defensive check would make this more robust against future refactoring.🛡️ Defensive guard suggestion
func (s *subscriptionState) done() { s.writeMu.Lock() defer s.writeMu.Unlock() + // Guard against accidental double-close (should not happen with current CAS-based finalization) + select { + case <-s.completed: + return + default: + } close(s.completed) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve.go` around lines 491 - 497, The done() method on subscriptionState calls close(s.completed) without guarding against a double-close; add a defensive guard to ensure completed is only closed once by checking that s.completed != nil (or using a select with a done flag) while holding s.writeMu, and set s.completed = nil after closing; update the subscriptionState.done() implementation to lock writeMu, verify completed is non-nil, close it, then nil it to prevent future double-close attempts.
564-593: Unlock ordering in error paths needs attention.In
executeSubscriptionUpdate, when the resolve step fails (line 571-582), the arena is released before unlockingwriteMu. This is fine, but the early return at lines 565-569 correctly releases the lock before returning. The flow is consistent.However, consider extracting the lock/unlock pattern into a helper or using
defermore consistently to reduce the risk of future modifications introducing unlock ordering bugs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve.go` around lines 564 - 593, The unlock/release ordering around sub.writeMu and r.resolveArenaPool.Release(resolveArena) in executeSubscriptionUpdate is fragile; change the function to centralize the lock/unlock semantics (e.g., wrap the critical section in a helper like withWriteLock(sub, func() { ... }) or acquire sub.writeMu and immediately defer sub.writeMu.Unlock() after validating removed) and similarly ensure resolveArena is released via a single deferred r.resolveArenaPool.Release(resolveArena) at the start of the resolve path so all early returns and error paths consistently release the arena and unlock the mutex (refer to sub.writeMu, sub.removed.Load(), r.resolveArenaPool.Release(resolveArena), r.asyncErrorWriter.WriteError and the resolve call).v2/pkg/engine/resolve/resolve_test.go (3)
5781-5781: Post-assertion sleeps are effectively no-op for these assertions.Both sleeps happen after completion/snapshot points and don’t improve determinism; they only add latency/flakiness surface.
♻️ Proposed cleanup
- time.Sleep(10 * time.Millisecond)Also applies to: 5818-5818
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve_test.go` at line 5781, Remove the redundant post-assertion sleeps — the time.Sleep(10 * time.Millisecond) calls in resolve_test.go (the instance at the shown diff and the one at the other noted location) occur after completion/snapshot points and do not improve determinism, they only add latency and flakiness; delete these sleep calls from the test (search for the exact symbol time.Sleep(10 * time.Millisecond) in resolve_test.go and remove those lines).
5950-5968: Test name is misleading (with close reason) but no reason behavior is validated.This currently duplicates the unsubscribe-client path without asserting any close-reason semantics. Consider renaming or adding explicit reason assertions if that behavior still exists.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve_test.go` around lines 5950 - 5968, The test "should stop stream on unsubscribe client with close reason" does not verify any close-reason semantics; update it to either rename the test to remove "with close reason" or add assertions to validate the close reason is propagated. Locate the test body (the t.Run block using createFakeStream, setup, AsyncResolveGraphQLSubscription, recorder, and resolver.UnsubscribeClient) and either change the test name string to "should stop stream on unsubscribe client" or, if close-reason behavior exists, capture the close message from recorder (or the fake stream) after calling resolver.UnsubscribeClient(id.ConnectionID) and assert the expected reason is present (use recorder.AwaitAnyMessageCount or equivalent to read the final message and assert its reason field).
5959-5962: PreferNewContext()over newContext{}literals in newly added test paths.Using
NewContext()keeps resolver context initialization consistent and avoids partial-context construction.Based on learnings: Ensure all Context instances in v2/pkg/engine/resolve are created with NewContext() instead of Context{} literals.🧩 Proposed fix
- ctx := Context{ - ctx: context.Background(), - } + ctx := *NewContext(context.Background())- subCtx := &Context{ - ctx: context.Background(), - ExecutionOptions: ExecutionOptions{ - SendHeartbeat: true, - }, - } + subCtx := NewContext(context.Background()) + subCtx.ExecutionOptions = ExecutionOptions{ + SendHeartbeat: true, + }Also applies to: 6055-6060, 6071-6071
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve_test.go` around lines 5959 - 5962, Replace literal Context struct initializations with the constructor NewContext to ensure full initialization: find places creating Context via a composite literal (e.g., Context{ ctx: context.Background() }) in resolve_test.go and change them to use NewContext(...) (or NewContext() if it takes no args) so the resolver context is consistently and fully initialized; update the occurrences around the recently added test paths (the Context literals reported in the comment) and any other similar instances in the same file (including the other two locations noted) to use NewContext instead.v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/common/message.go (1)
13-24: Clarify thatMessageTypeErroris the terminal protocol error frame.
IsTerminal()makes this end the stream, but the comment at Lines 16-18 reads like any GraphQL payload that containserrors. Subscriptionnextpayloads can legitimately carry partial errors without terminating, so the doc/name should make that distinction explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/common/message.go` around lines 13 - 24, The comment for MessageTypeError is ambiguous about whether it represents a terminal protocol error or just a GraphQL payload with errors; update the comment on MessageTypeError to explicitly state that it represents the terminal protocol-level error frame (distinct from GraphQL partial errors carried in MessageTypeData), and ensure the IsTerminal() method's behavior (returning true for MessageTypeError, MessageTypeComplete, MessageTypeConnectionError) is documented accordingly so readers know only protocol error frames (MessageTypeError/MessageTypeConnectionError) and completion signal termination while MessageTypeData may contain non-terminal GraphQL errors.execution/engine/execution_engine_test.go (1)
57-58: Use onectxfor both constructors inmustFactory.This helper now builds the factory and subscription client from separate background contexts. Reusing a single
ctxwill keep test lifecycle semantics aligned with the new context-first API.♻️ Small consistency cleanup
func mustFactory(t testing.TB, httpClient *http.Client) plan.PlannerFactory[graphql_datasource.Configuration] { t.Helper() + ctx := context.Background() - factory, err := graphql_datasource.NewFactory(context.Background(), httpClient, graphql_datasource.NewGraphQLSubscriptionClient(context.Background(), + factory, err := graphql_datasource.NewFactory(ctx, httpClient, graphql_datasource.NewGraphQLSubscriptionClient(ctx, graphql_datasource.WithUpgradeClient(httpClient), graphql_datasource.WithStreamingClient(httpClient))) require.NoError(t, err)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@execution/engine/execution_engine_test.go` around lines 57 - 58, In mustFactory, replace the two separate context.Background() calls with a single shared ctx variable and pass that ctx to both graphql_datasource.NewFactory and graphql_datasource.NewGraphQLSubscriptionClient so they share the same lifecycle; i.e., declare ctx := context.Background() and use ctx in the NewFactory call and inside the NewGraphQLSubscriptionClient invocation (the functions to update are graphql_datasource.NewFactory and graphql_datasource.NewGraphQLSubscriptionClient).v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)
8667-8669: Use semantic JSON comparison for GraphQL error payload assertions.These assertions currently depend on exact serialized JSON text. They can become flaky on harmless formatting/key-order differences. Prefer JSON-structure equality for stability.
Also applies to: 8772-8774
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go` around lines 8667 - 8669, The test currently compares GraphQL error payloads as exact JSON strings using assert.Equal on updater.errors[0]; change this to a semantic JSON comparison by parsing both the expected JSON and updater.errors[0] into generic structures (e.g., map[string]interface{} / interface{}) and comparing those (or use testify's assert.JSONEq) so key order/formatting won't flake; update the two failing assertions around updater.AwaitErrors / updater.errors and replace assert.Equal with the unmarshall + assert (or assert.JSONEq) plus require.NoError checks for the unmarshalling operations.execution/engine/config_factory_federation.go (1)
458-472: Consider consolidating the redundant switch branches.Both the
SubscriptionTypeGraphQLTransportWScase (lines 460-464) and thedefaultcase (lines 467-471) now execute identical code. If the protocol differentiation is no longer needed, the switch statement could be removed entirely. If it's intentional to keep the structure for future differentiation, a brief comment would help clarify.♻️ Suggested simplification
func (f *FederationEngineConfigFactory) subscriptionClient( httpClient *http.Client, streamingClient *http.Client, subscriptionType SubscriptionType, subscriptionClientFactory graphql_datasource.GraphQLSubscriptionClientFactory, ) (graphql_datasource.GraphQLSubscriptionClient, error) { - var graphqlSubscriptionClient graphql_datasource.GraphQLSubscriptionClient - switch subscriptionType { - case SubscriptionTypeGraphQLTransportWS: - graphqlSubscriptionClient = subscriptionClientFactory.NewSubscriptionClient( - f.engineCtx, - graphql_datasource.WithUpgradeClient(httpClient), - graphql_datasource.WithStreamingClient(streamingClient), - ) - default: - // for compatibility reasons we fall back to graphql-ws protocol - graphqlSubscriptionClient = subscriptionClientFactory.NewSubscriptionClient( - f.engineCtx, - graphql_datasource.WithUpgradeClient(httpClient), - graphql_datasource.WithStreamingClient(streamingClient), - ) - } + graphqlSubscriptionClient := subscriptionClientFactory.NewSubscriptionClient( + f.engineCtx, + graphql_datasource.WithUpgradeClient(httpClient), + graphql_datasource.WithStreamingClient(streamingClient), + ) ok := graphql_datasource.IsDefaultGraphQLSubscriptionClient(graphqlSubscriptionClient)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@execution/engine/config_factory_federation.go` around lines 458 - 472, The switch on subscriptionType contains identical branches; remove the switch and always instantiate graphqlSubscriptionClient by calling subscriptionClientFactory.NewSubscriptionClient with f.engineCtx, graphql_datasource.WithUpgradeClient(httpClient) and graphql_datasource.WithStreamingClient(streamingClient); if you want to keep the switch shape for future protocol differences instead, replace the duplicated branches with a single default assignment and add a short comment near subscriptionType/SubscriptionTypeGraphQLTransportWS explaining why the switch is retained for future use.v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/client_test.go (1)
282-293: Periodic messages lack subscription ID.The ticker sends
nextmessages without anidfield (lines 288-291). Per the graphql-transport-ws spec,nextmessages must include the subscription ID to route data to the correct subscriber. However, since this helper is only used in drain tests that don't rely on periodic messages (they use the per-subscription message sent in the subscribe handler at lines 268-274), this is functionally harmless but could cause confusion if the helper is reused.♻️ Consider removing the periodic sender or documenting its limitation
If the periodic ticker isn't needed for current tests, consider removing it to simplify the helper:
- // Send periodic messages until context cancelled - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - // Ignore write errors (connection may be closed) - _ = wsjson.Write(ctx, conn, map[string]any{ - "type": "next", - "payload": map[string]any{"data": map[string]any{"value": 1}}, - }) - } - } + <-ctx.Done()Alternatively, track active subscription IDs and include them in periodic messages.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/client_test.go` around lines 282 - 293, The periodic ticker loop sends "next" messages without an "id" which violates graphql-transport-ws; update the loop inside the goroutine that calls wsjson.Write to include the subscription id (e.g., add "id": id) or, if the id isn't in scope, maintain a small activeSubscriptions map when handling "subscribe" (in the subscribe handler that currently writes the per-subscription response) and use that id from the map in the periodic sender; alternatively remove the ticker entirely if periodic messages are unnecessary—modify the ticker block that calls wsjson.Write so every "next" payload includes the correct subscription id.v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_transport_test.go (1)
598-607: Consider documenting header precedence behavior.The
headerTransportusesmaps.Copywhich will overwrite any existing headers with the same key. If this is intentional (transport headers take precedence over request headers), consider adding a brief comment. If request headers should take precedence, the copy direction should be reversed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_transport_test.go` around lines 598 - 607, The headerTransport.RoundTrip currently calls maps.Copy(req.Header, t.headers) which overwrites request headers with transport headers; either add a brief comment on header precedence clarifying that transport headers intentionally take priority (e.g., "transport headers override request headers"), or if request headers should win, change the merge direction so request headers are preserved (i.e., merge transport headers into req.Header without overwriting existing keys). Update the headerTransport (and its RoundTrip method) accordingly and ensure the comment reflects the chosen precedence behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@v2/go.mod`:
- Line 75: The examples directory still directly imports go.uber.org/zap (the
zap import in the example's main function/initialization), which conflicts with
the v2 core's use of the abstractlogger abstraction; update the example's main
(the main function and any logger initialization/variable named like logger) to
obtain and use the abstractlogger API used across core (replace direct
zap.New/Config usage with the abstractlogger constructor/wrapper), or if you
intentionally want the example to remain independent, add a short comment in the
example explaining why a direct zap import is required and remove zap from
module indirect dependencies; ensure all references to zap.New, zap.Config, or
zap.SugarLogger in the example are replaced or documented accordingly.
In `@v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go`:
- Around line 8395-8396: The testSubscriptionUpdaterChan can block or panic when
multiple callbacks occur; update its implementation (methods Error() and Done())
to be resilient: make the errors channel non-blocking (e.g., increase the buffer
or use a non-blocking send/select in Error()) so multiple Error() calls don't
block, and make Done() idempotent (use a sync.Once, a closed-check, or
safe-close pattern) so calling Done() twice does not panic; locate these changes
in the testSubscriptionUpdaterChan type and its Error and Done methods to harden
tests against repeated callbacks.
In `@v2/pkg/engine/datasource/graphql_datasource/graphql_subscription_client.go`:
- Around line 68-72: WithLogger currently assigns cfg.Logger directly allowing
callers to set it to nil; update WithLogger (the SubscriptionClientOption
constructor for SubscriptionClientConfig) to check for nil before assignment
(similar to WithUpgradeClient and WithStreamingClient) so the default NoopLogger
isn't overwritten when WithLogger(nil) is called—only set cfg.Logger = log when
log != nil.
- Around line 32-42: defaultSubscriptionClientConfig() currently omits
initializing SubscriptionClientConfig.WriteTimeout and ReadLimit even though
WithWriteTimeout and WithReadLimit doc-comments state defaults; update
defaultSubscriptionClientConfig() to set WriteTimeout = 5 * time.Second and
ReadLimit = 1 * 1024 * 1024 (1MB) on the returned SubscriptionClientConfig so
the inner client receives the documented defaults (referencing the
SubscriptionClientConfig type and the WithWriteTimeout/WithReadLimit options).
In
`@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_conn_test.go`:
- Around line 89-115: The tests currently don't verify closeConn() behavior
because trackingCloser.Close doesn't forward to the wrapped io.PipeReader and
the idempotency case uses io.NopCloser; update the test helpers so
trackingCloser.Close delegates to the wrapped Closer (call the underlying Close)
and add/replace the nop closer in the idempotency subtest with a custom closer
that records/counts/closes and returns an error on a second Close to assert
deduplication; ensure tests use that counting/erroring closer (and adjust the
instances created for newSSEConnection and the pipe case) so readLoop unblocks
from the Close call and the second close would fail if not deduplicated, proving
both close and idempotency behaviors (apply same changes to the other occurrence
around lines 127-136).
- Around line 60-85: The test "stops on complete event" currently only checks
the messages slice length, which can miss a late third event; after receiving
msg2 (the complete) from receive(t, 1*time.Second) add a short no-more-messages
assertion (e.g., call receive with a small timeout like 50-100ms and assert it
returns nil or times out) to ensure newSSEConnection/conn.readLoop truly stopped
and the third "next" is not delivered; use the existing collectingHandler(),
newSSEConnection(resp, handler), conn.readLoop() and receive helper to perform
this additional negative check.
In
`@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_transport.go`:
- Around line 73-79: The SSE transport must always run the same cleanup when a
request fails or the read loop exits: create a single cleanup closure guarded by
a sync.Once (e.g., cleanup := func(){...} wrapped by var once sync.Once) that
calls requestCancel, removes the connection from t.conns (removeConn or
equivalent), and unregisters any context.AfterFunc registrations; use that
cleanup in the returned cancelFn, on every error return path in Do, and when
conn.readLoop() exits (instead of relying on callers to call cancelFn) so stale
callbacks and dead conn entries cannot remain; update all locations referenced
(requestCtx, requestCancel, context.AfterFunc registrations, cancelFn, Do,
conn.readLoop, t.conns/removeConn) to call once.Do(cleanup).
- Around line 103-111: The failure path currently calls io.ReadAll(resp.Body)
unbounded; change Subscribe in sseTransport to read with a cap using
io.LimitReader(resp.Body, <MAX_BYTES>) (choose a reasonable constant like 64KB
or 1MB), still closing resp.Body after reading, and treat/truncate the returned
body if it hits the limit so you don't allocate arbitrarily large memory for
remote error pages; update the error return to include the truncated snippet.
In
`@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/ws_transport_test.go`:
- Around line 413-421: The goroutine currently returns early on error so failing
Subscribe calls are ignored; change the subtest to capture subscription errors
and assert them after the concurrent group finishes by creating a buffered error
channel (e.g., errCh := make(chan error, 10)), have each goroutine send the
Subscribe error (or nil) into errCh instead of returning, then after wg.Wait
close(errCh) and iterate the channel to fail the test if any non-nil error is
present. Update the block around wg.Go, Subscribe, collectingHandler, cancel and
receive to write errors into errCh so Subscribe failures are asserted rather
than silently ignored.
In
`@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/ws_transport.go`:
- Around line 211-216: The ping loop currently uses defaultWriteTimeout when
calling conn.sendPing, ignoring any configured timeout set via WithWriteTimeout;
change the call to use the effective write timeout (e.g. pass conn.writeTimeout
or t.opts.writeTimeout instead of defaultWriteTimeout), falling back to
defaultWriteTimeout only if the configured value is zero/unset, so heartbeat
writes use the same deadline as the rest of the transport (look for the sendPing
call and the WithWriteTimeout/opts/writeTimeout field to update).
- Around line 203-208: The readLoop handling for websocket ping frames currently
replies with a pong but doesn't refresh liveness, so update the connection's
last-pong timestamp when a MessagePing (including legacy "ka" keep-alives) is
received: inside readLoop's PingMessage handling, after sending the pong (or
before), set/refresh conn.lastPongAt (or call any existing helper like
conn.touchPong()/conn.setLastPongAt(now)) so that
conn.pongOverdue(t.opts.pingTimeout) reflects the recent keep-alive;
alternatively, if you prefer legacy behavior, skip the PingTimeout/pongOverdue
check for legacy protocol connections, but ensure the change is applied where
readLoop processes MessagePing and references conn and lastPongAt/pongOverdue.
In `@v2/pkg/engine/resolve/resolve_test.go`:
- Around line 5341-5342: AwaitDone never observes completion because s.done is
never set, and Error(data []byte) stores the caller's slice by reference which
can be mutated later; fix by setting s.done.Store(true) when the recorder
reaches a terminal state (e.g., inside Error and any success/Done-like methods
that signify completion) so AwaitDone can return, and change Error(data []byte)
to append a copy of the input (e.g., store append([]byte(nil), data...)) into
errors [][]byte to avoid retaining a borrowed slice; update references to
s.done, AwaitDone, and Error(data []byte) accordingly.
---
Nitpick comments:
In `@execution/engine/config_factory_federation.go`:
- Around line 458-472: The switch on subscriptionType contains identical
branches; remove the switch and always instantiate graphqlSubscriptionClient by
calling subscriptionClientFactory.NewSubscriptionClient with f.engineCtx,
graphql_datasource.WithUpgradeClient(httpClient) and
graphql_datasource.WithStreamingClient(streamingClient); if you want to keep the
switch shape for future protocol differences instead, replace the duplicated
branches with a single default assignment and add a short comment near
subscriptionType/SubscriptionTypeGraphQLTransportWS explaining why the switch is
retained for future use.
In `@execution/engine/execution_engine_test.go`:
- Around line 57-58: In mustFactory, replace the two separate
context.Background() calls with a single shared ctx variable and pass that ctx
to both graphql_datasource.NewFactory and
graphql_datasource.NewGraphQLSubscriptionClient so they share the same
lifecycle; i.e., declare ctx := context.Background() and use ctx in the
NewFactory call and inside the NewGraphQLSubscriptionClient invocation (the
functions to update are graphql_datasource.NewFactory and
graphql_datasource.NewGraphQLSubscriptionClient).
In `@v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go`:
- Around line 8667-8669: The test currently compares GraphQL error payloads as
exact JSON strings using assert.Equal on updater.errors[0]; change this to a
semantic JSON comparison by parsing both the expected JSON and updater.errors[0]
into generic structures (e.g., map[string]interface{} / interface{}) and
comparing those (or use testify's assert.JSONEq) so key order/formatting won't
flake; update the two failing assertions around updater.AwaitErrors /
updater.errors and replace assert.Equal with the unmarshall + assert (or
assert.JSONEq) plus require.NoError checks for the unmarshalling operations.
In
`@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/client_test.go`:
- Around line 282-293: The periodic ticker loop sends "next" messages without an
"id" which violates graphql-transport-ws; update the loop inside the goroutine
that calls wsjson.Write to include the subscription id (e.g., add "id": id) or,
if the id isn't in scope, maintain a small activeSubscriptions map when handling
"subscribe" (in the subscribe handler that currently writes the per-subscription
response) and use that id from the map in the periodic sender; alternatively
remove the ticker entirely if periodic messages are unnecessary—modify the
ticker block that calls wsjson.Write so every "next" payload includes the
correct subscription id.
In
`@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/common/message.go`:
- Around line 13-24: The comment for MessageTypeError is ambiguous about whether
it represents a terminal protocol error or just a GraphQL payload with errors;
update the comment on MessageTypeError to explicitly state that it represents
the terminal protocol-level error frame (distinct from GraphQL partial errors
carried in MessageTypeData), and ensure the IsTerminal() method's behavior
(returning true for MessageTypeError, MessageTypeComplete,
MessageTypeConnectionError) is documented accordingly so readers know only
protocol error frames (MessageTypeError/MessageTypeConnectionError) and
completion signal termination while MessageTypeData may contain non-terminal
GraphQL errors.
In
`@v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_transport_test.go`:
- Around line 598-607: The headerTransport.RoundTrip currently calls
maps.Copy(req.Header, t.headers) which overwrites request headers with transport
headers; either add a brief comment on header precedence clarifying that
transport headers intentionally take priority (e.g., "transport headers override
request headers"), or if request headers should win, change the merge direction
so request headers are preserved (i.e., merge transport headers into req.Header
without overwriting existing keys). Update the headerTransport (and its
RoundTrip method) accordingly and ensure the comment reflects the chosen
precedence behavior.
In `@v2/pkg/engine/resolve/resolve_test.go`:
- Line 5781: Remove the redundant post-assertion sleeps — the time.Sleep(10 *
time.Millisecond) calls in resolve_test.go (the instance at the shown diff and
the one at the other noted location) occur after completion/snapshot points and
do not improve determinism, they only add latency and flakiness; delete these
sleep calls from the test (search for the exact symbol time.Sleep(10 *
time.Millisecond) in resolve_test.go and remove those lines).
- Around line 5950-5968: The test "should stop stream on unsubscribe client with
close reason" does not verify any close-reason semantics; update it to either
rename the test to remove "with close reason" or add assertions to validate the
close reason is propagated. Locate the test body (the t.Run block using
createFakeStream, setup, AsyncResolveGraphQLSubscription, recorder, and
resolver.UnsubscribeClient) and either change the test name string to "should
stop stream on unsubscribe client" or, if close-reason behavior exists, capture
the close message from recorder (or the fake stream) after calling
resolver.UnsubscribeClient(id.ConnectionID) and assert the expected reason is
present (use recorder.AwaitAnyMessageCount or equivalent to read the final
message and assert its reason field).
- Around line 5959-5962: Replace literal Context struct initializations with the
constructor NewContext to ensure full initialization: find places creating
Context via a composite literal (e.g., Context{ ctx: context.Background() }) in
resolve_test.go and change them to use NewContext(...) (or NewContext() if it
takes no args) so the resolver context is consistently and fully initialized;
update the occurrences around the recently added test paths (the Context
literals reported in the comment) and any other similar instances in the same
file (including the other two locations noted) to use NewContext instead.
In `@v2/pkg/engine/resolve/resolve.go`:
- Around line 698-732: Add an explicit precondition comment to the top of the
handleAddSubscription function documenting that r.mu must be held when calling
it (since it mutates r.subscriptionsByID via addSubscriptionIndex); reference
handleAddSubscription and addSubscriptionIndex in the comment. Keep it brief and
in the function header (same style as the existing "// handleAddSubscription
must be called with r.mu held.") so future maintainers see the lock requirement;
optionally add a debug-mode runtime check/assert that panics or logs if r.mu is
not held to catch violations early.
- Around line 491-497: The done() method on subscriptionState calls
close(s.completed) without guarding against a double-close; add a defensive
guard to ensure completed is only closed once by checking that s.completed !=
nil (or using a select with a done flag) while holding s.writeMu, and set
s.completed = nil after closing; update the subscriptionState.done()
implementation to lock writeMu, verify completed is non-nil, close it, then nil
it to prevent future double-close attempts.
- Around line 564-593: The unlock/release ordering around sub.writeMu and
r.resolveArenaPool.Release(resolveArena) in executeSubscriptionUpdate is
fragile; change the function to centralize the lock/unlock semantics (e.g., wrap
the critical section in a helper like withWriteLock(sub, func() { ... }) or
acquire sub.writeMu and immediately defer sub.writeMu.Unlock() after validating
removed) and similarly ensure resolveArena is released via a single deferred
r.resolveArenaPool.Release(resolveArena) at the start of the resolve path so all
early returns and error paths consistently release the arena and unlock the
mutex (refer to sub.writeMu, sub.removed.Load(),
r.resolveArenaPool.Release(resolveArena), r.asyncErrorWriter.WriteError and the
resolve call).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: cd013384-8670-412d-9e00-b5660d7f1356
⛔ Files ignored due to path filters (4)
examples/federation/go.sumis excluded by!**/*.sumexecution/go.sumis excluded by!**/*.sumgo.work.sumis excluded by!**/*.sumv2/go.sumis excluded by!**/*.sum
📒 Files selected for processing (46)
examples/federation/go.modexecution/engine/config_factory_federation.goexecution/engine/engine_config.goexecution/engine/engine_config_test.goexecution/engine/execution_engine_test.goexecution/go.modexecution/graphql/result_writer.goexecution/subscription/legacy_handler_test.goexecution/subscription/websocket/handler_test.gov2/go.modv2/pkg/engine/datasource/graphql_datasource/graphql_datasource.gov2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.gov2/pkg/engine/datasource/graphql_datasource/graphql_sse_handler.gov2/pkg/engine/datasource/graphql_datasource/graphql_sse_handler_test.gov2/pkg/engine/datasource/graphql_datasource/graphql_subscription_client.gov2/pkg/engine/datasource/graphql_datasource/graphql_subscription_client_test.gov2/pkg/engine/datasource/graphql_datasource/graphql_tws_handler.gov2/pkg/engine/datasource/graphql_datasource/graphql_tws_handler_test.gov2/pkg/engine/datasource/graphql_datasource/graphql_ws_handler.gov2/pkg/engine/datasource/graphql_datasource/graphql_ws_handler_test.gov2/pkg/engine/datasource/graphql_datasource/graphql_ws_proto_types.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/client.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/client_test.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/common/message.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/common/options.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/exports.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/protocol/graphql_transport_ws.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/protocol/graphql_transport_ws_test.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/protocol/graphql_ws.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/protocol/graphql_ws_test.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/protocol/protocol.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_conn.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_conn_test.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_transport.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/sse_transport_test.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/transport.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/transport_test.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/ws_conn.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/ws_conn_test.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/ws_transport.gov2/pkg/engine/datasource/graphql_datasource/subscriptionclient/transport/ws_transport_test.gov2/pkg/engine/resolve/datasource.gov2/pkg/engine/resolve/resolve.gov2/pkg/engine/resolve/resolve_test.gov2/pkg/engine/resolve/resolver_subscription_test.gov2/pkg/engine/resolve/response.go
💤 Files with no reviewable changes (8)
- v2/pkg/engine/resolve/datasource.go
- v2/pkg/engine/datasource/graphql_datasource/graphql_tws_handler_test.go
- v2/pkg/engine/datasource/graphql_datasource/graphql_sse_handler_test.go
- v2/pkg/engine/datasource/graphql_datasource/graphql_ws_handler.go
- v2/pkg/engine/datasource/graphql_datasource/graphql_sse_handler.go
- v2/pkg/engine/datasource/graphql_datasource/graphql_ws_proto_types.go
- v2/pkg/engine/datasource/graphql_datasource/graphql_tws_handler.go
- v2/pkg/engine/datasource/graphql_datasource/graphql_ws_handler_test.go
There was a problem hiding this comment.
Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit in Settings → Usage.
Once credits are available, comment @claude review on this pull request to trigger a review.
|
@claude review |
1 similar comment
|
@claude review |
5a61bda to
a5e12fb
Compare
ysmolski
left a comment
There was a problem hiding this comment.
This looks good to me, except for minor nits. Those two big tests need verifying - I did not have time to check them.
Noroth
left a comment
There was a problem hiding this comment.
Just some nits. Looking really good!
There was a problem hiding this comment.
Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit at claude.ai/admin-settings/claude-code.
Once credits are available, reopen this pull request to trigger a review.
| c.subsMu.RUnlock() | ||
|
|
||
| if !exists { | ||
| return |
There was a problem hiding this comment.
Tempted to say we should log a debug message here because it might be useful information for a user to know that their subgraph sent messages to with a subscription id unknown to us. On the other hand in rare occassions it could happen that we just deleted a subscriber from the map and received a message from upstream at the same time...
| } | ||
|
|
||
| // Split by newlines (normalize CR/LF) | ||
| for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' || r == '\r' }) { |
There was a problem hiding this comment.
Why not
for line := range bytes.Lines(msg) {
line = bytes.TrimRight(line, "\r\n")
// ...
}? Looks more intuitive to me
There was a problem hiding this comment.
SSE spec technically allows just \r as a line separator which is cursed. I'll go with bytes.Lines for now though
| streamingClient *http.Client | ||
| httpClient *http.Client | ||
| // SubscriptionClientConfig holds the subscription client configuration. | ||
| type SubscriptionClientConfig struct { |
There was a problem hiding this comment.
Can be an unexported type
There was a problem hiding this comment.
Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit at claude.ai/admin-settings/claude-code.
Once credits are available, reopen this pull request to trigger a review.
ysmolski
left a comment
There was a problem hiding this comment.
Gave it another go. LGTM from my side.
There was a problem hiding this comment.
Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit at claude.ai/admin-settings/claude-code.
Once credits are available, reopen this pull request to trigger a review.
could possibly cause issues with weird servers that split with just \r
Summary
Rewrites the subscription client from scratch with the following goals:
Changes
New
subscriptionclientpackagev2/pkg/engine/datasource/graphql_datasource/subscriptionclient/Separates protocols, transports, and client logic:
protocol/:graphql-wsandgraphql-transport-wsbehind a sharedProtocolinterfacetransport/: WebSocket and SSE behind a sharedTransportinterfaceclient.go: entry point that selects the appropriate transport and protocolgraphql_subscription_client.gois now a thin adapter over this package.Resolver simplification
v2/pkg/engine/resolve/The old clients used netpoll/event-loop patterns that required separate
AsyncSubscriptionDataSourceandSubscriptionDataSourceinterfaces. The new client is goroutine-based, so the two interfaces collapse into one.AsyncSubscriptionDataSourceis removed along with its channels and workers, replaced by mutex-guarded state and a single heartbeat loop.Close(kind)replaced withError(data)on response writerv2/pkg/engine/resolve/response.goThe resolver no longer closes downstream connections directly. It forwards failures as-is and lets the downstream handler decide what to do.
Summary by CodeRabbit
Chores
New Features
Refactor
Tests
Bug Fixes
Checklist