Skip to content

Commit

Permalink
xdsrouting: handle route fields in xds_client (#3747)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jul 20, 2020
1 parent 266c7b6 commit ca3959a
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 13 deletions.
12 changes: 12 additions & 0 deletions xds/internal/client/client_watchers_rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,20 @@ type HeaderMatcher struct {
PresentMatch *bool `json:"presentMatch,omitempty"`
}

// Route represents route with matchers and action.
type Route struct {
Path, Prefix, Regex *string
Headers []*HeaderMatcher
Fraction *uint32
Action map[string]uint32 // action is weighted clusters.
}

type rdsUpdate struct {
// weightedCluster is only set when routing is disabled (env variable
// GRPC_XDS_EXPERIMENTAL_ROUTING is not true).
weightedCluster map[string]uint32

routes []*Route
}
type rdsCallbackFunc func(rdsUpdate, error)

Expand Down
7 changes: 7 additions & 0 deletions xds/internal/client/client_watchers_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ import (
type ServiceUpdate struct {
// WeightedCluster is a map from cluster names (CDS resource to watch) to
// their weights.
//
// This field is only set when routing is disabled (env variable
// GRPC_XDS_EXPERIMENTAL_ROUTING is not true).
WeightedCluster map[string]uint32

// Routes
Routes []*Route
}

// WatchService uses LDS and RDS to discover information about the provided
Expand Down Expand Up @@ -121,6 +127,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update rdsUpdate, err error) {
}
w.serviceCb(ServiceUpdate{
WeightedCluster: update.weightedCluster,
Routes: update.routes,
}, nil)
}

Expand Down
21 changes: 19 additions & 2 deletions xds/internal/client/client_watchers_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ var serviceCmpOpts = []cmp.Option{cmp.AllowUnexported(serviceUpdateErr{}), cmpop

// TestServiceWatch covers the cases:
// - an update is received after a watch()
// - an update for another resource name (which doesn't trigger callback)
// - an upate is received after cancel()
// - an update with routes received
func (s) TestServiceWatch(t *testing.T) {
v2ClientCh, cleanup := overrideNewXDSV2Client()
defer cleanup()
Expand Down Expand Up @@ -77,6 +76,24 @@ func (s) TestServiceWatch(t *testing.T) {
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
}

wantUpdate2 := ServiceUpdate{
Routes: []*Route{{
Prefix: newStringP(""),
Action: map[string]uint32{testCDSName: 1},
}},
}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {
routes: []*Route{{
Prefix: newStringP(""),
Action: map[string]uint32{testCDSName: 1},
}},
},
})
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) {
t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
}
}

// TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS
Expand Down
33 changes: 33 additions & 0 deletions xds/internal/client/envconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package client

import (
"os"
"strings"
)

// TODO: there are multiple env variables, GRPC_XDS_BOOTSTRAP and
// GRPC_XDS_EXPERIMENTAL_V3_SUPPORT, and this. Move all env variables into a
// separate package.
const routingEnabledConfigStr = "GRPC_XDS_EXPERIMENTAL_ROUTING"

// routing is enabled only if env variable is set to true. The default is false.
// We may flip the default later.
var routingEnabled = strings.EqualFold(os.Getenv(routingEnabledConfigStr), "true")
131 changes: 125 additions & 6 deletions xds/internal/client/v2client_rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/internal/grpclog"
)

// handleRDSResponse processes an RDS response received from the xDS server. On
Expand All @@ -48,7 +50,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.logger.Infof("Resource with name: %v, type: %T, contains: %v. Picking routes for current watching hostname %v", rc.GetName(), rc, rc, v2c.hostname)

// Use the hostname (resourceName for LDS) to find the routes.
u, err := generateRDSUpdateFromRouteConfiguration(rc, hostname)
u, err := generateRDSUpdateFromRouteConfiguration(rc, hostname, v2c.logger)
if err != nil {
return fmt.Errorf("xds: received invalid RouteConfiguration in RDS response: %+v with err: %v", rc, err)
}
Expand Down Expand Up @@ -76,7 +78,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
// field must be empty and whose route field must be set. Inside that route
// message, the cluster field will contain the clusterName or weighted clusters
// we are looking for.
func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host string) (rdsUpdate, error) {
func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host string, logger *grpclog.PrefixLogger) (rdsUpdate, error) {
//
// Currently this returns "" on error, and the caller will return an error.
// But the error doesn't contain details of why the response is invalid
Expand All @@ -94,6 +96,16 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host
// should be at least one default route.
return rdsUpdate{}, fmt.Errorf("matched virtual host has no routes")
}

// Keep the old code path for routing disabled.
if routingEnabled {
routes, err := routesProtoToSlice(vh.Routes, logger)
if err != nil {
return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err)
}
return rdsUpdate{routes: routes}, nil
}

