Skip to content

Commit 5a61bda

Browse files
committed
collapse adapter readloop into a handler system instead of channels
1 parent a8b66c2 commit 5a61bda

16 files changed

Lines changed: 462 additions & 435 deletions

v2/pkg/engine/datasource/graphql_datasource/graphql_subscription_client.go

Lines changed: 28 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,36 @@ func NewGraphQLSubscriptionClient(ctx context.Context, opts ...SubscriptionClien
141141
}
142142

143143
// Subscribe implements GraphQLSubscriptionClient.
144-
// It bridges the channel-based new client API to the callback-based updater interface.
145144
func (c *subscriptionClientV2) Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
146145
opts, req, err := convertToClientOptions(options)
147146
if err != nil {
148147
return err
149148
}
150149

151-
msgCh, cancel, err := c.client.Subscribe(ctx.Context(), req, opts)
150+
handler := func(msg *client.Message) {
151+
switch msg.Type {
152+
case client.MessageTypeConnectionError:
153+
updater.Error(formatUpstreamServiceError(msg.Err))
154+
updater.Done()
155+
case client.MessageTypeError:
156+
data, _ := json.Marshal(msg.Payload)
157+
updater.Error(data)
158+
updater.Done()
159+
case client.MessageTypeData:
160+
data, err := json.Marshal(msg.Payload)
161+
if err != nil {
162+
updater.Error(formatSubscriptionError(err))
163+
updater.Done()
164+
return
165+
}
166+
updater.Update(data)
167+
case client.MessageTypeComplete:
168+
updater.Complete()
169+
updater.Done()
170+
}
171+
}
172+
173+
cancel, err := c.client.Subscribe(ctx.Context(), req, opts, handler)
152174
if err != nil {
153175
if isUpstreamError(err) {
154176
updater.Error(formatUpstreamServiceError(err))
@@ -158,57 +180,14 @@ func (c *subscriptionClientV2) Subscribe(ctx *resolve.Context, options GraphQLSu
158180
return err
159181
}
160182

161-
go c.readLoop(ctx.Context(), msgCh, cancel, updater)
183+
context.AfterFunc(ctx.Context(), func() {
184+
cancel()
185+
updater.Done()
186+
})
162187

163188
return nil
164189
}
165190

166-
// readLoop bridges the channel-based API to the callback-based updater.
167-
func (c *subscriptionClientV2) readLoop(ctx context.Context, msgCh <-chan *client.Message, cancel func(), updater resolve.SubscriptionUpdater) {
168-
defer cancel()
169-
170-
for {
171-
select {
172-
case <-ctx.Done():
173-
updater.Done()
174-
return
175-
176-
case msg, ok := <-msgCh:
177-
if !ok {
178-
updater.Done()
179-
return
180-
}
181-
182-
switch msg.Type {
183-
case client.MessageTypeConnectionError:
184-
updater.Error(formatUpstreamServiceError(msg.Err))
185-
updater.Done()
186-
return
187-
188-
case client.MessageTypeError:
189-
data, _ := json.Marshal(msg.Payload)
190-
updater.Error(data)
191-
updater.Done()
192-
return
193-
194-
case client.MessageTypeData:
195-
data, err := json.Marshal(msg.Payload)
196-
if err != nil {
197-
updater.Error(formatSubscriptionError(err))
198-
updater.Done()
199-
return
200-
}
201-
updater.Update(data)
202-
203-
case client.MessageTypeComplete:
204-
updater.Complete()
205-
updater.Done()
206-
return
207-
}
208-
}
209-
}
210-
}
211-
212191
// isUpstreamError reports whether err is a connection-level upstream error
213192
// that should be reported to the client as an UPSTREAM_SERVICE_ERROR.
214193
func isUpstreamError(err error) bool {

v2/pkg/engine/datasource/graphql_datasource/graphql_subscription_client_test.go

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package graphql_datasource
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"testing"
78

@@ -43,15 +44,37 @@ func (t *testBridgeUpdater) Subscriptions() map[context.Context]resolve.Subscrip
4344
return map[context.Context]resolve.SubscriptionIdentifier{}
4445
}
4546

46-
func TestReadLoopErrorHandling(t *testing.T) {
47+
func TestHandlerDeliversCorrectMessageForEachType(t *testing.T) {
48+
buildHandler := func(updater *testBridgeUpdater) client.Handler {
49+
return func(msg *client.Message) {
50+
switch msg.Type {
51+
case client.MessageTypeConnectionError:
52+
updater.Error(formatUpstreamServiceError(msg.Err))
53+
updater.Done()
54+
case client.MessageTypeError:
55+
data, _ := json.Marshal(msg.Payload)
56+
updater.Error(data)
57+
updater.Done()
58+
case client.MessageTypeData:
59+
data, err := json.Marshal(msg.Payload)
60+
if err != nil {
61+
updater.Error(formatSubscriptionError(err))
62+
updater.Done()
63+
return
64+
}
65+
updater.Update(data)
66+
case client.MessageTypeComplete:
67+
updater.Complete()
68+
updater.Done()
69+
}
70+
}
71+
}
72+
4773
t.Run("connection errors deliver error and done without updates", func(t *testing.T) {
4874
updater := &testBridgeUpdater{}
49-
msgCh := make(chan *client.Message, 1)
50-
msgCh <- &client.Message{Type: client.MessageTypeConnectionError, Err: client.ErrConnectionClosed}
51-
close(msgCh)
75+
handler := buildHandler(updater)
5276

53-
subClient := &subscriptionClientV2{}
54-
subClient.readLoop(context.Background(), msgCh, func() {}, updater)
77+
handler(&client.Message{Type: client.MessageTypeConnectionError, Err: client.ErrConnectionClosed})
5578

5679
require.True(t, updater.done)
5780
require.Len(t, updater.errors, 1)
@@ -61,52 +84,21 @@ func TestReadLoopErrorHandling(t *testing.T) {
6184

6285
t.Run("non-connection errors deliver error and done without updates", func(t *testing.T) {
6386
updater := &testBridgeUpdater{}
64-
msgCh := make(chan *client.Message, 1)
65-
msgCh <- &client.Message{Type: client.MessageTypeConnectionError, Err: errors.New("validation failed")}
66-
close(msgCh)
87+
handler := buildHandler(updater)
6788

68-
subClient := &subscriptionClientV2{}
69-
subClient.readLoop(context.Background(), msgCh, func() {}, updater)
89+
handler(&client.Message{Type: client.MessageTypeConnectionError, Err: errors.New("validation failed")})
7090

7191
require.True(t, updater.done)
7292
require.Len(t, updater.errors, 1)
7393
require.Len(t, updater.updates, 0)
7494
require.False(t, updater.completed)
7595
})
7696

77-
t.Run("context cancellation calls done without complete", func(t *testing.T) {
78-
updater := &testBridgeUpdater{}
79-
ctx, cancel := context.WithCancel(context.Background())
80-
cancel()
81-
msgCh := make(chan *client.Message)
82-
83-
subClient := &subscriptionClientV2{}
84-
subClient.readLoop(ctx, msgCh, func() {}, updater)
85-
86-
require.True(t, updater.done)
87-
require.False(t, updater.completed)
88-
})
89-
90-
t.Run("channel close calls done without complete", func(t *testing.T) {
91-
updater := &testBridgeUpdater{}
92-
msgCh := make(chan *client.Message)
93-
close(msgCh)
94-
95-
subClient := &subscriptionClientV2{}
96-
subClient.readLoop(context.Background(), msgCh, func() {}, updater)
97-
98-
require.True(t, updater.done)
99-
require.False(t, updater.completed)
100-
})
101-
102-
t.Run("done message calls complete then done", func(t *testing.T) {
97+
t.Run("complete message calls complete then done", func(t *testing.T) {
10398
updater := &testBridgeUpdater{}
104-
msgCh := make(chan *client.Message, 1)
105-
msgCh <- &client.Message{Type: client.MessageTypeComplete}
106-
close(msgCh)
99+
handler := buildHandler(updater)
107100

108-
subClient := &subscriptionClientV2{}
109-
subClient.readLoop(context.Background(), msgCh, func() {}, updater)
101+
handler(&client.Message{Type: client.MessageTypeComplete})
110102

111103
require.True(t, updater.done)
112104
require.True(t, updater.completed)

v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/client.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,23 +74,15 @@ func New(ctx context.Context, cfg Config) *Client {
7474
}
7575

7676
// Subscribe creates a new upstream via the appropriate transport.
77-
func (c *Client) Subscribe(ctx context.Context, req *common.Request, opts common.Options) (<-chan *common.Message, func(), error) {
77+
func (c *Client) Subscribe(ctx context.Context, req *common.Request, opts common.Options, handler common.Handler) (func(), error) {
7878
if c.ctx.Err() != nil {
79-
return nil, nil, ErrClientClosed
79+
return nil, ErrClientClosed
8080
}
8181

82-
// Route to transport
83-
var source <-chan *common.Message
84-
var cancel func()
85-
var err error
86-
8782
if opts.Transport == common.TransportSSE {
88-
source, cancel, err = c.sse.Subscribe(ctx, req, opts)
89-
} else {
90-
source, cancel, err = c.ws.Subscribe(ctx, req, opts)
83+
return c.sse.Subscribe(ctx, req, opts, handler)
9184
}
92-
93-
return source, cancel, err
85+
return c.ws.Subscribe(ctx, req, opts, handler)
9486
}
9587

9688
// Stats returns client statistics.

v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/client_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func TestClient(t *testing.T) {
3838
c := New(ctx, Config{})
3939
cancel()
4040

41-
_, _, err := c.Subscribe(t.Context(), &Request{Query: "subscription { a }"}, Options{
41+
_, err := c.Subscribe(t.Context(), &Request{Query: "subscription { a }"}, Options{
4242
Endpoint: "ws://localhost/graphql",
43-
})
43+
}, func(_ *common.Message) {})
4444

4545
assert.Equal(t, ErrClientClosed, err)
4646
})
@@ -57,11 +57,14 @@ func TestClient_SubscriberDrain(t *testing.T) {
5757

5858
c := New(t.Context(), Config{})
5959

60-
ch, subCancel, err := c.Subscribe(context.Background(), &common.Request{
60+
ch := make(chan *common.Message, 1)
61+
subCancel, err := c.Subscribe(context.Background(), &common.Request{
6162
Query: "subscription { test }",
6263
}, common.Options{
6364
Endpoint: server.URL,
6465
Transport: common.TransportWS,
66+
}, func(msg *common.Message) {
67+
ch <- msg
6568
})
6669
require.NoError(t, err)
6770

@@ -92,12 +95,15 @@ func TestClient_SubscriberDrain(t *testing.T) {
9295
// Start subscriptions with different headers (forces multiple connections)
9396
for i := range 3 {
9497
headers := http.Header{"X-Request-ID": []string{string(rune('A' + i))}}
95-
ch, subCancel, err := c.Subscribe(context.Background(), &common.Request{
98+
ch := make(chan *common.Message, 1)
99+
subCancel, err := c.Subscribe(context.Background(), &common.Request{
96100
Query: "subscription { test }",
97101
}, common.Options{
98102
Endpoint: server.URL,
99103
Transport: common.TransportWS,
100104
Headers: headers,
105+
}, func(msg *common.Message) {
106+
ch <- msg
101107
})
102108
require.NoError(t, err)
103109
cancels[i] = subCancel
@@ -185,11 +191,14 @@ func TestClient_CancelSendsComplete(t *testing.T) {
185191

186192
c := New(t.Context(), Config{})
187193

188-
ch, cancel, err := c.Subscribe(t.Context(), &common.Request{
194+
ch := make(chan *common.Message, 1)
195+
cancel, err := c.Subscribe(t.Context(), &common.Request{
189196
Query: "subscription { test }",
190197
}, common.Options{
191198
Endpoint: server.URL,
192199
Transport: common.TransportWS,
200+
}, func(msg *common.Message) {
201+
ch <- msg
193202
})
194203
require.NoError(t, err)
195204

v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/common/message.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ type Message struct {
2929
Err error // only set when Type == MessageTypeConnectionError
3030
}
3131

32+
// Handler receives subscription messages. It is called synchronously on the
33+
// transport's read goroutine; a slow handler blocks message delivery.
34+
type Handler func(msg *Message)
35+
3236
type ExecutionResult struct {
3337
Data json.RawMessage `json:"data,omitempty"`
3438
Errors json.RawMessage `json:"errors,omitempty"`

v2/pkg/engine/datasource/graphql_datasource/subscriptionclient/exports.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type (
1414
ExecutionResult = common.ExecutionResult
1515
Request = common.Request
1616
Options = common.Options
17+
Handler = common.Handler
1718
TransportType = common.TransportType
1819
WSSubprotocol = common.WSSubprotocol
1920
SSEMethod = common.SSEMethod

0 commit comments

Comments
 (0)