Skip to content

Commit

Permalink
Revert test to demonstrate actual bug (#87)
Browse files Browse the repository at this point in the history
* Revert test to demonstrate actual bug

Follow-up to #85

When this test fails, RabbitMQ logs the following connection exception:

```
2022-05-24 11:00:12.747989+00:00 [error] <0.19502.2> Channel error on connection <0.19347.2> (172.17.0.1:46318 -> 172.17.0.2:5672, vhost: '/', user: 'guest'), channel 20:
2022-05-24 11:00:12.747989+00:00 [error] <0.19502.2> operation basic.publish caused a channel exception not_found: no exchange 'not-existing-exchange' in vhost '/'
2022-05-24 11:00:12.748614+00:00 [error] <0.19347.2> Error on AMQP connection <0.19347.2> (172.17.0.1:46318 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 20:
2022-05-24 11:00:12.748614+00:00 [error] <0.19347.2>  operation basic.publish caused a connection exception channel_error: "expected 'channel.open'"
```

* Extend number of iterations

* Looks like this has a good chance of fixing the issue related to #85

* Indicate that a channel is closed immediately after decoding a channelClose frame

* Close the channel prior to the Once call in the same manner as the Connection

* No need to set closed again, wording

* Convert to the correct error type, thanks to @Gsantomaggio

* Conversion fixes
  • Loading branch information
lukebakken committed May 26, 2022
1 parent c113b33 commit 8b6de9a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 41 deletions.
20 changes: 13 additions & 7 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,16 @@ func newChannel(c *Connection, id uint16) *Channel {
}
}

// Signal that from now on, Channel.send() should call Channel.sendClosed()
func (ch *Channel) setClosed() {
atomic.StoreInt32(&ch.closed, 1)
}

// shutdown is called by Connection after the channel has been removed from the
// connection registry.
func (ch *Channel) shutdown(e *Error) {
ch.setClosed()

ch.destructor.Do(func() {
ch.m.Lock()
defer ch.m.Unlock()
Expand All @@ -105,10 +112,6 @@ func (ch *Channel) shutdown(e *Error) {
}
}

// Signal that from now on, Channel.send() should call
// Channel.sendClosed()
atomic.StoreInt32(&ch.closed, 1)

// Notify RPC if we're selecting
if e != nil {
ch.errors <- e
Expand Down Expand Up @@ -154,7 +157,7 @@ func (ch *Channel) shutdown(e *Error) {
// only 'channel.close' is sent to the server.
func (ch *Channel) send(msg message) (err error) {
// If the channel is closed, use Channel.sendClosed()
if atomic.LoadInt32(&ch.closed) == 1 {
if ch.IsClosed() {
return ch.sendClosed(msg)
}

Expand Down Expand Up @@ -231,7 +234,7 @@ func (ch *Channel) sendOpen(msg message) (err error) {
}

// If the channel is closed, use Channel.sendClosed()
if atomic.LoadInt32(&ch.closed) == 1 {
if ch.IsClosed() {
return ch.sendClosed(msg)
}

Expand Down Expand Up @@ -266,7 +269,7 @@ func (ch *Channel) sendOpen(msg message) (err error) {
}
} else {
// If the channel is closed, use Channel.sendClosed()
if atomic.LoadInt32(&ch.closed) == 1 {
if ch.IsClosed() {
return ch.sendClosed(msg)
}

Expand All @@ -284,6 +287,9 @@ func (ch *Channel) sendOpen(msg message) (err error) {
func (ch *Channel) dispatch(msg message) {
switch m := msg.(type) {
case *channelClose:
// Note: channel state is set to closed immedately after the message is
// decoded by the Connection

// lock before sending connection.close-ok
// to avoid unexpected interleaving with basic.publish frames if
// publishing is happening concurrently
Expand Down
3 changes: 3 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,11 @@ func (c *Connection) dispatch0(f frame) {
func (c *Connection) dispatchN(f frame) {
c.m.Lock()
channel := c.channels[f.channel()]
updateChannel(f, channel)
c.m.Unlock()

// Note: this could result in concurrent dispatch depending on
// how channels are managed in an application
if channel != nil {
channel.recv(channel, f)
} else {
Expand Down
57 changes: 23 additions & 34 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,46 +1539,36 @@ func TestDeadlockConsumerIssue48(t *testing.T) {

// https://github.com/streadway/amqp/issues/46
func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) {
var conn *Connection = nil
conn := integrationConnection(t, "issue46")
if conn == nil {
t.Fatal("conn is nil")
}

t.Cleanup(func() {
if conn != nil {
conn.Close()
}
conn.Close()
})

for i := 0; i < 100; i++ {
if conn == nil || conn.IsClosed() {
conn = integrationConnection(t, "issue46")
if conn == nil {
t.Fatal("conn is nil")
}
}

ch, err := conn.Channel()
if err, ok := err.(Error); ok {
if err.Code != 504 {
t.Fatalf("expected channel only exception i: %d got: %+v", i, err)
}
if conn.IsClosed() {
t.Fatal("conn is closed")
}

if ch == nil {
continue
ch, channelOpenError := conn.Channel()
if channelOpenError != nil {
t.Fatalf("error opening channel: %d error: %+v", i, channelOpenError)
}

for j := 0; j < 10; j++ {
for j := 0; j < 100; j++ {
if ch.IsClosed() {
if j == 0 {
t.Fatal("channel should not be closed")
}
break
} else {
err = ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
if err, ok := err.(Error); ok {
if err.Code != 504 {
t.Fatalf("expected channel only exception i: %d j: %d got: %+v", i, j, err)
}
if cerr := ch.Close(); cerr != nil {
t.Logf("error on channel close i: %d j: %d got: %+v", i, j, cerr)
}
break
}
err := ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})
if err != nil {
if publishError, ok := err.(*Error); !ok || publishError.Code != 504 {
t.Fatalf("expected channel only exception i: %d j: %d error: %+v", i, j, publishError)
}
}
}
Expand Down Expand Up @@ -1769,13 +1759,12 @@ func TestExchangeDeclarePrecondition(t *testing.T) {

if err == nil {
t.Fatalf("Expected to fail a redeclare with different durability, didn't receive an error")
}

if err, ok := err.(Error); ok {
if err.Code != PreconditionFailed {
} else {
declareErr := err.(*Error)
if declareErr.Code != PreconditionFailed {
t.Fatalf("Expected precondition error")
}
if !err.Recover {
if !declareErr.Recover {
t.Fatalf("Expected to be able to recover")
}
}
Expand Down
12 changes: 12 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,18 @@ type frame interface {
channel() uint16
}

/*
Perform any updates on the channel immediately after the frame is decoded while the
connection mutex is held.
*/
func updateChannel(f frame, channel *Channel) {
if mf, isMethodFrame := f.(*methodFrame); isMethodFrame {
if _, isChannelClose := mf.Method.(*channelClose); isChannelClose {
channel.setClosed()
}
}
}

type reader struct {
r io.Reader
}
Expand Down

0 comments on commit 8b6de9a

Please sign in to comment.