Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: implement RouteAction timeout support #4116

Merged
merged 4 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,11 @@ type Route struct {
Path, Prefix, Regex *string
// Indicates if prefix/path matching should be case insensitive. The default
// is false (case sensitive).
CaseInsensitive bool
Headers []*HeaderMatcher
Fraction *uint32
Action map[string]uint32 // action is weighted clusters.
CaseInsensitive bool
Headers []*HeaderMatcher
Fraction *uint32
Action map[string]uint32 // action is weighted clusters.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
MaxStreamDuration time.Duration
}

// HeaderMatcher represents header matchers.
Expand Down
92 changes: 92 additions & 0 deletions xds/internal/client/client_rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client

import (
"testing"
"time"

v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/durationpb"
)

func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
Expand Down Expand Up @@ -290,6 +292,96 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
},
},
},
{
name: "good-route-config-with-max-stream-duration",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(time.Second)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
},
},
},
},
{
name: "good-route-config-with-grpc-timeout-header-max",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{GrpcTimeoutHeaderMax: durationpb.New(time.Second)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
},
},
},
},
{
name: "good-route-config-with-both-timeouts",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(2 * time.Second), GrpcTimeoutHeaderMax: durationpb.New(0)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}},
},
},
},
},
}

for _, test := range tests {
Expand Down
10 changes: 9 additions & 1 deletion xds/internal/client/client_xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

clusters := make(map[string]uint32)
switch a := r.GetRoute().GetClusterSpecifier().(type) {
action := r.GetRoute()
switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
clusters[a.Cluster] = 1
case *v3routepb.RouteAction_WeightedClusters:
Expand All @@ -341,6 +342,13 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

route.Action = clusters
msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil {
route.MaxStreamDuration = dur.AsDuration()
} else {
route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration()
}
routesRet = append(routesRet, &route)
}
return routesRet, nil
Expand Down
5 changes: 5 additions & 0 deletions xds/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
bootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
xdsV3SupportEnv = "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"
circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
)

var (
Expand All @@ -44,4 +45,8 @@ var (
// enabled, which can be done by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "true".
CircuitBreakingSupport = strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "true")
// TimeoutSupport indicates whether support for max_stream_duration in
// route actions is enabled. This can be enabled by setting the
// environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "true".
TimeoutSupport = strings.EqualFold(os.Getenv(timeoutSupportEnv), "true")
)
40 changes: 26 additions & 14 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"encoding/json"
"fmt"
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/env"
)

const (
Expand Down Expand Up @@ -93,12 +95,13 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) {
}

type route struct {
action wrr.WRR
m *compositeMatcher // converted from route matchers
m *compositeMatcher // converted from route matchers
clusters wrr.WRR
maxStreamDuration time.Duration
}

func (r route) String() string {
return r.m.String() + "->" + fmt.Sprint(r.action)
return r.m.String() + "->" + fmt.Sprint(r.clusters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also print the duration? Will be hard to format though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I thought about this earlier but forgot to do it.

I think the formatting looks good:

pathPrefix:/foo -> { clusters: [{A 1}], maxStreamDuration: 5s }

pathPrefix: -> { clusters: [{cluster_1 75} {cluster_2 25}], maxStreamDuration: 0s }

I needed to go into the WRRs to add String methods to clean this up (I broke the formatting in my previous PR since it was a map and became a WRR instance).

}

type configSelector struct {
Expand All @@ -110,26 +113,27 @@ type configSelector struct {
var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")

func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
var action wrr.WRR
var rt *route
// Loop through routes in order and select first match.
for _, rt := range cs.routes {
if rt.m.match(rpcInfo) {
action = rt.action
for _, r := range cs.routes {
if r.m.match(rpcInfo) {
rt = &r
break
}
}
if action == nil {
if rt == nil || rt.clusters == nil {
return nil, errNoMatchedRouteFound
}
cluster, ok := action.Next().(string)
cluster, ok := rt.clusters.Next().(string)
if !ok {
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}
// Add a ref to the selected cluster, as this RPC needs this cluster until
// it is committed.
ref := &cs.clusters[cluster].refCount
atomic.AddInt32(ref, 1)
return &iresolver.RPCConfig{

config := &iresolver.RPCConfig{
// Communicate to the LB policy the chosen cluster.
Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster),
OnCommitted: func() {
Expand All @@ -144,7 +148,13 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
}
}
},
}, nil
}

if env.TimeoutSupport && rt.maxStreamDuration != 0 {
config.MethodConfig.Timeout = &rt.maxStreamDuration
}

return config, nil
}

// incRefs increments refs of all clusters referenced by this config selector.
Expand Down Expand Up @@ -196,9 +206,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}

for i, rt := range su.Routes {
action := newWRR()
clusters := newWRR()
for cluster, weight := range rt.Action {
action.Add(cluster, int64(weight))
clusters.Add(cluster, int64(weight))

// Initialize entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Set to zero as they will be
Expand All @@ -210,14 +220,16 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}
cs.clusters[cluster] = ci
}
cs.routes[i].action = action
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = routeToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].maxStreamDuration = rt.MaxStreamDuration
}

return cs, nil
}

Expand Down