From bedcbb91375baa9d854e15acb5651c80701a8966 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 28 Nov 2017 11:00:48 -0800 Subject: [PATCH] Add withbalancername dial option service config will not override dial option balancer fix comments Add const roundrobin.Name and PickFirstBalancerName. remove WithBalancerBuilder fix tests delete withBalancerBuilder --- balancer/roundrobin/roundrobin.go | 5 ++- balancer/roundrobin/roundrobin_test.go | 18 ++++----- balancer_switching_test.go | 37 +++++++++++++++++-- clientconn.go | 51 +++++++++++++++++++------- grpclb.go | 4 ++ grpclb/grpclb_test.go | 21 ++++++++++- grpclb_remote_balancer.go | 2 +- pickfirst.go | 5 ++- pickfirst_test.go | 14 +++---- test/end2end_test.go | 9 +---- 10 files changed, 120 insertions(+), 46 deletions(-) diff --git a/balancer/roundrobin/roundrobin.go b/balancer/roundrobin/roundrobin.go index 8ef14894801..2eda0a1c210 100644 --- a/balancer/roundrobin/roundrobin.go +++ b/balancer/roundrobin/roundrobin.go @@ -31,9 +31,12 @@ import ( "google.golang.org/grpc/resolver" ) +// Name is the name of round_robin balancer. +const Name = "round_robin" + // newBuilder creates a new roundrobin balancer builder. func newBuilder() balancer.Builder { - return base.NewBalancerBuilder("round_robin", &rrPickerBuilder{}) + return base.NewBalancerBuilder(Name, &rrPickerBuilder{}) } func init() { diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index 7f953ff002f..d65889b2758 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -27,7 +27,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/peer" @@ -37,8 +37,6 @@ import ( "google.golang.org/grpc/test/leakcheck" ) -var rr = balancer.Get("round_robin") - type testServer struct { testpb.TestServiceServer } @@ -102,7 +100,7 @@ func TestOneBackend(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -134,7 +132,7 @@ func TestBackendsRoundRobin(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -193,7 +191,7 @@ func TestAddressesRemoved(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -235,7 +233,7 @@ func TestCloseWithPendingRPC(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -269,7 +267,7 @@ func TestNewAddressWhileBlocking(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -318,7 +316,7 @@ func TestOneServerDown(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -415,7 +413,7 @@ func TestAllServersDown(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 1fd2f564c03..fb9f0f57069 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -25,6 +25,7 @@ import ( "time" "golang.org/x/net/context" + "google.golang.org/grpc/balancer/roundrobin" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -132,6 +133,34 @@ func TestSwitchBalancer(t *testing.T) { } } +// Test that balancer specified by dial option will not be overridden. +func TestBalancerDialOption(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + numServers := 2 + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) + defer scleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name)) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}) + // The init balancer is roundrobin. + if err := checkRoundRobin(cc, servers); err != nil { + t.Fatalf("check roundrobin returned non-nil error: %v", err) + } + // Switch to pickfirst. + cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`) + // Balancer is still roundrobin. + if err := checkRoundRobin(cc, servers); err != nil { + t.Fatalf("check roundrobin returned non-nil error: %v", err) + } +} + // First addr update contains grpclb. func TestSwitchBalancerGRPCLBFirst(t *testing.T) { defer leakcheck.Check(t) @@ -182,7 +211,7 @@ func TestSwitchBalancerGRPCLBFirst(t *testing.T) { r.NewAddress([]resolver.Address{{Addr: "backend"}}) for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break @@ -210,7 +239,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) { var isPickFirst bool for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break @@ -258,7 +287,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) { r.NewAddress([]resolver.Address{{Addr: "backend"}}) for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break @@ -352,7 +381,7 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) { var isPickFirst bool for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break diff --git a/clientconn.go b/clientconn.go index a047d19434c..b31f8632991 100644 --- a/clientconn.go +++ b/clientconn.go @@ -95,8 +95,10 @@ type dialOptions struct { scChan <-chan ServiceConfig copts transport.ConnectOptions callOptions []CallOption - // This is to support v1 balancer. + // This is used by v1 balancer dial option WithBalancer to support v1 balancer. balancerBuilder balancer.Builder + // The balancer to be used. Can not be overridden by service config. + balancerName string // This is to support grpclb. resolverBuilder resolver.Builder // Custom user options for resolver.Build. @@ -200,7 +202,8 @@ func WithDecompressor(dc Decompressor) DialOption { // WithBalancer returns a DialOption which sets a load balancer with the v1 API. // Name resolver will be ignored if this DialOption is specified. -// Deprecated: use the new balancer APIs in balancer package instead. +// +// Deprecated: use the new balancer APIs in balancer package and WithBalancerName. func WithBalancer(b Balancer) DialOption { return func(o *dialOptions) { o.balancerBuilder = &balancerWrapperBuilder{ @@ -209,12 +212,17 @@ func WithBalancer(b Balancer) DialOption { } } -// WithBalancerBuilder is for testing only. Users using custom balancers should -// register their balancer and use service config to choose the balancer to use. -func WithBalancerBuilder(b balancer.Builder) DialOption { - // TODO(bar) remove this when switching balancer is done. +// WithBalancerName sets the balancer that the ClientConn will be initialized +// with. Balancer registered with balancerName will be used. If no balancer was +// registered by balancerName, pick_first will be used. +// +// The balancer cannot be overridden by balancer option specified by service +// config. +// +// This is an EXPERIMENTAL API. +func WithBalancerName(balancerName string) DialOption { return func(o *dialOptions) { - o.balancerBuilder = b + o.balancerName = balancerName } } @@ -670,9 +678,9 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { cc.curAddresses = addrs - if cc.dopts.balancerBuilder != nil && cc.balancerWrapper == nil { - cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) - } else { + if cc.dopts.balancerName == "" && cc.dopts.balancerBuilder == nil { + // Only look at balancer types ans switch balancer if balancer dial + // option is not set. var isGRPCLB bool for _, a := range addrs { if a.Type == resolver.GRPCLB { @@ -697,11 +705,28 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { // - the first time handling non-grpclb addresses // (curBalancerName="grpclb", preBalancerName="") if newBalancerName == "" { - newBalancerName = pickfirstName + newBalancerName = PickFirstBalancerName } } cc.switchBalancer(newBalancerName) + } else if cc.balancerWrapper == nil { + // Balancer dial option was set, and this is the first time handling + // resolved addresses. Build a balancer use the first non-nil: + // - the builder with dopts.balancerName + // - the builder from dopts.balancerBuilder + // - pickfirst + builder := balancer.Get(cc.dopts.balancerName) + if builder == nil && cc.dopts.balancerBuilder != nil { + builder = cc.dopts.balancerBuilder + } + if builder == nil { + // No customBalancer was specified by DialOption, and this is the first + // time handling resolved addresses, create a pickfirst balancer. + builder = newPickfirstBuilder() + } + cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) } + cc.balancerWrapper.handleResolvedAddrs(addrs, nil) } @@ -723,8 +748,8 @@ func (cc *ClientConn) switchBalancer(name string) { } grpclog.Infof("ClientConn switching balancer to %q", name) - if cc.dopts.balancerBuilder != nil { - grpclog.Infoln("ignoring balancer switching: WithBalancer DialOption used instead") + if cc.dopts.balancerName != "" || cc.dopts.balancerBuilder != nil { + grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") return } // TODO(bar switching) change this to two steps: drain and close. diff --git a/grpclb.go b/grpclb.go index 0750fc1ecea..90564f4e675 100644 --- a/grpclb.go +++ b/grpclb.go @@ -83,6 +83,10 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { return m, nil } +func init() { + balancer.Register(newLBBuilder()) +} + // newLBBuilder creates a builder for grpclb. func newLBBuilder() balancer.Builder { return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout) diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index 8c4b01328b6..532ff817ded 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -37,6 +37,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" @@ -574,6 +575,24 @@ func TestBalancerDisconnects(t *testing.T) { t.Fatalf("No RPC sent to second backend after 1 second") } +type customGRPCLBBuilder struct { + balancer.Builder + name string +} + +func (b *customGRPCLBBuilder) Name() string { + return b.name +} + +const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout" + +func init() { + balancer.Register(&customGRPCLBBuilder{ + Builder: grpc.NewLBBuilderWithFallbackTimeout(100 * time.Millisecond), + name: grpclbCustomFallbackName, + }) +} + func TestFallback(t *testing.T) { defer leakcheck.Check(t) @@ -612,7 +631,7 @@ func TestFallback(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilderWithFallbackTimeout(100*time.Millisecond)), + grpc.WithBalancerName(grpclbCustomFallbackName), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) diff --git a/grpclb_remote_balancer.go b/grpclb_remote_balancer.go index 50d4ccd3a3a..1b580df26dd 100644 --- a/grpclb_remote_balancer.go +++ b/grpclb_remote_balancer.go @@ -241,7 +241,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { dopts = append(dopts, withContextDialer(lb.opt.Dialer)) } // Explicitly set pickfirst as the balancer. - dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder())) + dopts = append(dopts, WithBalancerName(PickFirstBalancerName)) dopts = append(dopts, withResolverBuilder(lb.manualResolver)) // Dial using manualResolver.Scheme, which is a random scheme generated // when init grpclb. The target name is not important. diff --git a/pickfirst.go b/pickfirst.go index 69623c4b1c8..bf659d49d2f 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -26,7 +26,8 @@ import ( "google.golang.org/grpc/resolver" ) -const pickfirstName = "pick_first" +// PickFirstBalancerName is the name of the pick_first balancer. +const PickFirstBalancerName = "pick_first" func newPickfirstBuilder() balancer.Builder { return &pickfirstBuilder{} @@ -39,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions } func (*pickfirstBuilder) Name() string { - return pickfirstName + return PickFirstBalancerName } type pickfirstBalancer struct { diff --git a/pickfirst_test.go b/pickfirst_test.go index e58b3422c14..33d7ead9136 100644 --- a/pickfirst_test.go +++ b/pickfirst_test.go @@ -40,7 +40,7 @@ func TestOneBackendPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -74,7 +74,7 @@ func TestBackendsPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -108,7 +108,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -145,7 +145,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) { _, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -182,7 +182,7 @@ func TestOneServerDownPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -224,7 +224,7 @@ func TestAllServersDownPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -268,7 +268,7 @@ func TestAddressesRemovedPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/test/end2end_test.go b/test/end2end_test.go index 43a53a19ba8..1da3fc0bcb7 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -46,8 +46,7 @@ import ( "golang.org/x/net/http2" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" - _ "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -693,11 +692,7 @@ func (te *test) clientConn() *grpc.ClientConn { case "v1": opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) case "round_robin": - rr := balancer.Get("round_robin") - if rr == nil { - te.t.Fatalf("got nil when trying to get roundrobin balancer builder") - } - opts = append(opts, grpc.WithBalancerBuilder(rr)) + opts = append(opts, grpc.WithBalancerName(roundrobin.Name)) } if te.clientInitialWindowSize > 0 { opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))