Skip to content

Commit 86aff15

Browse files
committed
draft: resolving defer response
1 parent 00807eb commit 86aff15

7 files changed

Lines changed: 241 additions & 5 deletions

File tree

execution/engine/execution_engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (e *ExecutionEngine) Execute(ctx context.Context, operation *graphql.Reques
155155
astnormalization.WithRemoveFragmentDefinitions(),
156156
astnormalization.WithRemoveUnusedVariables(),
157157
astnormalization.WithInlineFragmentSpreads(),
158+
astnormalization.WithInlineDefer(),
158159
)
159160
if err != nil {
160161
return err
@@ -229,6 +230,9 @@ func (e *ExecutionEngine) Execute(ctx context.Context, operation *graphql.Reques
229230
case *plan.SynchronousResponsePlan:
230231
_, err := e.resolver.ResolveGraphQLResponse(execContext.resolveContext, p.Response, nil, writer)
231232
return err
233+
case *plan.DeferResponsePlan:
234+
_, err := e.resolver.ResolveGraphQLDeferResponse(execContext.resolveContext, p.Response, writer)
235+
return err
232236
case *plan.SubscriptionResponsePlan:
233237
return e.resolver.ResolveGraphQLSubscription(execContext.resolveContext, p.Response, writer)
234238
default:

execution/engine/execution_engine_test.go

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ type _executionTestOptions struct {
220220
apolloRouterCompatibilitySubrequestHTTPError bool
221221
propagateFetchReasons bool
222222
validateRequiredExternalFields bool
223+
streamingResponse bool
223224
}
224225

225226
type executionTestOptions func(*_executionTestOptions)
@@ -244,6 +245,12 @@ func validateRequiredExternalFields() executionTestOptions {
244245
}
245246
}
246247

248+
func withStreamingResponse() executionTestOptions {
249+
return func(options *_executionTestOptions) {
250+
options.streamingResponse = true
251+
}
252+
}
253+
247254
func TestExecutionEngine_Execute(t *testing.T) {
248255
run := func(testCase ExecutionEngineTestCase, withError bool, expectedErrorMessage string, options ...executionTestOptions) func(t *testing.T) {
249256
t.Helper()
@@ -289,6 +296,14 @@ func TestExecutionEngine_Execute(t *testing.T) {
289296

290297
operation := testCase.operation(t)
291298
resultWriter := graphql.NewEngineResultWriter()
299+
300+
streamingBuf := bytes.NewBuffer(nil)
301+
if opts.streamingResponse {
302+
resultWriter.SetFlushCallback(func(data []byte) {
303+
streamingBuf.Write(data)
304+
})
305+
}
306+
292307
execCtx, execCtxCancel := context.WithCancel(context.Background())
293308
defer execCtxCancel()
294309
err = engine.Execute(execCtx, &operation, &resultWriter, testCase.engineOptions...)
@@ -311,7 +326,12 @@ func TestExecutionEngine_Execute(t *testing.T) {
311326
}
312327

313328
if testCase.expectedResponse != "" {
314-
assert.Equal(t, testCase.expectedResponse, actualResponse)
329+
if opts.streamingResponse {
330+
streamingResponse := streamingBuf.String()
331+
assert.Equal(t, testCase.expectedResponse, streamingResponse)
332+
} else {
333+
assert.Equal(t, testCase.expectedResponse, actualResponse)
334+
}
315335
}
316336

317337
if withError {
@@ -5603,6 +5623,100 @@ func TestExecutionEngine_Execute(t *testing.T) {
56035623
}, withFetchReasons(), validateRequiredExternalFields()))
56045624
})
56055625
})
5626+
5627+
t.Run("defer", func(t *testing.T) {
5628+
t.Run("simple", func(t *testing.T) {
5629+
5630+
definition := `
5631+
type User {
5632+
id: ID!
5633+
name: String!
5634+
title: String!
5635+
}
5636+
5637+
type Query {
5638+
user: User!
5639+
}
5640+
`
5641+
5642+
makeDataSource := func(t *testing.T, expectFetchReasons bool) []plan.DataSource {
5643+
return []plan.DataSource{
5644+
mustGraphqlDataSourceConfiguration(t,
5645+
"id-1",
5646+
mustFactory(t,
5647+
testConditionalNetHttpClient(t, conditionalTestCase{
5648+
expectedHost: "first",
5649+
expectedPath: "/",
5650+
responses: map[string]sendResponse{
5651+
`{"query":"{user {name}}"}`: {
5652+
statusCode: 200,
5653+
body: `{"data":{"user":{"name":"Black"}}}`,
5654+
},
5655+
`{"query":"{user {title}}"}`: {
5656+
statusCode: 200,
5657+
body: `{"data":{"user":{"title":"Sabbat"}}}`,
5658+
},
5659+
},
5660+
}),
5661+
),
5662+
&plan.DataSourceMetadata{
5663+
RootNodes: []plan.TypeField{
5664+
{
5665+
TypeName: "Query",
5666+
FieldNames: []string{"user"},
5667+
},
5668+
},
5669+
ChildNodes: []plan.TypeField{
5670+
{
5671+
TypeName: "User",
5672+
FieldNames: []string{"id", "title", "name"},
5673+
},
5674+
},
5675+
},
5676+
mustConfiguration(t, graphql_datasource.ConfigurationInput{
5677+
Fetch: &graphql_datasource.FetchConfiguration{
5678+
URL: "https://first/",
5679+
Method: "POST",
5680+
},
5681+
SchemaConfiguration: mustSchemaConfig(
5682+
t,
5683+
&graphql_datasource.FederationConfiguration{
5684+
Enabled: true,
5685+
ServiceSDL: definition,
5686+
},
5687+
definition,
5688+
),
5689+
}),
5690+
),
5691+
}
5692+
}
5693+
5694+
t.Run("run", runWithoutError(ExecutionEngineTestCase{
5695+
schema: func(t *testing.T) *graphql.Schema {
5696+
t.Helper()
5697+
parseSchema, err := graphql.NewSchemaFromString(definition)
5698+
require.NoError(t, err)
5699+
return parseSchema
5700+
}(t),
5701+
operation: func(t *testing.T) graphql.Request {
5702+
return graphql.Request{
5703+
OperationName: "DeferUserTitle",
5704+
Query: `
5705+
query DeferUserTitle {
5706+
user {
5707+
name
5708+
... @defer {
5709+
title
5710+
}
5711+
}
5712+
}`,
5713+
}
5714+
},
5715+
dataSources: makeDataSource(t, false),
5716+
expectedResponse: `{"data":{"user":{"name":"Black"}}}{"name":"Black"}{"data":{{"name":"Black"}}}`,
5717+
}, withStreamingResponse()))
5718+
})
5719+
})
56065720
}
56075721

