Skip to content

Commit

Permalink
fix: potential deadlock on channel shutdown (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Oct 27, 2021
1 parent ebbea58 commit d9377ea
Showing 1 changed file with 91 additions and 38 deletions.
129 changes: 91 additions & 38 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,15 +952,22 @@ func (c *dtChannel) open(
if c.gsKey != nil {
// Cancel the existing graphsync request
completed := c.completed
err := c.cancelRequest(ctx)
errch := c.cancelRequest(ctx)

// Wait for the complete callback to be called
err := waitForCompleteHook(ctx, completed)
if err != nil {
return nil, xerrors.Errorf("restarting graphsync request: %w", err)
return nil, xerrors.Errorf("%s: waiting for cancelled graphsync request to complete: %w", chid, err)
}

// Wait for the cancel to go through
err = waitForCancelComplete(ctx, completed)
// Wait for the cancel request method to complete
select {
case err = <-errch:
case <-ctx.Done():
err = xerrors.Errorf("timed out waiting for graphsync request to be cancelled")
}
if err != nil {
return nil, xerrors.Errorf("waiting for cancelled graphsync request to complete: %w", err)
return nil, xerrors.Errorf("%s: restarting graphsync request: %w", chid, err)
}
}

Expand Down Expand Up @@ -1003,7 +1010,7 @@ func (c *dtChannel) open(
}, nil
}

func waitForCancelComplete(ctx context.Context, completed chan struct{}) error {
func waitForCompleteHook(ctx context.Context, completed chan struct{}) error {
// Wait for the cancel to propagate through to graphsync, and for
// the graphsync request to complete
select {
Expand Down Expand Up @@ -1139,30 +1146,30 @@ func (c *dtChannel) resume(msg datatransfer.Message) error {
}

func (c *dtChannel) close(ctx context.Context) error {
var errch chan error
c.lk.Lock()
defer c.lk.Unlock()

// Check if the channel was already cancelled
if c.gsKey == nil {
return nil
}

// If it's a graphsync request
if c.gsKey.p == c.peerID {
// Cancel the request
return c.cancelRequest(ctx)
{
// Check if the channel was already cancelled
if c.gsKey != nil {
// Check whether it's a graphsync request or response
if c.gsKey.p == c.peerID {
// Cancel the request
errch = c.cancelRequest(ctx)
} else {
// Cancel the response
errch = c.cancelResponse()
}
}
}
c.lk.Unlock()

// It's a graphsync response

// If the requester already cancelled, bail out
if c.requesterCancelled {
return nil
// Wait for the cancel message to complete
select {
case err := <-errch:
return err
case <-ctx.Done():
return ctx.Err()
}

// Cancel the response
log.Debugf("%s: cancelling response", c.channelID)
return c.gs.CancelResponse(c.gsKey.p, c.gsKey.requestID)
}

// Called when the responder gets a cancel message from the requester
Expand Down Expand Up @@ -1217,34 +1224,80 @@ func (c *dtChannel) cleanup() {
}

func (c *dtChannel) shutdown(ctx context.Context) error {
// Cancel the graphsync request
c.lk.Lock()
defer c.lk.Unlock()
errch := c.cancelRequest(ctx)
c.lk.Unlock()

// Cancel the graphsync request
return c.cancelRequest(ctx)
// Wait for the cancel message to complete
select {
case err := <-errch:
return err
case <-ctx.Done():
return ctx.Err()
}
}

// Cancel the graphsync request.
// Note: must be called under the lock.
func (c *dtChannel) cancelRequest(ctx context.Context) error {
func (c *dtChannel) cancelRequest(ctx context.Context) chan error {
errch := make(chan error, 1)

// Check that the request has not already been cancelled
if c.gsKey == nil {
return nil
errch <- nil
return errch
}

log.Debugf("%s: cancelling request", c.channelID)
err := c.gs.CancelRequest(ctx, c.gsKey.requestID)
if err != nil {
// Clear the graphsync key to indicate that the request has been cancelled
gsKey := c.gsKey
c.gsKey = nil

go func() {
log.Debugf("%s: cancelling request", c.channelID)
err := c.gs.CancelRequest(ctx, gsKey.requestID)

// Ignore "request not found" errors
if !xerrors.Is(graphsync.RequestNotFoundErr{}, err) {
return xerrors.Errorf("cancelling graphsync request for channel %s: %w", c.channelID, err)
if err != nil && !xerrors.Is(graphsync.RequestNotFoundErr{}, err) {
errch <- xerrors.Errorf("cancelling graphsync request for channel %s: %w", c.channelID, err)
} else {
errch <- nil
}
}()

return errch
}

func (c *dtChannel) cancelResponse() chan error {
errch := make(chan error, 1)

// Check if the requester already sent a cancel message,
// or the response has already been cancelled
if c.requesterCancelled || c.gsKey == nil {
errch <- nil
return errch
}

// Clear the graphsync key to indicate that the request has been cancelled
// Clear the graphsync key to indicate that the response has been cancelled
gsKey := c.gsKey
c.gsKey = nil

return nil
// Cancel the response in a go-routine to avoid locking when the channel's
// event queue is drained (potentially calling hooks which take the channel
// lock)
go func() {
log.Debugf("%s: cancelling response", c.channelID)
err := c.gs.CancelResponse(gsKey.p, gsKey.requestID)

// Ignore "request not found" errors
if err != nil && !xerrors.Is(graphsync.RequestNotFoundErr{}, err) {
errch <- xerrors.Errorf("%s: cancelling response: %w", c.channelID, err)
} else {
errch <- nil
}
}()

return errch
}

// Used in graphsync callbacks to map from graphsync request to the
Expand Down

0 comments on commit d9377ea

Please sign in to comment.