From b345aee9707c3fabca5711adfe0ab4eb91f4d6b4 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:20:20 +0100 Subject: [PATCH 1/8] fix: use direct map lookup to find subscription --- v2/pkg/engine/resolve/resolve.go | 34 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index f735752ef9..8e8f61844e 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -445,10 +445,10 @@ type trigger struct { updater *subscriptionUpdater } -func (t *trigger) subscriptionIds() map[context.Context]SubscriptionIdentifier { - subs := make(map[context.Context]SubscriptionIdentifier, len(t.subscriptions)) +func (t *trigger) subscriptionIds() map[*Context]SubscriptionIdentifier { + subs := make(map[*Context]SubscriptionIdentifier, len(t.subscriptions)) for ctx, sub := range t.subscriptions { - subs[ctx.Context()] = sub.id + subs[ctx] = sub.id } return subs } @@ -668,7 +668,7 @@ func (r *Resolver) handleEvent(event subscriptionEvent) { case subscriptionEventKindRemoveClient: r.handleRemoveClient(event.id.ConnectionID) case subscriptionEventKindUpdateSubscription: - r.handleUpdateSubscription(event.triggerID, event.data, event.id) + r.handleUpdateSubscription(event.triggerID, event.data, event.id, event.subCtx) case subscriptionEventKindTriggerUpdate: r.handleTriggerUpdate(event.triggerID, event.data) case subscriptionEventKindTriggerComplete: @@ -756,7 +756,7 @@ func (r *Resolver) executeStartupHooks(add *addSubscription, updater *subscripti Updater: func(data []byte) { // Writing on the updater channel is safe but has to happen outside of the event loop // to respect order and not block the event loop - updater.UpdateSubscription(add.id, data) + updater.UpdateSubscription(add.ctx, add.id, data) }, } @@ -994,7 +994,7 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) { } } -func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier) { +func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier, subCtx *Context) { trig, ok := r.triggers[id] if !ok { return @@ -1004,13 +1004,11 @@ func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifie fmt.Printf("resolver:trigger:subscription:update:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID) } - for c, s := range trig.subscriptions { - if s.id != subIdentifier { - continue - } - r.sendUpdateToSubscription(data, c, s) - break + s, ok := trig.subscriptions[subCtx] + if !ok { + return } + r.sendUpdateToSubscription(data, subCtx, s) } func (r *Resolver) sendUpdateToSubscription(data []byte, c *Context, s *sub) { @@ -1449,7 +1447,7 @@ type subscriptionUpdater struct { triggerID uint64 ch chan subscriptionEvent ctx context.Context - subsFn func() map[context.Context]SubscriptionIdentifier + subsFn func() map[*Context]SubscriptionIdentifier } func (s *subscriptionUpdater) Update(data []byte) { @@ -1469,7 +1467,7 @@ func (s *subscriptionUpdater) Update(data []byte) { } } -func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data []byte) { +func (s *subscriptionUpdater) UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte) { if s.debug { fmt.Printf("resolver:subscription_updater:update:%d\n", s.triggerID) } @@ -1483,11 +1481,12 @@ func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data kind: subscriptionEventKindUpdateSubscription, data: data, id: id, + subCtx: ctx, }: } } -func (s *subscriptionUpdater) Subscriptions() map[context.Context]SubscriptionIdentifier { +func (s *subscriptionUpdater) Subscriptions() map[*Context]SubscriptionIdentifier { return s.subsFn() } @@ -1567,6 +1566,7 @@ type subscriptionEvent struct { data []byte addSubscription *addSubscription closeKind SubscriptionCloseKind + subCtx *Context } type addSubscription struct { @@ -1599,7 +1599,7 @@ type SubscriptionUpdater interface { // Update sends an update to the client. It is not guaranteed that the update is sent immediately. Update(data []byte) // UpdateSubscription sends an update to a single subscription. It is not guaranteed that the update is sent immediately. - UpdateSubscription(id SubscriptionIdentifier, data []byte) + UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte) // Complete also takes care of cleaning up the trigger and all subscriptions. No more updates should be sent after calling Complete. Complete() // Close closes the subscription and cleans up the trigger and all subscriptions. No more updates should be sent after calling Close. @@ -1607,5 +1607,5 @@ type SubscriptionUpdater interface { // CloseSubscription closes a single subscription. No more updates should be sent to that subscription after calling CloseSubscription. CloseSubscription(kind SubscriptionCloseKind, id SubscriptionIdentifier) // Subscriptions return all the subscriptions associated to this Updater - Subscriptions() map[context.Context]SubscriptionIdentifier + Subscriptions() map[*Context]SubscriptionIdentifier } From 54346fa22173e978f75dba8879db4621c7e02013 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:50:26 +0100 Subject: [PATCH 2/8] fix: update mock interface in tests --- .../graphql_datasource/graphql_datasource_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go b/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go index 05b07df9e1..6b4a6c8cf7 100644 --- a/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go +++ b/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go @@ -8404,7 +8404,7 @@ func (t *testSubscriptionUpdaterChan) Update(data []byte) { } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdaterChan) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) { +func (t *testSubscriptionUpdaterChan) UpdateSubscription(ctx *resolve.Context, id resolve.SubscriptionIdentifier, data []byte) { } // empty method to satisfy the interface, not used in this tests @@ -8412,8 +8412,8 @@ func (t *testSubscriptionUpdaterChan) CloseSubscription(kind resolve.Subscriptio } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdaterChan) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier { - return make(map[context.Context]resolve.SubscriptionIdentifier) +func (t *testSubscriptionUpdaterChan) Subscriptions() map[*resolve.Context]resolve.SubscriptionIdentifier { + return make(map[*resolve.Context]resolve.SubscriptionIdentifier) } func (t *testSubscriptionUpdaterChan) Complete() { @@ -8546,12 +8546,12 @@ func (t *testSubscriptionUpdater) CloseSubscription(kind resolve.SubscriptionClo } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdater) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier { - return make(map[context.Context]resolve.SubscriptionIdentifier) +func (t *testSubscriptionUpdater) Subscriptions() map[*resolve.Context]resolve.SubscriptionIdentifier { + return make(map[*resolve.Context]resolve.SubscriptionIdentifier) } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdater) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) { +func (t *testSubscriptionUpdater) UpdateSubscription(ctx *resolve.Context, id resolve.SubscriptionIdentifier, data []byte) { } func TestSubscriptionSource_Start(t *testing.T) { From 28f21ed673cdc0e7e01e775671f0ef601cf22a0a Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:08:19 +0100 Subject: [PATCH 3/8] chore: revert all changes --- .../graphql_datasource_test.go | 12 +++---- v2/pkg/engine/resolve/resolve.go | 34 +++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go b/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go index 6b4a6c8cf7..05b07df9e1 100644 --- a/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go +++ b/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go @@ -8404,7 +8404,7 @@ func (t *testSubscriptionUpdaterChan) Update(data []byte) { } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdaterChan) UpdateSubscription(ctx *resolve.Context, id resolve.SubscriptionIdentifier, data []byte) { +func (t *testSubscriptionUpdaterChan) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) { } // empty method to satisfy the interface, not used in this tests @@ -8412,8 +8412,8 @@ func (t *testSubscriptionUpdaterChan) CloseSubscription(kind resolve.Subscriptio } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdaterChan) Subscriptions() map[*resolve.Context]resolve.SubscriptionIdentifier { - return make(map[*resolve.Context]resolve.SubscriptionIdentifier) +func (t *testSubscriptionUpdaterChan) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier { + return make(map[context.Context]resolve.SubscriptionIdentifier) } func (t *testSubscriptionUpdaterChan) Complete() { @@ -8546,12 +8546,12 @@ func (t *testSubscriptionUpdater) CloseSubscription(kind resolve.SubscriptionClo } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdater) Subscriptions() map[*resolve.Context]resolve.SubscriptionIdentifier { - return make(map[*resolve.Context]resolve.SubscriptionIdentifier) +func (t *testSubscriptionUpdater) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier { + return make(map[context.Context]resolve.SubscriptionIdentifier) } // empty method to satisfy the interface, not used in this tests -func (t *testSubscriptionUpdater) UpdateSubscription(ctx *resolve.Context, id resolve.SubscriptionIdentifier, data []byte) { +func (t *testSubscriptionUpdater) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) { } func TestSubscriptionSource_Start(t *testing.T) { diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 8e8f61844e..f735752ef9 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -445,10 +445,10 @@ type trigger struct { updater *subscriptionUpdater } -func (t *trigger) subscriptionIds() map[*Context]SubscriptionIdentifier { - subs := make(map[*Context]SubscriptionIdentifier, len(t.subscriptions)) +func (t *trigger) subscriptionIds() map[context.Context]SubscriptionIdentifier { + subs := make(map[context.Context]SubscriptionIdentifier, len(t.subscriptions)) for ctx, sub := range t.subscriptions { - subs[ctx] = sub.id + subs[ctx.Context()] = sub.id } return subs } @@ -668,7 +668,7 @@ func (r *Resolver) handleEvent(event subscriptionEvent) { case subscriptionEventKindRemoveClient: r.handleRemoveClient(event.id.ConnectionID) case subscriptionEventKindUpdateSubscription: - r.handleUpdateSubscription(event.triggerID, event.data, event.id, event.subCtx) + r.handleUpdateSubscription(event.triggerID, event.data, event.id) case subscriptionEventKindTriggerUpdate: r.handleTriggerUpdate(event.triggerID, event.data) case subscriptionEventKindTriggerComplete: @@ -756,7 +756,7 @@ func (r *Resolver) executeStartupHooks(add *addSubscription, updater *subscripti Updater: func(data []byte) { // Writing on the updater channel is safe but has to happen outside of the event loop // to respect order and not block the event loop - updater.UpdateSubscription(add.ctx, add.id, data) + updater.UpdateSubscription(add.id, data) }, } @@ -994,7 +994,7 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) { } } -func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier, subCtx *Context) { +func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier) { trig, ok := r.triggers[id] if !ok { return @@ -1004,11 +1004,13 @@ func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifie fmt.Printf("resolver:trigger:subscription:update:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID) } - s, ok := trig.subscriptions[subCtx] - if !ok { - return + for c, s := range trig.subscriptions { + if s.id != subIdentifier { + continue + } + r.sendUpdateToSubscription(data, c, s) + break } - r.sendUpdateToSubscription(data, subCtx, s) } func (r *Resolver) sendUpdateToSubscription(data []byte, c *Context, s *sub) { @@ -1447,7 +1449,7 @@ type subscriptionUpdater struct { triggerID uint64 ch chan subscriptionEvent ctx context.Context - subsFn func() map[*Context]SubscriptionIdentifier + subsFn func() map[context.Context]SubscriptionIdentifier } func (s *subscriptionUpdater) Update(data []byte) { @@ -1467,7 +1469,7 @@ func (s *subscriptionUpdater) Update(data []byte) { } } -func (s *subscriptionUpdater) UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte) { +func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data []byte) { if s.debug { fmt.Printf("resolver:subscription_updater:update:%d\n", s.triggerID) } @@ -1481,12 +1483,11 @@ func (s *subscriptionUpdater) UpdateSubscription(ctx *Context, id SubscriptionId kind: subscriptionEventKindUpdateSubscription, data: data, id: id, - subCtx: ctx, }: } } -func (s *subscriptionUpdater) Subscriptions() map[*Context]SubscriptionIdentifier { +func (s *subscriptionUpdater) Subscriptions() map[context.Context]SubscriptionIdentifier { return s.subsFn() } @@ -1566,7 +1567,6 @@ type subscriptionEvent struct { data []byte addSubscription *addSubscription closeKind SubscriptionCloseKind - subCtx *Context } type addSubscription struct { @@ -1599,7 +1599,7 @@ type SubscriptionUpdater interface { // Update sends an update to the client. It is not guaranteed that the update is sent immediately. Update(data []byte) // UpdateSubscription sends an update to a single subscription. It is not guaranteed that the update is sent immediately. - UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte) + UpdateSubscription(id SubscriptionIdentifier, data []byte) // Complete also takes care of cleaning up the trigger and all subscriptions. No more updates should be sent after calling Complete. Complete() // Close closes the subscription and cleans up the trigger and all subscriptions. No more updates should be sent after calling Close. @@ -1607,5 +1607,5 @@ type SubscriptionUpdater interface { // CloseSubscription closes a single subscription. No more updates should be sent to that subscription after calling CloseSubscription. CloseSubscription(kind SubscriptionCloseKind, id SubscriptionIdentifier) // Subscriptions return all the subscriptions associated to this Updater - Subscriptions() map[*Context]SubscriptionIdentifier + Subscriptions() map[context.Context]SubscriptionIdentifier } From 75403337427af6c5520d0f17058c95a873c81723 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:14:28 +0100 Subject: [PATCH 4/8] fix: Use a map to lookup a resolve context --- v2/pkg/engine/resolve/resolve.go | 36 ++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index f735752ef9..68f7139eaa 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -437,9 +437,10 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe } type trigger struct { - id uint64 - cancel context.CancelFunc - subscriptions map[*Context]*sub + id uint64 + cancel context.CancelFunc + subscriptions map[*Context]*sub + subscriptionIdentifiers map[SubscriptionIdentifier]*Context // initialized is set to true when the trigger is started and initialized initialized bool updater *subscriptionUpdater @@ -817,6 +818,7 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) // After the startup hooks are executed, we can add the subscription to the subscriptions registry // so that it can start receive events trig.subscriptions[add.ctx] = s + trig.subscriptionIdentifiers[s.id] = add.ctx return } @@ -832,13 +834,15 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) } cloneCtx := add.ctx.clone(ctx) trig = &trigger{ - id: triggerID, - subscriptions: make(map[*Context]*sub), - cancel: cancel, - updater: updater, + id: triggerID, + subscriptions: make(map[*Context]*sub), + subscriptionIdentifiers: make(map[SubscriptionIdentifier]*Context), + cancel: cancel, + updater: updater, } r.triggers[triggerID] = trig trig.subscriptions[add.ctx] = s + trig.subscriptionIdentifiers[s.id] = add.ctx updater.subsFn = trig.subscriptionIds if r.reporter != nil { @@ -1004,13 +1008,17 @@ func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifie fmt.Printf("resolver:trigger:subscription:update:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID) } - for c, s := range trig.subscriptions { - if s.id != subIdentifier { - continue - } - r.sendUpdateToSubscription(data, c, s) - break + c, exists := trig.subscriptionIdentifiers[subIdentifier] + if !exists { + return } + + s, exists := trig.subscriptions[c] + if !exists { + return + } + + r.sendUpdateToSubscription(data, c, s) } func (r *Resolver) sendUpdateToSubscription(data []byte, c *Context, s *sub) { @@ -1111,6 +1119,7 @@ func (r *Resolver) completeTriggerSubscriptions(id uint64, completeMatcher func( // Important because we remove the subscription from the trigger on the same goroutine // as we send work to the subscription worker. We can ensure that no new work is sent to the worker after this point. delete(trig.subscriptions, c) + delete(trig.subscriptionIdentifiers, s.id) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:closed:%d:%d\n", trig.id, s.id.SubscriptionID) @@ -1142,6 +1151,7 @@ func (r *Resolver) closeTriggerSubscriptions(id uint64, closeKind SubscriptionCl // Important because we remove the subscription from the trigger on the same goroutine // as we send work to the subscription worker. We can ensure that no new work is sent to the worker after this point. delete(trig.subscriptions, c) + delete(trig.subscriptionIdentifiers, s.id) if r.options.Debug { fmt.Printf("resolver:trigger:subscription:closed:%d:%d\n", trig.id, s.id.SubscriptionID) From ec16d988c4b4194811e4cc7b308e47660f0a2211 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 16 Mar 2026 08:42:02 +0100 Subject: [PATCH 5/8] chore: add tests --- v2/pkg/engine/resolve/resolve_test.go | 134 ++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 82a8e1e635..b30917972b 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -6546,6 +6546,140 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) { assert.Contains(t, errorMessage, "errors", "Expected error message in GraphQL format") assert.Contains(t, errorMessage, expectedErr.Error(), "Expected actual error message to be included") }) + + t.Run("subscription added to existing trigger can be targeted by UpdateSubscription", func(t *testing.T) { + // Verifies that subscriptionIdentifiers is populated for subscriptions joining an + // already-running trigger (the existing-trigger path in handleAddSubscription), so + // that handleUpdateSubscription can reach them via O(1) map lookup. + c, cancel := context.WithCancel(context.Background()) + defer cancel() + + id2 := SubscriptionIdentifier{ + ConnectionID: 1, + SubscriptionID: 2, + } + + sub1HookDone := make(chan struct{}) + streamCanSend := make(chan struct{}) + + // The startup hook is shared by both subscriptions. + // Sub1 (new-trigger path) closes sub1HookDone and does nothing else. + // Sub2 (existing-trigger path) calls ctx.Updater, sending a targeted update only to sub2. + // Because the test waits for sub1HookDone before registering sub2, sub2's hook always + // finds sub1HookDone already closed, making the branch selection deterministic. + fakeStream := createFakeStream(func(counter int) (message string, done bool) { + <-streamCanSend + return `{"data":{"counter":0}}`, true + }, 0, nil, func(ctx StartupHookContext, input []byte) error { + select { + case <-sub1HookDone: + // sub1HookDone is already closed: this is sub2 (existing-trigger path). + ctx.Updater([]byte(`{"data":{"counter":1000}}`)) + default: + // First call: this is sub1 (new-trigger path). + close(sub1HookDone) + } + return nil + }) + + resolver, plan, recorder, id := setup(c, fakeStream) + + ctx1 := &Context{ctx: context.Background()} + err := resolver.AsyncResolveGraphQLSubscription(ctx1, plan, recorder, id) + assert.NoError(t, err) + + // Wait for sub1's startup hook to complete before adding sub2, + // guaranteeing sub2 joins the existing trigger. + select { + case <-sub1HookDone: + case <-time.After(defaultTimeout): + t.Fatal("timed out waiting for sub1 startup hook") + } + + recorder2 := &SubscriptionRecorder{ + buf: &bytes.Buffer{}, + messages: []string{}, + complete: atomic.Bool{}, + } + recorder2.complete.Store(false) + + ctx2 := &Context{ctx: context.Background()} + err2 := resolver.AsyncResolveGraphQLSubscription(ctx2, plan, recorder2, id2) + assert.NoError(t, err2) + + // Wait for sub2 to receive its targeted update from the startup hook. + recorder2.AwaitAnyMessageCount(t, defaultTimeout) + + // Signal the stream to send its final message and complete both subscriptions. + close(streamCanSend) + + recorder.AwaitComplete(t, defaultTimeout) + recorder2.AwaitComplete(t, defaultTimeout) + + // sub1 receives only the stream message — it was not the target of ctx.Updater. + assert.Len(t, recorder.Messages(), 1) + assert.Equal(t, `{"data":{"counter":0}}`, recorder.Messages()[0]) + + // sub2 receives both: the targeted startup update and the stream message. + assert.Len(t, recorder2.Messages(), 2) + assert.Equal(t, `{"data":{"counter":1000}}`, recorder2.Messages()[0]) + assert.Equal(t, `{"data":{"counter":0}}`, recorder2.Messages()[1]) + }) + + t.Run("subscriptionIdentifiers entry is removed when subscription is unsubscribed", func(t *testing.T) { + // Verifies that the subscriptionIdentifiers map is cleaned up when a subscription is + // removed. Without cleanup, calling ctx.Updater after unsubscription would find a stale + // entry, attempt to send work to the already-closed work channel, and panic. + c, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamCanEnd := make(chan struct{}) + var capturedUpdater func([]byte) + hookDone := make(chan struct{}) + + fakeStream := createFakeStream(func(counter int) (message string, done bool) { + <-streamCanEnd + return "", true + }, 0, nil, func(ctx StartupHookContext, input []byte) error { + capturedUpdater = ctx.Updater + ctx.Updater([]byte(`{"data":{"counter":1000}}`)) + close(hookDone) + return nil + }) + + resolver, plan, recorder, id := setup(c, fakeStream) + + ctx1 := &Context{ctx: context.Background()} + err := resolver.AsyncResolveGraphQLSubscription(ctx1, plan, recorder, id) + assert.NoError(t, err) + + select { + case <-hookDone: + case <-time.After(defaultTimeout): + t.Fatal("timed out waiting for startup hook") + } + recorder.AwaitAnyMessageCount(t, defaultTimeout) + + // Unsubscribe before the stream sends any messages. + err = resolver.AsyncUnsubscribeSubscription(id) + assert.NoError(t, err) + recorder.AwaitClosed(t, defaultTimeout) + + // Unblock the stream so its goroutine can exit cleanly. + close(streamCanEnd) + + // Calling the captured updater after the subscription has been cleaned up must be + // a no-op. If subscriptionIdentifiers was not cleaned up, this would find a stale + // entry, try to send work to the closed work channel, and panic. + capturedUpdater([]byte(`{"data":{"counter":2000}}`)) + + // Give the event loop time to process the update event. + time.Sleep(50 * time.Millisecond) + + // Only the startup update should have been delivered; the post-removal call is dropped. + assert.Len(t, recorder.Messages(), 1) + assert.Equal(t, `{"data":{"counter":1000}}`, recorder.Messages()[0]) + }) } func Test_ResolveGraphQLSubscriptionWithFilter(t *testing.T) { From 0f27265d730caa203495a744bba15b0f45012390 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 16 Mar 2026 09:41:29 +0100 Subject: [PATCH 6/8] chore: add benchmark --- .../resolve_subscription_benchmark_test.go | 163 ++++++++++++++++++ v2/pkg/engine/resolve/resolve_test.go | 8 +- 2 files changed, 167 insertions(+), 4 deletions(-) create mode 100644 v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go diff --git a/v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go b/v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go new file mode 100644 index 0000000000..25c0783836 --- /dev/null +++ b/v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go @@ -0,0 +1,163 @@ +package resolve + +import ( + "bytes" + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func BenchmarkUpdateSubscription(b *testing.B) { + for _, n := range []int{1, 10, 100, 1000} { + b.Run(func() string { + switch n { + case 1: + return "1_subs" + case 10: + return "10_subs" + case 100: + return "100_subs" + default: + return "1000_subs" + } + }(), func(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + b.Cleanup(cancel) + + streamDone := make(chan struct{}) + b.Cleanup(func() { close(streamDone) }) + + updaters := make([]func([]byte), n) + var setupWg sync.WaitGroup + setupWg.Add(n) + + // Each subscription gets its own fakeStream so that its + // subscriptionOnStartFn captures the correct slot index i. + // executeStartupHooks uses add.resolve.Trigger.Source (the + // subscription's own plan source), so the hook fires on the + // right stream regardless of goroutine scheduling order. + // All subscriptions share the same static input, so they all + // land on a single trigger whose subscriptionIdentifiers map + // grows to N entries — the map we want to exercise. + makePlan := func(i int) *GraphQLSubscription { + stream := createFakeStream( + func(counter int) (message string, done bool) { + <-streamDone + return "", true + }, + 0, + nil, + func(hookCtx StartupHookContext, _ []byte) error { + updaters[i] = hookCtx.Updater + setupWg.Done() + return nil + }, + ) + + fetches := Sequence() + fetches.Trigger = &FetchTreeNode{ + Kind: FetchTreeNodeKindTrigger, + Item: &FetchItem{ + Fetch: &SingleFetch{ + FetchDependencies: FetchDependencies{ + FetchID: 0, + }, + Info: &FetchInfo{ + DataSourceID: "0", + DataSourceName: "counter", + QueryPlan: &QueryPlan{ + Query: "subscription {\n counter\n}", + }, + }, + }, + ResponsePath: "counter", + }, + } + + return &GraphQLSubscription{ + Trigger: GraphQLSubscriptionTrigger{ + Source: stream, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + { + SegmentType: StaticSegmentType, + Data: []byte(`{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`), + }, + }, + }, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + SelectResponseErrorsPath: []string{"errors"}, + }, + }, + Response: &GraphQLResponse{ + Data: &Object{ + Fields: []*Field{ + { + Name: []byte("counter"), + Value: &Integer{ + Path: []string{"counter"}, + }, + Info: &FieldInfo{ + Name: "counter", + ExactParentTypeName: "Subscription", + Source: TypeFieldSource{ + IDs: []string{"0"}, + Names: []string{"counter"}, + }, + FetchID: 0, + }, + }, + }, + }, + Fetches: fetches, + }, + } + } + + resolver := newResolver(ctx) + + recorders := make([]*SubscriptionRecorder, n) + for i := 0; i < n; i++ { + recorders[i] = &SubscriptionRecorder{ + buf: &bytes.Buffer{}, + messages: []string{}, + complete: atomic.Bool{}, + } + subCtx := &Context{ctx: context.Background()} + id := SubscriptionIdentifier{ + ConnectionID: 1, + SubscriptionID: int64(i + 1), + } + err := resolver.AsyncResolveGraphQLSubscription(subCtx, makePlan(i), recorders[i], id) + require.NoError(b, err) + } + + // Block until all N startup hooks have fired, guaranteeing all + // entries are in subscriptionIdentifiers before timing starts. + setupWg.Wait() + + b.ResetTimer() + b.ReportAllocs() + + data := []byte(`{"data":{"counter":1}}`) + for i := 0; i < b.N; i++ { + // Update every subscription on the trigger sequentially. + // Each call does an O(1) map lookup in subscriptionIdentifiers + // then delivers to that subscription's worker. + for j := 0; j < n; j++ { + updaters[j](data) + } + } + + // Every recorder must have received exactly b.N messages. + for i := 0; i < n; i++ { + recorders[i].AwaitMessages(b, b.N, 30*time.Second) + } + }) + } +} diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index b30917972b..3b239b2077 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -5351,7 +5351,7 @@ type SubscriptionRecorder struct { var _ SubscriptionResponseWriter = (*SubscriptionRecorder)(nil) -func (s *SubscriptionRecorder) AwaitMessages(t *testing.T, count int, timeout time.Duration) { +func (s *SubscriptionRecorder) AwaitMessages(t testing.TB, count int, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) for { @@ -5368,7 +5368,7 @@ func (s *SubscriptionRecorder) AwaitMessages(t *testing.T, count int, timeout ti } } -func (s *SubscriptionRecorder) AwaitAnyMessageCount(t *testing.T, timeout time.Duration) { +func (s *SubscriptionRecorder) AwaitAnyMessageCount(t testing.TB, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) for { @@ -5385,7 +5385,7 @@ func (s *SubscriptionRecorder) AwaitAnyMessageCount(t *testing.T, timeout time.D } } -func (s *SubscriptionRecorder) AwaitComplete(t *testing.T, timeout time.Duration) { +func (s *SubscriptionRecorder) AwaitComplete(t testing.TB, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) for { @@ -5399,7 +5399,7 @@ func (s *SubscriptionRecorder) AwaitComplete(t *testing.T, timeout time.Duration } } -func (s *SubscriptionRecorder) AwaitClosed(t *testing.T, timeout time.Duration) { +func (s *SubscriptionRecorder) AwaitClosed(t testing.TB, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) for { From 2e4d02d8cc94f8e6890b98eb5464a7d60a7e9b2b Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 16 Mar 2026 10:32:22 +0100 Subject: [PATCH 7/8] feat: add fallback --- v2/pkg/engine/resolve/resolve.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 68f7139eaa..7375eb43d3 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -1008,17 +1008,31 @@ func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifie fmt.Printf("resolver:trigger:subscription:update:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID) } - c, exists := trig.subscriptionIdentifiers[subIdentifier] + // Fast path: O(1) lookup of the subscriptions resolver context via map + resolverCtx, exists := trig.subscriptionIdentifiers[subIdentifier] + + // Fallback O(N) lookup in case we couldn't find the resolver context by map: + // Loop through trig.subscriptions and find the corresponding resolver context. + if !exists { + for i := range trig.subscriptions { + if trig.subscriptions[i].id == subIdentifier { + resolverCtx = i + exists = true + break + } + } + } + if !exists { return } - s, exists := trig.subscriptions[c] + subscription, exists := trig.subscriptions[resolverCtx] if !exists { return } - r.sendUpdateToSubscription(data, c, s) + r.sendUpdateToSubscription(data, resolverCtx, subscription) } func (r *Resolver) sendUpdateToSubscription(data []byte, c *Context, s *sub) { From 97f3769be27129835575fb8cba4baa5b35ffe028 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 16 Mar 2026 11:09:02 +0100 Subject: [PATCH 8/8] chore: use newContext to create test contexts --- .../engine/resolve/resolve_subscription_benchmark_test.go | 2 +- v2/pkg/engine/resolve/resolve_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go b/v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go index 25c0783836..b943a15921 100644 --- a/v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go +++ b/v2/pkg/engine/resolve/resolve_subscription_benchmark_test.go @@ -128,7 +128,7 @@ func BenchmarkUpdateSubscription(b *testing.B) { messages: []string{}, complete: atomic.Bool{}, } - subCtx := &Context{ctx: context.Background()} + subCtx := NewContext(context.Background()) id := SubscriptionIdentifier{ ConnectionID: 1, SubscriptionID: int64(i + 1), diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 3b239b2077..32f73e218a 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -6584,7 +6584,7 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) { resolver, plan, recorder, id := setup(c, fakeStream) - ctx1 := &Context{ctx: context.Background()} + ctx1 := NewContext(context.Background()) err := resolver.AsyncResolveGraphQLSubscription(ctx1, plan, recorder, id) assert.NoError(t, err) @@ -6603,7 +6603,7 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) { } recorder2.complete.Store(false) - ctx2 := &Context{ctx: context.Background()} + ctx2 := NewContext(context.Background()) err2 := resolver.AsyncResolveGraphQLSubscription(ctx2, plan, recorder2, id2) assert.NoError(t, err2) @@ -6649,7 +6649,7 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) { resolver, plan, recorder, id := setup(c, fakeStream) - ctx1 := &Context{ctx: context.Background()} + ctx1 := NewContext(context.Background()) err := resolver.AsyncResolveGraphQLSubscription(ctx1, plan, recorder, id) assert.NoError(t, err)