Skip to content

Commit 4423d60

Browse files
authored
feat: improved subscription heartbeats (#1269)
<!-- Important: Before developing new features, please open an issue to discuss your ideas with the maintainers. This ensures project alignment and helps avoid unnecessary work for you. Thank you for your contribution! Please provide a detailed description below and ensure you've met all the requirements. Squashed commit messages must follow the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) standard to facilitate changelog generation. Please ensure your PR title follows the Conventional Commits specification, using the appropriate type (e.g., feat, fix, docs) and scope. Examples of good PR titles: - 💥feat!: change implementation in an non-backward compatible way - ✨feat(auth): add support for OAuth2 login - 🐞fix(router): add support for custom metrics - 📚docs(README): update installation instructions - 🧹chore(deps): bump dependencies to latest versions --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added explicit subscription heartbeat signaling support to response writers. * **Bug Fixes** * Heartbeats now stop promptly on client disconnect or resolver shutdown, preventing stray heartbeat errors. * **Performance** * Heartbeat emission streamlined to a single writer call, reducing overhead and improving idle stability. * **Refactor** * Consolidated and renamed subscription heartbeat configuration for clearer setup and consistent behavior. * **Tests** * Subscription tests updated to record and verify heartbeat events; test writers now support heartbeat signaling. <!-- end of auto-generated comment: release notes by coderabbit.ai --> ## Checklist - [ ] I have discussed my proposed changes in an issue and have received approval to proceed. - [ ] I have followed the coding standards of the project. - [ ] Tests or benchmarks have been added or updated. <!-- Please add any additional information or context regarding your changes here. -->
1 parent e8bcf31 commit 4423d60

5 files changed

Lines changed: 37 additions & 29 deletions

File tree

execution/graphql/result_writer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ func (e *EngineResultWriter) Complete() {
3535

3636
}
3737

38+
func (e *EngineResultWriter) Heartbeat() error {
39+
return nil
40+
}
41+
3842
func (e *EngineResultWriter) Close(_ resolve.SubscriptionCloseKind) {
3943

4044
}

v2/pkg/engine/resolve/event_loop_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ func (f *FakeSubscriptionWriter) Complete() {
5151
f.messageCountOnComplete = len(f.writtenMessages)
5252
}
5353

54+
// Heartbeat writes directly to the writtenMessages slice, as the real implementations implicitly flush
55+
func (f *FakeSubscriptionWriter) Heartbeat() error {
56+
f.mu.Lock()
57+
defer f.mu.Unlock()
58+
f.writtenMessages = append(f.writtenMessages, string("heartbeat"))
59+
return nil
60+
}
61+
5462
func (f *FakeSubscriptionWriter) Close(SubscriptionCloseKind) {
5563
f.mu.Lock()
5664
defer f.mu.Unlock()
@@ -123,7 +131,7 @@ func TestEventLoop(t *testing.T) {
123131
SubgraphErrorPropagationMode: SubgraphErrorPropagationModePassThrough,
124132
DefaultErrorExtensionCode: "TEST",
125133
MaxRecyclableParserSize: 1024 * 1024,
126-
MultipartSubHeartbeatInterval: DefaultHeartbeatInterval,
134+
SubscriptionHeartbeatInterval: DefaultHeartbeatInterval,
127135
Reporter: testReporter,
128136
})
129137

v2/pkg/engine/resolve/resolve.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ const (
2121
DefaultHeartbeatInterval = 5 * time.Second
2222
)
2323

24-
var (
25-
multipartHeartbeat = []byte("{}")
26-
)
27-
2824
// ConnectionIDs is used to create unique connection IDs for each subscription
2925
// Whenever a new connection is created, use this to generate a new ID
3026
// It is public because it can be used in more high level packages to instantiate a new connection
@@ -69,7 +65,7 @@ type Resolver struct {
6965

7066
propagateSubgraphErrors bool
7167
propagateSubgraphStatusCodes bool
72-
// Multipart heartbeat interval
68+
// Subscription heartbeat interval
7369
heartbeatInterval time.Duration
7470
// maxSubscriptionFetchTimeout defines the maximum time a subscription fetch can take before it is considered timed out
7571
maxSubscriptionFetchTimeout time.Duration
@@ -143,8 +139,8 @@ type ResolverOptions struct {
143139
ResolvableOptions ResolvableOptions
144140
// AllowedCustomSubgraphErrorFields defines which fields are allowed in the subgraph error when in passthrough mode
145141
AllowedSubgraphErrorFields []string
146-
// MultipartSubHeartbeatInterval defines the interval in which a heartbeat is sent to all multipart subscriptions
147-
MultipartSubHeartbeatInterval time.Duration
142+
// SubscriptionHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
143+
SubscriptionHeartbeatInterval time.Duration
148144
// MaxSubscriptionFetchTimeout defines the maximum time a subscription fetch can take before it is considered timed out
149145
MaxSubscriptionFetchTimeout time.Duration
150146
// ApolloRouterCompatibilitySubrequestHTTPError is a compatibility flag for Apollo Router, it is used to handle HTTP errors in subrequests differently
@@ -158,8 +154,8 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
158154
options.MaxConcurrency = 32
159155
}
160156

161-
if options.MultipartSubHeartbeatInterval <= 0 {
162-
options.MultipartSubHeartbeatInterval = DefaultHeartbeatInterval
157+
if options.SubscriptionHeartbeatInterval <= 0 {
158+
options.SubscriptionHeartbeatInterval = DefaultHeartbeatInterval
163159
}
164160

165161
// We transform the allowed fields into a map for faster lookups
@@ -202,7 +198,7 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
202198
triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
203199
allowedErrorExtensionFields: allowedExtensionFields,
204200
allowedErrorFields: allowedErrorFields,
205-
heartbeatInterval: options.MultipartSubHeartbeatInterval,
201+
heartbeatInterval: options.SubscriptionHeartbeatInterval,
206202
maxSubscriptionFetchTimeout: options.MaxSubscriptionFetchTimeout,
207203
}
208204
resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency)
@@ -310,8 +306,8 @@ func (s *sub) startWorker() {
310306
s.startWorkerWithoutHeartbeat()
311307
}
312308

313-
// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when
314-
// subscription over multipart is used. It sends a heartbeat to the client every heartbeatInterval.
309+
// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when enabled.
310+
// It sends a heartbeat to the client every heartbeatInterval. Heartbeats are handled by the SubscriptionResponseWriter interface.
315311
// TODO: Implement a shared timer implementation to avoid creating a new ticker for each subscription.
316312
func (s *sub) startWorkerWithHeartbeat() {
317313
heartbeatTicker := time.NewTicker(s.resolver.heartbeatInterval)
@@ -330,7 +326,7 @@ func (s *sub) startWorkerWithHeartbeat() {
330326

331327
return
332328
case <-heartbeatTicker.C:
333-
s.resolver.handleHeartbeat(s, multipartHeartbeat)
329+
s.resolver.handleHeartbeat(s)
334330
case work := <-s.workChan:
335331
work.fn()
336332

@@ -501,7 +497,7 @@ func (r *Resolver) handleEvent(event subscriptionEvent) {
501497
}
502498

503499
// handleHeartbeat sends a heartbeat to the client. It needs to be executed on the same goroutine as the writer.
504-
func (r *Resolver) handleHeartbeat(sub *sub, data []byte) {
500+
func (r *Resolver) handleHeartbeat(sub *sub) {
505501
if r.options.Debug {
506502
fmt.Printf("resolver:heartbeat\n")
507503
}
@@ -518,24 +514,16 @@ func (r *Resolver) handleHeartbeat(sub *sub, data []byte) {
518514
fmt.Printf("resolver:heartbeat:subscription:%d\n", sub.id.SubscriptionID)
519515
}
520516

521-
if _, err := sub.writer.Write(data); err != nil {
522-
if errors.Is(err, context.Canceled) {
523-
// If Write fails (e.g. client disconnected), remove the subscription.
524-
_ = r.AsyncUnsubscribeSubscription(sub.id)
525-
return
526-
}
527-
r.asyncErrorWriter.WriteError(sub.ctx, err, nil, sub.writer)
528-
}
529-
err := sub.writer.Flush()
530-
if err != nil {
531-
// If flush fails (e.g. client disconnected), remove the subscription.
517+
if err := sub.writer.Heartbeat(); err != nil {
518+
// If heartbeat fails (e.g. client disconnected), remove the subscription.
532519
_ = r.AsyncUnsubscribeSubscription(sub.id)
533520
return
534521
}
535522

536523
if r.options.Debug {
537-
fmt.Printf("resolver:heartbeat:subscription:flushed:%d\n", sub.id.SubscriptionID)
524+
fmt.Printf("resolver:heartbeat:subscription:done:%d\n", sub.id.SubscriptionID)
538525
}
526+
539527
if r.reporter != nil {
540528
r.reporter.SubscriptionUpdateSent()
541529
}

v2/pkg/engine/resolve/resolve_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (t *TestErrorWriter) WriteError(ctx *Context, err error, res *GraphQLRespon
8686
}
8787
}
8888

89-
var multipartSubHeartbeatInterval = 100 * time.Millisecond
89+
var subscriptionHeartbeatInterval = 100 * time.Millisecond
9090

9191
func newResolver(ctx context.Context) *Resolver {
9292
return New(ctx, ResolverOptions{
@@ -95,7 +95,7 @@ func newResolver(ctx context.Context) *Resolver {
9595
PropagateSubgraphErrors: true,
9696
PropagateSubgraphStatusCodes: true,
9797
AsyncErrorWriter: &TestErrorWriter{},
98-
MultipartSubHeartbeatInterval: multipartSubHeartbeatInterval,
98+
SubscriptionHeartbeatInterval: subscriptionHeartbeatInterval,
9999
})
100100
}
101101

@@ -4777,6 +4777,13 @@ func (s *SubscriptionRecorder) Complete() {
47774777
s.complete.Store(true)
47784778
}
47794779

4780+
func (s *SubscriptionRecorder) Heartbeat() error {
4781+
s.mux.Lock()
4782+
defer s.mux.Unlock()
4783+
s.messages = append(s.messages, "heartbeat")
4784+
return nil
4785+
}
4786+
47804787
func (s *SubscriptionRecorder) Close(_ SubscriptionCloseKind) {
47814788
s.closed.Store(true)
47824789
}

v2/pkg/engine/resolve/response.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type SubscriptionResponseWriter interface {
6767
ResponseWriter
6868
Flush() error
6969
Complete()
70+
Heartbeat() error
7071
Close(kind SubscriptionCloseKind)
7172
}
7273

0 commit comments

Comments
 (0)