Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: grpc/grpc-go
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.5.0
Choose a base ref
...
head repository: grpc/grpc-go
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.5.1
Choose a head ref
  • 3 commits
  • 2 files changed
  • 1 contributor

Commits on Jul 19, 2017

  1. Change version to 1.5.1-dev

    menghanl authored Jul 19, 2017
    Copy the full SHA
    284c78b View commit details

Commits on Jul 27, 2017

  1. Copy the full SHA
    5ad7775 View commit details
  2. Change version to 1.5.1

    menghanl committed Jul 27, 2017
    Copy the full SHA
    b8669c3 View commit details
Showing with 61 additions and 33 deletions.
  1. +60 −32 clientconn.go
  2. +1 −1 rpc_util.go
92 changes: 60 additions & 32 deletions clientconn.go
Original file line number Diff line number Diff line change
@@ -559,15 +559,16 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
//
// We should never need to replace an addrConn with a new one. This function is only used
// as newAddrConn to create new addrConn.
// TODO rename this function and clean up the code.
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = cc.mkp
cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
@@ -598,10 +599,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
// 1) a buggy Balancer notifies duplicated Addresses;
// 2) goaway was received, a new ac will replace the old ac.
// The old ac should be deleted from cc.conns, but the
// underlying transport should drain rather than close.
// a buggy Balancer that reports duplicated Addresses.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
@@ -828,26 +826,44 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
return ac.state, nil
}

func (ac *addrConn) resetTransport(closeTransport bool) error {
// resetTransport recreates a transport to the address for ac.
// For the old transport:
// - if drain is true, it will be gracefully closed.
// - otherwise, it will be closed.
func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock()
if ac.state == Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.printf("connecting")
if ac.down != nil {
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
ac.state = Connecting
ac.stateCV.Broadcast()
t := ac.transport
ac.transport = nil
ac.mu.Unlock()
if t != nil {
if drain {
t.GracefulClose()
} else {
t.Close()
}
}
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
ac.printf("connecting")
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
if ac.down != nil {
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
ac.state = Connecting
ac.stateCV.Broadcast()
t := ac.transport
ac.mu.Unlock()
if closeTransport && t != nil {
t.Close()
}
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
if timeout < sleepTime {
@@ -883,7 +899,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
ac.ready = nil
}
ac.mu.Unlock()
closeTransport = false
timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
case <-timer.C:
@@ -936,28 +951,40 @@ func (ac *addrConn) transportMonitor() {
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
// are closed.
// In both cases, a new ac is created.
// If GoAway happens without any network I/O error, the underlying transport
// will be gracefully closed, and a new transport will be created.
// (The transport will be closed when all the pending RPCs finished or failed.)
// If GoAway and some network I/O error happen concurrently, the underlying transport
// will be closed, and a new transport will be created.
var drain bool
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
drain = true
}
if err := ac.resetTransport(drain); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
return
case <-t.Error():
select {
case <-ac.ctx.Done():
t.Close()
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
return
if err := ac.resetTransport(false); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
default:
}
ac.mu.Lock()
@@ -969,7 +996,8 @@ func (ac *addrConn) transportMonitor() {
ac.state = TransientFailure
ac.stateCV.Broadcast()
ac.mu.Unlock()
if err := ac.resetTransport(true); err != nil {
if err := ac.resetTransport(false); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
2 changes: 1 addition & 1 deletion rpc_util.go
Original file line number Diff line number Diff line change
@@ -519,6 +519,6 @@ const SupportPackageIsVersion3 = true
const SupportPackageIsVersion4 = true

// Version is the current grpc version.
const Version = "1.5.0"
const Version = "1.5.1"

const grpcUA = "grpc-go/" + Version