Skip to content

Commit

Permalink
client: backoff before reconnecting if an HTTP2 server preface was no…
Browse files Browse the repository at this point in the history
…t received (#1648)
  • Loading branch information
MakMukhi committed Dec 1, 2017
1 parent a4bf341 commit ddbb27e
Show file tree
Hide file tree
Showing 7 changed files with 459 additions and 82 deletions.
245 changes: 176 additions & 69 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ type dialOptions struct {
// This is to support v1 balancer.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
resolverBuilder resolver.Builder
waitForHandshake bool
}

const (
Expand All @@ -109,6 +110,15 @@ const (
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)

// WithWaitForHandshake blocks until the initial settings frame is received from the
// server before assigning RPCs to the connection.
// Experimental API.
func WithWaitForHandshake() DialOption {
return func(o *dialOptions) {
o.waitForHandshake = true
}
}

// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
func WithWriteBufferSize(s int) DialOption {
Expand Down Expand Up @@ -240,7 +250,7 @@ func WithBackoffConfig(b BackoffConfig) DialOption {
return withBackoff(b)
}

// withBackoff sets the backoff strategy used for retries after a
// withBackoff sets the backoff strategy used for connectRetryNum after a
// failed connection attempt.
//
// This can be exported if arbitrary backoff strategies are allowed by gRPC.
Expand Down Expand Up @@ -804,6 +814,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
}

return curAddrFound
Expand Down Expand Up @@ -899,22 +910,31 @@ type addrConn struct {
ctx context.Context
cancel context.CancelFunc

cc *ClientConn
curAddr resolver.Address
addrs []resolver.Address
dopts dialOptions
events trace.EventLog
acbw balancer.SubConn
cc *ClientConn
addrs []resolver.Address
dopts dialOptions
events trace.EventLog
acbw balancer.SubConn

mu sync.Mutex
state connectivity.State
mu sync.Mutex
curAddr resolver.Address
reconnectIdx int // The index in addrs list to start reconnecting from.
state connectivity.State
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
transport transport.ClientTransport

// The reason this addrConn is torn down.
tearDownErr error

connectRetryNum int
// backoffDeadline is the time until which resetTransport needs to
// wait before increasing connectRetryNum count.
backoffDeadline time.Time
// connectDeadline is the time by which all connection
// negotiations must complete.
connectDeadline time.Time
}

// adjustParams updates parameters used to create transports upon
Expand Down Expand Up @@ -949,6 +969,15 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {

// resetTransport recreates a transport to the address for ac. The old
// transport will close itself on error or when the clientconn is closed.
// The created transport must receive initial settings frame from the server.
// In case that doesnt happen, transportMonitor will kill the newly created
// transport after connectDeadline has expired.
// In case there was an error on the transport before the settings frame was
// received, resetTransport resumes connecting to backends after the one that
// was previously connected to. In case end of the list is reached, resetTransport
// backs off until the original deadline.
// If the DialOption WithWaitForHandshake was set, resetTrasport returns
// successfully only after server settings are received.
//
// TODO(bar) make sure all state transitions are valid.
func (ac *addrConn) resetTransport() error {
Expand All @@ -962,19 +991,38 @@ func (ac *addrConn) resetTransport() error {
ac.ready = nil
}
ac.transport = nil
ac.curAddr = resolver.Address{}
ridx := ac.reconnectIdx
ac.mu.Unlock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
var backoffDeadline, connectDeadline time.Time
for connectRetryNum := 0; ; connectRetryNum++ {
ac.mu.Lock()
if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) {
timeout = time.Duration(int(sleepTime) / len(ac.addrs))
if ac.backoffDeadline.IsZero() {
// This means either a successful HTTP2 connection was established
// or this is the first time this addrConn is trying to establish a
// connection.
backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration.
// This will be the duration that dial gets to finish.
dialDuration := minConnectTimeout
if backoffFor > dialDuration {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time.Now()
backoffDeadline = start.Add(backoffFor)
connectDeadline = start.Add(dialDuration)
ridx = 0 // Start connecting from the beginning.
} else {
// Continue trying to conect with the same deadlines.
connectRetryNum = ac.connectRetryNum
backoffDeadline = ac.backoffDeadline
connectDeadline = ac.connectDeadline
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.connectRetryNum = 0
}
connectTime := time.Now()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
Expand All @@ -989,94 +1037,153 @@ func (ac *addrConn) resetTransport() error {
copy(addrsIter, ac.addrs)
copts := ac.dopts.copts
ac.mu.Unlock()
for _, addr := range addrsIter {
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
if err != nil {
return err
}
if connected {
return nil
}
}
}

// createTransport creates a connection to one of the backends in addrs.
// It returns true if a connection was established.
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
for i := ridx; i < len(addrs); i++ {
addr := addrs[i]
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
done := make(chan struct{})
onPrefaceReceipt := func() {
close(done)
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
if !ac.backoffDeadline.IsZero() {
// If we haven't already started reconnecting to
// other backends.
// Note, this can happen when writer notices an error
// and triggers resetTransport while at the same time
// reader receives the preface and invokes this closure.
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.connectRetryNum = 0
}
ac.mu.Unlock()
sinfo := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout)
if err != nil {
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
ac.mu.Lock()
if ac.state != connectivity.Shutdown {
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
ac.mu.Unlock()
return err
}
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
}
// Do not cancel in the success path because of
// this issue in Go1.6: https://github.com/golang/go/issues/15078.
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
if err != nil {
cancel()
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
if ac.state != connectivity.Shutdown {
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
ac.mu.Unlock()
continue
return false, err
}
ac.mu.Lock()
ac.printf("ready")
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return errConnClosing
}
ac.state = connectivity.Ready
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
t := ac.transport
ac.transport = newTransport
if t != nil {
t.Close()
}
ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
return false, errConnClosing
}
ac.mu.Unlock()
return nil
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
continue
}
if ac.dopts.waitForHandshake {
select {
case <-done:
case <-connectCtx.Done():
// Didn't receive server preface, must kill this new transport now.
grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
newTr.Close()
break
case <-ac.ctx.Done():
}
}
ac.mu.Lock()
ac.state = connectivity.TransientFailure
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
// ac.tearDonn(...) has been invoked.
newTr.Close()
return false, errConnClosing
}
ac.printf("ready")
ac.state = connectivity.Ready
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.transport = newTr
ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.connectRetryNum = connectRetryNum
ac.backoffDeadline = backoffDeadline
ac.connectDeadline = connectDeadline
ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
ac.mu.Unlock()
timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
case <-timer.C:
case <-ac.ctx.Done():
timer.Stop()
return ac.ctx.Err()
}
return true, nil
}
ac.mu.Lock()
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.mu.Unlock()
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
select {
case <-timer.C:
case <-ac.ctx.Done():
timer.Stop()
return false, ac.ctx.Err()
}
return false, nil
}

// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
for {
var timer *time.Timer
var cdeadline <-chan time.Time
ac.mu.Lock()
t := ac.transport
if !ac.connectDeadline.IsZero() {
timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
cdeadline = timer.C
}
ac.mu.Unlock()
// Block until we receive a goaway or an error occurs.
select {
case <-t.GoAway():
case <-t.Error():
case <-cdeadline:
ac.mu.Lock()
// This implies that client received server preface.
if ac.backoffDeadline.IsZero() {
ac.mu.Unlock()
continue
}
ac.mu.Unlock()
timer = nil
// No server preface received until deadline.
// Kill the connection.
grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
t.Close()
}
if timer != nil {
timer.Stop()
}
// If a GoAway happened, regardless of error, adjust our keepalive
// parameters as appropriate.
Expand Down

0 comments on commit ddbb27e

Please sign in to comment.