diff --git a/interop/xds/client/client.go b/interop/xds/client/client.go index 27f954c30d2..7792094d54d 100644 --- a/interop/xds/client/client.go +++ b/interop/xds/client/client.go @@ -27,6 +27,7 @@ import ( "net" "strings" "sync" + "sync/atomic" "time" "google.golang.org/grpc" @@ -73,7 +74,7 @@ func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse { } var ( - failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail") + failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success") numChannels = flag.Int("num_channels", 1, "Num of channels") printResponse = flag.Bool("print_response", false, "Write RPC response to stdout") qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC") @@ -86,12 +87,24 @@ var ( mu sync.Mutex currentRequestID int32 watchers = make(map[statsWatcherKey]*statsWatcher) + + // 0 or 1 representing an RPC has succeeded. Use hasRPCSucceeded and + // setRPCSucceeded to access in a safe manner. + rpcSucceeded uint32 ) type statsService struct { testpb.UnimplementedLoadBalancerStatsServiceServer } +func hasRPCSucceeded() bool { + return atomic.LoadUint32(&rpcSucceeded) > 0 +} + +func setRPCSucceeded() { + atomic.StoreUint32(&rpcSucceeded, 1) +} + // Wait for the next LoadBalancerStatsRequest.GetNumRpcs to start and complete, // and return the distribution of remote peers. This is essentially a clientside // LB reporting mechanism that is designed to be queried by an external test @@ -291,9 +304,12 @@ func sendRPCs(clients []testpb.TestServiceClient, cfgs []*rpcConfig, ticker *tim // This sends an empty string if the RPC failed. watcher.chanHosts <- info } - if err != nil && *failOnFailedRPC { + if err != nil && *failOnFailedRPC && hasRPCSucceeded() { grpclog.Fatalf("RPC failed: %v", err) } + if err == nil { + setRPCSucceeded() + } if *printResponse { if err == nil { if cfg.typ == unaryCall {