Skip to content

Commit

Permalink
Looks like this has a good chance of fixing the issue related to #85
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed May 24, 2022
1 parent 08699c7 commit 4fa1f1c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
21 changes: 14 additions & 7 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,17 @@ func newChannel(c *Connection, id uint16) *Channel {
}
}

// Signal that from now on, Channel.send() should call Channel.sendClosed()
func (ch *Channel) setClosed() {
atomic.StoreInt32(&ch.closed, 1)
}

// shutdown is called by Connection after the channel has been removed from the
// connection registry.
func (ch *Channel) shutdown(e *Error) {
ch.destructor.Do(func() {
ch.setClosed()

ch.m.Lock()
defer ch.m.Unlock()

Expand All @@ -105,10 +112,6 @@ func (ch *Channel) shutdown(e *Error) {
}
}

// Signal that from now on, Channel.send() should call
// Channel.sendClosed()
atomic.StoreInt32(&ch.closed, 1)

// Notify RPC if we're selecting
if e != nil {
ch.errors <- e
Expand Down Expand Up @@ -154,7 +157,7 @@ func (ch *Channel) shutdown(e *Error) {
// only 'channel.close' is sent to the server.
func (ch *Channel) send(msg message) (err error) {
// If the channel is closed, use Channel.sendClosed()
if atomic.LoadInt32(&ch.closed) == 1 {
if ch.IsClosed() {
return ch.sendClosed(msg)
}

Expand Down Expand Up @@ -231,7 +234,7 @@ func (ch *Channel) sendOpen(msg message) (err error) {
}

// If the channel is closed, use Channel.sendClosed()
if atomic.LoadInt32(&ch.closed) == 1 {
if ch.IsClosed() {
return ch.sendClosed(msg)
}

Expand Down Expand Up @@ -266,7 +269,7 @@ func (ch *Channel) sendOpen(msg message) (err error) {
}
} else {
// If the channel is closed, use Channel.sendClosed()
if atomic.LoadInt32(&ch.closed) == 1 {
if ch.IsClosed() {
return ch.sendClosed(msg)
}

Expand All @@ -284,6 +287,10 @@ func (ch *Channel) sendOpen(msg message) (err error) {
func (ch *Channel) dispatch(msg message) {
switch m := msg.(type) {
case *channelClose:
// Immediately indicate that this channel is closed to prevent
// invalid frames from being sent to the server
ch.setClosed()

// lock before sending connection.close-ok
// to avoid unexpected interleaving with basic.publish frames if
// publishing is happening concurrently
Expand Down
2 changes: 0 additions & 2 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,6 @@ func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) {
if j == 0 {
t.Fatal("channel should not be closed")
}
// TODO remove this debug log
t.Logf("channel is closed, i: %d j: %d", i, j)
break
}
publishError := ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
Expand Down

0 comments on commit 4fa1f1c

Please sign in to comment.