Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: implement maxAttempts for retryPolicy #7223

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,3 +1841,14 @@ func (cc *ClientConn) determineAuthority() error {
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
return nil
}

// sanitizeRPCConfig sanitizes the RPCConfig supplied by the server by
// enforcing any limits specified in the client's dial options
func (cc *ClientConn) sanitizeRPCConfig(c *iresolver.RPCConfig) {
// Enforce max retry attempts
if c.MethodConfig.RetryPolicy != nil {
if c.MethodConfig.RetryPolicy.MaxAttempts > cc.dopts.maxRetryAttempts {
c.MethodConfig.RetryPolicy.MaxAttempts = cc.dopts.maxRetryAttempts
}
}
}
25 changes: 20 additions & 5 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type dialOptions struct {
idleTimeout time.Duration
recvBufferPool SharedBufferPool
defaultScheme string
maxRetryAttempts int
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -650,11 +651,12 @@ func defaultDialOptions() dialOptions {
UseProxy: true,
UserAgent: grpcUA,
},
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
defaultScheme: "dns",
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
defaultScheme: "dns",
maxRetryAttempts: 5, // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#limits-on-retries-and-hedges
}
}

Expand Down Expand Up @@ -712,6 +714,19 @@ func WithIdleTimeout(d time.Duration) DialOption {
})
}

// WithMaxRetryAttempts returns a DialOption that configures the maximum number
// of retry attempts for the channel. Service owners may specify a higher value
// for these parameters, but higher values will be treated as equal to the
// maximum value by the client implementation. This mitigates security concerns
// related to the service config being transferred to the client via DNS.
//
// A default value of 5 will be used if this dial option is not set.
func WithMaxRetryAttempts(n int) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.maxRetryAttempts = n
})
}

// WithRecvBufferPool returns a DialOption that configures the ClientConn
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
Expand Down
4 changes: 0 additions & 4 deletions service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,6 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPol
BackoffMultiplier: jrp.BackoffMultiplier,
RetryableStatusCodes: make(map[codes.Code]bool),
}
if rp.MaxAttempts > 5 {
// TODO(retry): Make the max maxAttempts configurable.
rp.MaxAttempts = 5
}
for _, code := range jrp.RetryableStatusCodes {
rp.RetryableStatusCodes[code] = true
}
Expand Down
3 changes: 3 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}

if rpcConfig != nil {
// sanitize the rpcConfig based on the client's dial options
cc.sanitizeRPCConfig(rpcConfig)

if rpcConfig.Context != nil {
ctx = rpcConfig.Context
}
Expand Down
133 changes: 133 additions & 0 deletions test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,64 @@ func (s) TestRetryUnary(t *testing.T) {
}
}

func (s) TestRetryMaxAttemptsUnary(t *testing.T) {
callCount := 0
ss := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
defer func() { t.Logf("server call %v returning err %v", callCount, err) }()
callCount++
return nil, status.New(codes.AlreadyExists, "retryable error").Err()
},
}
testCases := []struct {
serviceMaxAttempts int
clientMaxAttempts int
expectedAttempts int
}{
{serviceMaxAttempts: 9, clientMaxAttempts: 4, expectedAttempts: 4},
{serviceMaxAttempts: 9, clientMaxAttempts: 7, expectedAttempts: 7},
{serviceMaxAttempts: 3, clientMaxAttempts: 10, expectedAttempts: 3},
{serviceMaxAttempts: 8, clientMaxAttempts: -1, expectedAttempts: 5}, // 5 is default max
{serviceMaxAttempts: 3, clientMaxAttempts: 0, expectedAttempts: 1},
}
for num, tc := range testCases {
clientOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": %d,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "ALREADY_EXISTS" ]
}
}]}`, tc.serviceMaxAttempts)),
}
if tc.clientMaxAttempts >= 0 {
clientOpts = append(clientOpts, grpc.WithMaxRetryAttempts(tc.clientMaxAttempts))
}
func() {
callCount = 0
if err := ss.Start([]grpc.ServerOption{}, clientOpts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
t.Log("Case", num)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
cancel()
if status.Code(err) != codes.AlreadyExists {
t.Fatalf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, codes.AlreadyExists)
}
if callCount != tc.expectedAttempts {
t.Fatalf("expectedAttempts = %v; want %v", callCount, tc.expectedAttempts)
}
}()
}
}

func (s) TestRetryThrottling(t *testing.T) {
i := -1
ss := &stubserver.StubServer{
Expand Down Expand Up @@ -477,6 +535,81 @@ func (s) TestRetryStreaming(t *testing.T) {
}
}

func (s) TestRetryMaxAttemptsStreaming(t *testing.T) {
testCases := []struct {
serviceMaxAttempts int
clientMaxAttempts int
expectedAttempts int
}{
{serviceMaxAttempts: 9, clientMaxAttempts: 4, expectedAttempts: 4},
{serviceMaxAttempts: 9, clientMaxAttempts: 7, expectedAttempts: 7},
{serviceMaxAttempts: 3, clientMaxAttempts: 10, expectedAttempts: 3},
{serviceMaxAttempts: 8, clientMaxAttempts: -1, expectedAttempts: 5}, // 5 is default max
{serviceMaxAttempts: 3, clientMaxAttempts: 0, expectedAttempts: 1},
}

for _, tc := range testCases {
func() {
clientOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": %d,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`, tc.serviceMaxAttempts),
),
}

callCount := 0
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
callCount++
return status.New(codes.Unavailable, "this is a test error").Err()
},
}

if tc.clientMaxAttempts >= 0 {
clientOpts = append(clientOpts, grpc.WithMaxRetryAttempts(tc.clientMaxAttempts))
}
if err := ss.Start([]grpc.ServerOption{}, clientOpts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

for {
if ctx.Err() != nil {
t.Fatalf("Timed out waiting for service config update")
}
if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
break
}
time.Sleep(time.Millisecond)
}
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("Error while creating stream: %v", err)
}

if got, err := stream.Recv(); err == nil {
t.Fatalf("client: Recv() = %s, %v; want <nil>, error", got, err)
}

if callCount != tc.expectedAttempts {
t.Fatalf("expectedAttempts = %v; want %v", callCount, tc.expectedAttempts)
}

}()
}
}

type retryStatsHandler struct {
mu sync.Mutex
s []stats.RPCStats
Expand Down