Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: implement RouteAction timeout support #4116

Merged
merged 4 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/wrr/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package wrr

import (
"fmt"
"sync"

"google.golang.org/grpc/internal/grpcrand"
Expand All @@ -29,6 +30,10 @@ type weightedItem struct {
Weight int64
}

func (w *weightedItem) String() string {
return fmt.Sprint(*w)
}

// randomWRR is a struct that contains weighted items implement weighted random algorithm.
type randomWRR struct {
mu sync.RWMutex
Expand Down Expand Up @@ -68,3 +73,7 @@ func (rw *randomWRR) Add(item interface{}, weight int64) {
rw.items = append(rw.items, rItem)
rw.sumOfWeights += weight
}

func (rw *randomWRR) String() string {
return fmt.Sprint(rw.items)
}
5 changes: 4 additions & 1 deletion xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ type Route struct {
CaseInsensitive bool
Headers []*HeaderMatcher
Fraction *uint32
Action map[string]uint32 // action is weighted clusters.

// If the matchers above indicate a match, the below configuration is used.
Action map[string]uint32 // action is weighted clusters.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
MaxStreamDuration time.Duration
}

// HeaderMatcher represents header matchers.
Expand Down
92 changes: 92 additions & 0 deletions xds/internal/client/client_rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client

import (
"testing"
"time"

v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/durationpb"
)

func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
Expand Down Expand Up @@ -290,6 +292,96 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
},
},
},
{
name: "good-route-config-with-max-stream-duration",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(time.Second)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
},
},
},
},
{
name: "good-route-config-with-grpc-timeout-header-max",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{GrpcTimeoutHeaderMax: durationpb.New(time.Second)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
},
},
},
},
{
name: "good-route-config-with-both-timeouts",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(2 * time.Second), GrpcTimeoutHeaderMax: durationpb.New(0)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}},
},
},
},
},
}

for _, test := range tests {
Expand Down
10 changes: 9 additions & 1 deletion xds/internal/client/client_xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

clusters := make(map[string]uint32)
switch a := r.GetRoute().GetClusterSpecifier().(type) {
action := r.GetRoute()
switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
clusters[a.Cluster] = 1
case *v3routepb.RouteAction_WeightedClusters:
Expand All @@ -341,6 +342,13 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

route.Action = clusters
msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil {
route.MaxStreamDuration = dur.AsDuration()
} else {
route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration()
}
routesRet = append(routesRet, &route)
}
return routesRet, nil
Expand Down
5 changes: 5 additions & 0 deletions xds/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
bootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
xdsV3SupportEnv = "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"
circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
)

var (
Expand All @@ -44,4 +45,8 @@ var (
// enabled, which can be done by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "true".
CircuitBreakingSupport = strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "true")
// TimeoutSupport indicates whether support for max_stream_duration in
// route actions is enabled. This can be enabled by setting the
// environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "true".
TimeoutSupport = strings.EqualFold(os.Getenv(timeoutSupportEnv), "true")
)
40 changes: 26 additions & 14 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"encoding/json"
"fmt"
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/env"
)

const (
Expand Down Expand Up @@ -93,12 +95,13 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) {
}

type route struct {
action wrr.WRR
m *compositeMatcher // converted from route matchers
m *compositeMatcher // converted from route matchers
clusters wrr.WRR
maxStreamDuration time.Duration
}

func (r route) String() string {
return r.m.String() + "->" + fmt.Sprint(r.action)
return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the String method to the wrr.WRR interface.

Actually, why does this work since String is not part of the interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface is fmt.Stringer. I don't think we need to explicitly add Stringer everywhere we want to print things; it's a magic part of standard library stuff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry. I thought you were calling String() on clusters.

}

type configSelector struct {
Expand All @@ -110,26 +113,27 @@ type configSelector struct {
var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")

func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
var action wrr.WRR
var rt *route
// Loop through routes in order and select first match.
for _, rt := range cs.routes {
if rt.m.match(rpcInfo) {
action = rt.action
for _, r := range cs.routes {
if r.m.match(rpcInfo) {
rt = &r
break
}
}
if action == nil {
if rt == nil || rt.clusters == nil {
return nil, errNoMatchedRouteFound
}
cluster, ok := action.Next().(string)
cluster, ok := rt.clusters.Next().(string)
if !ok {
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}
// Add a ref to the selected cluster, as this RPC needs this cluster until
// it is committed.
ref := &cs.clusters[cluster].refCount
atomic.AddInt32(ref, 1)
return &iresolver.RPCConfig{

config := &iresolver.RPCConfig{
// Communicate to the LB policy the chosen cluster.
Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster),
OnCommitted: func() {
Expand All @@ -144,7 +148,13 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
}
}
},
}, nil
}

if env.TimeoutSupport && rt.maxStreamDuration != 0 {
config.MethodConfig.Timeout = &rt.maxStreamDuration
}

return config, nil
}

// incRefs increments refs of all clusters referenced by this config selector.
Expand Down Expand Up @@ -196,9 +206,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}

for i, rt := range su.Routes {
action := newWRR()
clusters := newWRR()
for cluster, weight := range rt.Action {
action.Add(cluster, int64(weight))
clusters.Add(cluster, int64(weight))

// Initialize entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Set to zero as they will be
Expand All @@ -210,14 +220,16 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}
cs.clusters[cluster] = ci
}
cs.routes[i].action = action
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = routeToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].maxStreamDuration = rt.MaxStreamDuration
}

return cs, nil
}

Expand Down