Skip to content

Commit 4105082

Browse files
jensneuseclaude
andauthored
fix(resolve): fix flaky singleflight deduplication tests (#1393)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Refactor** * Improved request deduplication and follower tracking by switching to a low‑contention atomic counter for more reliable concurrency and streamlined lifecycle handling. * **Tests** * Updated test coordination to poll for follower registration with time‑based deadlines and added gating for deterministic ordering. * Converted several previously concurrent test actions to synchronous calls to stabilize timing. <!-- end of auto-generated comment: release notes by coderabbit.ai --> The inbound request singleflight tests used a channel-based synchronization pattern that signalled \"entered\" before followers actually called `GetOrCreate`. Under `-race`, the leader could finish and delete the singleflight key before followers registered, causing them to start fresh requests instead of deduplicating — making `TestResolver_ArenaResolveGraphQLResponse_RequestDeduplication` and two related tests flaky. Replace `HasFollowers bool` + `Mu sync.Mutex` on `InflightRequest` with a single `followerCount atomic.Int32`, incremented inside `GetOrCreate` at the exact point a follower joins. Tests now poll this counter to guarantee all followers have registered before the leader is unblocked, eliminating the race entirely. ## Checklist - [ ] I have discussed my proposed changes in an issue and have received approval to proceed. - [ ] I have followed the coding standards of the project. - [x] Tests or benchmarks have been added or updated. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f79d071 commit 4105082

4 files changed

Lines changed: 78 additions & 54 deletions

File tree

execution/subscription/legacy_handler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ func TestHandler_Handle(t *testing.T) {
417417
time.Sleep(10 * time.Millisecond)
418418
cancelFunc()
419419

420-
go sendChatMutation(t, chatServer.URL)
420+
sendChatMutation(t, chatServer.URL)
421421

422422
require.Eventually(t, func() bool {
423423
return client.hasMoreMessagesThan(0)
@@ -481,7 +481,7 @@ func TestHandler_Handle(t *testing.T) {
481481
time.Sleep(10 * time.Millisecond)
482482
cancelFunc()
483483

484-
go sendChatMutation(t, chatServer.URL)
484+
sendChatMutation(t, chatServer.URL)
485485

486486
require.Eventually(t, func() bool {
487487
return client.hasMoreMessagesThan(0)

v2/pkg/engine/resolve/inbound_request_singleflight.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package resolve
33
import (
44
"encoding/binary"
55
"sync"
6+
"sync/atomic"
67

78
"github.com/wundergraph/graphql-go-tools/v2/pkg/pool"
89
)
@@ -46,8 +47,15 @@ type InflightRequest struct {
4647
Err error
4748
ID uint64
4849

49-
HasFollowers bool
50-
Mu sync.Mutex
50+
followerCount atomic.Int32
51+
}
52+
53+
func (r *InflightRequest) AddFollower() {
54+
r.followerCount.Add(1)
55+
}
56+
57+
func (r *InflightRequest) HasFollowers() bool {
58+
return r.followerCount.Load() > 0
5159
}
5260

5361
// GetOrCreate creates a new InflightRequest or returns an existing (shared) one
@@ -90,9 +98,7 @@ func (r *InboundRequestSingleFlight) GetOrCreate(ctx *Context, response *GraphQL
9098
inflight, shared := shard.m.LoadOrStore(key, request)
9199
if shared {
92100
request = inflight.(*InflightRequest)
93-
request.Mu.Lock()
94-
request.HasFollowers = true
95-
request.Mu.Unlock()
101+
request.AddFollower()
96102
select {
97103
case <-request.Done:
98104
if request.Err != nil {
@@ -113,10 +119,7 @@ func (r *InboundRequestSingleFlight) FinishOk(req *InflightRequest, data []byte)
113119
}
114120
shard := r.shardFor(req.ID)
115121
shard.m.Delete(req.ID)
116-
req.Mu.Lock()
117-
hasFollowers := req.HasFollowers
118-
req.Mu.Unlock()
119-
if hasFollowers {
122+
if req.HasFollowers() {
120123
// optimization to only copy when we actually have to
121124
req.Data = make([]byte, len(data))
122125
copy(req.Data, data)

v2/pkg/engine/resolve/inbound_request_singleflight_test.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"sync"
66
"testing"
7+
"time"
78

89
"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
910
)
@@ -75,8 +76,7 @@ func TestInboundSingleFlight_FollowerReceivesLeaderError(t *testing.T) {
7576
}
7677

7778
// The follower calls GetOrCreate which blocks on inflight.Done.
78-
// We wait for HasFollowers to be set before calling FinishErr.
79-
followerReady := make(chan struct{})
79+
// We wait for followerCount to confirm it has entered before calling FinishErr.
8080
var wg sync.WaitGroup
8181
wg.Add(1)
8282

@@ -85,25 +85,20 @@ func TestInboundSingleFlight_FollowerReceivesLeaderError(t *testing.T) {
8585
followerCtx := NewContext(context.Background())
8686
followerCtx.Request.ID = 2
8787

88-
// Signal that we're about to enter GetOrCreate. HasFollowers will be
89-
// set inside GetOrCreate before the select blocks, so closing
90-
// followerReady here is slightly early, but we poll HasFollowers below.
91-
close(followerReady)
92-
9388
_, followerErr := sf.GetOrCreate(followerCtx, response)
9489
if followerErr == nil {
9590
t.Error("expected error from follower after leader FinishErr")
9691
}
9792
}()
9893

99-
<-followerReady
100-
// Spin until the follower has actually registered (set HasFollowers)
101-
for {
102-
inflight.Mu.Lock()
103-
ready := inflight.HasFollowers
104-
inflight.Mu.Unlock()
105-
if ready {
106-
break
94+
// Poll until the follower has actually registered inside GetOrCreate.
95+
deadline := time.After(3 * time.Second)
96+
for !inflight.HasFollowers() {
97+
select {
98+
case <-deadline:
99+
t.Fatal("timeout waiting for follower to enter singleflight")
100+
default:
101+
time.Sleep(10 * time.Millisecond)
107102
}
108103
}
109104

v2/pkg/engine/resolve/resolve_test.go

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,40 @@ func (w *blockingWriter) String() string {
149149
return w.buf.String()
150150
}
151151

152+
// findAnyInflight iterates through all singleflight shards and returns
153+
// the first inflight request found. Used in tests to poll followerCount.
154+
func findAnyInflight(r *Resolver) *InflightRequest {
155+
for i := range r.inboundRequestSingleFlight.shards {
156+
var found *InflightRequest
157+
r.inboundRequestSingleFlight.shards[i].m.Range(func(_, value any) bool {
158+
found = value.(*InflightRequest)
159+
return false
160+
})
161+
if found != nil {
162+
return found
163+
}
164+
}
165+
return nil
166+
}
167+
168+
// waitForFollowerCount polls until the inflight request has at least count followers registered.
169+
func waitForFollowerCount(t *testing.T, r *Resolver, count int32) {
170+
t.Helper()
171+
deadline := time.After(3 * time.Second)
172+
for {
173+
inflight := findAnyInflight(r)
174+
if inflight != nil && inflight.followerCount.Load() >= count {
175+
return
176+
}
177+
select {
178+
case <-deadline:
179+
t.Fatal("timeout waiting for followers to enter singleflight")
180+
default:
181+
time.Sleep(10 * time.Millisecond)
182+
}
183+
}
184+
}
185+
152186
type TestErrorWriter struct {
153187
}
154188

@@ -4694,30 +4728,20 @@ func TestResolver_ArenaResolveGraphQLResponse_RequestDeduplication(t *testing.T)
46944728
t.Fatalf("timeout waiting for leader data source load")
46954729
}
46964730

4697-
startFollowers := make(chan struct{})
4698-
followersEntered := make(chan struct{}, requestCount-1)
4699-
47004731
for i := 1; i < requestCount; i++ {
47014732
go func(i int) {
47024733
defer wg.Done()
47034734
ctx := ctxTemplate
4704-
<-startFollowers
4705-
followersEntered <- struct{}{}
47064735
buf := &bytes.Buffer{}
47074736
info, err := r.ArenaResolveGraphQLResponse(&ctx, response, buf)
47084737
results[i] = result{info: info, output: buf.String(), err: err}
47094738
}(i)
47104739
}
47114740

4712-
close(startFollowers)
4713-
4714-
for i := 1; i < requestCount; i++ {
4715-
select {
4716-
case <-followersEntered:
4717-
case <-time.After(time.Second):
4718-
t.Fatalf("timeout waiting for follower %d to start", i)
4719-
}
4720-
}
4741+
// Wait until all followers have entered the singleflight (called LoadOrStore)
4742+
// before releasing the data source. This guarantees they join the leader's
4743+
// inflight request rather than creating their own.
4744+
waitForFollowerCount(t, r, int32(requestCount-1))
47214745

47224746
ds.Release()
47234747

@@ -4823,9 +4847,6 @@ func TestResolver_ArenaResolveGraphQLResponse_RequestDeduplication_SharedData(t
48234847
t.Fatalf("timeout waiting for leader data source load")
48244848
}
48254849

4826-
startFollowers := make(chan struct{})
4827-
followersEntered := make(chan struct{}, requestCount-1)
4828-
48294850
for i := 1; i < requestCount; i++ {
48304851
go func(i int) {
48314852
defer wg.Done()
@@ -4838,23 +4859,16 @@ func TestResolver_ArenaResolveGraphQLResponse_RequestDeduplication_SharedData(t
48384859
followerData.Store(i, data)
48394860
},
48404861
)
4841-
<-startFollowers
4842-
followersEntered <- struct{}{}
48434862
buf := &bytes.Buffer{}
48444863
info, err := r.ArenaResolveGraphQLResponse(&ctx, response, buf)
48454864
results[i] = result{info: info, output: buf.String(), err: err}
48464865
}(i)
48474866
}
48484867

4849-
close(startFollowers)
4850-
4851-
for i := 1; i < requestCount; i++ {
4852-
select {
4853-
case <-followersEntered:
4854-
case <-time.After(time.Second):
4855-
t.Fatalf("timeout waiting for follower %d to start", i)
4856-
}
4857-
}
4868+
// Wait until all followers have entered the singleflight (called LoadOrStore)
4869+
// before releasing the data source. This guarantees they join the leader's
4870+
// inflight request rather than creating their own.
4871+
waitForFollowerCount(t, r, int32(requestCount-1))
48584872

48594873
ds.Release()
48604874

@@ -6426,10 +6440,21 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) {
64266440
c, cancel := context.WithCancel(context.Background())
64276441
defer cancel()
64286442

6443+
// sub2Ready gates the data source goroutine so that it doesn't start
6444+
// emitting before sub2 has been registered on the trigger. Without this,
6445+
// the emitting goroutine's first triggerUpdate can race sub2's
6446+
// addSubscription on the unbuffered events channel, causing sub2 to
6447+
// miss counter=0.
6448+
sub2Ready := make(chan struct{})
64296449
fakeStream := createFakeStream(func(counter int) (message string, done bool) {
64306450
return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 100
64316451
}, 1*time.Millisecond, func(input []byte) {
64326452
assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
6453+
// Block the data source goroutine until sub2 is registered.
6454+
// onStart runs inside the goroutine that calls Start(), not the
6455+
// event loop, so blocking here is safe — the event loop remains
6456+
// free to process sub2's addSubscription event.
6457+
<-sub2Ready
64336458
}, func(ctx StartupHookContext, input []byte) (err error) {
64346459
return nil
64356460
})
@@ -6451,6 +6476,7 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) {
64516476

64526477
err2 := resolver1.AsyncResolveGraphQLSubscription(ctx2, plan1, recorder2, id2)
64536478
assert.NoError(t, err2)
6479+
close(sub2Ready)
64546480

64556481
// complete is called only on the last recorder
64566482
recorder1.AwaitComplete(t, defaultTimeout)

0 commit comments

Comments
 (0)