Skip to content

Commit

Permalink
feat(spanner): support of client-level custom retry settings (#2599)
Browse files Browse the repository at this point in the history
* Support of client-level custom retry settings.
* Add a func comment to mergeCallOptions.
* Add an end-to-end test.
  • Loading branch information
hengfengli committed Jul 17, 2020
1 parent a31e954 commit e143982
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 2 deletions.
7 changes: 6 additions & 1 deletion spanner/client.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"cloud.google.com/go/internal/trace"
vkit "cloud.google.com/go/spanner/apiv1"
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
Expand Down Expand Up @@ -104,6 +105,10 @@ type ClientConfig struct {
// QueryOptions is the configuration for executing a sql query.
QueryOptions QueryOptions

// CallOptions is the configuration for providing custom retry settings that
// override the default values.
CallOptions *vkit.CallOptions

// logger is the logger to use for this client. If it is nil, all logging
// will be directed to the standard logger.
logger *log.Logger
Expand Down Expand Up @@ -200,7 +205,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
config.incStep = DefaultSessionPoolConfig.incStep
}
// Create a session client.
sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger)
sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger, config.CallOptions)
// Create a session pool.
config.SessionPoolConfig.sessionLabels = sessionLabels
sp, err := newSessionPool(sc, config.SessionPoolConfig)
Expand Down
73 changes: 73 additions & 0 deletions spanner/client_test.go
Expand Up @@ -27,8 +27,10 @@ import (

"cloud.google.com/go/civil"
itestutil "cloud.google.com/go/internal/testutil"
vkit "cloud.google.com/go/spanner/apiv1"
. "cloud.google.com/go/spanner/internal/testutil"
structpb "github.com/golang/protobuf/ptypes/struct"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
Expand Down Expand Up @@ -1760,6 +1762,77 @@ func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T)
}
}

func TestClient_CallOptions(t *testing.T) {
t.Parallel()
co := &vkit.CallOptions{
CreateSession: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable, codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 200 * time.Millisecond,
Max: 30000 * time.Millisecond,
Multiplier: 1.25,
})
}),
},
}

_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co})
defer teardown()

c, err := client.sc.nextClient()
if err != nil {
t.Fatalf("failed to get a session client: %v", err)
}

cs := &gax.CallSettings{}
// This is the default retry setting.
c.CallOptions.CreateSession[0].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}

// This is the custom retry setting.
c.CallOptions.CreateSession[1].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{200000000 30000000000 1.25 0} [14 4]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}
}

func TestClient_QueryWithCallOptions(t *testing.T) {
t.Parallel()
co := &vkit.CallOptions{
ExecuteSql: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 200 * time.Millisecond,
Max: 30000 * time.Millisecond,
Multiplier: 1.25,
})
}),
},
}
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{CallOptions: co})
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{
Errors: []error{status.Error(codes.DeadlineExceeded, "Deadline exceeded")},
})
defer teardown()
ctx := context.Background()
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
if err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
}

func TestBatchReadOnlyTransaction_QueryOptions(t *testing.T) {
ctx := context.Background()
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
Expand Down
31 changes: 30 additions & 1 deletion spanner/sessionclient.go
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"fmt"
"log"
"reflect"
"sync"
"time"

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
vkit "cloud.google.com/go/spanner/apiv1"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
Expand Down Expand Up @@ -90,10 +92,11 @@ type sessionClient struct {
md metadata.MD
batchTimeout time.Duration
logger *log.Logger
callOptions *vkit.CallOptions
}

// newSessionClient creates a session client to use for a database.
func newSessionClient(connPool gtransport.ConnPool, database string, sessionLabels map[string]string, md metadata.MD, logger *log.Logger) *sessionClient {
func newSessionClient(connPool gtransport.ConnPool, database string, sessionLabels map[string]string, md metadata.MD, logger *log.Logger, callOptions *vkit.CallOptions) *sessionClient {
return &sessionClient{
connPool: connPool,
database: database,
Expand All @@ -102,6 +105,7 @@ func newSessionClient(connPool gtransport.ConnPool, database string, sessionLabe
md: md,
batchTimeout: time.Minute,
logger: logger,
callOptions: callOptions,
}
}

Expand Down Expand Up @@ -274,5 +278,30 @@ func (sc *sessionClient) nextClient() (*vkit.Client, error) {
return nil, err
}
client.SetGoogleClientInfo("gccl", version.Repo)
if sc.callOptions != nil {
client.CallOptions = mergeCallOptions(client.CallOptions, sc.callOptions)
}
return client, nil
}

// mergeCallOptions merges two CallOptions into one and the first argument has
// a lower order of precedence than the second one.
func mergeCallOptions(a *vkit.CallOptions, b *vkit.CallOptions) *vkit.CallOptions {
res := &vkit.CallOptions{}
resVal := reflect.ValueOf(res).Elem()
aVal := reflect.ValueOf(a).Elem()
bVal := reflect.ValueOf(b).Elem()

t := aVal.Type()

for i := 0; i < aVal.NumField(); i++ {
fieldName := t.Field(i).Name

aFieldVal := aVal.Field(i).Interface().([]gax.CallOption)
bFieldVal := bVal.Field(i).Interface().([]gax.CallOption)

merged := append(aFieldVal, bFieldVal...)
resVal.FieldByName(fieldName).Set(reflect.ValueOf(merged))
}
return res
}
76 changes: 76 additions & 0 deletions spanner/sessionclient_test.go
Expand Up @@ -18,12 +18,14 @@ package spanner

import (
"context"
"fmt"
"sync"
"testing"
"time"

vkit "cloud.google.com/go/spanner/apiv1"
. "cloud.google.com/go/spanner/internal/testutil"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -373,3 +375,77 @@ func TestClientIDGenerator(t *testing.T) {
}
}
}

func TestMergeCallOptions(t *testing.T) {
a := &vkit.CallOptions{
CreateSession: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable, codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 16000 * time.Millisecond,
Multiplier: 1.0,
})
}),
},
GetSession: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable, codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 250 * time.Millisecond,
Max: 32000 * time.Millisecond,
Multiplier: 1.30,
})
}),
},
}
b := &vkit.CallOptions{
CreateSession: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable,
}, gax.Backoff{
Initial: 250 * time.Millisecond,
Max: 32000 * time.Millisecond,
Multiplier: 1.30,
})
}),
},
BatchCreateSessions: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable,
}, gax.Backoff{
Initial: 250 * time.Millisecond,
Max: 32000 * time.Millisecond,
Multiplier: 1.30,
})
}),
}}

merged := mergeCallOptions(b, a)
cs := &gax.CallSettings{}
// We can't access the fields of Retryer so we have test the result by
// comparing strings.
merged.CreateSession[0].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}

merged.CreateSession[1].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{100000000 16000000000 1 0} [14 4]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}

merged.GetSession[0].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14 4]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}

merged.BatchCreateSessions[0].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}
}

0 comments on commit e143982

Please sign in to comment.