From e143982c0485e1ff2726f7e2b607c26997dd0d90 Mon Sep 17 00:00:00 2001 From: Hengfeng Li Date: Fri, 17 Jul 2020 10:18:03 +1000 Subject: [PATCH] feat(spanner): support of client-level custom retry settings (#2599) * Support of client-level custom retry settings. * Add a func comment to mergeCallOptions. * Add an end-to-end test. --- spanner/client.go | 7 +++- spanner/client_test.go | 73 +++++++++++++++++++++++++++++++++ spanner/sessionclient.go | 31 +++++++++++++- spanner/sessionclient_test.go | 76 +++++++++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 2 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index e8d6cdebb9b..1b1c4587858 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -25,6 +25,7 @@ import ( "time" "cloud.google.com/go/internal/trace" + vkit "cloud.google.com/go/spanner/apiv1" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" sppb "google.golang.org/genproto/googleapis/spanner/v1" @@ -104,6 +105,10 @@ type ClientConfig struct { // QueryOptions is the configuration for executing a sql query. QueryOptions QueryOptions + // CallOptions is the configuration for providing custom retry settings that + // override the default values. + CallOptions *vkit.CallOptions + // logger is the logger to use for this client. If it is nil, all logging // will be directed to the standard logger. logger *log.Logger @@ -200,7 +205,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf config.incStep = DefaultSessionPoolConfig.incStep } // Create a session client. - sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger) + sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger, config.CallOptions) // Create a session pool. config.SessionPoolConfig.sessionLabels = sessionLabels sp, err := newSessionPool(sc, config.SessionPoolConfig) diff --git a/spanner/client_test.go b/spanner/client_test.go index 5156c622c14..a8299610285 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -27,8 +27,10 @@ import ( "cloud.google.com/go/civil" itestutil "cloud.google.com/go/internal/testutil" + vkit "cloud.google.com/go/spanner/apiv1" . "cloud.google.com/go/spanner/internal/testutil" structpb "github.com/golang/protobuf/ptypes/struct" + gax "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" "google.golang.org/api/option" sppb "google.golang.org/genproto/googleapis/spanner/v1" @@ -1760,6 +1762,77 @@ func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T) } } +func TestClient_CallOptions(t *testing.T) { + t.Parallel() + co := &vkit.CallOptions{ + CreateSession: []gax.CallOption{ + gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{ + codes.Unavailable, codes.DeadlineExceeded, + }, gax.Backoff{ + Initial: 200 * time.Millisecond, + Max: 30000 * time.Millisecond, + Multiplier: 1.25, + }) + }), + }, + } + + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co}) + defer teardown() + + c, err := client.sc.nextClient() + if err != nil { + t.Fatalf("failed to get a session client: %v", err) + } + + cs := &gax.CallSettings{} + // This is the default retry setting. + c.CallOptions.CreateSession[0].Resolve(cs) + if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want { + t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) + } + + // This is the custom retry setting. + c.CallOptions.CreateSession[1].Resolve(cs) + if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{200000000 30000000000 1.25 0} [14 4]}"; got != want { + t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) + } +} + +func TestClient_QueryWithCallOptions(t *testing.T) { + t.Parallel() + co := &vkit.CallOptions{ + ExecuteSql: []gax.CallOption{ + gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{ + codes.DeadlineExceeded, + }, gax.Backoff{ + Initial: 200 * time.Millisecond, + Max: 30000 * time.Millisecond, + Multiplier: 1.25, + }) + }), + }, + } + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co}) + server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{ + Errors: []error{status.Error(codes.DeadlineExceeded, "Deadline exceeded")}, + }) + defer teardown() + ctx := context.Background() + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + _, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}) + if err != nil { + return err + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + func TestBatchReadOnlyTransaction_QueryOptions(t *testing.T) { ctx := context.Background() qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}} diff --git a/spanner/sessionclient.go b/spanner/sessionclient.go index aac830bd44e..a1c9f66d805 100644 --- a/spanner/sessionclient.go +++ b/spanner/sessionclient.go @@ -20,12 +20,14 @@ import ( "context" "fmt" "log" + "reflect" "sync" "time" "cloud.google.com/go/internal/trace" "cloud.google.com/go/internal/version" vkit "cloud.google.com/go/spanner/apiv1" + "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" sppb "google.golang.org/genproto/googleapis/spanner/v1" @@ -90,10 +92,11 @@ type sessionClient struct { md metadata.MD batchTimeout time.Duration logger *log.Logger + callOptions *vkit.CallOptions } // newSessionClient creates a session client to use for a database. -func newSessionClient(connPool gtransport.ConnPool, database string, sessionLabels map[string]string, md metadata.MD, logger *log.Logger) *sessionClient { +func newSessionClient(connPool gtransport.ConnPool, database string, sessionLabels map[string]string, md metadata.MD, logger *log.Logger, callOptions *vkit.CallOptions) *sessionClient { return &sessionClient{ connPool: connPool, database: database, @@ -102,6 +105,7 @@ func newSessionClient(connPool gtransport.ConnPool, database string, sessionLabe md: md, batchTimeout: time.Minute, logger: logger, + callOptions: callOptions, } } @@ -274,5 +278,30 @@ func (sc *sessionClient) nextClient() (*vkit.Client, error) { return nil, err } client.SetGoogleClientInfo("gccl", version.Repo) + if sc.callOptions != nil { + client.CallOptions = mergeCallOptions(client.CallOptions, sc.callOptions) + } return client, nil } + +// mergeCallOptions merges two CallOptions into one and the first argument has +// a lower order of precedence than the second one. +func mergeCallOptions(a *vkit.CallOptions, b *vkit.CallOptions) *vkit.CallOptions { + res := &vkit.CallOptions{} + resVal := reflect.ValueOf(res).Elem() + aVal := reflect.ValueOf(a).Elem() + bVal := reflect.ValueOf(b).Elem() + + t := aVal.Type() + + for i := 0; i < aVal.NumField(); i++ { + fieldName := t.Field(i).Name + + aFieldVal := aVal.Field(i).Interface().([]gax.CallOption) + bFieldVal := bVal.Field(i).Interface().([]gax.CallOption) + + merged := append(aFieldVal, bFieldVal...) + resVal.FieldByName(fieldName).Set(reflect.ValueOf(merged)) + } + return res +} diff --git a/spanner/sessionclient_test.go b/spanner/sessionclient_test.go index cdb3b6f8bb3..f06b175c36e 100644 --- a/spanner/sessionclient_test.go +++ b/spanner/sessionclient_test.go @@ -18,12 +18,14 @@ package spanner import ( "context" + "fmt" "sync" "testing" "time" vkit "cloud.google.com/go/spanner/apiv1" . "cloud.google.com/go/spanner/internal/testutil" + gax "github.com/googleapis/gax-go/v2" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -373,3 +375,77 @@ func TestClientIDGenerator(t *testing.T) { } } } + +func TestMergeCallOptions(t *testing.T) { + a := &vkit.CallOptions{ + CreateSession: []gax.CallOption{ + gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{ + codes.Unavailable, codes.DeadlineExceeded, + }, gax.Backoff{ + Initial: 100 * time.Millisecond, + Max: 16000 * time.Millisecond, + Multiplier: 1.0, + }) + }), + }, + GetSession: []gax.CallOption{ + gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{ + codes.Unavailable, codes.DeadlineExceeded, + }, gax.Backoff{ + Initial: 250 * time.Millisecond, + Max: 32000 * time.Millisecond, + Multiplier: 1.30, + }) + }), + }, + } + b := &vkit.CallOptions{ + CreateSession: []gax.CallOption{ + gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{ + codes.Unavailable, + }, gax.Backoff{ + Initial: 250 * time.Millisecond, + Max: 32000 * time.Millisecond, + Multiplier: 1.30, + }) + }), + }, + BatchCreateSessions: []gax.CallOption{ + gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{ + codes.Unavailable, + }, gax.Backoff{ + Initial: 250 * time.Millisecond, + Max: 32000 * time.Millisecond, + Multiplier: 1.30, + }) + }), + }} + + merged := mergeCallOptions(b, a) + cs := &gax.CallSettings{} + // We can't access the fields of Retryer so we have test the result by + // comparing strings. + merged.CreateSession[0].Resolve(cs) + if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want { + t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) + } + + merged.CreateSession[1].Resolve(cs) + if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{100000000 16000000000 1 0} [14 4]}"; got != want { + t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) + } + + merged.GetSession[0].Resolve(cs) + if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14 4]}"; got != want { + t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) + } + + merged.BatchCreateSessions[0].Resolve(cs) + if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want { + t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want) + } +}