Skip to content

Commit

Permalink
client: implement maxAttempts for retryPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
imoore76 committed May 11, 2024
1 parent 59954c8 commit cab3751
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 9 deletions.
15 changes: 15 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,3 +1841,18 @@ func (cc *ClientConn) determineAuthority() error {
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
return nil
}

// sanitizeRPCConfig sanitizes the RPCConfig supplied by the server by
// enforcing any limits specified in the client's dial options
func (cc *ClientConn) sanitizeRPCConfig(c *iresolver.RPCConfig) error {
if c == nil {
return nil
}

Check warning on line 1850 in clientconn.go

View check run for this annotation

Codecov / codecov/patch

clientconn.go#L1849-L1850

Added lines #L1849 - L1850 were not covered by tests
// Enforce max retry attempts
if c.MethodConfig.RetryPolicy != nil {
if c.MethodConfig.RetryPolicy.MaxAttempts > cc.dopts.maxRetryAttempts {
c.MethodConfig.RetryPolicy.MaxAttempts = cc.dopts.maxRetryAttempts
}
}
return nil
}
25 changes: 20 additions & 5 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type dialOptions struct {
idleTimeout time.Duration
recvBufferPool SharedBufferPool
defaultScheme string
maxRetryAttempts int
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -650,11 +651,12 @@ func defaultDialOptions() dialOptions {
UseProxy: true,
UserAgent: grpcUA,
},
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
defaultScheme: "dns",
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
defaultScheme: "dns",
maxRetryAttempts: 5, // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#limits-on-retries-and-hedges
}
}

Expand Down Expand Up @@ -712,6 +714,19 @@ func WithIdleTimeout(d time.Duration) DialOption {
})
}

// WithMaxRetryAttempts returns a DialOption that configures the maximum number
// of retry attempts for the channel. Service owners may specify a higher value
// for these parameters, but higher values will be treated as equal to the
// maximum value by the client implementation. This mitigates security concerns
// related to the service config being transferred to the client via DNS.
//
// A default value of 5 will be used if this dial option is not set.
func WithMaxRetryAttempts(n int) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.maxRetryAttempts = n
})
}

// WithRecvBufferPool returns a DialOption that configures the ClientConn
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
Expand Down
4 changes: 0 additions & 4 deletions service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,6 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPol
BackoffMultiplier: jrp.BackoffMultiplier,
RetryableStatusCodes: make(map[codes.Code]bool),
}
if rp.MaxAttempts > 5 {
// TODO(retry): Make the max maxAttempts configurable.
rp.MaxAttempts = 5
}
for _, code := range jrp.RetryableStatusCodes {
rp.RetryableStatusCodes[code] = true
}
Expand Down
5 changes: 5 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}

if rpcConfig != nil {
// sanitize the rpcConfig based on the client's dial options
if err = cc.sanitizeRPCConfig(rpcConfig); err != nil {
return nil, err
}

Check warning on line 239 in stream.go

View check run for this annotation

Codecov / codecov/patch

stream.go#L238-L239

Added lines #L238 - L239 were not covered by tests

if rpcConfig.Context != nil {
ctx = rpcConfig.Context
}
Expand Down
133 changes: 133 additions & 0 deletions test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,64 @@ func (s) TestRetryUnary(t *testing.T) {
}
}

func (s) TestRetryMaxAttemptsUnary(t *testing.T) {
callCount := 0
ss := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
defer func() { t.Logf("server call %v returning err %v", callCount, err) }()
callCount++
return nil, status.New(codes.AlreadyExists, "retryable error").Err()
},
}
testCases := []struct {
serviceMaxAttempts int
clientMaxAttempts int
expectedAttempts int
}{
{serviceMaxAttempts: 9, clientMaxAttempts: 4, expectedAttempts: 4},
{serviceMaxAttempts: 9, clientMaxAttempts: 7, expectedAttempts: 7},
{serviceMaxAttempts: 3, clientMaxAttempts: 10, expectedAttempts: 3},
{serviceMaxAttempts: 8, clientMaxAttempts: -1, expectedAttempts: 5}, // 5 is default max
{serviceMaxAttempts: 3, clientMaxAttempts: 0, expectedAttempts: 1},
}
for num, tc := range testCases {
clientOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": %d,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "ALREADY_EXISTS" ]
}
}]}`, tc.serviceMaxAttempts)),
}
if tc.clientMaxAttempts >= 0 {
clientOpts = append(clientOpts, grpc.WithMaxRetryAttempts(tc.clientMaxAttempts))
}
func() {
callCount = 0
if err := ss.Start([]grpc.ServerOption{}, clientOpts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
t.Log("Case", num)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
cancel()
if status.Code(err) != codes.AlreadyExists {
t.Fatalf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, codes.AlreadyExists)
}
if callCount != tc.expectedAttempts {
t.Fatalf("expectedAttempts = %v; want %v", callCount, tc.expectedAttempts)
}
}()
}
}

func (s) TestRetryThrottling(t *testing.T) {
i := -1
ss := &stubserver.StubServer{
Expand Down Expand Up @@ -477,6 +535,81 @@ func (s) TestRetryStreaming(t *testing.T) {
}
}

func (s) TestRetryMaxAttemptsStreaming(t *testing.T) {
testCases := []struct {
serviceMaxAttempts int
clientMaxAttempts int
expectedAttempts int
}{
{serviceMaxAttempts: 9, clientMaxAttempts: 4, expectedAttempts: 4},
{serviceMaxAttempts: 9, clientMaxAttempts: 7, expectedAttempts: 7},
{serviceMaxAttempts: 3, clientMaxAttempts: 10, expectedAttempts: 3},
{serviceMaxAttempts: 8, clientMaxAttempts: -1, expectedAttempts: 5}, // 5 is default max
{serviceMaxAttempts: 3, clientMaxAttempts: 0, expectedAttempts: 1},
}

for _, tc := range testCases {
func() {
clientOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": %d,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`, tc.serviceMaxAttempts),
),
}

callCount := 0
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
callCount++
return status.New(codes.Unavailable, "this is a test error").Err()
},
}

if tc.clientMaxAttempts >= 0 {
clientOpts = append(clientOpts, grpc.WithMaxRetryAttempts(tc.clientMaxAttempts))
}
if err := ss.Start([]grpc.ServerOption{}, clientOpts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

for {
if ctx.Err() != nil {
t.Fatalf("Timed out waiting for service config update")
}
if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
break
}
time.Sleep(time.Millisecond)
}
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("Error while creating stream: %v", err)
}

if got, err := stream.Recv(); err == nil {
t.Fatalf("client: Recv() = %s, %v; want <nil>, error", got, err)
}

if callCount != tc.expectedAttempts {
t.Fatalf("expectedAttempts = %v; want %v", callCount, tc.expectedAttempts)
}

}()
}
}

type retryStatsHandler struct {
mu sync.Mutex
s []stats.RPCStats
Expand Down

0 comments on commit cab3751

Please sign in to comment.