diff --git a/balancer/balancer.go b/balancer/balancer.go index 6a1b779edc2..34c435d9072 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -75,24 +75,26 @@ func Get(name string) Builder { return nil } -// SubConn represents a gRPC sub connection. -// Each sub connection contains a list of addresses. gRPC will -// try to connect to them (in sequence), and stop trying the -// remainder once one connection is successful. +// A SubConn represents a single connection to a gRPC backend service. // -// The reconnect backoff will be applied on the list, not a single address. -// For example, try_on_all_addresses -> backoff -> try_on_all_addresses. +// Each SubConn contains a list of addresses. // -// All SubConns start in IDLE, and will not try to connect. To trigger -// the connecting, Balancers must call Connect. -// When the connection encounters an error, it will reconnect immediately. -// When the connection becomes IDLE, it will not reconnect unless Connect is -// called. +// All SubConns start in IDLE, and will not try to connect. To trigger the +// connecting, Balancers must call Connect. If a connection re-enters IDLE, +// Balancers must call Connect again to trigger a new connection attempt. // -// This interface is to be implemented by gRPC. Users should not need a -// brand new implementation of this interface. For the situations like -// testing, the new implementation should embed this interface. This allows -// gRPC to add new methods to this interface. +// gRPC will try to connect to the addresses in sequence, and stop trying the +// remainder once the first connection is successful. If an attempt to connect +// to all addresses encounters an error, the SubConn will enter +// TRANSIENT_FAILURE for a backoff period, and then transition to IDLE. +// +// Once established, if a connection is lost, the SubConn will transition +// directly to IDLE. +// +// This interface is to be implemented by gRPC. Users should not need their own +// implementation of this interface. For situations like testing, any +// implementations should embed this interface. This allows gRPC to add new +// methods to this interface. type SubConn interface { // UpdateAddresses updates the addresses used in this SubConn. // gRPC checks if currently-connected address is still in the new list. @@ -326,6 +328,20 @@ type Balancer interface { Close() } +// ExitIdler is an optional interface for balancers to implement. If +// implemented, ExitIdle will be called when ClientConn.Connect is called, if +// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause +// all SubConns to connect. +// +// Notice: it will be required for all balancers to implement this in a future +// release. +type ExitIdler interface { + // ExitIdle instructs the LB policy to reconnect to backends / exit the + // IDLE state, if appropriate and possible. Note that SubConns that enter + // the IDLE state will not reconnect until SubConn.Connect is called. + ExitIdle() +} + // SubConnState describes the state of a SubConn. type SubConnState struct { // ConnectivityState is the connectivity state of the SubConn. diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index c883efa0bbf..b1286533e73 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -251,6 +251,11 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su func (b *baseBalancer) Close() { } +// ExitIdle is a nop because the base balancer attempts to stay connected to +// all SubConns at all times. +func (b *baseBalancer) ExitIdle() { +} + // NewErrPicker returns a Picker that always returns err on Pick(). func NewErrPicker(err error) balancer.Picker { return &errPicker{err: err} diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 49d11d0d2e2..adf59611160 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -488,3 +488,5 @@ func (lb *lbBalancer) Close() { } lb.cc.close() } + +func (lb *lbBalancer) ExitIdle() {} diff --git a/balancer/rls/internal/balancer.go b/balancer/rls/internal/balancer.go index 7af97b76faf..b23783bf9da 100644 --- a/balancer/rls/internal/balancer.go +++ b/balancer/rls/internal/balancer.go @@ -129,6 +129,10 @@ func (lb *rlsBalancer) Close() { } } +func (lb *rlsBalancer) ExitIdle() { + // TODO: are we 100% sure this should be a nop? +} + // updateControlChannel updates the RLS client if required. // Caller must hold lb.mu. func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) { diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 0ddb24f375f..f4ea6174682 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -37,15 +37,20 @@ type scStateUpdate struct { err error } +// exitIdle contains no data and is just a signal sent on the updateCh in +// ccBalancerWrapper to instruct the balancer to exit idle. +type exitIdle struct{} + // ccBalancerWrapper is a wrapper on top of cc for balancers. // It implements balancer.ClientConn interface. type ccBalancerWrapper struct { - cc *ClientConn - balancerMu sync.Mutex // synchronizes calls to the balancer - balancer balancer.Balancer - updateCh *buffer.Unbounded - closed *grpcsync.Event - done *grpcsync.Event + cc *ClientConn + balancerMu sync.Mutex // synchronizes calls to the balancer + balancer balancer.Balancer + hasExitIdle bool + updateCh *buffer.Unbounded + closed *grpcsync.Event + done *grpcsync.Event mu sync.Mutex subConns map[*acBalancerWrapper]struct{} @@ -61,6 +66,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui } go ccb.watcher() ccb.balancer = b.Build(ccb, bopts) + _, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler) return ccb } @@ -86,6 +92,17 @@ func (ccb *ccBalancerWrapper) watcher() { ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain) } ccb.mu.Unlock() + case exitIdle: + if ccb.cc.GetState() == connectivity.Idle { + if ei, ok := ccb.balancer.(balancer.ExitIdler); ok { + // We already checked that the balancer implements + // ExitIdle before pushing the event to updateCh, but + // check conditionally again as defensive programming. + ccb.balancerMu.Lock() + ei.ExitIdle() + ccb.balancerMu.Unlock() + } + } default: logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t) } @@ -118,6 +135,14 @@ func (ccb *ccBalancerWrapper) close() { <-ccb.done.Done() } +func (ccb *ccBalancerWrapper) exitIdle() bool { + if !ccb.hasExitIdle { + return false + } + ccb.updateCh.Put(exitIdle{}) + return true +} + func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { // When updating addresses for a SubConn, if the address in use is not in // the new addresses, the old ac will be tearDown() and a new ac will be @@ -144,8 +169,8 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat func (ccb *ccBalancerWrapper) resolverError(err error) { ccb.balancerMu.Lock() + defer ccb.balancerMu.Unlock() ccb.balancer.ResolverError(err) - ccb.balancerMu.Unlock() } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { diff --git a/balancer_switching_test.go b/balancer_switching_test.go index e9fee87d8f8..5d9a1f9fffc 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -58,6 +58,8 @@ func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error { func (b *magicalLB) Close() {} +func (b *magicalLB) ExitIdle() {} + func init() { balancer.Register(&magicalLB{}) } diff --git a/clientconn.go b/clientconn.go index b9e9eed4681..62dc3bdaf52 100644 --- a/clientconn.go +++ b/clientconn.go @@ -555,13 +555,13 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - if cc.GetState() == connectivity.Idle { - cc.mu.Lock() - for ac := range cc.conns { - // TODO: should this be a signal to the LB policy instead? - go ac.connect() - } - cc.mu.Unlock() + cc.mu.Lock() + defer cc.mu.Unlock() + if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() { + return + } + for ac := range cc.conns { + go ac.connect() } } diff --git a/internal/balancer/stub/stub.go b/internal/balancer/stub/stub.go index e3757c1a50b..950eaaa0278 100644 --- a/internal/balancer/stub/stub.go +++ b/internal/balancer/stub/stub.go @@ -33,6 +33,7 @@ type BalancerFuncs struct { ResolverError func(*BalancerData, error) UpdateSubConnState func(*BalancerData, balancer.SubConn, balancer.SubConnState) Close func(*BalancerData) + ExitIdle func(*BalancerData) } // BalancerData contains data relevant to a stub balancer. @@ -75,6 +76,12 @@ func (b *bal) Close() { } } +func (b *bal) ExitIdle() { + if b.bf.ExitIdle != nil { + b.bf.ExitIdle(b.bd) + } +} + type bb struct { name string bf BalancerFuncs diff --git a/pickfirst.go b/pickfirst.go index d32161c748d..f194d14a081 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -124,6 +124,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S func (b *pickfirstBalancer) Close() { } +func (b *pickfirstBalancer) ExitIdle() { + if b.state == connectivity.Idle { + b.sc.Connect() + } +} + type picker struct { result balancer.PickResult err error diff --git a/test/balancer_test.go b/test/balancer_test.go index a6a8f726afa..bb87ac834d6 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -116,6 +116,8 @@ func (b *testBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubCon func (b *testBalancer) Close() {} +func (b *testBalancer) ExitIdle() {} + type picker struct { err error sc balancer.SubConn @@ -373,8 +375,9 @@ func (testBalancerKeepAddresses) UpdateSubConnState(sc balancer.SubConn, s balan panic("not used") } -func (testBalancerKeepAddresses) Close() { -} +func (testBalancerKeepAddresses) Close() {} + +func (testBalancerKeepAddresses) ExitIdle() {} // Make sure that non-grpclb balancers don't get grpclb addresses even if name // resolver sends them diff --git a/xds/internal/balancer/balancergroup/balancergroup.go b/xds/internal/balancer/balancergroup/balancergroup.go index 6d54728dc91..5798b03ac50 100644 --- a/xds/internal/balancer/balancergroup/balancergroup.go +++ b/xds/internal/balancer/balancergroup/balancergroup.go @@ -104,6 +104,22 @@ func (sbc *subBalancerWrapper) startBalancer() { } } +func (sbc *subBalancerWrapper) exitIdle() { + b := sbc.balancer + if b == nil { + return + } + if ei, ok := b.(balancer.ExitIdler); ok { + ei.ExitIdle() + return + } + for sc, b := range sbc.group.scToSubBalancer { + if b == sbc { + sc.Connect() + } + } +} + func (sbc *subBalancerWrapper) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { b := sbc.balancer if b == nil { @@ -493,6 +509,17 @@ func (bg *BalancerGroup) Close() { bg.outgoingMu.Unlock() } +// ExitIdle should be invoked when the parent LB policy's ExitIdle is invoked. +// It will trigger this on all sub-balancers, or reconnect their subconns if +// not supported. +func (bg *BalancerGroup) ExitIdle() { + bg.outgoingMu.Lock() + for _, config := range bg.idToBalancerConfig { + config.exitIdle() + } + bg.outgoingMu.Unlock() +} + const ( serverLoadCPUName = "cpu_utilization" serverLoadMemoryName = "mem_utilization" diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 9fb3e9bd48a..82d2a96958e 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -141,6 +141,8 @@ type scUpdate struct { state balancer.SubConnState } +type exitIdle struct{} + // cdsBalancer implements a CDS based LB policy. It instantiates a // cluster_resolver balancer to further resolve the serviceName received from // CDS, into localities and endpoints. Implements the balancer.Balancer @@ -376,6 +378,18 @@ func (b *cdsBalancer) run() { break } b.childLB.UpdateSubConnState(update.subConn, update.state) + case exitIdle: + if b.childLB == nil { + b.logger.Errorf("xds: received ExitIdle with no child balancer") + break + } + // This implementation assumes the child balancer supports + // ExitIdle (but still checks for the interface's existence to + // avoid a panic if not). If the child does not, no subconns + // will be connected. + if ei, ok := b.childLB.(balancer.ExitIdler); ok { + ei.ExitIdle() + } } case u := <-b.clusterHandler.updateChannel: b.handleWatchUpdate(u) @@ -494,6 +508,10 @@ func (b *cdsBalancer) Close() { <-b.done.Done() } +func (b *cdsBalancer) ExitIdle() { + b.updateCh.Put(exitIdle{}) +} + // ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at // creation and intercepts the NewSubConn() and UpdateAddresses() call from the // child policy to add security configuration required by xDS credentials. diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index fe754a529fa..c5981095802 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -1,3 +1,4 @@ +//go:build go1.12 // +build go1.12 /* @@ -85,7 +86,8 @@ type testEDSBalancer struct { // resolverErrCh is a channel used to signal a resolver error. resolverErrCh *testutils.Channel // closeCh is a channel used to signal the closing of this balancer. - closeCh *testutils.Channel + closeCh *testutils.Channel + exitIdleCh *testutils.Channel // parentCC is the balancer.ClientConn passed to this test balancer as part // of the Build() call. parentCC balancer.ClientConn @@ -102,6 +104,7 @@ func newTestEDSBalancer() *testEDSBalancer { scStateCh: testutils.NewChannel(), resolverErrCh: testutils.NewChannel(), closeCh: testutils.NewChannel(), + exitIdleCh: testutils.NewChannel(), } } @@ -122,6 +125,10 @@ func (tb *testEDSBalancer) Close() { tb.closeCh.Send(struct{}{}) } +func (tb *testEDSBalancer) ExitIdle() { + tb.exitIdleCh.Send(struct{}{}) +} + // waitForClientConnUpdate verifies if the testEDSBalancer receives the // provided ClientConnState within a reasonable amount of time. func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS balancer.ClientConnState) error { @@ -705,6 +712,35 @@ func (s) TestClose(t *testing.T) { } } +func (s) TestExitIdle(t *testing.T) { + // This creates a CDS balancer, pushes a ClientConnState update with a fake + // xdsClient, and makes sure that the CDS balancer registers a watch on the + // provided xdsClient. + xdsC, cdsB, edsB, _, cancel := setupWithWatch(t) + defer func() { + cancel() + cdsB.Close() + }() + + // Here we invoke the watch callback registered on the fake xdsClient. This + // will trigger the watch handler on the CDS balancer, which will attempt to + // create a new EDS balancer. The fake EDS balancer created above will be + // returned to the CDS balancer, because we have overridden the + // newChildBalancer function as part of test setup. + cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} + wantCCS := edsCCS(serviceName, nil, false, nil) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + t.Fatal(err) + } + + // Call ExitIdle on the CDS balancer. + cdsB.ExitIdle() + + edsB.exitIdleCh.Receive(ctx) +} + // TestParseConfig verifies the ParseConfig() method in the CDS balancer. func (s) TestParseConfig(t *testing.T) { bb := balancer.Get(cdsName) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 1b49dccbc63..03d357b1f4e 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -335,6 +335,21 @@ func (b *clusterImplBalancer) Close() { b.logger.Infof("Shutdown") } +func (b *clusterImplBalancer) ExitIdle() { + if b.childLB == nil { + return + } + if ei, ok := b.childLB.(balancer.ExitIdler); ok { + ei.ExitIdle() + return + } + // Fallback for children that don't support ExitIdle -- connect to all + // SubConns. + for _, sc := range b.scWrappers { + sc.Connect() + } +} + // Override methods to accept updates from the child LB. func (b *clusterImplBalancer) UpdateState(state balancer.State) { diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index 211133d384e..318545d79b0 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -136,6 +136,10 @@ func (b *bal) Close() { b.logger.Infof("Shutdown") } +func (b *bal) ExitIdle() { + b.bg.ExitIdle() +} + const prefix = "[xds-cluster-manager-lb %p] " var logger = grpclog.Component("xds") diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index b9568173bad..66a5aab305e 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -115,6 +115,8 @@ type scUpdate struct { state balancer.SubConnState } +type exitIdle struct{} + // clusterResolverBalancer manages xdsClient and the actual EDS balancer implementation that // does load balancing. // @@ -279,6 +281,18 @@ func (b *clusterResolverBalancer) run() { break } b.child.UpdateSubConnState(update.subConn, update.state) + case exitIdle: + if b.child == nil { + b.logger.Errorf("xds: received ExitIdle with no child balancer") + break + } + // This implementation assumes the child balancer supports + // ExitIdle (but still checks for the interface's existence to + // avoid a panic if not). If the child does not, no subconns + // will be connected. + if ei, ok := b.child.(balancer.ExitIdler); ok { + ei.ExitIdle() + } } case u := <-b.resourceWatcher.updateChannel: b.handleWatchUpdate(u) @@ -348,6 +362,10 @@ func (b *clusterResolverBalancer) Close() { <-b.done.Done() } +func (b *clusterResolverBalancer) ExitIdle() { + b.updateCh.Put(exitIdle{}) +} + // ccWrapper overrides ResolveNow(), so that re-resolution from the child // policies will trigger the DNS resolver in cluster_resolver balancer. type ccWrapper struct { diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index e7d0cd347cb..fc98fec4550 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -1,3 +1,4 @@ +//go:build go1.12 // +build go1.12 /* @@ -128,6 +129,8 @@ func (f *fakeChildBalancer) UpdateSubConnState(sc balancer.SubConn, state balanc func (f *fakeChildBalancer) Close() {} +func (f *fakeChildBalancer) ExitIdle() {} + func (f *fakeChildBalancer) waitForClientConnStateChange(ctx context.Context) error { _, err := f.clientConnState.Receive(ctx) if err != nil { diff --git a/xds/internal/balancer/priority/balancer.go b/xds/internal/balancer/priority/balancer.go index 7475145c612..23e8aa77503 100644 --- a/xds/internal/balancer/priority/balancer.go +++ b/xds/internal/balancer/priority/balancer.go @@ -201,6 +201,10 @@ func (b *priorityBalancer) Close() { b.stopPriorityInitTimer() } +func (b *priorityBalancer) ExitIdle() { + b.bg.ExitIdle() +} + // stopPriorityInitTimer stops the priorityInitTimer if it's not nil, and set it // to nil. // diff --git a/xds/internal/balancer/weightedtarget/weightedtarget.go b/xds/internal/balancer/weightedtarget/weightedtarget.go index eb6516af56e..f05e0aca19f 100644 --- a/xds/internal/balancer/weightedtarget/weightedtarget.go +++ b/xds/internal/balancer/weightedtarget/weightedtarget.go @@ -172,3 +172,7 @@ func (b *weightedTargetBalancer) Close() { b.stateAggregator.Stop() b.bg.Close() } + +func (b *weightedTargetBalancer) ExitIdle() { + b.bg.ExitIdle() +}