From 2d5ce7066508cf5f498609dc697da9b33f3b1a53 Mon Sep 17 00:00:00 2001 From: bigdrum Date: Sat, 23 Jul 2016 23:34:21 -0400 Subject: [PATCH 1/6] Allow user customizable executor to resolve object fields and list items. And an example to demonstrate how it can be used. --- examples/dataloader/example.go | 194 ++++++++++++++++++++++++++++ examples/dataloader/example_test.go | 37 ++++++ executor.go | 55 ++++++-- graphql.go | 5 + 4 files changed, 281 insertions(+), 10 deletions(-) create mode 100644 examples/dataloader/example.go create mode 100644 examples/dataloader/example_test.go diff --git a/examples/dataloader/example.go b/examples/dataloader/example.go new file mode 100644 index 00000000..62414165 --- /dev/null +++ b/examples/dataloader/example.go @@ -0,0 +1,194 @@ +package dataloaderexample + +import ( + "fmt" + "log" + "time" + + "golang.org/x/net/context" + + "github.com/bigdrum/godataloader" + "github.com/graphql-go/graphql" +) + +var postDB = map[string]*post{ + "1": &post{ + ID: "1", + Content: "Hello 1", + AuthorID: "1", + }, + "2": &post{ + ID: "2", + Content: "Hello 2", + AuthorID: "1", + }, + "3": &post{ + ID: "3", + Content: "Hello 3", + AuthorID: "2", + }, + "4": &post{ + ID: "4", + Content: "Hello 4", + AuthorID: "2", + }, +} + +var userDB = map[string]*user{ + "1": &user{ + ID: "1", + Name: "Mike", + }, + "2": &user{ + ID: "2", + Name: "John", + }, +} + +var loaderKey = struct{}{} + +type loader struct { + postLoader *dataloader.DataLoader + userLoader *dataloader.DataLoader +} + +func newLoader(sch *dataloader.Scheduler) *loader { + return &loader{ + postLoader: dataloader.New(sch, dataloader.Parallel(func(key interface{}) dataloader.Value { + // In practice, we will make remote request (e.g. SQL) to fetch post. + // Here we just fake it. + log.Print("Load post ", key) + time.Sleep(time.Second) + id := key.(string) + return dataloader.NewValue(postDB[id], nil) + })), + userLoader: dataloader.New(sch, dataloader.Parallel(func(key interface{}) dataloader.Value { + // In practice, we will make remote request (e.g. SQL) to fetch post. + // Here we just fake it. + log.Print("Load user ", key) + time.Sleep(time.Second) + id := key.(string) + return dataloader.NewValue(userDB[id], nil) + })), + } +} + +type post struct { + ID string `json:"id"` + Content string `json:"content"` + AuthorID string `json:"author_id"` +} + +type user struct { + ID string `json:"id"` + Name string `json:"name"` +} + +func attachNewDataLoader(parent context.Context, sch *dataloader.Scheduler) context.Context { + dl := newLoader(sch) + return context.WithValue(parent, loaderKey, dl) +} + +func getDataLoader(ctx context.Context) *loader { + return ctx.Value(loaderKey).(*loader) +} + +func CreateSchema() graphql.Schema { + userType := graphql.NewObject(graphql.ObjectConfig{ + Name: "User", + Fields: graphql.Fields{ + "id": &graphql.Field{ + Type: graphql.String, + }, + "name": &graphql.Field{ + Type: graphql.String, + }, + }, + }) + + postType := graphql.NewObject(graphql.ObjectConfig{ + Name: "Post", + Fields: graphql.Fields{ + "id": &graphql.Field{ + Type: graphql.String, + }, + "content": &graphql.Field{ + Type: graphql.String, + }, + "author": &graphql.Field{ + Type: userType, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + post := p.Source.(*post) + id := post.AuthorID + dl := getDataLoader(p.Context) + return dl.userLoader.Load(id).Unbox() + }, + }, + }, + }) + + rootQuery := graphql.NewObject(graphql.ObjectConfig{ + Name: "RootQuery", + Fields: graphql.Fields{ + "post": &graphql.Field{ + Type: postType, + Args: graphql.FieldConfigArgument{ + "id": &graphql.ArgumentConfig{ + Type: graphql.String, + }, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + id, ok := p.Args["id"].(string) + if !ok { + return nil, nil + } + dl := getDataLoader(p.Context) + return dl.postLoader.Load(id).Unbox() + }, + }, + }}) + + schema, err := graphql.NewSchema(graphql.SchemaConfig{ + Query: rootQuery, + }) + if err != nil { + panic(err) + } + return schema +} + +type dataloaderExecutor struct { + sch *dataloader.Scheduler +} + +func (e *dataloaderExecutor) RunMany(fs []func()) { + wg := dataloader.NewWaitGroup(e.sch) + for i := range fs { + f := fs[i] + wg.Add(1) + e.sch.Spawn(func() { + defer wg.Done() + f() + }) + } + wg.Wait() +} + +func RunQuery(query string, schema graphql.Schema) *graphql.Result { + var result *graphql.Result + dataloader.RunWithScheduler(func(sch *dataloader.Scheduler) { + executor := dataloaderExecutor{sch} + ctx := attachNewDataLoader(context.Background(), sch) + result = graphql.Do(graphql.Params{ + Schema: schema, + RequestString: query, + Context: ctx, + Executor: &executor, + }) + if len(result.Errors) > 0 { + fmt.Printf("wrong result, unexpected errors: %v", result.Errors) + } + }) + + return result +} diff --git a/examples/dataloader/example_test.go b/examples/dataloader/example_test.go new file mode 100644 index 00000000..5014554f --- /dev/null +++ b/examples/dataloader/example_test.go @@ -0,0 +1,37 @@ +package dataloaderexample_test + +import ( + "testing" + + "github.com/graphql-go/graphql/examples/dataloader" +) + +func TestQuery(t *testing.T) { + schema := dataloaderexample.CreateSchema() + r := dataloaderexample.RunQuery(`{ + p1_0: post(id: "1") { id author { name }} + p1_1: post(id: "1") { id author { name }} + p1_2: post(id: "1") { id author { name }} + p1_3: post(id: "1") { id author { name }} + p1_4: post(id: "1") { id author { name }} + p1_5: post(id: "1") { id author { name }} + p2_1: post(id: "2") { id author { name }} + p2_2: post(id: "2") { id author { name }} + p2_3: post(id: "2") { id author { name }} + p3_1: post(id: "3") { id author { name }} + p3_2: post(id: "3") { id author { name }} + p3_3: post(id: "3") { id author { name }} + }`, schema) + if len(r.Errors) != 0 { + t.Error(r.Errors) + } + // The above query would produce log like this: + // 2016/07/23 23:28:05 Load post 1 + // 2016/07/23 23:28:05 Load post 3 + // 2016/07/23 23:28:05 Load post 2 + // 2016/07/23 23:28:06 Load user 1 + // 2016/07/23 23:28:06 Load user 2 + // Notice the first level post loading is done concurrently without duplicate. + // And the second level user loading is also done in the same fashion. + // TODO: Make test actually verify that. +} diff --git a/executor.go b/executor.go index b03c8a01..2bb6a16d 100644 --- a/executor.go +++ b/executor.go @@ -11,6 +11,20 @@ import ( "golang.org/x/net/context" ) +type Executor interface { + RunMany(f []func()) +} + +type SerialExecutor struct{} + +func (e *SerialExecutor) RunMany(fs []func()) { + for _, f := range fs { + f() + } +} + +var defaultExecutor = &SerialExecutor{} + type ExecuteParams struct { Schema Schema Root interface{} @@ -21,10 +35,15 @@ type ExecuteParams struct { // Context may be provided to pass application-specific per-request // information to resolve functions. Context context.Context + + Executor Executor } func Execute(p ExecuteParams) (result *Result) { result = &Result{} + if p.Executor == nil { + p.Executor = defaultExecutor + } exeContext, err := buildExecutionContext(BuildExecutionCtxParams{ Schema: p.Schema, @@ -35,6 +54,7 @@ func Execute(p ExecuteParams) (result *Result) { Errors: nil, Result: result, Context: p.Context, + Executor: p.Executor, }) if err != nil { @@ -69,6 +89,7 @@ type BuildExecutionCtxParams struct { Errors []gqlerrors.FormattedError Result *Result Context context.Context + Executor Executor } type ExecutionContext struct { Schema Schema @@ -78,6 +99,7 @@ type ExecutionContext struct { VariableValues map[string]interface{} Errors []gqlerrors.FormattedError Context context.Context + Executor Executor } func buildExecutionContext(p BuildExecutionCtxParams) (*ExecutionContext, error) { @@ -124,6 +146,7 @@ func buildExecutionContext(p BuildExecutionCtxParams) (*ExecutionContext, error) eCtx.VariableValues = variableValues eCtx.Errors = p.Errors eCtx.Context = p.Context + eCtx.Executor = p.Executor return eCtx, nil } @@ -247,13 +270,20 @@ func executeFields(p ExecuteFieldsParams) *Result { } finalResults := map[string]interface{}{} + fs := make([]func(), 0, len(p.Fields)) for responseName, fieldASTs := range p.Fields { - resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs) - if state.hasNoFieldDefs { - continue - } - finalResults[responseName] = resolved + responseName := responseName + fieldASTs := fieldASTs + finalResults[responseName] = nil // preallocate to avoid race. + fs = append(fs, func() { + resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs) + if state.hasNoFieldDefs { + return + } + finalResults[responseName] = resolved + }) } + p.ExecutionContext.Executor.RunMany(fs) return &Result{ Data: finalResults, @@ -756,12 +786,17 @@ func completeListValue(eCtx *ExecutionContext, returnType *List, fieldASTs []*as } itemType := returnType.OfType - completedResults := []interface{}{} + completedResults := make([]interface{}, resultVal.Len()) + fs := make([]func(), 0, resultVal.Len()) for i := 0; i < resultVal.Len(); i++ { - val := resultVal.Index(i).Interface() - completedItem := completeValueCatchingError(eCtx, itemType, fieldASTs, info, val) - completedResults = append(completedResults, completedItem) - } + i := i + fs = append(fs, func() { + val := resultVal.Index(i).Interface() + completedItem := completeValueCatchingError(eCtx, itemType, fieldASTs, info, val) + completedResults[i] = completedItem + }) + } + eCtx.Executor.RunMany(fs) return completedResults } diff --git a/graphql.go b/graphql.go index af9dd65a..f9a1cd87 100644 --- a/graphql.go +++ b/graphql.go @@ -30,6 +30,10 @@ type Params struct { // Context may be provided to pass application-specific per-request // information to resolve functions. Context context.Context + + // Executor allows to control the behavior of how to perform resolving function that + // can be run concurrently. If not given, they will be executed serially. + Executor Executor } func Do(p Params) *Result { @@ -58,5 +62,6 @@ func Do(p Params) *Result { OperationName: p.OperationName, Args: p.VariableValues, Context: p.Context, + Executor: p.Executor, }) } From 916f8c4cbe267f46ab56b8b571c36e54dfce4ebd Mon Sep 17 00:00:00 2001 From: bigdrum Date: Sun, 24 Jul 2016 14:30:18 -0400 Subject: [PATCH 2/6] Batch user loading. --- examples/dataloader/example.go | 36 ++++++++++++++++++++++++----- examples/dataloader/example_test.go | 17 +++++++++----- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/examples/dataloader/example.go b/examples/dataloader/example.go index 62414165..9bdbeddb 100644 --- a/examples/dataloader/example.go +++ b/examples/dataloader/example.go @@ -43,6 +43,10 @@ var userDB = map[string]*user{ ID: "2", Name: "John", }, + "3": &user{ + ID: "3", + Name: "Kate", + }, } var loaderKey = struct{}{} @@ -62,14 +66,18 @@ func newLoader(sch *dataloader.Scheduler) *loader { id := key.(string) return dataloader.NewValue(postDB[id], nil) })), - userLoader: dataloader.New(sch, dataloader.Parallel(func(key interface{}) dataloader.Value { - // In practice, we will make remote request (e.g. SQL) to fetch post. + userLoader: dataloader.New(sch, func(keys []interface{}) []dataloader.Value { + // In practice, we will make remote request (e.g. SQL) to fetch multiple users. // Here we just fake it. - log.Print("Load user ", key) + log.Print("Batch load users ", keys) time.Sleep(time.Second) - id := key.(string) - return dataloader.NewValue(userDB[id], nil) - })), + var ret []dataloader.Value + for _, key := range keys { + id := key.(string) + ret = append(ret, dataloader.NewValue(userDB[id], nil)) + } + return ret + }), } } @@ -146,6 +154,22 @@ func CreateSchema() graphql.Schema { return dl.postLoader.Load(id).Unbox() }, }, + "user": &graphql.Field{ + Type: userType, + Args: graphql.FieldConfigArgument{ + "id": &graphql.ArgumentConfig{ + Type: graphql.String, + }, + }, + Resolve: func(p graphql.ResolveParams) (interface{}, error) { + id, ok := p.Args["id"].(string) + if !ok { + return nil, nil + } + dl := getDataLoader(p.Context) + return dl.userLoader.Load(id).Unbox() + }, + }, }}) schema, err := graphql.NewSchema(graphql.SchemaConfig{ diff --git a/examples/dataloader/example_test.go b/examples/dataloader/example_test.go index 5014554f..3b111685 100644 --- a/examples/dataloader/example_test.go +++ b/examples/dataloader/example_test.go @@ -21,17 +21,22 @@ func TestQuery(t *testing.T) { p3_1: post(id: "3") { id author { name }} p3_2: post(id: "3") { id author { name }} p3_3: post(id: "3") { id author { name }} + u1_1: user(id: "1") { name } + u1_2: user(id: "1") { name } + u1_3: user(id: "1") { name } + u2_1: user(id: "3") { name } + u2_2: user(id: "3") { name } + u2_3: user(id: "3") { name } }`, schema) if len(r.Errors) != 0 { t.Error(r.Errors) } // The above query would produce log like this: - // 2016/07/23 23:28:05 Load post 1 - // 2016/07/23 23:28:05 Load post 3 - // 2016/07/23 23:28:05 Load post 2 - // 2016/07/23 23:28:06 Load user 1 - // 2016/07/23 23:28:06 Load user 2 + // 2016/07/23 23:49:31 Load post 3 + // 2016/07/23 23:49:31 Load post 1 + // 2016/07/23 23:49:31 Load post 2 + // 2016/07/23 23:49:32 Batch load users [3 1 2] // Notice the first level post loading is done concurrently without duplicate. - // And the second level user loading is also done in the same fashion. + // And the second level user loading is also done in the same fashion, but batched fetch is used instead. // TODO: Make test actually verify that. } From b7944de9981ea60886cc2414e1c828f368c59fe4 Mon Sep 17 00:00:00 2001 From: bigdrum Date: Sun, 24 Jul 2016 15:57:43 -0400 Subject: [PATCH 3/6] Fix test. --- executor.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/executor.go b/executor.go index 2bb6a16d..9e6227fe 100644 --- a/executor.go +++ b/executor.go @@ -271,10 +271,11 @@ func executeFields(p ExecuteFieldsParams) *Result { finalResults := map[string]interface{}{} fs := make([]func(), 0, len(p.Fields)) + undefined := struct{}{} for responseName, fieldASTs := range p.Fields { responseName := responseName fieldASTs := fieldASTs - finalResults[responseName] = nil // preallocate to avoid race. + finalResults[responseName] = undefined // This is to avoid using lock. fs = append(fs, func() { resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs) if state.hasNoFieldDefs { @@ -285,6 +286,12 @@ func executeFields(p ExecuteFieldsParams) *Result { } p.ExecutionContext.Executor.RunMany(fs) + // Remove undefined keys. + for k, v := range finalResults { + if v == undefined { + delete(finalResults, k) + } + } return &Result{ Data: finalResults, Errors: p.ExecutionContext.Errors, From fd2a5d656cd430032472b7f170851c0cdba2d403 Mon Sep 17 00:00:00 2001 From: bigdrum Date: Sun, 24 Jul 2016 16:08:00 -0400 Subject: [PATCH 4/6] Minor code improvement on handling of the undefined keys. --- executor.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/executor.go b/executor.go index 9e6227fe..e53602fa 100644 --- a/executor.go +++ b/executor.go @@ -271,14 +271,15 @@ func executeFields(p ExecuteFieldsParams) *Result { finalResults := map[string]interface{}{} fs := make([]func(), 0, len(p.Fields)) - undefined := struct{}{} + var undefinedKeys []string for responseName, fieldASTs := range p.Fields { responseName := responseName fieldASTs := fieldASTs - finalResults[responseName] = undefined // This is to avoid using lock. + finalResults[responseName] = nil // This is to avoid using lock. fs = append(fs, func() { resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs) if state.hasNoFieldDefs { + undefinedKeys = append(undefinedKeys, responseName) return } finalResults[responseName] = resolved @@ -287,10 +288,8 @@ func executeFields(p ExecuteFieldsParams) *Result { p.ExecutionContext.Executor.RunMany(fs) // Remove undefined keys. - for k, v := range finalResults { - if v == undefined { - delete(finalResults, k) - } + for _, key := range undefinedKeys { + delete(finalResults, key) } return &Result{ Data: finalResults, From d5145660dd377f5bd4dfa7509d5b6ef3d37c3795 Mon Sep 17 00:00:00 2001 From: bigdrum Date: Sun, 24 Jul 2016 17:40:27 -0400 Subject: [PATCH 5/6] We should just use lock, no need the premature optimization. --- executor.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/executor.go b/executor.go index e53602fa..e99047af 100644 --- a/executor.go +++ b/executor.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "sync" "github.com/graphql-go/graphql/gqlerrors" "github.com/graphql-go/graphql/language/ast" @@ -271,26 +272,21 @@ func executeFields(p ExecuteFieldsParams) *Result { finalResults := map[string]interface{}{} fs := make([]func(), 0, len(p.Fields)) - var undefinedKeys []string + var resultsMu sync.Mutex for responseName, fieldASTs := range p.Fields { responseName := responseName fieldASTs := fieldASTs - finalResults[responseName] = nil // This is to avoid using lock. fs = append(fs, func() { resolved, state := resolveField(p.ExecutionContext, p.ParentType, p.Source, fieldASTs) if state.hasNoFieldDefs { - undefinedKeys = append(undefinedKeys, responseName) return } + resultsMu.Lock() finalResults[responseName] = resolved + resultsMu.Unlock() }) } p.ExecutionContext.Executor.RunMany(fs) - - // Remove undefined keys. - for _, key := range undefinedKeys { - delete(finalResults, key) - } return &Result{ Data: finalResults, Errors: p.ExecutionContext.Errors, From 61de32e8cbd344df1c6779e7b3002f8b0ee31a86 Mon Sep 17 00:00:00 2001 From: bigdrum Date: Sun, 24 Jul 2016 19:26:59 -0400 Subject: [PATCH 6/6] Some special case optmization. --- examples/dataloader/example.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/examples/dataloader/example.go b/examples/dataloader/example.go index 9bdbeddb..089e1b39 100644 --- a/examples/dataloader/example.go +++ b/examples/dataloader/example.go @@ -186,6 +186,14 @@ type dataloaderExecutor struct { } func (e *dataloaderExecutor) RunMany(fs []func()) { + if len(fs) == 1 { + fs[0]() + return + } + if len(fs) == 0 { + return + } + wg := dataloader.NewWaitGroup(e.sch) for i := range fs { f := fs[i]