From 1b7bf449d7bdd3ca89f930bc9c3e4c45d447cf0a Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 28 Nov 2017 11:00:48 -0800 Subject: [PATCH] Add withbalancername dial option service config will not override dial option balancer fix comments Add const roundrobin.Name and PickFirstBalancerName. remove WithBalancerBuilder fix tests delete withBalancerBuilder --- balancer/roundrobin/roundrobin.go | 5 ++++- balancer/roundrobin/roundrobin_test.go | 18 +++++++-------- balancer_switching_test.go | 29 ++++++++++++++++++++++++ clientconn.go | 31 +++++++++++++++++--------- grpclb.go | 9 +++++--- grpclb/grpclb_test.go | 31 +++++++++++++++++++++----- grpclb_remote_balancer.go | 2 +- pickfirst.go | 5 ++++- pickfirst_test.go | 14 ++++++------ test/end2end_test.go | 9 ++------ 10 files changed, 107 insertions(+), 46 deletions(-) diff --git a/balancer/roundrobin/roundrobin.go b/balancer/roundrobin/roundrobin.go index 8ef14894801c..2eda0a1c2107 100644 --- a/balancer/roundrobin/roundrobin.go +++ b/balancer/roundrobin/roundrobin.go @@ -31,9 +31,12 @@ import ( "google.golang.org/grpc/resolver" ) +// Name is the name of round_robin balancer. +const Name = "round_robin" + // newBuilder creates a new roundrobin balancer builder. func newBuilder() balancer.Builder { - return base.NewBalancerBuilder("round_robin", &rrPickerBuilder{}) + return base.NewBalancerBuilder(Name, &rrPickerBuilder{}) } func init() { diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index 7f953ff002f2..d65889b27587 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -27,7 +27,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/peer" @@ -37,8 +37,6 @@ import ( "google.golang.org/grpc/test/leakcheck" ) -var rr = balancer.Get("round_robin") - type testServer struct { testpb.TestServiceServer } @@ -102,7 +100,7 @@ func TestOneBackend(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -134,7 +132,7 @@ func TestBackendsRoundRobin(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -193,7 +191,7 @@ func TestAddressesRemoved(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -235,7 +233,7 @@ func TestCloseWithPendingRPC(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -269,7 +267,7 @@ func TestNewAddressWhileBlocking(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -318,7 +316,7 @@ func TestOneServerDown(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -415,7 +413,7 @@ func TestAllServersDown(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 92c196d388e1..6eef6a1b8ce2 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -25,6 +25,7 @@ import ( "time" "golang.org/x/net/context" + "google.golang.org/grpc/balancer/roundrobin" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -131,3 +132,31 @@ func TestSwitchBalancer(t *testing.T) { t.Fatalf("check pickfirst returned non-nil error: %v", err) } } + +// Test that balancer specified by dial option will not be overridden. +func TestBalancerDialOption(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + numServers := 2 + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) + defer scleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name)) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}) + // The init balancer is roundrobin. + if err := checkRoundRobin(cc, servers); err != nil { + t.Fatalf("check roundrobin returned non-nil error: %v", err) + } + // Switch to pickfirst. + cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`) + // Balancer is still roundrobin. + if err := checkRoundRobin(cc, servers); err != nil { + t.Fatalf("check roundrobin returned non-nil error: %v", err) + } +} diff --git a/clientconn.go b/clientconn.go index 4a98265a046f..08746ae38e02 100644 --- a/clientconn.go +++ b/clientconn.go @@ -95,8 +95,10 @@ type dialOptions struct { scChan <-chan ServiceConfig copts transport.ConnectOptions callOptions []CallOption - // This is to support v1 balancer. + // This is used by v1 balancer dial option WithBalancer to support v1 balancer. balancerBuilder balancer.Builder + // The balancer to be used. Can not be overridden by service config. + balancerName string // This is to support grpclb. resolverBuilder resolver.Builder waitForHandshake bool @@ -198,7 +200,8 @@ func WithDecompressor(dc Decompressor) DialOption { // WithBalancer returns a DialOption which sets a load balancer with the v1 API. // Name resolver will be ignored if this DialOption is specified. -// Deprecated: use the new balancer APIs in balancer package instead. +// +// Deprecated: use the new balancer APIs in balancer package and WithBalancerName. func WithBalancer(b Balancer) DialOption { return func(o *dialOptions) { o.balancerBuilder = &balancerWrapperBuilder{ @@ -207,12 +210,17 @@ func WithBalancer(b Balancer) DialOption { } } -// WithBalancerBuilder is for testing only. Users using custom balancers should -// register their balancer and use service config to choose the balancer to use. -func WithBalancerBuilder(b balancer.Builder) DialOption { - // TODO(bar) remove this when switching balancer is done. +// WithBalancerName sets the balancer that the ClientConn will be initialized +// with. Balancer registered with balancerName will be used. If no balancer was +// registered by balancerName, pick_first will be used. +// +// The balancer cannot be overridden by balancer option specified by service +// config. +// +// This is an EXPERIMENTAL API. +func WithBalancerName(balancerName string) DialOption { return func(o *dialOptions) { - o.balancerBuilder = b + o.balancerName = balancerName } } @@ -661,7 +669,10 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { if cc.balancerWrapper == nil { // First time handling resolved addresses. Build a balancer use either // the builder specified by dial option, or pickfirst. - builder := cc.dopts.balancerBuilder + builder := balancer.Get(cc.dopts.balancerName) + if builder == nil && cc.dopts.balancerBuilder != nil { + builder = cc.dopts.balancerBuilder + } if builder == nil { // No customBalancer was specified by DialOption, and this is the first // time handling resolved addresses, create a pickfirst balancer. @@ -684,8 +695,8 @@ func (cc *ClientConn) switchBalancer(name string) { } grpclog.Infof("ClientConn switching balancer to %q", name) - if cc.dopts.balancerBuilder != nil { - grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead") + if cc.dopts.balancerName != "" || cc.dopts.balancerBuilder != nil { + grpclog.Infoln("ignoring service config balancer configuration: Balancer DialOption used instead") return } diff --git a/grpclb.go b/grpclb.go index 8489c690b6a2..6dce06deae7d 100644 --- a/grpclb.go +++ b/grpclb.go @@ -82,9 +82,12 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { return m, nil } -// NewLBBuilder creates a builder for grpclb. For testing only. -func NewLBBuilder() balancer.Builder { - // TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb. +func init() { + balancer.Register(newLBBuilder()) +} + +// newLBBuilder creates a builder for grpclb. For testing only. +func newLBBuilder() balancer.Builder { return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout) } diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index 9bab731c6f53..10abe20f4c47 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -37,6 +37,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" @@ -344,7 +345,7 @@ func TestGRPCLB(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), + grpc.WithBalancerName("grpclb"), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -396,7 +397,7 @@ func TestGRPCLBWeighted(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), + grpc.WithBalancerName("grpclb"), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -462,7 +463,7 @@ func TestDropRequest(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), + grpc.WithBalancerName("grpclb"), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -537,7 +538,7 @@ func TestBalancerDisconnects(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), + grpc.WithBalancerName("grpclb"), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -578,6 +579,24 @@ func TestBalancerDisconnects(t *testing.T) { t.Fatalf("No RPC sent to second backend after 1 second") } +type customGRPCLBBuilder struct { + balancer.Builder + name string +} + +func (b *customGRPCLBBuilder) Name() string { + return b.name +} + +const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout" + +func init() { + balancer.Register(&customGRPCLBBuilder{ + Builder: grpc.NewLBBuilderWithFallbackTimeout(100 * time.Millisecond), + name: grpclbCustomFallbackName, + }) +} + func TestFallback(t *testing.T) { defer leakcheck.Check(t) @@ -616,7 +635,7 @@ func TestFallback(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilderWithFallbackTimeout(100*time.Millisecond)), + grpc.WithBalancerName(grpclbCustomFallbackName), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -710,7 +729,7 @@ func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), + grpc.WithBalancerName("grpclb"), grpc.WithTransportCredentials(&creds), grpc.WithPerRPCCredentials(failPreRPCCred{}), grpc.WithDialer(fakeNameDialer)) diff --git a/grpclb_remote_balancer.go b/grpclb_remote_balancer.go index 50d4ccd3a3a9..1b580df26dd1 100644 --- a/grpclb_remote_balancer.go +++ b/grpclb_remote_balancer.go @@ -241,7 +241,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { dopts = append(dopts, withContextDialer(lb.opt.Dialer)) } // Explicitly set pickfirst as the balancer. - dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder())) + dopts = append(dopts, WithBalancerName(PickFirstBalancerName)) dopts = append(dopts, withResolverBuilder(lb.manualResolver)) // Dial using manualResolver.Scheme, which is a random scheme generated // when init grpclb. The target name is not important. diff --git a/pickfirst.go b/pickfirst.go index e83ca2b0db6d..bf659d49d2f1 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -26,6 +26,9 @@ import ( "google.golang.org/grpc/resolver" ) +// PickFirstBalancerName is the name of the pick_first balancer. +const PickFirstBalancerName = "pick_first" + func newPickfirstBuilder() balancer.Builder { return &pickfirstBuilder{} } @@ -37,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions } func (*pickfirstBuilder) Name() string { - return "pick_first" + return PickFirstBalancerName } type pickfirstBalancer struct { diff --git a/pickfirst_test.go b/pickfirst_test.go index e58b3422c149..33d7ead91366 100644 --- a/pickfirst_test.go +++ b/pickfirst_test.go @@ -40,7 +40,7 @@ func TestOneBackendPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -74,7 +74,7 @@ func TestBackendsPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -108,7 +108,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -145,7 +145,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) { _, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -182,7 +182,7 @@ func TestOneServerDownPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -224,7 +224,7 @@ func TestAllServersDownPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -268,7 +268,7 @@ func TestAddressesRemovedPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/test/end2end_test.go b/test/end2end_test.go index 43a53a19ba82..1da3fc0bcb76 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -46,8 +46,7 @@ import ( "golang.org/x/net/http2" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" - _ "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -693,11 +692,7 @@ func (te *test) clientConn() *grpc.ClientConn { case "v1": opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) case "round_robin": - rr := balancer.Get("round_robin") - if rr == nil { - te.t.Fatalf("got nil when trying to get roundrobin balancer builder") - } - opts = append(opts, grpc.WithBalancerBuilder(rr)) + opts = append(opts, grpc.WithBalancerName(roundrobin.Name)) } if te.clientInitialWindowSize > 0 { opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))