Skip to content

Commit a8a7bff

Browse files
committed
draft: resolving defer response
1 parent e4a562b commit a8a7bff

7 files changed

Lines changed: 240 additions & 5 deletions

File tree

execution/engine/execution_engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func (e *ExecutionEngine) Execute(ctx context.Context, operation *graphql.Reques
157157
astnormalization.WithRemoveFragmentDefinitions(),
158158
astnormalization.WithRemoveUnusedVariables(),
159159
astnormalization.WithInlineFragmentSpreads(),
160+
astnormalization.WithInlineDefer(),
160161
)
161162
if err != nil {
162163
return err
@@ -237,6 +238,9 @@ func (e *ExecutionEngine) Execute(ctx context.Context, operation *graphql.Reques
237238
case *plan.SynchronousResponsePlan:
238239
_, err := e.resolver.ResolveGraphQLResponse(execContext.resolveContext, p.Response, nil, writer)
239240
return err
241+
case *plan.DeferResponsePlan:
242+
_, err := e.resolver.ResolveGraphQLDeferResponse(execContext.resolveContext, p.Response, writer)
243+
return err
240244
case *plan.SubscriptionResponsePlan:
241245
return e.resolver.ResolveGraphQLSubscription(execContext.resolveContext, p.Response, writer)
242246
default:

execution/engine/execution_engine_test.go

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ type _executionTestOptions struct {
224224
validateRequiredExternalFields bool
225225
computeStaticCost bool
226226
relaxFieldSelectionMergingNullability bool
227+
streamingResponse bool
227228
}
228229

229230
type executionTestOptions func(*_executionTestOptions)
@@ -257,6 +258,11 @@ func computeStaticCost() executionTestOptions {
257258
func relaxFieldSelectionMergingNullability() executionTestOptions {
258259
return func(options *_executionTestOptions) {
259260
options.relaxFieldSelectionMergingNullability = true
261+
}
262+
263+
func withStreamingResponse() executionTestOptions {
264+
return func(options *_executionTestOptions) {
265+
options.streamingResponse = true
260266
}
261267
}
262268

@@ -309,6 +315,14 @@ func TestExecutionEngine_Execute(t *testing.T) {
309315

310316
operation := testCase.operation(t)
311317
resultWriter := graphql.NewEngineResultWriter()
318+
319+
streamingBuf := bytes.NewBuffer(nil)
320+
if opts.streamingResponse {
321+
resultWriter.SetFlushCallback(func(data []byte) {
322+
streamingBuf.Write(data)
323+
})
324+
}
325+
312326
execCtx, execCtxCancel := context.WithCancel(context.Background())
313327
defer execCtxCancel()
314328
err = engine.Execute(execCtx, &operation, &resultWriter, testCase.engineOptions...)
@@ -340,7 +354,12 @@ func TestExecutionEngine_Execute(t *testing.T) {
340354
}
341355

342356
if testCase.expectedResponse != "" {
343-
assert.Equal(t, testCase.expectedResponse, actualResponse)
357+
if opts.streamingResponse {
358+
streamingResponse := streamingBuf.String()
359+
assert.Equal(t, testCase.expectedResponse, streamingResponse)
360+
} else {
361+
assert.Equal(t, testCase.expectedResponse, actualResponse)
362+
}
344363
}
345364

346365
if testCase.expectedStaticCost != 0 {
@@ -7024,6 +7043,100 @@ func TestExecutionEngine_Execute(t *testing.T) {
70247043
relaxFieldSelectionMergingNullability(),
70257044
))
70267045
})
7046+
7047+
t.Run("defer", func(t *testing.T) {
7048+
t.Run("simple", func(t *testing.T) {
7049+
7050+
definition := `
7051+
type User {
7052+
id: ID!
7053+
name: String!
7054+
title: String!
7055+
}
7056+
7057+
type Query {
7058+
user: User!
7059+
}
7060+
`
7061+
7062+
makeDataSource := func(t *testing.T, expectFetchReasons bool) []plan.DataSource {
7063+
return []plan.DataSource{
7064+
mustGraphqlDataSourceConfiguration(t,
7065+
"id-1",
7066+
mustFactory(t,
7067+
testConditionalNetHttpClient(t, conditionalTestCase{
7068+
expectedHost: "first",
7069+
expectedPath: "/",
7070+
responses: map[string]sendResponse{
7071+
`{"query":"{user {name}}"}`: {
7072+
statusCode: 200,
7073+
body: `{"data":{"user":{"name":"Black"}}}`,
7074+
},
7075+
`{"query":"{user {title}}"}`: {
7076+
statusCode: 200,
7077+
body: `{"data":{"user":{"title":"Sabbat"}}}`,
7078+
},
7079+
},
7080+
}),
7081+
),
7082+
&plan.DataSourceMetadata{
7083+
RootNodes: []plan.TypeField{
7084+
{
7085+
TypeName: "Query",
7086+
FieldNames: []string{"user"},
7087+
},
7088+
},
7089+
ChildNodes: []plan.TypeField{
7090+
{
7091+
TypeName: "User",
7092+
FieldNames: []string{"id", "title", "name"},
7093+
},
7094+
},
7095+
},
7096+
mustConfiguration(t, graphql_datasource.ConfigurationInput{
7097+
Fetch: &graphql_datasource.FetchConfiguration{
7098+
URL: "https://first/",
7099+
Method: "POST",
7100+
},
7101+
SchemaConfiguration: mustSchemaConfig(
7102+
t,
7103+
&graphql_datasource.FederationConfiguration{
7104+
Enabled: true,
7105+
ServiceSDL: definition,
7106+
},
7107+
definition,
7108+
),
7109+
}),
7110+
),
7111+
}
7112+
}
7113+
7114+
t.Run("run", runWithoutError(ExecutionEngineTestCase{
7115+
schema: func(t *testing.T) *graphql.Schema {
7116+
t.Helper()
7117+
parseSchema, err := graphql.NewSchemaFromString(definition)
7118+
require.NoError(t, err)
7119+
return parseSchema
7120+
}(t),
7121+
operation: func(t *testing.T) graphql.Request {
7122+
return graphql.Request{
7123+
OperationName: "DeferUserTitle",
7124+
Query: `
7125+
query DeferUserTitle {
7126+
user {
7127+
name
7128+
... @defer {
7129+
title
7130+
}
7131+
}
7132+
}`,
7133+
}
7134+
},
7135+
dataSources: makeDataSource(t, false),
7136+
expectedResponse: `{"data":{"user":{"name":"Black"}}}{"name":"Black"}{"data":{{"name":"Black"}}}`,
7137+
}, withStreamingResponse()))
7138+
})
7139+
})
70277140
}
70287141

70297142
func testNetHttpClient(t *testing.T, testCase roundTripperTestCase) *http.Client {

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ func (p *Planner[T]) EnterDirective(ref int) {
151151
}
152152

153153
func (p *Planner[T]) addDirectiveToNode(directiveRef int, node ast.Node) {
154+
// do not propagate internal directives to upstream query document
155+
if bytes.Equal(p.visitor.Operation.DirectiveNameBytes(directiveRef), literal.DEFER_INTERNAL) {
156+
return
157+
}
158+
154159
directiveName := p.visitor.Operation.DirectiveNameString(directiveRef)
155160
operationType := ast.OperationTypeQuery
156161
if !p.dataSourcePlannerConfig.IsNested {

v2/pkg/engine/resolve/loader.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,19 @@ func (l *Loader) Free() {
214214
}
215215

216216
func (l *Loader) LoadGraphQLResponseData(ctx *Context, response *GraphQLResponse, resolvable *Resolvable) (err error) {
217+
l.Init(ctx, response.Info, resolvable)
218+
219+
return l.ResolveFetchNode(response.Fetches)
220+
}
221+
222+
func (l *Loader) Init(ctx *Context, responseInfo *GraphQLResponseInfo, resolvable *Resolvable) {
217223
l.resolvable = resolvable
218224
l.ctx = ctx
219-
l.info = response.Info
225+
l.info = responseInfo
220226
l.taintedObjs = make(taintedObjects)
221-
return l.resolveFetchNode(response.Fetches)
222227
}
223228

224-
func (l *Loader) resolveFetchNode(node *FetchTreeNode) error {
229+
func (l *Loader) ResolveFetchNode(node *FetchTreeNode) error {
225230
if node == nil {
226231
return nil
227232
}
@@ -294,7 +299,7 @@ func (l *Loader) resolveParallel(nodes []*FetchTreeNode) error {
294299

295300
func (l *Loader) resolveSerial(nodes []*FetchTreeNode) error {
296301
for i := range nodes {
297-
err := l.resolveFetchNode(nodes[i])
302+
err := l.ResolveFetchNode(nodes[i])
298303
if err != nil {
299304
return errors.WithStack(err)
300305
}
@@ -590,6 +595,7 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson
590595
if responseData.Type() != astjson.TypeObject {
591596
return l.renderErrorsFailedToFetch(fetchItem, res, invalidGraphQLResponseShape)
592597
}
598+
// TODO: unclear why we doing this
593599
l.resolvable.data = responseData
594600
return nil
595601
}

v2/pkg/engine/resolve/resolvable.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type Resolvable struct {
5252
wroteErrors bool
5353
wroteData bool
5454
skipValueCompletion bool
55+
deferMode bool
56+
deferID string
5557

5658
typeNames [][]byte
5759

@@ -272,6 +274,10 @@ func (r *Resolvable) printData(root *Object) {
272274
r.printBytes(colon)
273275
r.printBytes(lBrace)
274276
r.print = true
277+
278+
if r.deferMode && r.deferID != "" {
279+
r.print = false
280+
}
275281
_ = r.walkObject(root, r.data)
276282
r.print = false
277283
r.printBytes(rBrace)
@@ -625,6 +631,21 @@ func (r *Resolvable) walkObject(obj *Object, parent *astjson.Value) bool {
625631
}
626632
}
627633

634+
hasDeferredFields := false
635+
for i := range obj.Fields {
636+
if obj.Fields[i].Defer != nil && obj.Fields[i].Defer.DeferID == r.deferID {
637+
hasDeferredFields = true
638+
}
639+
}
640+
641+
if hasDeferredFields {
642+
// enable printing
643+
if !r.print {
644+
r.print = true
645+
defer func() { r.print = false }()
646+
}
647+
}
648+
628649
if r.print && !isRoot {
629650
r.printBytes(lBrace)
630651
}
@@ -635,6 +656,22 @@ func (r *Resolvable) walkObject(obj *Object, parent *astjson.Value) bool {
635656
r.typeNames = r.typeNames[:len(r.typeNames)-1]
636657
}()
637658
for i := range obj.Fields {
659+
if r.deferMode {
660+
661+
// for initial response render only fields without a defer id
662+
if r.deferID == "" && obj.Fields[i].Defer != nil {
663+
continue
664+
}
665+
666+
// skip fields with different defer id in deferred response
667+
if r.deferID != "" && obj.Fields[i].Defer != nil && obj.Fields[i].Defer.DeferID == r.deferID {
668+
continue
669+
}
670+
671+
// when defer id matches render field - render it
672+
// walk objects to find other defers
673+
}
674+
638675
if obj.Fields[i].ParentOnTypeNames != nil {
639676
if r.skipFieldOnParentTypeNames(obj.Fields[i]) {
640677
continue

v2/pkg/engine/resolve/resolve.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,71 @@ func (r *Resolver) ArenaResolveGraphQLResponse(ctx *Context, response *GraphQLRe
419419
return resp, err
420420
}
421421

422+
func (r *Resolver) ResolveGraphQLDeferResponse(ctx *Context, response *GraphQLDeferResponse, writer DeferResponseWriter) (*GraphQLResolveInfo, error) {
423+
resp := &GraphQLResolveInfo{}
424+
425+
start := time.Now()
426+
<-r.maxConcurrency
427+
resp.ResolveAcquireWaitTime = time.Since(start)
428+
defer func() {
429+
r.maxConcurrency <- struct{}{}
430+
}()
431+
432+
t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields, r.subgraphRequestSingleFlight, nil)
433+
434+
err := t.resolvable.Init(ctx, nil, response.Response.Info.OperationType)
435+
if err != nil {
436+
return nil, err
437+
}
438+
439+
if !ctx.ExecutionOptions.SkipLoader {
440+
t.loader.Init(ctx, response.Response.Info, t.resolvable)
441+
442+
// fetch initial response
443+
if err := t.loader.ResolveFetchNode(response.Response.Fetches); err != nil {
444+
return nil, err
445+
}
446+
447+
t.resolvable.deferMode = true
448+
t.resolvable.deferID = ""
449+
450+
// render initial response
451+
err = t.resolvable.Resolve(ctx.ctx, response.Response.Data, response.Response.Fetches, writer)
452+
if err != nil {
453+
return nil, err
454+
}
455+
456+
err = writer.Flush()
457+
if err != nil {
458+
return nil, err
459+
}
460+
461+
// fetch deferred responses
462+
463+
for _, deferGroup := range response.Defers {
464+
if err := t.loader.ResolveFetchNode(deferGroup.Fetches); err != nil {
465+
return nil, err
466+
}
467+
468+
// render deferred response
469+
t.resolvable.deferID = deferGroup.DeferID
470+
err = t.resolvable.Resolve(ctx.ctx, response.Response.Data, deferGroup.Fetches, writer)
471+
if err != nil {
472+
return nil, err
473+
}
474+
475+
// flush after each deferred response
476+
477+
err = writer.Flush()
478+
if err != nil {
479+
return nil, err
480+
}
481+
}
482+
}
483+
484+
return resp, err
485+
}
486+
422487
type trigger struct {
423488
id uint64
424489
cancel context.CancelFunc

v2/pkg/engine/resolve/response.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ type ResponseWriter interface {
7878
io.Writer
7979
}
8080

81+
type DeferResponseWriter interface {
82+
ResponseWriter
83+
Flush() error
84+
}
85+
8186
type SubscriptionCloseKind struct {
8287
WSCode ws.StatusCode
8388
Reason string

0 commit comments

Comments
 (0)