Skip to content

Commit

Permalink
xds: implement RouteAction timeout support (#4116)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Dec 17, 2020
1 parent d79063f commit 15458d2
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 16 deletions.
9 changes: 9 additions & 0 deletions internal/wrr/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package wrr

import (
"fmt"
"sync"

"google.golang.org/grpc/internal/grpcrand"
Expand All @@ -29,6 +30,10 @@ type weightedItem struct {
Weight int64
}

func (w *weightedItem) String() string {
return fmt.Sprint(*w)
}

// randomWRR is a struct that contains weighted items implement weighted random algorithm.
type randomWRR struct {
mu sync.RWMutex
Expand Down Expand Up @@ -68,3 +73,7 @@ func (rw *randomWRR) Add(item interface{}, weight int64) {
rw.items = append(rw.items, rItem)
rw.sumOfWeights += weight
}

func (rw *randomWRR) String() string {
return fmt.Sprint(rw.items)
}
5 changes: 4 additions & 1 deletion xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ type Route struct {
CaseInsensitive bool
Headers []*HeaderMatcher
Fraction *uint32
Action map[string]uint32 // action is weighted clusters.

// If the matchers above indicate a match, the below configuration is used.
Action map[string]uint32 // action is weighted clusters.
MaxStreamDuration time.Duration
}

// HeaderMatcher represents header matchers.
Expand Down
92 changes: 92 additions & 0 deletions xds/internal/client/client_rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client

import (
"testing"
"time"

v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/durationpb"
)

func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
Expand Down Expand Up @@ -290,6 +292,96 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
},
},
},
{
name: "good-route-config-with-max-stream-duration",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(time.Second)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
},
},
},
},
{
name: "good-route-config-with-grpc-timeout-header-max",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{GrpcTimeoutHeaderMax: durationpb.New(time.Second)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
},
},
},
},
{
name: "good-route-config-with-both-timeouts",
rc: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{
Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(2 * time.Second), GrpcTimeoutHeaderMax: durationpb.New(0)},
},
},
},
},
},
},
},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}},
},
},
},
},
}

for _, test := range tests {
Expand Down
10 changes: 9 additions & 1 deletion xds/internal/client/client_xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

clusters := make(map[string]uint32)
switch a := r.GetRoute().GetClusterSpecifier().(type) {
action := r.GetRoute()
switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
clusters[a.Cluster] = 1
case *v3routepb.RouteAction_WeightedClusters:
Expand All @@ -341,6 +342,13 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

route.Action = clusters
msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil {
route.MaxStreamDuration = dur.AsDuration()
} else {
route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration()
}
routesRet = append(routesRet, &route)
}
return routesRet, nil
Expand Down
5 changes: 5 additions & 0 deletions xds/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
bootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
xdsV3SupportEnv = "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"
circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
)

var (
Expand All @@ -44,4 +45,8 @@ var (
// enabled, which can be done by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "true".
CircuitBreakingSupport = strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "true")
// TimeoutSupport indicates whether support for max_stream_duration in
// route actions is enabled. This can be enabled by setting the
// environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "true".
TimeoutSupport = strings.EqualFold(os.Getenv(timeoutSupportEnv), "true")
)
40 changes: 26 additions & 14 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"encoding/json"
"fmt"
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/env"
)

const (
Expand Down Expand Up @@ -93,12 +95,13 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) {
}

type route struct {
action wrr.WRR
m *compositeMatcher // converted from route matchers
m *compositeMatcher // converted from route matchers
clusters wrr.WRR
maxStreamDuration time.Duration
}

func (r route) String() string {
return r.m.String() + "->" + fmt.Sprint(r.action)
return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
}

type configSelector struct {
Expand All @@ -110,26 +113,27 @@ type configSelector struct {
var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")

func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
var action wrr.WRR
var rt *route
// Loop through routes in order and select first match.
for _, rt := range cs.routes {
if rt.m.match(rpcInfo) {
action = rt.action
for _, r := range cs.routes {
if r.m.match(rpcInfo) {
rt = &r
break
}
}
if action == nil {
if rt == nil || rt.clusters == nil {
return nil, errNoMatchedRouteFound
}
cluster, ok := action.Next().(string)
cluster, ok := rt.clusters.Next().(string)
if !ok {
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}
// Add a ref to the selected cluster, as this RPC needs this cluster until
// it is committed.
ref := &cs.clusters[cluster].refCount
atomic.AddInt32(ref, 1)
return &iresolver.RPCConfig{

config := &iresolver.RPCConfig{
// Communicate to the LB policy the chosen cluster.
Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster),
OnCommitted: func() {
Expand All @@ -144,7 +148,13 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
}
}
},
}, nil
}

if env.TimeoutSupport && rt.maxStreamDuration != 0 {
config.MethodConfig.Timeout = &rt.maxStreamDuration
}

return config, nil
}

// incRefs increments refs of all clusters referenced by this config selector.
Expand Down Expand Up @@ -196,9 +206,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}

for i, rt := range su.Routes {
action := newWRR()
clusters := newWRR()
for cluster, weight := range rt.Action {
action.Add(cluster, int64(weight))
clusters.Add(cluster, int64(weight))

// Initialize entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Set to zero as they will be
Expand All @@ -210,14 +220,16 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}
cs.clusters[cluster] = ci
}
cs.routes[i].action = action
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = routeToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].maxStreamDuration = rt.MaxStreamDuration
}

return cs, nil
}

Expand Down

0 comments on commit 15458d2

Please sign in to comment.