From 5f0e72845ef27d2c2b7bf82dfdac2bcabbdd76ef Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 21 Jul 2020 11:55:49 -0700 Subject: [PATCH] xdsrouting: balancer implementation (#3746) --- xds/internal/balancer/balancer.go | 1 + .../xdsrouting/balancerstateaggregator.go | 227 ++++++ xds/internal/balancer/xdsrouting/logging.go | 34 + xds/internal/balancer/xdsrouting/routing.go | 256 +++++++ .../balancer/xdsrouting/routing_config.go | 12 +- .../xdsrouting/routing_config_test.go | 10 +- .../balancer/xdsrouting/routing_picker.go | 61 ++ .../xdsrouting/routing_picker_test.go | 177 +++++ .../balancer/xdsrouting/routing_test.go | 710 ++++++++++++++++++ 9 files changed, 1477 insertions(+), 11 deletions(-) create mode 100644 xds/internal/balancer/xdsrouting/balancerstateaggregator.go create mode 100644 xds/internal/balancer/xdsrouting/logging.go create mode 100644 xds/internal/balancer/xdsrouting/routing.go create mode 100644 xds/internal/balancer/xdsrouting/routing_picker.go create mode 100644 xds/internal/balancer/xdsrouting/routing_picker_test.go create mode 100644 xds/internal/balancer/xdsrouting/routing_test.go diff --git a/xds/internal/balancer/balancer.go b/xds/internal/balancer/balancer.go index 489def3ce3b..d0b1bb786c7 100644 --- a/xds/internal/balancer/balancer.go +++ b/xds/internal/balancer/balancer.go @@ -23,4 +23,5 @@ import ( _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer _ "google.golang.org/grpc/xds/internal/balancer/edsbalancer" // Register the EDS balancer _ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer + _ "google.golang.org/grpc/xds/internal/balancer/xdsrouting" // Register the xds_routing balancer ) diff --git a/xds/internal/balancer/xdsrouting/balancerstateaggregator.go b/xds/internal/balancer/xdsrouting/balancerstateaggregator.go new file mode 100644 index 00000000000..83dbb7914b9 --- /dev/null +++ b/xds/internal/balancer/xdsrouting/balancerstateaggregator.go @@ -0,0 +1,227 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsrouting + +import ( + "fmt" + "sync" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/xds/internal" +) + +type subBalancerState struct { + state balancer.State + // stateToAggregate is the connectivity state used only for state + // aggregation. It could be different from state.ConnectivityState. For + // example when a sub-balancer transitions from TransientFailure to + // connecting, state.ConnectivityState is Connecting, but stateToAggregate + // is still TransientFailure. + stateToAggregate connectivity.State +} + +func (s *subBalancerState) String() string { + return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate) +} + +type balancerStateAggregator struct { + cc balancer.ClientConn + logger *grpclog.PrefixLogger + + mu sync.Mutex + // routes, one for each matcher. + routes []route + // If started is false, no updates should be sent to the parent cc. A closed + // sub-balancer could still send pickers to this aggregator. This makes sure + // that no updates will be forwarded to parent when the whole balancer group + // and states aggregator is closed. + started bool + // All balancer IDs exist as keys in this map, even if balancer group is not + // started. + // + // If an ID is not in map, it's either removed or never added. + idToPickerState map[internal.LocalityID]*subBalancerState +} + +func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator { + return &balancerStateAggregator{ + cc: cc, + logger: logger, + idToPickerState: make(map[internal.LocalityID]*subBalancerState), + } +} + +// Start starts the aggregator. It can be called after Close to restart the +// aggretator. +func (rbsa *balancerStateAggregator) start() { + rbsa.mu.Lock() + defer rbsa.mu.Unlock() + rbsa.started = true +} + +// Close closes the aggregator. When the aggregator is closed, it won't call +// parent ClientConn to upate balancer state. +func (rbsa *balancerStateAggregator) close() { + rbsa.mu.Lock() + defer rbsa.mu.Unlock() + rbsa.started = false + rbsa.clearStates() +} + +// add adds a sub-balancer state with weight. It adds a place holder, and waits +// for the real sub-balancer to update state. +// +// This is called when there's a new action. +func (rbsa *balancerStateAggregator) add(id internal.LocalityID) { + rbsa.mu.Lock() + defer rbsa.mu.Unlock() + rbsa.idToPickerState[id] = &subBalancerState{ + // Start everything in CONNECTING, so if one of the sub-balancers + // reports TransientFailure, the RPCs will still wait for the other + // sub-balancers. + state: balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), + }, + stateToAggregate: connectivity.Connecting, + } +} + +// remove removes the sub-balancer state. Future updates from this sub-balancer, +// if any, will be ignored. +// +// This is called when an action is removed. +func (rbsa *balancerStateAggregator) remove(id internal.LocalityID) { + rbsa.mu.Lock() + defer rbsa.mu.Unlock() + if _, ok := rbsa.idToPickerState[id]; !ok { + return + } + // Remove id and picker from picker map. This also results in future updates + // for this ID to be ignored. + delete(rbsa.idToPickerState, id) +} + +// updateRoutes updates the routes. Note that it doesn't trigger an update to +// the parent ClientConn. The caller should decide when it's necessary, and call +// buildAndUpdate. +func (rbsa *balancerStateAggregator) updateRoutes(newRoutes []route) { + rbsa.mu.Lock() + defer rbsa.mu.Unlock() + rbsa.routes = newRoutes +} + +// UpdateState is called to report a balancer state change from sub-balancer. +// It's usually called by the balancer group. +// +// It calls parent ClientConn's UpdateState with the new aggregated state. +func (rbsa *balancerStateAggregator) UpdateState(id internal.LocalityID, state balancer.State) { + rbsa.mu.Lock() + defer rbsa.mu.Unlock() + pickerSt, ok := rbsa.idToPickerState[id] + if !ok { + // All state starts with an entry in pickStateMap. If ID is not in map, + // it's either removed, or never existed. + return + } + if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) { + // If old state is TransientFailure, and new state is Connecting, don't + // update the state, to prevent the aggregated state from being always + // CONNECTING. Otherwise, stateToAggregate is the same as + // state.ConnectivityState. + pickerSt.stateToAggregate = state.ConnectivityState + } + pickerSt.state = state + + if !rbsa.started { + return + } + rbsa.cc.UpdateState(rbsa.build()) +} + +// clearState Reset everything to init state (Connecting) but keep the entry in +// map (to keep the weight). +// +// Caller must hold rbsa.mu. +func (rbsa *balancerStateAggregator) clearStates() { + for _, pState := range rbsa.idToPickerState { + pState.state = balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), + } + pState.stateToAggregate = connectivity.Connecting + } +} + +// buildAndUpdate combines the sub-state from each sub-balancer into one state, +// and update it to parent ClientConn. +func (rbsa *balancerStateAggregator) buildAndUpdate() { + rbsa.mu.Lock() + defer rbsa.mu.Unlock() + if !rbsa.started { + return + } + rbsa.cc.UpdateState(rbsa.build()) +} + +// build combines sub-states into one. The picker will do routing pick. +// +// Caller must hold rbsa.mu. +func (rbsa *balancerStateAggregator) build() balancer.State { + // TODO: the majority of this function (and UpdateState) is exactly the same + // as weighted_target's state aggregator. Try to make a general utility + // function/struct to handle the logic. + // + // One option: make a SubBalancerState that handles Update(State), including + // handling the special connecting after ready, as in UpdateState(). Then a + // function to calculate the aggregated connectivity state as in this + // function. + var readyN, connectingN int + for _, ps := range rbsa.idToPickerState { + switch ps.stateToAggregate { + case connectivity.Ready: + readyN++ + case connectivity.Connecting: + connectingN++ + } + } + var aggregatedState connectivity.State + switch { + case readyN > 0: + aggregatedState = connectivity.Ready + case connectingN > 0: + aggregatedState = connectivity.Connecting + default: + aggregatedState = connectivity.TransientFailure + } + + // The picker's return error might not be consistent with the + // aggregatedState. Because for routing, we want to always build picker with + // all sub-pickers (not even ready sub-pickers), so even if the overall + // state is Ready, pick for certain RPCs can behave like Connecting or + // TransientFailure. + rbsa.logger.Infof("Child pickers with routes: %s, actions: %+v", rbsa.routes, rbsa.idToPickerState) + return balancer.State{ + ConnectivityState: aggregatedState, + Picker: newPickerGroup(rbsa.routes, rbsa.idToPickerState), + } +} diff --git a/xds/internal/balancer/xdsrouting/logging.go b/xds/internal/balancer/xdsrouting/logging.go new file mode 100644 index 00000000000..5c4a6b3cb41 --- /dev/null +++ b/xds/internal/balancer/xdsrouting/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsrouting + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[xds-routing-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *routingBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/balancer/xdsrouting/routing.go b/xds/internal/balancer/xdsrouting/routing.go new file mode 100644 index 00000000000..ebbb2b8cf9d --- /dev/null +++ b/xds/internal/balancer/xdsrouting/routing.go @@ -0,0 +1,256 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsrouting + +import ( + "encoding/json" + "fmt" + "regexp" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/hierarchy" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/balancer/balancergroup" +) + +const xdsRoutingName = "xds_routing_experimental" + +func init() { + balancer.Register(&routingBB{}) +} + +type routingBB struct{} + +func (rbb *routingBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { + b := &routingBalancer{} + b.logger = prefixLogger(b) + b.stateAggregator = newBalancerStateAggregator(cc, b.logger) + b.stateAggregator.start() + b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger) + b.bg.Start() + b.logger.Infof("Created") + return b +} + +func (rbb *routingBB) Name() string { + return xdsRoutingName +} + +func (rbb *routingBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return parseConfig(c) +} + +type route struct { + action string + m *compositeMatcher +} + +func (r route) String() string { + return r.m.String() + "->" + r.action +} + +type routingBalancer struct { + logger *grpclog.PrefixLogger + + // TODO: make this package not dependent on xds specific code. Same as for + // weighted target balancer. + bg *balancergroup.BalancerGroup + stateAggregator *balancerStateAggregator + + actions map[string]actionConfig + routes []route +} + +// TODO: remove this and use strings directly as keys for balancer group. +func makeLocalityFromName(name string) internal.LocalityID { + return internal.LocalityID{Region: name} +} + +// TODO: remove this and use strings directly as keys for balancer group. +func getNameFromLocality(id internal.LocalityID) string { + return id.Region +} + +func (rb *routingBalancer) updateActions(s balancer.ClientConnState, newConfig *lbConfig) (needRebuild bool) { + addressesSplit := hierarchy.Group(s.ResolverState.Addresses) + var rebuildStateAndPicker bool + + // Remove sub-pickers and sub-balancers that are not in the new action list. + for name := range rb.actions { + if _, ok := newConfig.actions[name]; !ok { + l := makeLocalityFromName(name) + rb.stateAggregator.remove(l) + rb.bg.Remove(l) + // Trigger a state/picker update, because we don't want `ClientConn` + // to pick this sub-balancer anymore. + rebuildStateAndPicker = true + } + } + + // For sub-balancers in the new action list, + // - add to balancer group if it's new, + // - forward the address/balancer config update. + for name, newT := range newConfig.actions { + l := makeLocalityFromName(name) + if _, ok := rb.actions[name]; !ok { + // If this is a new sub-balancer, add weights to the picker map. + rb.stateAggregator.add(l) + // Then add to the balancer group. + rb.bg.Add(l, balancer.Get(newT.ChildPolicy.Name)) + // Not trigger a state/picker update. Wait for the new sub-balancer + // to send its updates. + } + // Forwards all the update: + // - Addresses are from the map after splitting with hierarchy path, + // - Top level service config and attributes are the same, + // - Balancer config comes from the targets map. + // + // TODO: handle error? How to aggregate errors and return? + _ = rb.bg.UpdateClientConnState(l, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: addressesSplit[name], + ServiceConfig: s.ResolverState.ServiceConfig, + Attributes: s.ResolverState.Attributes, + }, + BalancerConfig: newT.ChildPolicy.Config, + }) + } + + rb.actions = newConfig.actions + return rebuildStateAndPicker +} + +func routeToMatcher(r routeConfig) (*compositeMatcher, error) { + var pathMatcher pathMatcherInterface + switch { + case r.regex != "": + re, err := regexp.Compile(r.regex) + if err != nil { + return nil, fmt.Errorf("failed to compile regex %q", r.regex) + } + pathMatcher = newPathRegexMatcher(re) + case r.path != "": + pathMatcher = newPathExactMatcher(r.path) + default: + pathMatcher = newPathPrefixMatcher(r.prefix) + } + + var headerMatchers []headerMatcherInterface + for _, h := range r.headers { + var matcherT headerMatcherInterface + switch { + case h.exactMatch != "": + matcherT = newHeaderExactMatcher(h.name, h.exactMatch) + case h.regexMatch != "": + re, err := regexp.Compile(h.regexMatch) + if err != nil { + return nil, fmt.Errorf("failed to compile regex %q, skipping this matcher", h.regexMatch) + } + matcherT = newHeaderRegexMatcher(h.name, re) + case h.prefixMatch != "": + matcherT = newHeaderPrefixMatcher(h.name, h.prefixMatch) + case h.suffixMatch != "": + matcherT = newHeaderSuffixMatcher(h.name, h.suffixMatch) + case h.rangeMatch != nil: + matcherT = newHeaderRangeMatcher(h.name, h.rangeMatch.start, h.rangeMatch.end) + default: + matcherT = newHeaderPresentMatcher(h.name, h.presentMatch) + } + if h.invertMatch { + matcherT = newInvertMatcher(matcherT) + } + headerMatchers = append(headerMatchers, matcherT) + } + + var fractionMatcher *fractionMatcher + if r.fraction != nil { + fractionMatcher = newFractionMatcher(*r.fraction) + } + return newCompositeMatcher(pathMatcher, headerMatchers, fractionMatcher), nil +} + +func routesEqual(a, b []route) bool { + if len(a) != len(b) { + return false + } + for i := range a { + aa := a[i] + bb := b[i] + if aa.action != bb.action { + return false + } + if !aa.m.equal(bb.m) { + return false + } + } + return true +} + +func (rb *routingBalancer) updateRoutes(newConfig *lbConfig) (needRebuild bool, _ error) { + var newRoutes []route + for _, rt := range newConfig.routes { + newMatcher, err := routeToMatcher(rt) + if err != nil { + return false, err + } + newRoutes = append(newRoutes, route{action: rt.action, m: newMatcher}) + } + rebuildStateAndPicker := !routesEqual(newRoutes, rb.routes) + rb.routes = newRoutes + + if rebuildStateAndPicker { + rb.stateAggregator.updateRoutes(rb.routes) + } + return rebuildStateAndPicker, nil +} + +func (rb *routingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + newConfig, ok := s.BalancerConfig.(*lbConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) + } + rb.logger.Infof("update with config %+v, resolver state %+v", s.BalancerConfig, s.ResolverState) + + rebuildForActions := rb.updateActions(s, newConfig) + rebuildForRoutes, err := rb.updateRoutes(newConfig) + if err != nil { + return fmt.Errorf("xds_routing balancer: failed to update routes: %v", err) + } + + if rebuildForActions || rebuildForRoutes { + rb.stateAggregator.buildAndUpdate() + } + return nil +} + +func (rb *routingBalancer) ResolverError(err error) { + rb.bg.ResolverError(err) +} + +func (rb *routingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + rb.bg.UpdateSubConnState(sc, state) +} + +func (rb *routingBalancer) Close() { + rb.stateAggregator.close() + rb.bg.Close() +} diff --git a/xds/internal/balancer/xdsrouting/routing_config.go b/xds/internal/balancer/xdsrouting/routing_config.go index c8cc7a18c6d..78716a09813 100644 --- a/xds/internal/balancer/xdsrouting/routing_config.go +++ b/xds/internal/balancer/xdsrouting/routing_config.go @@ -28,7 +28,7 @@ import ( xdsclient "google.golang.org/grpc/xds/internal/client" ) -type action struct { +type actionConfig struct { // ChildPolicy is the child policy and it's config. ChildPolicy *internalserviceconfig.BalancerConfig } @@ -48,7 +48,7 @@ type headerMatcher struct { presentMatch bool } -type route struct { +type routeConfig struct { // Path, Prefix and Regex can have at most one set. This is guaranteed by // config parsing. path, prefix, regex string @@ -63,8 +63,8 @@ type route struct { // lbConfig is the balancer config for xds routing policy. type lbConfig struct { serviceconfig.LoadBalancingConfig - routes []route - actions map[string]action + routes []routeConfig + actions map[string]actionConfig } // The following structs with `JSON` in name are temporary structs to unmarshal @@ -84,13 +84,13 @@ type routeJSON struct { // lbConfigJSON is temporary struct for json unmarshal. type lbConfigJSON struct { Route []routeJSON - Action map[string]action + Action map[string]actionConfig } func (jc lbConfigJSON) toLBConfig() *lbConfig { var ret lbConfig for _, r := range jc.Route { - var tempR route + var tempR routeConfig switch { case r.Path != nil: tempR.path = *r.Path diff --git a/xds/internal/balancer/xdsrouting/routing_config_test.go b/xds/internal/balancer/xdsrouting/routing_config_test.go index c1ff4611dac..8cb4d22724e 100644 --- a/xds/internal/balancer/xdsrouting/routing_config_test.go +++ b/xds/internal/balancer/xdsrouting/routing_config_test.go @@ -294,14 +294,14 @@ func Test_parseConfig(t *testing.T) { name: "OK with path matchers only", js: testJSONConfig, want: &lbConfig{ - routes: []route{ + routes: []routeConfig{ {path: "/service_1/method_1", action: "cds:cluster_1"}, {path: "/service_1/method_2", action: "cds:cluster_1"}, {prefix: "/service_2/method_1", action: "weighted:cluster_1_cluster_2_1"}, {prefix: "/service_2", action: "weighted:cluster_1_cluster_2_1"}, {regex: "^/service_2/method_3$", action: "weighted:cluster_1_cluster_3_1"}, }, - actions: map[string]action{ + actions: map[string]actionConfig{ "cds:cluster_1": {ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: cdsName, Config: cdsConfig1}, }, @@ -319,7 +319,7 @@ func Test_parseConfig(t *testing.T) { name: "OK with all matchers", js: testJSONConfigWithAllMatchers, want: &lbConfig{ - routes: []route{ + routes: []routeConfig{ {path: "/service_1/method_1", action: "cds:cluster_1"}, {prefix: "/service_2/method_1", action: "cds:cluster_1"}, {regex: "^/service_2/method_3$", action: "cds:cluster_1"}, @@ -332,7 +332,7 @@ func Test_parseConfig(t *testing.T) { {prefix: "", headers: []headerMatcher{{name: "header-1", suffixMatch: "value-1"}}, action: "cds:cluster_2"}, {prefix: "", fraction: newUInt32P(31415), action: "cds:cluster_3"}, }, - actions: map[string]action{ + actions: map[string]actionConfig{ "cds:cluster_1": {ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: cdsName, Config: cdsConfig1}, }, @@ -348,7 +348,7 @@ func Test_parseConfig(t *testing.T) { }, } - cmpOptions := []cmp.Option{cmp.AllowUnexported(lbConfig{}, route{}, headerMatcher{}, int64Range{})} + cmpOptions := []cmp.Option{cmp.AllowUnexported(lbConfig{}, routeConfig{}, headerMatcher{}, int64Range{})} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/xds/internal/balancer/xdsrouting/routing_picker.go b/xds/internal/balancer/xdsrouting/routing_picker.go new file mode 100644 index 00000000000..36fdebd4ad2 --- /dev/null +++ b/xds/internal/balancer/xdsrouting/routing_picker.go @@ -0,0 +1,61 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsrouting + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal" +) + +// pickerGroup contains a list of route matchers and their corresponding +// pickers. For each pick, the first matched picker is used. If the picker isn't +// ready, the pick will be queued. +type pickerGroup struct { + routes []route + pickers map[string]balancer.Picker +} + +func newPickerGroup(routes []route, idToPickerState map[internal.LocalityID]*subBalancerState) *pickerGroup { + pickers := make(map[string]balancer.Picker) + for id, st := range idToPickerState { + pickers[getNameFromLocality(id)] = st.state.Picker + } + return &pickerGroup{ + routes: routes, + pickers: pickers, + } +} + +var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") + +func (pg *pickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + for _, rt := range pg.routes { + if rt.m.match(info) { + // action from route is the ID for the sub-balancer to use. + p, ok := pg.pickers[rt.action] + if !ok { + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + } + return p.Pick(info) + } + } + return balancer.PickResult{}, errNoMatchedRouteFound +} diff --git a/xds/internal/balancer/xdsrouting/routing_picker_test.go b/xds/internal/balancer/xdsrouting/routing_picker_test.go new file mode 100644 index 00000000000..2de40757a4f --- /dev/null +++ b/xds/internal/balancer/xdsrouting/routing_picker_test.go @@ -0,0 +1,177 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsrouting + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/testutils" +) + +var ( + testPickers = []*testutils.TestConstPicker{ + {SC: testutils.TestSubConns[0]}, + {SC: testutils.TestSubConns[1]}, + } +) + +func (s) TestRoutingPickerGroupPick(t *testing.T) { + tests := []struct { + name string + + routes []route + pickers map[internal.LocalityID]*subBalancerState + info balancer.PickInfo + + want balancer.PickResult + wantErr error + }{ + { + name: "empty", + wantErr: errNoMatchedRouteFound, + }, + { + name: "one route no match", + routes: []route{ + {m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"}, + }, + pickers: map[internal.LocalityID]*subBalancerState{ + makeLocalityFromName("action-0"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[0], + }}, + }, + info: balancer.PickInfo{FullMethodName: "/z/y"}, + wantErr: errNoMatchedRouteFound, + }, + { + name: "one route one match", + routes: []route{ + {m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"}, + }, + pickers: map[internal.LocalityID]*subBalancerState{ + makeLocalityFromName("action-0"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[0], + }}, + }, + info: balancer.PickInfo{FullMethodName: "/a/b"}, + want: balancer.PickResult{SubConn: testutils.TestSubConns[0]}, + }, + { + name: "two routes first match", + routes: []route{ + {m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"}, + {m: newCompositeMatcher(newPathPrefixMatcher("/z/"), nil, nil), action: "action-1"}, + }, + pickers: map[internal.LocalityID]*subBalancerState{ + makeLocalityFromName("action-0"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[0], + }}, + makeLocalityFromName("action-1"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[1], + }}, + }, + info: balancer.PickInfo{FullMethodName: "/a/b"}, + want: balancer.PickResult{SubConn: testutils.TestSubConns[0]}, + }, + { + name: "two routes second match", + routes: []route{ + {m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"}, + {m: newCompositeMatcher(newPathPrefixMatcher("/z/"), nil, nil), action: "action-1"}, + }, + pickers: map[internal.LocalityID]*subBalancerState{ + makeLocalityFromName("action-0"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[0], + }}, + makeLocalityFromName("action-1"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[1], + }}, + }, + info: balancer.PickInfo{FullMethodName: "/z/y"}, + want: balancer.PickResult{SubConn: testutils.TestSubConns[1]}, + }, + { + name: "two routes both match former more specific", + routes: []route{ + {m: newCompositeMatcher(newPathExactMatcher("/a/b"), nil, nil), action: "action-0"}, + {m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-1"}, + }, + pickers: map[internal.LocalityID]*subBalancerState{ + makeLocalityFromName("action-0"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[0], + }}, + makeLocalityFromName("action-1"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[1], + }}, + }, + info: balancer.PickInfo{FullMethodName: "/a/b"}, + // First route is a match, so first action is picked. + want: balancer.PickResult{SubConn: testutils.TestSubConns[0]}, + }, + { + name: "tow routes both match latter more specific", + routes: []route{ + {m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"}, + {m: newCompositeMatcher(newPathExactMatcher("/a/b"), nil, nil), action: "action-1"}, + }, + pickers: map[internal.LocalityID]*subBalancerState{ + makeLocalityFromName("action-0"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[0], + }}, + makeLocalityFromName("action-1"): {state: balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: testPickers[1], + }}, + }, + info: balancer.PickInfo{FullMethodName: "/a/b"}, + // First route is a match, so first action is picked, even though + // second is an exact match. + want: balancer.PickResult{SubConn: testutils.TestSubConns[0]}, + }, + } + cmpOpts := []cmp.Option{cmp.AllowUnexported(testutils.TestSubConn{})} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pg := newPickerGroup(tt.routes, tt.pickers) + got, err := pg.Pick(tt.info) + t.Logf("Pick(%+v) = {%+v, %+v}", tt.info, got, err) + if err != tt.wantErr { + t.Errorf("Pick() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(got, tt.want, cmpOpts...) { + t.Errorf("Pick() got = %v, want %v, diff %s", got, tt.want, cmp.Diff(got, tt.want, cmpOpts...)) + } + }) + } + +} diff --git a/xds/internal/balancer/xdsrouting/routing_test.go b/xds/internal/balancer/xdsrouting/routing_test.go new file mode 100644 index 00000000000..90607bc1860 --- /dev/null +++ b/xds/internal/balancer/xdsrouting/routing_test.go @@ -0,0 +1,710 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsrouting + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/hierarchy" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/balancer/balancergroup" + "google.golang.org/grpc/xds/internal/testutils" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +var ( + rtBuilder balancer.Builder + rtParser balancer.ConfigParser + testBackendAddrStrs []string +) + +const ignoreAttrsRRName = "ignore_attrs_round_robin" + +type ignoreAttrsRRBuilder struct { + balancer.Builder +} + +func (trrb *ignoreAttrsRRBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return &ignoreAttrsRRBalancer{trrb.Builder.Build(cc, opts)} +} + +func (*ignoreAttrsRRBuilder) Name() string { + return ignoreAttrsRRName +} + +// ignoreAttrsRRBalancer clears attributes from all addresses. +// +// It's necessary in this tests because hierarchy modifies address.Attributes. +// Even if rr gets addresses with empty hierarchy, the attributes fields are +// different. This is a temporary walkaround for the tests to ignore attributes. +// Eventually, we need a way for roundrobin to know that two addresses with +// empty attributes are equal. +// +// TODO: delete this when the issue is resolved: +// https://github.com/grpc/grpc-go/issues/3611. +type ignoreAttrsRRBalancer struct { + balancer.Balancer +} + +func (trrb *ignoreAttrsRRBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + var newAddrs []resolver.Address + for _, a := range s.ResolverState.Addresses { + a.Attributes = nil + newAddrs = append(newAddrs, a) + } + s.ResolverState.Addresses = newAddrs + return trrb.Balancer.UpdateClientConnState(s) +} + +const testBackendAddrsCount = 12 + +func init() { + for i := 0; i < testBackendAddrsCount; i++ { + testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) + } + rtBuilder = balancer.Get(xdsRoutingName) + rtParser = rtBuilder.(balancer.ConfigParser) + + balancer.Register(&ignoreAttrsRRBuilder{balancer.Get(roundrobin.Name)}) + + balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond +} + +func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC balancer.SubConn, wantErr error) { + t.Helper() + for i := 0; i < 5; i++ { + gotSCSt, err := p.Pick(info) + if err != wantErr { + t.Fatalf("picker.Pick(%+v), got error %v, want %v", info, err, wantErr) + } + if !cmp.Equal(gotSCSt.SubConn, wantSC, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick(%+v), got %v, want SubConn=%v", info, gotSCSt, wantSC) + } + } +} + +func TestRouting(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"Action": { + "cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +}, +"Route": [ + {"prefix":"/a/", "action":"cds:cluster_1"}, + {"prefix":"", "headers":[{"name":"header-1", "exactMatch":"value-1"}], "action":"cds:cluster_2"} +] +}` + + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + {Addr: testBackendAddrStrs[1], Attributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + m1 := make(map[resolver.Address]balancer.SubConn) + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + addrs := <-cc.NewSubConnAddrsCh + if len(hierarchy.Get(addrs[0])) != 0 { + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + } + sc := <-cc.NewSubConnCh + // Clear the attributes before adding to map. + addrs[0].Attributes = nil + m1[addrs[0]] = sc + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } + + p1 := <-cc.NewPickerCh + for _, tt := range []struct { + pickInfo balancer.PickInfo + wantSC balancer.SubConn + wantErr error + }{ + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/0"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/1"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{ + FullMethodName: "/z/y", + Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("header-1", "value-1")), + }, + wantSC: m1[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{ + FullMethodName: "/z/y", + Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("h", "v")), + }, + wantSC: nil, + wantErr: errNoMatchedRouteFound, + }, + } { + testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr) + } +} + +// TestRoutingConfigUpdateAddRoute covers the cases the routing balancer +// receives config update with extra route, but the same actions. +func TestRoutingConfigUpdateAddRoute(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"Action": { + "cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +}, +"Route": [ + {"prefix":"/a/", "action":"cds:cluster_1"}, + {"path":"/z/y", "action":"cds:cluster_2"} +] +}` + + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + {Addr: testBackendAddrStrs[1], Attributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + m1 := make(map[resolver.Address]balancer.SubConn) + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + addrs := <-cc.NewSubConnAddrsCh + if len(hierarchy.Get(addrs[0])) != 0 { + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + } + sc := <-cc.NewSubConnCh + // Clear the attributes before adding to map. + addrs[0].Attributes = nil + m1[addrs[0]] = sc + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } + + p1 := <-cc.NewPickerCh + for _, tt := range []struct { + pickInfo balancer.PickInfo + wantSC balancer.SubConn + wantErr error + }{ + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/0"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/1"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/z/y"}, + wantSC: m1[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/c/d"}, + wantSC: nil, + wantErr: errNoMatchedRouteFound, + }, + } { + testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr) + } + + // A config update with different routes, but the same actions. Expect a + // picker update, but no subconn changes. + configJSON2 := `{ +"Action": { + "cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +}, +"Route": [ + {"prefix":"", "headers":[{"name":"header-1", "presentMatch":true}], "action":"cds:cluster_2"}, + {"prefix":"/a/", "action":"cds:cluster_1"}, + {"path":"/z/y", "action":"cds:cluster_2"} +] +}` + config2, err := rtParser.ParseConfig([]byte(configJSON2)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + // Send update with the same addresses. + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + }}, + BalancerConfig: config2, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + // New change to actions, expect no newSubConn. + select { + case <-time.After(time.Millisecond * 500): + case <-cc.NewSubConnCh: + addrs := <-cc.NewSubConnAddrsCh + t.Fatalf("unexpected NewSubConn with address %v", addrs) + } + + p2 := <-cc.NewPickerCh + for _, tt := range []struct { + pickInfo balancer.PickInfo + wantSC balancer.SubConn + wantErr error + }{ + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/0"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/1"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/z/y"}, + wantSC: m1[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{ + FullMethodName: "/a/z", + Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("header-1", "value-1")), + }, + wantSC: m1[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{ + FullMethodName: "/c/d", + Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("h", "v")), + }, + wantSC: nil, + wantErr: errNoMatchedRouteFound, + }, + } { + testPick(t, p2, tt.pickInfo, tt.wantSC, tt.wantErr) + } +} + +// TestRoutingConfigUpdateAddRouteAndAction covers the cases the routing +// balancer receives config update with extra route and actions. +func TestRoutingConfigUpdateAddRouteAndAction(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"Action": { + "cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +}, +"Route": [ + {"prefix":"/a/", "action":"cds:cluster_1"}, + {"path":"/z/y", "action":"cds:cluster_2"} +] +}` + + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + {Addr: testBackendAddrStrs[1], Attributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + m1 := make(map[resolver.Address]balancer.SubConn) + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + addrs := <-cc.NewSubConnAddrsCh + if len(hierarchy.Get(addrs[0])) != 0 { + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + } + sc := <-cc.NewSubConnCh + // Clear the attributes before adding to map. + addrs[0].Attributes = nil + m1[addrs[0]] = sc + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } + + p1 := <-cc.NewPickerCh + for _, tt := range []struct { + pickInfo balancer.PickInfo + wantSC balancer.SubConn + wantErr error + }{ + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/0"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/1"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/z/y"}, + wantSC: m1[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/c/d"}, + wantSC: nil, + wantErr: errNoMatchedRouteFound, + }, + } { + testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr) + } + + // A config update with different routes, and different actions. Expect a + // new subconn and a picker update. + configJSON2 := `{ +"Action": { + "cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }, + "cds:cluster_3":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +}, +"Route": [ + {"prefix":"", "headers":[{"name":"header-1", "presentMatch":false, "invertMatch":true}], "action":"cds:cluster_3"}, + {"prefix":"/a/", "action":"cds:cluster_1"}, + {"path":"/z/y", "action":"cds:cluster_2"} +] +}` + config2, err := rtParser.ParseConfig([]byte(configJSON2)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + wantAddrs = append(wantAddrs, resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil}) + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + hierarchy.Set(wantAddrs[2], []string{"cds:cluster_3"}), + }}, + BalancerConfig: config2, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + // Expect exactly one new subconn. + addrs := <-cc.NewSubConnAddrsCh + if len(hierarchy.Get(addrs[0])) != 0 { + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + } + sc := <-cc.NewSubConnCh + // Clear the attributes before adding to map. + addrs[0].Attributes = nil + m1[addrs[0]] = sc + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Should have no more newSubConn. + select { + case <-time.After(time.Millisecond * 500): + case <-cc.NewSubConnCh: + addrs := <-cc.NewSubConnAddrsCh + t.Fatalf("unexpected NewSubConn with address %v", addrs) + } + + p2 := <-cc.NewPickerCh + for _, tt := range []struct { + pickInfo balancer.PickInfo + wantSC balancer.SubConn + wantErr error + }{ + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/0"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/1"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/z/y"}, + wantSC: m1[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{ + FullMethodName: "/a/z", + Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("header-1", "value-1")), + }, + wantSC: m1[wantAddrs[2]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{ + FullMethodName: "/c/d", + Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("h", "v")), + }, + wantSC: nil, + wantErr: errNoMatchedRouteFound, + }, + } { + testPick(t, p2, tt.pickInfo, tt.wantSC, tt.wantErr) + } +} + +// TestRoutingConfigUpdateDeleteAll covers the cases the routing balancer receives config +// update with no routes. Pick should fail with details in error. +func TestRoutingConfigUpdateDeleteAll(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"Action": { + "cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +}, +"Route": [ + {"prefix":"/a/", "action":"cds:cluster_1"}, + {"path":"/z/y", "action":"cds:cluster_2"} +] +}` + + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + {Addr: testBackendAddrStrs[1], Attributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + m1 := make(map[resolver.Address]balancer.SubConn) + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + addrs := <-cc.NewSubConnAddrsCh + if len(hierarchy.Get(addrs[0])) != 0 { + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + } + sc := <-cc.NewSubConnCh + // Clear the attributes before adding to map. + addrs[0].Attributes = nil + m1[addrs[0]] = sc + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } + + p1 := <-cc.NewPickerCh + for _, tt := range []struct { + pickInfo balancer.PickInfo + wantSC balancer.SubConn + wantErr error + }{ + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/0"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/1"}, + wantSC: m1[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/z/y"}, + wantSC: m1[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/c/d"}, + wantSC: nil, + wantErr: errNoMatchedRouteFound, + }, + } { + testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr) + } + + // A config update with no routes. + configJSON2 := `{}` + config2, err := rtParser.ParseConfig([]byte(configJSON2)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: config2, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + // Expect two remove subconn. + for range wantAddrs { + select { + case <-time.After(time.Millisecond * 500): + t.Fatalf("timeout waiting for remove subconn") + case <-cc.RemoveSubConnCh: + } + } + + p2 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, err := p2.Pick(balancer.PickInfo{}) + if err != errNoMatchedRouteFound { + t.Fatalf("picker.Pick, got %v, %v, want error %v", gotSCSt, err, errNoMatchedRouteFound) + } + } + + // Resend the previous config with routes and actions. + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + m2 := make(map[resolver.Address]balancer.SubConn) + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + addrs := <-cc.NewSubConnAddrsCh + if len(hierarchy.Get(addrs[0])) != 0 { + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + } + sc := <-cc.NewSubConnCh + // Clear the attributes before adding to map. + addrs[0].Attributes = nil + m2[addrs[0]] = sc + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } + + p3 := <-cc.NewPickerCh + for _, tt := range []struct { + pickInfo balancer.PickInfo + wantSC balancer.SubConn + wantErr error + }{ + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/0"}, + wantSC: m2[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/a/1"}, + wantSC: m2[wantAddrs[0]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/z/y"}, + wantSC: m2[wantAddrs[1]], + wantErr: nil, + }, + { + pickInfo: balancer.PickInfo{FullMethodName: "/c/d"}, + wantSC: nil, + wantErr: errNoMatchedRouteFound, + }, + } { + testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr) + } +}