Skip to content

Commit 0f27265

Browse files
committed
chore: add benchmark
1 parent 9192199 commit 0f27265

2 files changed

Lines changed: 167 additions & 4 deletions

File tree

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package resolve
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func BenchmarkUpdateSubscription(b *testing.B) {
15+
for _, n := range []int{1, 10, 100, 1000} {
16+
b.Run(func() string {
17+
switch n {
18+
case 1:
19+
return "1_subs"
20+
case 10:
21+
return "10_subs"
22+
case 100:
23+
return "100_subs"
24+
default:
25+
return "1000_subs"
26+
}
27+
}(), func(b *testing.B) {
28+
ctx, cancel := context.WithCancel(context.Background())
29+
b.Cleanup(cancel)
30+
31+
streamDone := make(chan struct{})
32+
b.Cleanup(func() { close(streamDone) })
33+
34+
updaters := make([]func([]byte), n)
35+
var setupWg sync.WaitGroup
36+
setupWg.Add(n)
37+
38+
// Each subscription gets its own fakeStream so that its
39+
// subscriptionOnStartFn captures the correct slot index i.
40+
// executeStartupHooks uses add.resolve.Trigger.Source (the
41+
// subscription's own plan source), so the hook fires on the
42+
// right stream regardless of goroutine scheduling order.
43+
// All subscriptions share the same static input, so they all
44+
// land on a single trigger whose subscriptionIdentifiers map
45+
// grows to N entries — the map we want to exercise.
46+
makePlan := func(i int) *GraphQLSubscription {
47+
stream := createFakeStream(
48+
func(counter int) (message string, done bool) {
49+
<-streamDone
50+
return "", true
51+
},
52+
0,
53+
nil,
54+
func(hookCtx StartupHookContext, _ []byte) error {
55+
updaters[i] = hookCtx.Updater
56+
setupWg.Done()
57+
return nil
58+
},
59+
)
60+
61+
fetches := Sequence()
62+
fetches.Trigger = &FetchTreeNode{
63+
Kind: FetchTreeNodeKindTrigger,
64+
Item: &FetchItem{
65+
Fetch: &SingleFetch{
66+
FetchDependencies: FetchDependencies{
67+
FetchID: 0,
68+
},
69+
Info: &FetchInfo{
70+
DataSourceID: "0",
71+
DataSourceName: "counter",
72+
QueryPlan: &QueryPlan{
73+
Query: "subscription {\n counter\n}",
74+
},
75+
},
76+
},
77+
ResponsePath: "counter",
78+
},
79+
}
80+
81+
return &GraphQLSubscription{
82+
Trigger: GraphQLSubscriptionTrigger{
83+
Source: stream,
84+
InputTemplate: InputTemplate{
85+
Segments: []TemplateSegment{
86+
{
87+
SegmentType: StaticSegmentType,
88+
Data: []byte(`{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`),
89+
},
90+
},
91+
},
92+
PostProcessing: PostProcessingConfiguration{
93+
SelectResponseDataPath: []string{"data"},
94+
SelectResponseErrorsPath: []string{"errors"},
95+
},
96+
},
97+
Response: &GraphQLResponse{
98+
Data: &Object{
99+
Fields: []*Field{
100+
{
101+
Name: []byte("counter"),
102+
Value: &Integer{
103+
Path: []string{"counter"},
104+
},
105+
Info: &FieldInfo{
106+
Name: "counter",
107+
ExactParentTypeName: "Subscription",
108+
Source: TypeFieldSource{
109+
IDs: []string{"0"},
110+
Names: []string{"counter"},
111+
},
112+
FetchID: 0,
113+
},
114+
},
115+
},
116+
},
117+
Fetches: fetches,
118+
},
119+
}
120+
}
121+
122+
resolver := newResolver(ctx)
123+
124+
recorders := make([]*SubscriptionRecorder, n)
125+
for i := 0; i < n; i++ {
126+
recorders[i] = &SubscriptionRecorder{
127+
buf: &bytes.Buffer{},
128+
messages: []string{},
129+
complete: atomic.Bool{},
130+
}
131+
subCtx := &Context{ctx: context.Background()}
132+
id := SubscriptionIdentifier{
133+
ConnectionID: 1,
134+
SubscriptionID: int64(i + 1),
135+
}
136+
err := resolver.AsyncResolveGraphQLSubscription(subCtx, makePlan(i), recorders[i], id)
137+
require.NoError(b, err)
138+
}
139+
140+
// Block until all N startup hooks have fired, guaranteeing all
141+
// entries are in subscriptionIdentifiers before timing starts.
142+
setupWg.Wait()
143+
144+
b.ResetTimer()
145+
b.ReportAllocs()
146+
147+
data := []byte(`{"data":{"counter":1}}`)
148+
for i := 0; i < b.N; i++ {
149+
// Update every subscription on the trigger sequentially.
150+
// Each call does an O(1) map lookup in subscriptionIdentifiers
151+
// then delivers to that subscription's worker.
152+
for j := 0; j < n; j++ {
153+
updaters[j](data)
154+
}
155+
}
156+
157+
// Every recorder must have received exactly b.N messages.
158+
for i := 0; i < n; i++ {
159+
recorders[i].AwaitMessages(b, b.N, 30*time.Second)
160+
}
161+
})
162+
}
163+
}

v2/pkg/engine/resolve/resolve_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5351,7 +5351,7 @@ type SubscriptionRecorder struct {
53515351

53525352
var _ SubscriptionResponseWriter = (*SubscriptionRecorder)(nil)
53535353

5354-
func (s *SubscriptionRecorder) AwaitMessages(t *testing.T, count int, timeout time.Duration) {
5354+
func (s *SubscriptionRecorder) AwaitMessages(t testing.TB, count int, timeout time.Duration) {
53555355
t.Helper()
53565356
deadline := time.Now().Add(timeout)
53575357
for {
@@ -5368,7 +5368,7 @@ func (s *SubscriptionRecorder) AwaitMessages(t *testing.T, count int, timeout ti
53685368
}
53695369
}
53705370

5371-
func (s *SubscriptionRecorder) AwaitAnyMessageCount(t *testing.T, timeout time.Duration) {
5371+
func (s *SubscriptionRecorder) AwaitAnyMessageCount(t testing.TB, timeout time.Duration) {
53725372
t.Helper()
53735373
deadline := time.Now().Add(timeout)
53745374
for {
@@ -5385,7 +5385,7 @@ func (s *SubscriptionRecorder) AwaitAnyMessageCount(t *testing.T, timeout time.D
53855385
}
53865386
}
53875387

5388-
func (s *SubscriptionRecorder) AwaitComplete(t *testing.T, timeout time.Duration) {
5388+
func (s *SubscriptionRecorder) AwaitComplete(t testing.TB, timeout time.Duration) {
53895389
t.Helper()
53905390
deadline := time.Now().Add(timeout)
53915391
for {
@@ -5399,7 +5399,7 @@ func (s *SubscriptionRecorder) AwaitComplete(t *testing.T, timeout time.Duration
53995399
}
54005400
}
54015401

5402-
func (s *SubscriptionRecorder) AwaitClosed(t *testing.T, timeout time.Duration) {
5402+
func (s *SubscriptionRecorder) AwaitClosed(t testing.TB, timeout time.Duration) {
54035403
t.Helper()
54045404
deadline := time.Now().Add(timeout)
54055405
for {

0 commit comments

Comments
 (0)