fix: use map lookup to find subscription instead of looping#1441
fix: use map lookup to find subscription instead of looping#1441
Conversation
📝 WalkthroughWalkthroughAdds a per-subscription context lookup map ( Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
v2/pkg/engine/resolve/resolve.go (1)
997-1011:⚠️ Potential issue | 🟠 MajorKeep a compatibility fallback when the
subCtxfast path misses.The direct lookup only works for the original
add.ctxpointer stored intrig.subscriptions, butStart/AsyncStartstill hands datasource implementations a cloned context. If a caller feeds that start context back intoUpdateSubscription, this!okpath silently drops the update. Please keep the fast path, but fall back to the previousSubscriptionIdentifierscan and verify the ID so existing implementations do not break quietly.🛠 Possible fix
func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier, subCtx *Context) { trig, ok := r.triggers[id] if !ok { return } @@ s, ok := trig.subscriptions[subCtx] if !ok { - return + for ctx, candidate := range trig.subscriptions { + if candidate.id == subIdentifier { + subCtx, s, ok = ctx, candidate, true + break + } + } + if !ok { + return + } } + if s.id != subIdentifier { + return + } r.sendUpdateToSubscription(data, subCtx, s) }Also applies to: 1470-1485, 1601-1602
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve.go` around lines 997 - 1011, The fast-path lookup in Resolver.handleUpdateSubscription currently checks trig.subscriptions[subCtx] and returns on miss, which drops updates when callers pass a cloned start context; keep that fast path but add a fallback that iterates trig.subscriptions (range over trig.subscriptions map) and compares each stored SubscriptionIdentifier to the provided subIdentifier (verify ConnectionID and SubscriptionID) and the subscription's Context identity as needed, then call r.sendUpdateToSubscription(data, matchedCtx, matchedSubscription) for the found entry; apply the same fallback logic to the other similar handlers referenced (near lines 1470-1485 and 1601-1602) to preserve compatibility with Start/AsyncStart cloned contexts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@v2/pkg/engine/resolve/resolve.go`:
- Around line 448-452: The subscriptionIds() method currently reads the
trigger.subscriptions map directly from caller goroutines (risking concurrent
map read/write against processEvents which mutates it); change the API so
snapshotting is performed on the event-loop goroutine by sending a new
request/reply event over the trigger's events channel (similar to
UpdateSubscription) that asks processEvents to produce and return the map copy;
update any callers of trigger.subscriptionIds()/Subscriptions() to send that
event and wait for the reply rather than accessing trigger.subscriptions
directly, and remove direct unsynchronized reads from functions like
subscriptionIds and public Subscriptions.
---
Outside diff comments:
In `@v2/pkg/engine/resolve/resolve.go`:
- Around line 997-1011: The fast-path lookup in
Resolver.handleUpdateSubscription currently checks trig.subscriptions[subCtx]
and returns on miss, which drops updates when callers pass a cloned start
context; keep that fast path but add a fallback that iterates trig.subscriptions
(range over trig.subscriptions map) and compares each stored
SubscriptionIdentifier to the provided subIdentifier (verify ConnectionID and
SubscriptionID) and the subscription's Context identity as needed, then call
r.sendUpdateToSubscription(data, matchedCtx, matchedSubscription) for the found
entry; apply the same fallback logic to the other similar handlers referenced
(near lines 1470-1485 and 1601-1602) to preserve compatibility with
Start/AsyncStart cloned contexts.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3abf84c5-1617-4fb8-b187-984669f46789
📒 Files selected for processing (2)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.gov2/pkg/engine/resolve/resolve.go
…-fast-lookup-of-subscriptions
…-fast-lookup-of-subscriptions
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
v2/pkg/engine/resolve/resolve_test.go (1)
6668-6681: Make the post-unsubscribe assertion deterministic.The single
time.Sleep(50 * time.Millisecond)only samples once. If the delayed update lands later, this test still passes. I'd wait for the fake stream to exit and then poll that the recorder never grows past one message for a short window.Suggested refactor
- // Unblock the stream so its goroutine can exit cleanly. - close(streamCanEnd) - - // Calling the captured updater after the subscription has been cleaned up must be - // a no-op. If subscriptionIdentifiers was not cleaned up, this would find a stale - // entry, try to send work to the closed work channel, and panic. - capturedUpdater([]byte(`{"data":{"counter":2000}}`)) - - // Give the event loop time to process the update event. - time.Sleep(50 * time.Millisecond) - - // Only the startup update should have been delivered; the post-removal call is dropped. - assert.Len(t, recorder.Messages(), 1) + // Unblock the stream and wait for its goroutine to exit cleanly. + close(streamCanEnd) + fakeStream.AwaitIsDone(t, defaultTimeout) + + // Calling the captured updater after the subscription has been cleaned up must be + // a no-op. If subscriptionIdentifiers was not cleaned up, this would find a stale + // entry, try to send work to the closed work channel, and panic. + capturedUpdater([]byte(`{"data":{"counter":2000}}`)) + + deadline := time.Now().Add(200 * time.Millisecond) + for time.Now().Before(deadline) { + if got := recorder.Messages(); len(got) != 1 { + t.Fatalf("unexpected post-removal messages: %v", got) + } + time.Sleep(10 * time.Millisecond) + } + + // Only the startup update should have been delivered; the post-removal call is dropped. + assert.Len(t, recorder.Messages(), 1)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve_test.go` around lines 6668 - 6681, The test currently uses a single time.Sleep to wait for the post-unsubscribe update, which is non-deterministic; replace that with a deterministic wait for the fake stream to exit (use the existing streamCanEnd or another dedicated done channel from the fake stream goroutine) and then poll the recorder length for a short window (e.g., repeatedly check recorder.Messages() until time.After timeout) asserting it never grows past 1; update references to capturedUpdater, streamCanEnd, and recorder.Messages() in the test to perform: close(streamCanEnd) -> wait for stream goroutine to signal done -> poll recorder.Messages() for a small duration and assert len==1 throughout/at end.v2/pkg/engine/resolve/resolve.go (1)
1011-1036: Well-structured O(1) optimization with defensive fallback.The fast-path map lookup followed by an O(N) fallback is a solid approach. Under normal operation,
subscriptionIdentifiersshould always contain the entry, so the fallback is purely defensive.Consider adding debug logging when the fallback path is triggered to aid in diagnosing any unexpected map inconsistencies during development:
🔧 Optional: Add debug logging for fallback path
// Fallback O(N) lookup in case we couldn't find the resolver context by map: // Loop through trig.subscriptions and find the corresponding resolver context. if !exists { + if r.options.Debug { + fmt.Printf("resolver:trigger:subscription:update:fallback:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID) + } for i := range trig.subscriptions { if trig.subscriptions[i].id == subIdentifier {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/resolve/resolve.go` around lines 1011 - 1036, Add a debug/logging call inside the fallback branch that runs when trig.subscriptionIdentifiers[subIdentifier] is missing: when the loop over trig.subscriptions finds a match (or when it fails to find one), log that the map lookup missed and include the subIdentifier, the resolved resolverCtx index (or a sentinel if not found), and any relevant subscription id/value; update the block that computes resolverCtx (the code referencing trig.subscriptionIdentifiers, trig.subscriptions, subIdentifier, and resolverCtx) to emit this debug message before proceeding to r.sendUpdateToSubscription so fallback occurrences are recorded for diagnostics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go`:
- Around line 131-136: Replace the manual Context literal with the constructor:
do not use &Context{ctx: context.Background()} in the benchmark; instead call
NewContext(...) to create the Context so required fields (like subgraphErrors)
are initialized before calling resolver.AsyncResolveGraphQLSubscription; update
the creation in the loop where id := SubscriptionIdentifier{...} and err :=
resolver.AsyncResolveGraphQLSubscription(subCtx, makePlan(i), recorders[i], id)
to use subCtx := NewContext(context.Background()) (or NewContext() per the
package API) so the benchmark measures subscription lookup and not context
construction errors.
In `@v2/pkg/engine/resolve/resolve_test.go`:
- Around line 6587-6589: Replace raw Context literals with the package
initializer NewContext() to ensure proper internal setup: instead of using
&Context{ctx: context.Background()} in the tests that call
resolver.AsyncResolveGraphQLSubscription (and other tests around the same
areas), call NewContext(context.Background()) (or NewContext() if it defaults)
so the Context.subgraphErrors and other internal fields are initialized; update
all occurrences noted (including the ones around the
AsyncResolveGraphQLSubscription calls at the nearby test blocks) to prevent
context-setup failures.
---
Nitpick comments:
In `@v2/pkg/engine/resolve/resolve_test.go`:
- Around line 6668-6681: The test currently uses a single time.Sleep to wait for
the post-unsubscribe update, which is non-deterministic; replace that with a
deterministic wait for the fake stream to exit (use the existing streamCanEnd or
another dedicated done channel from the fake stream goroutine) and then poll the
recorder length for a short window (e.g., repeatedly check recorder.Messages()
until time.After timeout) asserting it never grows past 1; update references to
capturedUpdater, streamCanEnd, and recorder.Messages() in the test to perform:
close(streamCanEnd) -> wait for stream goroutine to signal done -> poll
recorder.Messages() for a small duration and assert len==1 throughout/at end.
In `@v2/pkg/engine/resolve/resolve.go`:
- Around line 1011-1036: Add a debug/logging call inside the fallback branch
that runs when trig.subscriptionIdentifiers[subIdentifier] is missing: when the
loop over trig.subscriptions finds a match (or when it fails to find one), log
that the map lookup missed and include the subIdentifier, the resolved
resolverCtx index (or a sentinel if not found), and any relevant subscription
id/value; update the block that computes resolverCtx (the code referencing
trig.subscriptionIdentifiers, trig.subscriptions, subIdentifier, and
resolverCtx) to emit this debug message before proceeding to
r.sendUpdateToSubscription so fallback occurrences are recorded for diagnostics.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: dc8be04f-4b3e-4eeb-b593-74704608a189
📒 Files selected for processing (3)
v2/pkg/engine/resolve/resolve.gov2/pkg/engine/resolve/resolve_subscription_benchmark_test.gov2/pkg/engine/resolve/resolve_test.go
Summary by CodeRabbit
Improvements
Tests
Checklist
There are two ways how subscriptions get updated, i.e. new data needs to be populated to subscription clients. One way is to broadcast data to all subscribers on a trigger. The other is to update individual subscribers on a trigger. Broadcasting is done by normal subscriptions and Cosmo Streams without hooks. The latter is important for Cosmo Streams with
OnReceiveEventhooks. These hooks can modify data for each subscriber individually and so we have to update them individually.Currently there is a performance problem when updating individual subscribers. In order to write to a specific subscription it has to find it on a trigger. This has to be done by looping through all subscriptions associated to the trigger. These can be in the range of tens of thousand. When data has to be written to lots of subscribers and/or we have lots of data write occurences then each time we have to loop through it. This causes massive delays when carrying out data to subscribers.
It gets noticeable at around 1k subscribers on a trigger. On a vanilla router with 20k subscriptions it takes around 1300ms from event sending to broker until all subscribers got updated. With this patch it takes around 230ms. Tested with a custom benchmark tool on my machine. I added a benchmark to the test code, which measures the time it takes to update all subscribers on a trigger this way.
Vanilla engine:
With this patch:
On 1k subscribers this is a 3x speed increase. The difference becomes more dramatic the more subscribers there are on a trigger.
The solution I implemented here is to keep a map between subscription identifiers (used by the router to identify a subscription) and the resolver context of a subscription. The resolver context is whats mapped to the subscription connection details used to write to the subscriber. This allows us to do a map lookup to get the context instead of looping. That change is done in the resolvers
handleUpdateSubscriptionmethod. In case we cannot find the resolver context via map lookup we fall back to the original way. But really this path shouldn't get triggered. It's a defensive mechanism only.The change does not modify the engines api so no changes on the router needs to be done.