56085722
func testNetHttpClient(t *testing.T, testCase roundTripperTestCase) *http.Client {

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ func (p *Planner[T]) EnterDirective(ref int) {
150150
}
151151

152152
func (p *Planner[T]) addDirectiveToNode(directiveRef int, node ast.Node) {
153+
// do not propagate internal directives to upstream query document
154+
if bytes.Equal(p.visitor.Operation.DirectiveNameBytes(directiveRef), literal.DEFER_INTERNAL) {
155+
return
156+
}
157+
153158
directiveName := p.visitor.Operation.DirectiveNameString(directiveRef)
154159
operationType := ast.OperationTypeQuery
155160
if !p.dataSourcePlannerConfig.IsNested {

v2/pkg/engine/resolve/loader.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,19 @@ func (l *Loader) Free() {
190190
}
191191

192192
func (l *Loader) LoadGraphQLResponseData(ctx *Context, response *GraphQLResponse, resolvable *Resolvable) (err error) {
193+
l.Init(ctx, response.Info, resolvable)
194+
195+
return l.ResolveFetchNode(response.Fetches)
196+
}
197+
198+
func (l *Loader) Init(ctx *Context, responseInfo *GraphQLResponseInfo, resolvable *Resolvable) {
193199
l.resolvable = resolvable
194200
l.ctx = ctx
195-
l.info = response.Info
201+
l.info = responseInfo
196202
l.taintedObjs = make(taintedObjects)
197-
return l.resolveFetchNode(response.Fetches)
198203
}
199204

200-
func (l *Loader) resolveFetchNode(node *FetchTreeNode) error {
205+
func (l *Loader) ResolveFetchNode(node *FetchTreeNode) error {
201206
if node == nil {
202207
return nil
203208
}
@@ -264,7 +269,7 @@ func (l *Loader) resolveParallel(nodes []*FetchTreeNode) error {
264269

265270
func (l *Loader) resolveSerial(nodes []*FetchTreeNode) error {
266271
for i := range nodes {
267-
err := l.resolveFetchNode(nodes[i])
272+
err := l.ResolveFetchNode(nodes[i])
268273
if err != nil {
269274
return errors.WithStack(err)
270275
}
@@ -629,6 +634,7 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson
629634
if responseData.Type() != astjson.TypeObject {
630635
return l.renderErrorsFailedToFetch(fetchItem, res, invalidGraphQLResponseShape)
631636
}
637+
// TODO: unclear why we doing this
632638
l.resolvable.data = responseData
633639
return nil
634640
}

v2/pkg/engine/resolve/resolvable.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type Resolvable struct {
5050
wroteErrors bool
5151
wroteData bool
5252
skipValueCompletion bool
53+
deferMode bool
54+
deferID string
5355

5456
typeNames [][]byte
5557

@@ -262,6 +264,10 @@ func (r *Resolvable) printData(root *Object) {
262264
r.printBytes(colon)
263265
r.printBytes(lBrace)
264266
r.print = true
267+
268+
if r.deferMode && r.deferID != "" {
269+
r.print = false
270+
}
265271
_ = r.walkObject(root, r.data)
266272
r.print = false
267273
r.printBytes(rBrace)
@@ -615,6 +621,21 @@ func (r *Resolvable) walkObject(obj *Object, parent *astjson.Value) bool {
615621
}
616622
}
617623

624+
hasDeferredFields := false
625+
for i := range obj.Fields {
626+
if obj.Fields[i].Defer != nil && obj.Fields[i].Defer.DeferID == r.deferID {
627+
hasDeferredFields = true
628+
}
629+
}
630+
631+
if hasDeferredFields {
632+
// enable printing
633+
if !r.print {
634+
r.print = true
635+
defer func() { r.print = false }()
636+
}
637+
}
638+
618639
if r.print && !isRoot {
619640
r.printBytes(lBrace)
620641
}
@@ -625,6 +646,22 @@ func (r *Resolvable) walkObject(obj *Object, parent *astjson.Value) bool {
625646
r.typeNames = r.typeNames[:len(r.typeNames)-1]
626647
}()
627648
for i := range obj.Fields {
649+
if r.deferMode {
650+
651+
// for initial response render only fields without a defer id
652+
if r.deferID == "" && obj.Fields[i].Defer != nil {
653+
continue
654+
}
655+
656+
// skip fields with different defer id in deferred response
657+
if r.deferID != "" && obj.Fields[i].Defer != nil && obj.Fields[i].Defer.DeferID == r.deferID {
658+
continue
659+
}
660+
661+
// when defer id matches render field - render it
662+
// walk objects to find other defers
663+
}
664+
628665
if obj.Fields[i].ParentOnTypeNames != nil {
629666
if r.skipFieldOnParentTypeNames(obj.Fields[i]) {
630667
continue

v2/pkg/engine/resolve/resolve.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,71 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
291291
return resp, err
292292
}
293293

294+
func (r *Resolver) ResolveGraphQLDeferResponse(ctx *Context, response *GraphQLDeferResponse, writer DeferResponseWriter) (*GraphQLResolveInfo, error) {
295+
resp := &GraphQLResolveInfo{}
296+
297+
start := time.Now()
298+
<-r.maxConcurrency
299+
resp.ResolveAcquireWaitTime = time.Since(start)
300+
defer func() {
301+
r.maxConcurrency <- struct{}{}
302+
}()
303+
304+
t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)
305+
306+
err := t.resolvable.Init(ctx, nil, response.Response.Info.OperationType)
307+
if err != nil {
308+
return nil, err
309+
}
310+
311+
if !ctx.ExecutionOptions.SkipLoader {
312+
t.loader.Init(ctx, response.Response.Info, t.resolvable)
313+
314+
// fetch initial response
315+
if err := t.loader.ResolveFetchNode(response.Response.Fetches); err != nil {
316+
return nil, err
317+
}
318+
319+
t.resolvable.deferMode = true
320+
t.resolvable.deferID = ""
321+
322+
// render initial response
323+
err = t.resolvable.Resolve(ctx.ctx, response.Response.Data, response.Response.Fetches, writer)
324+
if err != nil {
325+
return nil, err
326+
}
327+
328+
err = writer.Flush()
329+
if err != nil {
330+
return nil, err
331+
}
332+
333+
// fetch deferred responses
334+
335+
for _, deferGroup := range response.Defers {
336+
if err := t.loader.ResolveFetchNode(deferGroup.Fetches); err != nil {
337+
return nil, err
338+
}
339+
340+
// render deferred response
341+
t.resolvable.deferID = deferGroup.DeferID
342+
err = t.resolvable.Resolve(ctx.ctx, response.Response.Data, deferGroup.Fetches, writer)
343+
if err != nil {
344+
return nil, err
345+
}
346+
347+
// flush after each deferred response
348+
349+
err = writer.Flush()
350+
if err != nil {
351+
return nil, err
352+
}
353+
}
354+
}
355+
356+
return resp, err
357+
}
358+
294359
type trigger struct {
295360
id uint64
296361
cancel context.CancelFunc

v2/pkg/engine/resolve/response.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ type ResponseWriter interface {
6363
io.Writer
6464
}
6565

66+
type DeferResponseWriter interface {
67+
ResponseWriter
68+
Flush() error
69+
}
70+
6671
type SubscriptionCloseKind struct {
6772
WSCode ws.StatusCode
6873
Reason string

0 commit comments

Comments
 (0)