Skip to content

Commit

Permalink
Updates after rebase and review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Apr 24, 2020
1 parent 6d3e99d commit 236d7a0
Show file tree
Hide file tree
Showing 26 changed files with 79 additions and 2,183 deletions.
23 changes: 14 additions & 9 deletions balancer/balancer.go
Expand Up @@ -27,9 +27,11 @@ import (
"net"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -149,7 +151,7 @@ type ClientConn interface {
// changed.
//
// gRPC will update the connectivity state of the ClientConn, and will call
// pick on the new picker to pick new SubConns.
// Pick on the new Picker to pick new SubConns.
UpdateState(State)

// ResolveNow is called by balancer to notify gRPC to do a name resolving.
Expand Down Expand Up @@ -250,20 +252,23 @@ type PickResult struct {
}

type dropRPCError struct {
error
status *status.Status
*istatus.ErrorT
}

func (e *dropRPCError) DropRPC() bool { return true }

func (e *dropRPCError) GRPCStatus() *status.Status { return e.status }
func (e dropRPCError) DropRPC() bool { return true }

// DropRPCError wraps err in an error implementing DropRPC() bool, returning
// true. If err is not a status error, it will be converted to one with code
// Unknown and the message containing the err.Error() result.
// Unknown and the message containing the err.Error() result. DropRPCError
// should not be called with a nil error.
func DropRPCError(err error) error {
st := status.Convert(err)
return &dropRPCError{error: st.Err(), status: st}
if err == nil {
return dropRPCError{ErrorT: status.Error(codes.Unknown, "nil error passed to DropRPCError").(*istatus.ErrorT)}
}
if se, ok := err.(*istatus.ErrorT); ok {
return dropRPCError{ErrorT: se}
}
return dropRPCError{ErrorT: status.Convert(err).Err().(*istatus.ErrorT)}
}

// TransientFailureError returns e. It exists for backward compatibility and
Expand Down
2 changes: 1 addition & 1 deletion balancer/grpclb/grpclb_remote_balancer.go
Expand Up @@ -192,7 +192,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
lb.cc.RemoveSubConn(sc)
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// The entry will be deleted in UpdateSubConnState.
}
}

Expand Down
4 changes: 2 additions & 2 deletions balancer/rls/internal/cache/cache.go
Expand Up @@ -85,10 +85,10 @@ type Entry struct {
// X-Google-RLS-Data header for matching RPCs.
HeaderData string
// ChildPicker is a very thin wrapper around the child policy wrapper.
// The type is declared as a V2Picker interface since the users of
// The type is declared as a Picker interface since the users of
// the cache only care about the picker provided by the child policy, and
// this makes it easy for testing.
ChildPicker balancer.V2Picker
ChildPicker balancer.Picker

// size stores the size of this cache entry. Uses only a subset of the
// fields. See `entrySize` for this is computed.
Expand Down
5 changes: 1 addition & 4 deletions balancer/rls/internal/picker.go
Expand Up @@ -31,10 +31,7 @@ import (
"google.golang.org/grpc/metadata"
)

var errRLSThrottled = balancer.TransientFailureError(errors.New("RLS call throttled at client side"))

// Compile time assert to ensure we implement V2Picker.
var _ balancer.V2Picker = (*rlsPicker)(nil)
var errRLSThrottled = errors.New("RLS call throttled at client side")

// RLS rlsPicker selects the subConn to be used for a particular RPC. It does
// not manage subConns directly and usually deletegates to pickers provided by
Expand Down
21 changes: 18 additions & 3 deletions clientconn.go
Expand Up @@ -316,7 +316,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
if err = cc.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
Expand All @@ -327,7 +327,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
if err = cc.blockingpicker.connectionError(); err != nil && cc.dopts.returnLastError {
if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
return nil, err
}
return nil, ctx.Err()
Expand Down Expand Up @@ -498,6 +498,9 @@ type ClientConn struct {

channelzID int64 // channelz unique identification number
czData *channelzData

lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}

// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
Expand Down Expand Up @@ -1207,7 +1210,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
if firstConnErr == nil {
firstConnErr = err
}
ac.cc.blockingpicker.updateConnectionError(err)
ac.cc.updateConnectionError(err)
}

// Couldn't connect to any address.
Expand Down Expand Up @@ -1533,3 +1536,15 @@ func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
}
return resolver.Get(scheme)
}

func (cc *ClientConn) updateConnectionError(err error) {
cc.lceMu.Lock()
cc.lastConnectionError = err
cc.lceMu.Unlock()
}

func (cc *ClientConn) connectionError() error {
cc.lceMu.Lock()
defer cc.lceMu.Unlock()
return cc.lastConnectionError
}
3 changes: 1 addition & 2 deletions dialoptions.go
Expand Up @@ -57,8 +57,7 @@ type dialOptions struct {
authority string
copts transport.ConnectOptions
callOptions []CallOption
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
// This is used by WithBalancerName dial option.
balancerBuilder balancer.Builder
channelzParentID int64
disableServiceConfig bool
Expand Down
2 changes: 2 additions & 0 deletions examples/go.sum
Expand Up @@ -63,6 +63,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.30.0-dev.1 h1:UPWdABFs9zu2kdq7GrCUcfnVgCT65hSpvHmy0RiKn0M=
google.golang.org/grpc v1.30.0-dev.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
24 changes: 12 additions & 12 deletions internal/status/status.go
Expand Up @@ -97,7 +97,7 @@ func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
return (*Error)(s.Proto())
return (*ErrorT)(s.Proto())
}

// WithDetails returns a new status with the provided details messages appended to the status.
Expand Down Expand Up @@ -136,26 +136,26 @@ func (s *Status) Details() []interface{} {
return details
}

// Error is an alias of a status proto. It implements error and Status,
// and a nil Error should never be returned by this package.
type Error spb.Status
// ErrorT is an alias of a status proto. It implements error and Status,
// and a nil *ErrorT should never be returned by this package.
type ErrorT spb.Status

func (se *Error) Error() string {
p := (*spb.Status)(se)
func (e *ErrorT) Error() string {
p := (*spb.Status)(e)
return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(p.GetCode()), p.GetMessage())
}

// GRPCStatus returns the Status represented by se.
func (se *Error) GRPCStatus() *Status {
return FromProto((*spb.Status)(se))
func (e *ErrorT) GRPCStatus() *Status {
return FromProto((*spb.Status)(e))
}

// Is implements future error.Is functionality.
// A Error is equivalent if the code and message are identical.
func (se *Error) Is(target error) bool {
tse, ok := target.(*Error)
// A ErrorT is equivalent if the code and message are identical.
func (e *ErrorT) Is(target error) bool {
tse, ok := target.(*ErrorT)
if !ok {
return false
}
return proto.Equal((*spb.Status)(se), (*spb.Status)(tse))
return proto.Equal((*spb.Status)(e), (*spb.Status)(tse))
}

0 comments on commit 236d7a0

Please sign in to comment.