dr := vh.Routes[len(vh.Routes)-1]
match := dr.GetMatch()
if match == nil {
Expand All @@ -108,12 +120,12 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host
// valid.
return rdsUpdate{}, fmt.Errorf("matched virtual host's default route set case-sensitive to false")
}
route := dr.GetRoute()
if route == nil {
routeAction := dr.GetRoute()
if routeAction == nil {
return rdsUpdate{}, fmt.Errorf("matched route is nil")
}

if wc := route.GetWeightedClusters(); wc != nil {
if wc := routeAction.GetWeightedClusters(); wc != nil {
m, err := weightedClustersProtoToMap(wc)
if err != nil {
return rdsUpdate{}, fmt.Errorf("matched weighted cluster is invalid: %v", err)
Expand All @@ -129,7 +141,114 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host
// and CDS. In case when the action changes between one cluster and multiple
// clusters, changing top level policy means recreating TCP connection every
// time.
return rdsUpdate{weightedCluster: map[string]uint32{route.GetCluster(): 1}}, nil
return rdsUpdate{weightedCluster: map[string]uint32{routeAction.GetCluster(): 1}}, nil
}

func routesProtoToSlice(routes []*routepb.Route, logger *grpclog.PrefixLogger) ([]*Route, error) {
var routesRet []*Route

for _, r := range routes {
match := r.GetMatch()
if match == nil {
return nil, fmt.Errorf("route %+v doesn't have a match", r)
}

if len(match.GetQueryParameters()) != 0 {
// Ignore route with query parameters.
logger.Warningf("route %+v has query parameter matchers, the route will be ignored", r)
continue
}

if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil && !caseSensitive.Value {
return nil, fmt.Errorf("route %+v has case-sensitive false", r)
}

pathSp := match.GetPathSpecifier()
if pathSp == nil {
return nil, fmt.Errorf("route %+v doesn't have a path specifier", r)
}

var route Route
switch pt := pathSp.(type) {
case *routepb.RouteMatch_Prefix:
route.Prefix = &pt.Prefix
case *routepb.RouteMatch_Path:
route.Path = &pt.Path
case *routepb.RouteMatch_SafeRegex:
route.Regex = &pt.SafeRegex.Regex
case *routepb.RouteMatch_Regex:
return nil, fmt.Errorf("route %+v has Regex, expected SafeRegex instead", r)
default:
logger.Warningf("route %+v has an unrecognized path specifier: %+v", r, pt)
continue
}

for _, h := range match.GetHeaders() {
var header HeaderMatcher
switch ht := h.GetHeaderMatchSpecifier().(type) {
case *routepb.HeaderMatcher_ExactMatch:
header.ExactMatch = &ht.ExactMatch
case *routepb.HeaderMatcher_SafeRegexMatch:
header.RegexMatch = &ht.SafeRegexMatch.Regex
case *routepb.HeaderMatcher_RangeMatch:
header.RangeMatch = &Int64Range{
Start: ht.RangeMatch.Start,
End: ht.RangeMatch.End,
}
case *routepb.HeaderMatcher_PresentMatch:
header.PresentMatch = &ht.PresentMatch
case *routepb.HeaderMatcher_PrefixMatch:
header.PrefixMatch = &ht.PrefixMatch
case *routepb.HeaderMatcher_SuffixMatch:
header.SuffixMatch = &ht.SuffixMatch
case *routepb.HeaderMatcher_RegexMatch:
return nil, fmt.Errorf("route %+v has a header matcher with Regex, expected SafeRegex instead", r)
default:
logger.Warningf("route %+v has an unrecognized header matcher: %+v", r, ht)
continue
}
header.Name = h.GetName()
invert := h.GetInvertMatch()
header.InvertMatch = &invert
route.Headers = append(route.Headers, &header)
}

if fr := match.GetRuntimeFraction(); fr != nil {
d := fr.GetDefaultValue()
n := d.GetNumerator()
switch d.GetDenominator() {
case typepb.FractionalPercent_HUNDRED:
n *= 10000
case typepb.FractionalPercent_TEN_THOUSAND:
n *= 100
case typepb.FractionalPercent_MILLION:
}
route.Fraction = &n
}

clusters := make(map[string]uint32)
switch a := r.GetRoute().GetClusterSpecifier().(type) {
case *routepb.RouteAction_Cluster:
clusters[a.Cluster] = 1
case *routepb.RouteAction_WeightedClusters:
wcs := a.WeightedClusters
var totalWeight uint32
for _, c := range wcs.Clusters {
w := c.GetWeight().GetValue()
clusters[c.GetName()] = w
totalWeight += w
}
if totalWeight != wcs.GetTotalWeight().GetValue() {
return nil, fmt.Errorf("route %+v, action %+v, weights of clusters do not add up to total total weight, got: %v, want %v", r, a, wcs.GetTotalWeight().GetValue(), totalWeight)
}
case *routepb.RouteAction_ClusterHeader:
continue
}

route.Action = clusters
routesRet = append(routesRet, &route)
}
return routesRet, nil
}

func weightedClustersProtoToMap(wc *routepb.WeightedCluster) (map[string]uint32, error) {
Expand Down

0 comments on commit ca3959a

Please sign in to comment.