Skip to content

Commit

Permalink
backport PRs to v1.41.x (#4810)
Browse files Browse the repository at this point in the history
* xds: fix parent balancers to handle Idle children (#4801)
* transport: fix log spam from Server Authentication Handshake errors (#4798)
  • Loading branch information
menghanl committed Sep 23, 2021
1 parent 8c8b55e commit a671967
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 6 deletions.
8 changes: 5 additions & 3 deletions internal/transport/http2_server.go
Expand Up @@ -139,9 +139,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
var err error
conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err == credentials.ErrConnDispatched {
// ErrConnDispatched means that the connection was dispatched away
// from gRPC; those connections should be left open. io.EOF means
// the connection was closed before handshaking completed, which can
// happen naturally from probers. Return these errors directly.
if err == credentials.ErrConnDispatched || err == io.EOF {
return nil, err
}
return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
Expand Down
7 changes: 6 additions & 1 deletion server.go
Expand Up @@ -887,7 +887,12 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
if err != credentials.ErrConnDispatched {
c.Close()
}
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
if err != credentials.ErrConnDispatched {
if err != io.EOF {
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
}
}
return nil
}

Expand Down
Expand Up @@ -183,13 +183,18 @@ func (bsa *balancerStateAggregator) build() balancer.State {
// handling the special connecting after ready, as in UpdateState(). Then a
// function to calculate the aggregated connectivity state as in this
// function.
var readyN, connectingN int
//
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
for _, ps := range bsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
var aggregatedState connectivity.State
Expand All @@ -198,6 +203,8 @@ func (bsa *balancerStateAggregator) build() balancer.State {
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}
Expand Down
65 changes: 65 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Expand Up @@ -565,3 +565,68 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
t.Fatal(err2)
}
}

const initIdleBalancerName = "test-init-Idle-balancer"

var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")

func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}

// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})

configJSON1 := `{
"children": {
"cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] }
}
}`

config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}

// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
sc := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
}

if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
t.Fatalf("Received aggregated state: %v, want Idle", state1)
}
}
Expand Up @@ -200,7 +200,9 @@ func (wbsa *Aggregator) BuildAndUpdate() {
func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
m := wbsa.idToPickerState
var readyN, connectingN int
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
readyPickerWithWeights := make([]weightedPickerState, 0, len(m))
for _, ps := range m {
switch ps.stateToAggregate {
Expand All @@ -209,6 +211,8 @@ func (wbsa *Aggregator) build() balancer.State {
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
var aggregatedState connectivity.State
Expand All @@ -217,6 +221,8 @@ func (wbsa *Aggregator) build() balancer.State {
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}
Expand Down
61 changes: 61 additions & 0 deletions xds/internal/balancer/weightedtarget/weightedtarget_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -263,3 +264,63 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return scst.SubConn
}
}

const initIdleBalancerName = "test-init-Idle-balancer"

var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")

func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}

// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})

// Start with "cluster_1: round_robin".
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"test-init-Idle-balancer":""}]}}}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}

// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
sc := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
}

if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
t.Fatalf("Received aggregated state: %v, want Idle", state1)
}
}

0 comments on commit a671967

Please sign in to comment.